diff options
author | David Addison <daddison@nvidia.com> | 2018-10-25 00:44:59 +0300 |
---|---|---|
committer | David Addison <daddison@nvidia.com> | 2018-10-25 00:44:59 +0300 |
commit | b56650c7f59b8cd40d18809784a6d6be38ef8acb (patch) | |
tree | ffe7400758df57c9218ed75a6c6bdf255dbcdc4f | |
parent | 3202d6b393c42b1e626fab8eb3caf21b88fb8519 (diff) |
2.3.7-1v2.3.7-1
Improved LL tuning for multi-node jobs.
Improved bootstrap for large job scaling.
Fixed a hang during bootstrap due to socket reuse.
Added operation name to the COLL INFO logging.
-rw-r--r-- | makefiles/version.mk | 4 | ||||
-rw-r--r-- | src/bootstrap.cu | 213 | ||||
-rw-r--r-- | src/collectives/all_gather.cu | 2 | ||||
-rw-r--r-- | src/collectives/all_reduce.cu | 2 | ||||
-rw-r--r-- | src/collectives/broadcast.cu | 2 | ||||
-rw-r--r-- | src/collectives/reduce.cu | 2 | ||||
-rw-r--r-- | src/collectives/reduce_scatter.cu | 2 | ||||
-rw-r--r-- | src/include/bootstrap.h | 1 | ||||
-rw-r--r-- | src/include/core.h | 7 | ||||
-rw-r--r-- | src/include/socket.h | 16 | ||||
-rw-r--r-- | src/init.cu | 69 | ||||
-rw-r--r-- | src/misc/utils.cu | 2 | ||||
-rw-r--r-- | src/transport/net_socket.cu | 1 |
13 files changed, 165 insertions, 158 deletions
diff --git a/makefiles/version.mk b/makefiles/version.mk index c411280..f9cee6a 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 3 -NCCL_PATCH := 5 +NCCL_PATCH := 7 NCCL_SUFFIX := -PKG_REVISION := 5 +PKG_REVISION := 1 diff --git a/src/bootstrap.cu b/src/bootstrap.cu index bfe525a..8593726 100644 --- a/src/bootstrap.cu +++ b/src/bootstrap.cu @@ -40,7 +40,7 @@ static ncclResult_t bootstrapRecv(void* recvComm, void* data, int size) { } struct extId { - ncclNetHandle_t extHandle; + ncclNetHandle_t extHandleRoot; void* extListenComm; uint64_t hostHash; pid_t pid; @@ -48,20 +48,11 @@ struct extId { pthread_t boostrapThread; }; -struct bootstrapOp { - int op; - int size; -}; - struct extInfo { int rank; int nranks; - ncclNetHandle_t extHandle; -}; - -enum { - BOOTSTRAP_ALLGATHER = 1, - BOOTSTRAP_RINGEXCHANGE, + ncclNetHandle_t extHandleListenFromRoot; + ncclNetHandle_t extHandleRing; }; #include <sys/resource.h> @@ -77,10 +68,10 @@ static ncclResult_t setFilesLimit() { static void *bootstrapRoot(void* commId) { struct extInfo info; struct extId* id = (struct extId*)commId; - struct bootstrapOp bop; - void **extSendComm = NULL; - void **extRecvComm = NULL; - int size, alloc_size = 0; + ncclNetHandle_t *extHandleBstrap = NULL; // for initial rank <-> root information exchange + ncclNetHandle_t *extHandleRing = NULL; // for bootstrap ring creation + ncclNetHandle_t zero = { 0 }; // for sanity checking + void* tmpComm; char* data = NULL; ncclResult_t res; setFilesLimit(); @@ -88,13 +79,14 @@ static void *bootstrapRoot(void* commId) { /* Receive addresses from all ranks */ int nranks = 0, c = 0; do { - void* tmpRecvComm; - NCCLCHECKGOTO(bootstrapAccept(id->extListenComm, &tmpRecvComm), res, out); - NCCLCHECKGOTO(bootstrapRecv(tmpRecvComm, &info, sizeof(info)), res, out); - if (!c) { - extSendComm = (void**)calloc(info.nranks, sizeof(void*)); - extRecvComm = (void**)calloc(info.nranks, sizeof(void*)); - if (extSendComm == NULL || extRecvComm == NULL) { + NCCLCHECKGOTO(bootstrapAccept(id->extListenComm, &tmpComm), res, out); + NCCLCHECKGOTO(bootstrapRecv(tmpComm, &info, sizeof(info)), res, out); + NCCLCHECKGOTO(bootstrapCloseRecv(tmpComm), res, out); + + if (c == 0) { + extHandleBstrap = (ncclNetHandle_t *)calloc(info.nranks, sizeof(ncclNetHandle_t)); + extHandleRing = (ncclNetHandle_t *)calloc(info.nranks, sizeof(ncclNetHandle_t)); + if (extHandleBstrap == NULL || extHandleRing == NULL) { WARN("Bootstrap thread : failed to allocate memory"); goto out; } @@ -106,69 +98,39 @@ static void *bootstrapRoot(void* commId) { goto out; } - extRecvComm[info.rank] = tmpRecvComm; - NCCLCHECKGOTO(bootstrapConnect(0, info.extHandle, extSendComm+info.rank), res, out); - c++; - } while (c < nranks); - - do { - NCCLCHECKGOTO(bootstrapRecv(extRecvComm[0], &bop, sizeof(struct bootstrapOp)), res, out); - if (bop.size == -1) { - break; - } else { - size = bop.size; - if (size*nranks*2 > alloc_size) { - if (data) free(data); data = NULL; - NCCLCHECKGOTO(ncclCalloc(&data, size*nranks*2), res, out); - alloc_size = size*nranks*2; - } + if (memcmp(&zero, &extHandleBstrap[info.rank], sizeof(ncclNetHandle_t)) != 0) { + WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks); + goto out; } - if (bop.op == BOOTSTRAP_ALLGATHER) { - for (int r=0; r<nranks; r++) { - NCCLCHECKGOTO(bootstrapRecv(extRecvComm[r], data+size*r, size), res, out); - } + // Save the connection handle for connecting back to the ranks + memcpy(&extHandleBstrap[info.rank], info.extHandleListenFromRoot, sizeof(ncclNetHandle_t)); + // Save the connection handle for the AllGather ring + memcpy(&extHandleRing[info.rank], info.extHandleRing, sizeof(ncclNetHandle_t)); - for (int r=0; r<nranks; r++) { - NCCLCHECKGOTO(bootstrapSend(extSendComm[r], data, size*nranks), res, out); - } - } else if (bop.op == BOOTSTRAP_RINGEXCHANGE) { - // Receive from all and build total table - for (int r=0; r<nranks; r++) { - NCCLCHECKGOTO(bootstrapRecv(extRecvComm[r], data+r*2*size, 2*size), res, out); - } + ++c; + } while (c < nranks); - // Get prev/next request from everyone and answer. - for (int r=0; r<nranks; r++) { - int offset; - NCCLCHECKGOTO(bootstrapRecv(extRecvComm[r], &offset, sizeof(int)), res, out); - NCCLCHECKGOTO(bootstrapSend(extSendComm[r], data+offset, size), res, out); - NCCLCHECKGOTO(bootstrapRecv(extRecvComm[r], &offset, sizeof(int)), res, out); - NCCLCHECKGOTO(bootstrapSend(extSendComm[r], data+offset, size), res, out); - } - } else { - WARN("Bootstrap Root : invalid op type received %d", bop.op); - break; - } - } while (1); + // Send the connect handle for the next rank in the AllGather ring + for (int r=0; r<nranks; ++r) { + int next = (r+1) % nranks; + void *tmpSendComm; + NCCLCHECKGOTO(bootstrapConnect(0, extHandleBstrap[r], &tmpSendComm), res, out); + NCCLCHECKGOTO(bootstrapSend(tmpSendComm, &extHandleRing[next], sizeof(ncclNetHandle_t)), res, out); + NCCLCHECKGOTO(bootstrapCloseSend(tmpSendComm), res, out); + } out: bootstrapCloseListen(id->extListenComm); - for (int r=0; r<nranks; r++) { - if (extSendComm[r]) bootstrapCloseSend(extSendComm[r]); - if (extRecvComm[r]) bootstrapCloseRecv(extRecvComm[r]); - } free(commId); if (data) free(data); - if (extSendComm) free(extSendComm); - if (extRecvComm) free(extRecvComm); return NULL; } ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv) { struct extId* id = (struct extId*)commId; id->hostHash = getHostHash(); - NCCLCHECK(bootstrapListen(idFromEnv ? dontCareIf : 0, &id->extHandle, &id->extListenComm)); + NCCLCHECK(bootstrapListen(idFromEnv ? dontCareIf : 0, &id->extHandleRoot, &id->extListenComm)); ncclUniqueId* threadIdCopy; NCCLCHECK(ncclCalloc(&threadIdCopy, 1)); memcpy(threadIdCopy, id, sizeof(ncclUniqueId)); @@ -182,7 +144,7 @@ ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out) { char* env = getenv("NCCL_COMM_ID"); if (env) { - if (ncclSocketCreateHandle(&id->extHandle, env) != 0) { + if (ncclSocketCreateHandle(&id->extHandleRoot, env) != 0) { WARN("Invalid NCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>"); return ncclInvalidArgument; } @@ -196,10 +158,12 @@ ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out) { } struct extState { - void* extRecvComm; - void* extSendComm; + void* extBstrapRingRecvComm; + void* extBstrapRingSendComm; + ncclNetHandle_t extBstrapRootHandle; int rank; int nranks; + int dev; }; ncclResult_t bootstrapInit(ncclUniqueId* commId, int rank, int nranks, void** commState) { @@ -210,22 +174,39 @@ ncclResult_t bootstrapInit(ncclUniqueId* commId, int rank, int nranks, void** co state->rank = rank; state->nranks = nranks; *commState = state; + void* extBstrapRootListenComm; // comm on which we accept root's connections - struct extInfo info; + struct extInfo info = { 0 }; info.rank = rank; info.nranks = nranks; - void* tmpListenComm; + void *tmpSendComm, *extBstrapRingListenComm, *tmpRecvComm; // Pass the remote address to listen via info if (idFromEnv) { - memcpy(&info.extHandle, &id->extHandle, sizeof(ncclNetHandle_t)); + memcpy(&info.extHandleListenFromRoot, &id->extHandleRoot, sizeof(ncclNetHandle_t)); + memcpy(&info.extHandleRing, &id->extHandleRoot, sizeof(ncclNetHandle_t)); } - // listen will return the local address via info ('findSubnetIf' indicates that the net device is unknown) - int dev = idFromEnv ? findSubnetIf : 0; - NCCLCHECK(bootstrapListen(dev, &info.extHandle, &tmpListenComm)); - NCCLCHECK(bootstrapConnect(dev, id->extHandle, &state->extSendComm)); - NCCLCHECK(bootstrapSend(state->extSendComm, &info, sizeof(info))); - NCCLCHECK(bootstrapAccept(tmpListenComm, &state->extRecvComm)); - NCCLCHECK(bootstrapCloseListen(tmpListenComm)); + // listen will return the local address via info (specify interface type 'findSubnetIf') + state->dev = idFromEnv ? findSubnetIf : 0; + NCCLCHECK(bootstrapListen(state->dev, &info.extHandleListenFromRoot, &extBstrapRootListenComm)); + NCCLCHECK(bootstrapListen(state->dev, &info.extHandleRing, &extBstrapRingListenComm)); // AllGather Ring + + memcpy(&state->extBstrapRootHandle, &id->extHandleRoot, sizeof(ncclNetHandle_t)); + // send info on my listening sockets to root + NCCLCHECK(bootstrapConnect(state->dev, id->extHandleRoot, &tmpSendComm)); + NCCLCHECK(bootstrapSend(tmpSendComm, &info, sizeof(info))); + NCCLCHECK(bootstrapCloseSend(tmpSendComm)); + + // get info on my "next" rank in the bootstrap ring from root + ncclNetHandle_t extHandleNext; + NCCLCHECK(bootstrapAccept(extBstrapRootListenComm, &tmpRecvComm)); + NCCLCHECK(bootstrapRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext))); + NCCLCHECK(bootstrapCloseRecv(tmpRecvComm)); + + NCCLCHECK(bootstrapConnect(state->dev, extHandleNext, &state->extBstrapRingSendComm)); + // Accept the connect request from the previous rank in the AllGather ring + NCCLCHECK(bootstrapAccept(extBstrapRingListenComm, &state->extBstrapRingRecvComm)); + NCCLCHECK(bootstrapCloseListen(extBstrapRingListenComm)); + NCCLCHECK(bootstrapCloseListen(extBstrapRootListenComm)); return ncclSuccess; } @@ -233,58 +214,34 @@ ncclResult_t bootstrapInit(ncclUniqueId* commId, int rank, int nranks, void** co ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) { struct extState* state = (struct extState*)commState; char* data = (char*)allData; - struct bootstrapOp bop; - - bop.op = BOOTSTRAP_ALLGATHER; - bop.size = size; - - if (!state->rank) { - NCCLCHECK(bootstrapSend(state->extSendComm, &bop, sizeof(struct bootstrapOp))); + int rank = state->rank; + int nranks = state->nranks; + + TRACE(INIT, "rank %d nranks %d size %d", rank, nranks, size); + + /* Simple ring based AllGather + * At each step i receive data from (rank-i-1) from left + * and send previous step's data from (rank-i) to right + */ + for (int i=0; i<nranks-1; i++) { + int rslice = (rank - i - 1 + nranks) % nranks; + int sslice = (rank - i + nranks) % nranks; + + // Send slice to the right + NCCLCHECK(bootstrapSend(state->extBstrapRingSendComm, data+sslice*size, size)); + // Recv slice from the left + NCCLCHECK(bootstrapRecv(state->extBstrapRingRecvComm, data+rslice*size, size)); } - NCCLCHECK(bootstrapSend(state->extSendComm, data+state->rank*size, size)); - NCCLCHECK(bootstrapRecv(state->extRecvComm, data, size*state->nranks)); - - return ncclSuccess; -} - -ncclResult_t bootstrapRingExchange(void* commState, void* prevNextData, int prev, int next, int size) { - struct extState* state = (struct extState*)commState; - char* mydata = (char*)prevNextData; - int prev_offset = prev*2*size+size, next_offset = next*2*size; - - struct bootstrapOp bop; - bop.op = BOOTSTRAP_RINGEXCHANGE; - bop.size = size; - - if (!state->rank) { - NCCLCHECK(bootstrapSend(state->extSendComm, &bop, sizeof(struct bootstrapOp))); - } - - // Send data to root - NCCLCHECK(bootstrapSend(state->extSendComm, mydata, 2*size)); - - // Receive prev and next data - NCCLCHECK(bootstrapSend(state->extSendComm, &prev_offset, sizeof(int))); - NCCLCHECK(bootstrapRecv(state->extRecvComm, mydata, size)); - NCCLCHECK(bootstrapSend(state->extSendComm, &next_offset, sizeof(int))); - NCCLCHECK(bootstrapRecv(state->extRecvComm, mydata+size, size)); - - + TRACE(INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size); return ncclSuccess; } ncclResult_t bootstrapClose(void* commState) { struct extState* state = (struct extState*)commState; - struct bootstrapOp bop; - bop.size = -1; - - if (!state->rank) { - NCCLCHECK(bootstrapSend(state->extSendComm, &bop, sizeof(struct bootstrapOp))); - } - NCCLCHECK(bootstrapCloseSend(state->extSendComm)); - NCCLCHECK(bootstrapCloseRecv(state->extRecvComm)); + NCCLCHECK(bootstrapCloseSend(state->extBstrapRingSendComm)); + NCCLCHECK(bootstrapCloseRecv(state->extBstrapRingRecvComm)); free(state); diff --git a/src/collectives/all_gather.cu b/src/collectives/all_gather.cu index fab262d..e19feff 100644 --- a/src/collectives/all_gather.cu +++ b/src/collectives/all_gather.cu @@ -12,7 +12,7 @@ ncclResult_t ncclAllGatherFunc(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { size_t nbytes = count*ncclTypeSize(datatype); - INFO(COLL,"opCount %lx sendbuff %p recvbuff %p count %zi size %zi datatype %d op %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, nbytes, datatype, op, comm, comm->nRanks, stream); + INFO(COLL,"AllGather: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, datatype, op, root, comm, comm->nRanks, stream); if (comm->nRanks == 1) { if (sendbuff != recvbuff) CUDACHECK(cudaMemcpyAsync(recvbuff, sendbuff, nbytes, cudaMemcpyDeviceToDevice, stream)); diff --git a/src/collectives/all_reduce.cu b/src/collectives/all_reduce.cu index cca9886..77ae4c8 100644 --- a/src/collectives/all_reduce.cu +++ b/src/collectives/all_reduce.cu @@ -12,7 +12,7 @@ ncclResult_t ncclAllReduceFunc(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { size_t nbytes = count*ncclTypeSize(datatype); - INFO(COLL,"opCount %lx sendbuff %p recvbuff %p count %zi size %zi datatype %d op %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, nbytes, datatype, op, comm, comm->nRanks, stream); + INFO(COLL,"AllReduce: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, datatype, op, root, comm, comm->nRanks, stream); if (comm->nRanks == 1) { if (sendbuff != recvbuff) CUDACHECK(cudaMemcpyAsync(recvbuff, sendbuff, nbytes, cudaMemcpyDeviceToDevice, stream)); diff --git a/src/collectives/broadcast.cu b/src/collectives/broadcast.cu index fe079b0..0e5ec7b 100644 --- a/src/collectives/broadcast.cu +++ b/src/collectives/broadcast.cu @@ -12,7 +12,7 @@ ncclResult_t ncclBroadcastFunc(const void* sendbuff, void* recvbuff, const size_t count, ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { size_t nbytes = count*ncclTypeSize(datatype); - INFO(COLL,"opCount %lx sendbuff %p recvbuff %p count %zi size %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, nbytes, datatype, op, root, comm, comm->nRanks, stream); + INFO(COLL,"Broadcast: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, datatype, op, root, comm, comm->nRanks, stream); if (comm->nRanks == 1) { if (sendbuff != recvbuff) CUDACHECK(cudaMemcpyAsync(recvbuff, sendbuff, nbytes, cudaMemcpyDeviceToDevice, stream)); diff --git a/src/collectives/reduce.cu b/src/collectives/reduce.cu index b7c91e6..76d4a19 100644 --- a/src/collectives/reduce.cu +++ b/src/collectives/reduce.cu @@ -12,7 +12,7 @@ ncclResult_t ncclReduceFunc(const void* sendbuff, void* recvbuff, const size_t count, ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { size_t nbytes = count*ncclTypeSize(datatype); - INFO(COLL,"opCount %lx sendbuff %p recvbuff %p count %zi size %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, nbytes, datatype, op, root, comm, comm->nRanks, stream); + INFO(COLL,"Reduce: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, datatype, op, root, comm, comm->nRanks, stream); if (comm->nRanks == 1) { if (sendbuff != recvbuff) CUDACHECK(cudaMemcpyAsync(recvbuff, sendbuff, nbytes, cudaMemcpyDeviceToDevice, stream)); diff --git a/src/collectives/reduce_scatter.cu b/src/collectives/reduce_scatter.cu index 9e052ff..af9d78b 100644 --- a/src/collectives/reduce_scatter.cu +++ b/src/collectives/reduce_scatter.cu @@ -12,7 +12,7 @@ ncclResult_t ncclReduceScatterFunc(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { size_t nbytes = count*ncclTypeSize(datatype); - INFO(COLL,"opCount %lx sendbuff %p recvbuff %p count %zi size %zi datatype %d op %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, nbytes, datatype, op, comm, comm->nRanks, stream); + INFO(COLL,"ReduceScatter: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p", comm->opCount, sendbuff, recvbuff, count, datatype, op, root, comm, comm->nRanks, stream); if (comm->nRanks == 1) { if (sendbuff != recvbuff) CUDACHECK(cudaMemcpyAsync(recvbuff, sendbuff, nbytes, cudaMemcpyDeviceToDevice, stream)); diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index 81af3a4..278593c 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -13,6 +13,5 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv); ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out); ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState); ncclResult_t bootstrapAllGather(void* commState, void* allData, int size); -ncclResult_t bootstrapRingExchange(void* commState, void* prevNextData, int prev, int next, int size); ncclResult_t bootstrapClose(void* commState); #endif diff --git a/src/include/core.h b/src/include/core.h index 66b353c..2dd63d6 100644 --- a/src/include/core.h +++ b/src/include/core.h @@ -35,7 +35,8 @@ struct cudaLaunchParams { // Rings / LL tuning #define NCCL_LL_RING_THRESHOLD 8 // Per thread size before we start increasing nrings -#define NCCL_THREAD_THRESHOLD 32 // Per thread size before we switch to non-LL +#define NCCL_THREAD_THRESHOLD 64 // Per thread size before we switch to non-LL for Volta and above +#define NCCL_THREAD_THRESHOLD_PREVOLTA 32 // Per thread size before we switch to non-LL for pre-Volta archs #define NCCL_LL_MAX_NTHREADS 256 #define NCCL_LL_MIN_NTHREADS 64 @@ -95,8 +96,8 @@ struct ncclConnector { #define CUDA_IPC_MIN 2097152UL /* 2MiB - not currently used */ #define NCCL_LL_CHUNKS 8 -#define NUM_LINES_PER_THREAD 2 -#define NCCL_LL_BUFF_SIZE (NUM_LINES_PER_THREAD*NCCL_LL_MAX_NTHREADS*NCCL_LL_CHUNKS*sizeof(union ncclLLFifoLine)) // 64K +#define NUM_LINES_PER_THREAD 8 +#define NCCL_LL_BUFF_SIZE (NUM_LINES_PER_THREAD*NCCL_LL_MAX_NTHREADS*NCCL_LL_CHUNKS*sizeof(union ncclLLFifoLine)) // 256K #define NCCL_LL_BUFF_LINES (NCCL_LL_BUFF_SIZE / (2*sizeof(uint64_t))) #define NCCL_LL_SLICE_LINES (NCCL_LL_BUFF_LINES / NCCL_LL_CHUNKS) #define NCCL_LL_CLEAN_FREQ 0x10000000 diff --git a/src/include/socket.h b/src/include/socket.h index 3321e4d..533cacc 100644 --- a/src/include/socket.h +++ b/src/include/socket.h @@ -16,6 +16,7 @@ #include <net/if.h> #include "utils.h" +#define MAX_IFS 16 #define MAX_IF_NAME_SIZE 16 #define SLEEP_INT 1000 // sleep interval in usec #define RETRY_TIMES 2e4 // retry times before reporting a timeout (20 sec) @@ -40,6 +41,10 @@ static inline const char *socketToString(struct sockaddr *saddr, char *buf) { return buf; } +static inline short socketToPort(struct sockaddr *saddr) { + return ntohs(saddr->sa_family == AF_INET ? ((struct sockaddr_in*)saddr)->sin_port : ((struct sockaddr_in6*)saddr)->sin6_port); +} + /* Allow the user to force the IPv4/IPv6 interface selection */ static inline int envSocketFamily(void) { int family = -1; // Family selection is not forced, will use first one found @@ -56,9 +61,9 @@ static inline int envSocketFamily(void) { static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) { char line[1024]; - struct netIf userIfs[maxIfs]; + struct netIf userIfs[MAX_IFS]; bool searchNot = prefixList && prefixList[0] == '^'; - int nUserIfs = parseStringList(prefixList, userIfs, maxIfs); + int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); int found = 0; struct ifaddrs *interfaces, *interface; @@ -313,8 +318,11 @@ static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) return ncclSystemError; } - int opt = 1; - SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt"); + if (socketToPort(&localAddr->sa)) { + // Port is forced by env. Make sure we get the port. + int opt = 1; + SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt"); + } // localAddr port should be 0 (Any port) SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind"); diff --git a/src/init.cu b/src/init.cu index 6669251..ed33d1a 100644 --- a/src/init.cu +++ b/src/init.cu @@ -79,7 +79,19 @@ void initNet() { } NCCL_PARAM(LlThreshold, "LL_THRESHOLD", -2); -NCCL_PARAM(ThreadThreshold, "THREAD_THRESHOLD", NCCL_THREAD_THRESHOLD); +NCCL_PARAM(ThreadThreshold, "THREAD_THRESHOLD", -2); + +int ncclThreadThreshold(int minCompCap, int multiNode) { + int threshold = ncclParamThreadThreshold(); + if (threshold == -2) { // user has not set this env variable + threshold = (minCompCap <= 6) ? NCCL_THREAD_THRESHOLD_PREVOLTA : NCCL_THREAD_THRESHOLD; + // multiply by 2 if running on multiple nodes + if (multiNode) { + threshold *= 2; + } + } + return threshold; +} pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; static bool initialized = false; @@ -165,7 +177,6 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { cudaGetDevice(&comm->cudaDev); comm->doneEvent = doneEvent; comm->llThreshold = ncclParamLlThreshold(); - comm->threadThreshold = ncclParamThreadThreshold(); comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false; #if __CUDACC_VER_MAJOR__ >= 10 || (__CUDACC_VER_MAJOR__ >= 9 && __CUDACC_VER_MINOR__ >= 2) comm->groupCudaStream = ncclParamGroupCudaStream(); @@ -277,7 +288,7 @@ static void swap(void* mem1, void* mem2, int size) { #define MAXWIDTH 20 #define PREFIXLEN 15 -#define STRLENGTH (PREFIXLEN+4*MAXWIDTH) +#define STRLENGTH (PREFIXLEN+5*MAXWIDTH) void dumpMatrix(int* connectMatrix, int nranks) { char line[STRLENGTH+1]; line[STRLENGTH] = '\0'; @@ -292,6 +303,21 @@ void dumpMatrix(int* connectMatrix, int nranks) { } } +void dumpMatrixTvalue(ncclTvalue_t* connectMatrix, int nranks) { + char line[STRLENGTH+1]; + line[STRLENGTH] = '\0'; + memset(line, ' ', STRLENGTH); + for (int j=0; j<nranks && j<MAXWIDTH; j++) sprintf(4+line+5*j, " %4d", j); + INFO(INIT,"%s", line); + for (int i=0; i<nranks; i++) { + memset(line, ' ', STRLENGTH); + sprintf(line, "%3d ", i); + for (int j=0; j<nranks && j<MAXWIDTH; j++) sprintf(4+line+5*j, " %4o", (int)connectMatrix[i*nranks+j]); + INFO(INIT,"%s", line); + } +} + + void dumpLine(int* values, int nranks, const char* prefix) { int prefixlen = strlen(prefix); char line[STRLENGTH+1]; @@ -433,7 +459,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(bootstrapAllGather(commState, connectTransport, nranks*(sizeof(int)))); NCCLCHECK(bootstrapAllGather(commState, connectValue, nranks*(sizeof(ncclTvalue_t)))); //if (rank == 0) dumpMatrix(connectTransport, nranks); - //if (rank == 0) dumpMatrix(connectValue, nranks); + //if (rank == 0) dumpMatrixTvalue(connectValue, nranks); // Get my rings int nrings; @@ -481,15 +507,19 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm free(next); // Connect with prev/next for each ring + struct ncclConnect *connectData; + NCCLCHECK(ncclCalloc(&connectData, 2*nranks)); for (int r=0; r<nrings; r++) { int* ringRanks = rings+r*nranks; struct ncclRing *ring = comm->rings+r; - struct ncclConnect connect[2]; - NCCLCHECK(setupRing(comm, r, rank, nranks, ringRanks, allInfo, connect)); - NCCLCHECK(bootstrapRingExchange(commState, connect, ring->userRanks[nranks-1], ring->userRanks[1], sizeof(struct ncclConnect))); - NCCLCHECK(ring->send.transport->send.connect(connect+1, &ring->send)); - NCCLCHECK(ring->recv.transport->recv.connect(connect+0, &ring->recv)); - } + NCCLCHECK(setupRing(comm, r, rank, nranks, ringRanks, allInfo, connectData+2*rank)); + int prev_offset = ring->userRanks[nranks-1]*2+1; + int next_offset = ring->userRanks[1]*2; + NCCLCHECK(bootstrapAllGather(commState, connectData, sizeof(struct ncclConnect)*2)); + NCCLCHECK(ring->send.transport->send.connect(connectData+next_offset, &ring->send)); + NCCLCHECK(ring->recv.transport->recv.connect(connectData+prev_offset, &ring->recv)); + } + free(connectData); free(rings); free(allInfo); @@ -506,12 +536,15 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm // Compute intra ranks int intraRank0 = -1, intraRank = -1, intraRanks = 0; + int multiNode = 0; for (int r=0; r<nranks; r++) { if ((rankInfos[r].hostHash == rankInfos[rank].hostHash) && (rankInfos[r].pidHash == rankInfos[rank].pidHash)) { if (intraRanks == 0) intraRank0 = r; if (r == rank) intraRank = intraRanks; intraRanks++; + } else if (rankInfos[r].hostHash != rankInfos[rank].hostHash) { + multiNode = 1; } } TRACE(INIT,"hostHash[%d] %lx intraRank %d intraRanks %d intraRank0 %d", @@ -523,6 +556,9 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm } NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, rankInfos[intraRank0].comm)); + // Determine thread threshold across all GPUs + comm->threadThreshold = ncclThreadThreshold(minCompCap, multiNode); + // Barrier bootstrapClose(commState); return ncclSuccess; @@ -539,7 +575,7 @@ bool SetCpuAffinity(int cudaDev, nvmlDevice_t* nvmlDevice) { return true; } -ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int ndev, ncclUniqueId commId, int myrank) { +ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) { cpu_set_t affinitySave; sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave); @@ -553,12 +589,15 @@ ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int ndev, ncclUniqueId co SetCpuAffinity(cudaDev, &nvmlDevice); ncclResult_t res; - NCCLCHECKGOTO(commAlloc(newcomm, ndev, myrank), res, cleanup); + NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup); NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup); NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup); sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave); NCCLCHECKGOTO(wrapNvmlShutdown(), res, cleanup); + + INFO(INIT,"comm %p rank %d nranks %d - COMPLETE", *newcomm, myrank, nranks); + return ncclSuccess; cleanup: *newcomm = NULL; @@ -566,7 +605,7 @@ cleanup: return res; } -NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int ndev, ncclUniqueId commId, int myrank); +NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank); ncclResult_t ncclCommInitRank(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) { char* env = getenv("NCCL_COMM_ID"); if (env && myrank == 0) { @@ -649,9 +688,13 @@ static ncclResult_t initTransportsAll(struct ncclComm** comms, const int* devs, free(prevFinal); free(nextFinal); + // Determine thread threshold across all GPUs + int threadThreshold = ncclThreadThreshold(minCompCap, 0); + for (int rank=0; rank<nranks; rank++) { comms[rank]->nRings = nrings; comms[rank]->nThreads = nthreads; + comms[rank]->threadThreshold = threadThreshold; } for (int r=0; r<nrings; r++) { diff --git a/src/misc/utils.cu b/src/misc/utils.cu index e22a2c3..986bd61 100644 --- a/src/misc/utils.cu +++ b/src/misc/utils.cu @@ -99,7 +99,7 @@ int parseStringList(const char* string, struct netIf* ifList, int maxList) { ifC++; } ptr++; - } while (c); + } while (ifNum < maxList && c); return ifNum; } diff --git a/src/transport/net_socket.cu b/src/transport/net_socket.cu index e7682dd..cff1973 100644 --- a/src/transport/net_socket.cu +++ b/src/transport/net_socket.cu @@ -23,7 +23,6 @@ ncclResult_t ncclSocketPtrSupport(int dev, int* supportedTypes) { return ncclSuccess; } -#define MAX_IFS 16 static char ncclNetIfNames[MAX_IF_NAME_SIZE*MAX_IFS]; static union socketAddress ncclNetIfAddrs[MAX_IFS]; static int ncclNetIfs = -1; |