diff options
author | Sylvain Jeaugey <sjeaugey@nvidia.com> | 2021-04-13 02:00:11 +0300 |
---|---|---|
committer | Sylvain Jeaugey <sjeaugey@nvidia.com> | 2021-04-13 02:00:46 +0300 |
commit | a46ea105830fbc3fbb222a00a82862df49cc9a9f (patch) | |
tree | d21bca14da82ead582403ccedec50d165e3a5ee9 | |
parent | 911d61f214d45c98df1ee8c0ac23c33fb94b63de (diff) |
2.9.6-1v2.9.6-1
Add support for CUDA graphs.
Fuse BCM Gen4 switches to avoid suboptimal performance on some platforms. Issue #439.
Fix bootstrap issue caused by connection reordering.
Fix CPU locking block.
Improve CollNet algorithm.
Improve performance on DGX A100 for communicators with only one GPU per node.
43 files changed, 2680 insertions, 1237 deletions
diff --git a/makefiles/version.mk b/makefiles/version.mk index f64e8ad..6a40a92 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 -NCCL_MINOR := 8 -NCCL_PATCH := 4 +NCCL_MINOR := 9 +NCCL_PATCH := 6 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/Makefile b/src/Makefile index 487f790..a548840 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,5 +1,5 @@ # -# Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. # # See LICENSE.txt for license information # @@ -10,7 +10,7 @@ include ../makefiles/version.mk ##### src files INCEXPORTS := nccl.h nccl_net.h LIBSRCFILES := init.cc channel.cc bootstrap.cc transport.cc enqueue.cc group.cc debug.cc proxy.cc \ - misc/nvmlwrap.cc misc/ibvwrap.cc misc/utils.cc misc/argcheck.cc \ + misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc \ transport/p2p.cc transport/shm.cc transport/net.cc transport/net_socket.cc transport/net_ib.cc transport/coll_net.cc \ collectives/sendrecv.cc collectives/all_reduce.cc collectives/all_gather.cc collectives/broadcast.cc collectives/reduce.cc collectives/reduce_scatter.cc \ graph/topo.cc graph/paths.cc graph/search.cc graph/connect.cc graph/rings.cc graph/trees.cc graph/tuning.cc graph/xml.cc @@ -56,8 +56,8 @@ ALWAYS_REBUILD: $(LIBDIR)/$(LIBTARGET) $(LIBDIR)/$(STATICLIBTARGET) : $(LIBOBJ) $(INCDIR)/nccl.h : nccl.h.in -# NCCL_VERSION(X,Y,Z) ((X) * 1000 + (Y) * 100 + (Z)) - @$(eval NCCL_VERSION := $(shell printf "%d%d%02d" $(NCCL_MAJOR) $(NCCL_MINOR) $(NCCL_PATCH))) +# NCCL_VERSION(X,Y,Z) ((X) * 10000 + (Y) * 100 + (Z)) + @$(eval NCCL_VERSION := $(shell printf "%d%02d%02d" $(NCCL_MAJOR) $(NCCL_MINOR) $(NCCL_PATCH))) mkdir -p $(INCDIR) @printf "Generating %-35s > %s\n" $< $@ sed -e "s/\$${nccl:Major}/$(NCCL_MAJOR)/g" \ @@ -129,7 +129,7 @@ install : lib cp -P -v $(BUILDDIR)/lib/pkgconfig/* $(PREFIX)/lib/pkgconfig/ cp -v $(BUILDDIR)/include/* $(PREFIX)/include/ -FILESTOFORMAT := $(shell find . -name ".\#*" -prune -o \( -name "*.cc" -o -name "*.h" \) -print | grep -v -E 'ibvwrap.h|nvmlwrap.h|nccl.h') +FILESTOFORMAT := $(shell find . -name ".\#*" -prune -o \( -name "*.cc" -o -name "*.h" \) -print | grep -v -E 'ibvwrap.h|nvmlwrap.h|gdrwrap.h|nccl.h') # Note that formatting.mk defines a new target so in order to not overwrite the default target, # it shouldn't be included at the top. Also, it uses the above definition of FILESTOFORMAT as well # as the BUILDDIR variable. diff --git a/src/bootstrap.cc b/src/bootstrap.cc index 6f682f6..d452f91 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -191,6 +191,7 @@ ncclResult_t bootstrapGetUniqueId(ncclUniqueId* id) { struct unexConn { int peer; + int tag; int fd; struct unexConn* next; }; @@ -411,21 +412,23 @@ ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) { return ncclSuccess; } -ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size) { +ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size) { struct extState* state = (struct extState*)commState; int tmpSendFd; NCCLCHECK(connectAddress(&tmpSendFd, state->peerCommAddresses+peer)); NCCLCHECK(bootstrapNetSend(tmpSendFd, &state->rank, sizeof(int))); + NCCLCHECK(bootstrapNetSend(tmpSendFd, &tag, sizeof(int))); NCCLCHECK(bootstrapNetSend(tmpSendFd, data, size)); close(tmpSendFd); return ncclSuccess; } -ncclResult_t unexpectedEnqueue(struct extState* state, int peer, int fd) { +ncclResult_t unexpectedEnqueue(struct extState* state, int peer, int tag, int fd) { // New unex struct unexConn* unex; NCCLCHECK(ncclCalloc(&unex, 1)); unex->peer = peer; + unex->tag = tag; unex->fd = fd; // Enqueue @@ -439,11 +442,11 @@ ncclResult_t unexpectedEnqueue(struct extState* state, int peer, int fd) { return ncclSuccess; } -int unexpectedDequeue(struct extState* state, int peer) { +int unexpectedDequeue(struct extState* state, int peer, int tag) { struct unexConn* elem = state->unexpectedConnections; struct unexConn* prev = NULL; while (elem) { - if (elem->peer == peer) { + if (elem->peer == peer && elem->tag == tag) { if (prev == NULL) { state->unexpectedConnections = elem->next; } else { @@ -460,13 +463,13 @@ int unexpectedDequeue(struct extState* state, int peer) { } // We can't know who we'll receive from, so we need to receive everything at once -ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size) { +ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size) { struct extState* state = (struct extState*)commState; int tmpRecvFd; // Search unexpected connections first - if ((tmpRecvFd = unexpectedDequeue(state, peer)) != -1) { + if ((tmpRecvFd = unexpectedDequeue(state, peer, tag)) != -1) { NCCLCHECK(bootstrapNetRecv(tmpRecvFd, ((char*)data), size)); close(tmpRecvFd); return ncclSuccess; @@ -475,15 +478,16 @@ ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size) { // Then look for new connections while (1) { NCCLCHECK(bootstrapNetAccept(state->extListenFd, &tmpRecvFd)); - int newPeer; + int newPeer, newTag; NCCLCHECK(bootstrapNetRecv(tmpRecvFd, &newPeer, sizeof(int))); - if (newPeer == peer) { + NCCLCHECK(bootstrapNetRecv(tmpRecvFd, &newTag, sizeof(int))); + if (newPeer == peer && newTag == tag) { NCCLCHECK(bootstrapNetRecv(tmpRecvFd, ((char*)data), size)); close(tmpRecvFd); return ncclSuccess; } // Unexpected connection. Save for later. - NCCLCHECK(unexpectedEnqueue(state, newPeer, tmpRecvFd)); + NCCLCHECK(unexpectedEnqueue(state, newPeer, newTag, tmpRecvFd)); } } diff --git a/src/channel.cc b/src/channel.cc index b2cc5b7..a07e38a 100644 --- a/src/channel.cc +++ b/src/channel.cc @@ -1,11 +1,15 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ #include "channel.h" #include "param.h" +#include "gdrwrap.h" + +// GDRCOPY support: FIFO_ENABLE when enabled locates a workFifo in CUDA memory +NCCL_PARAM(GdrCopyFifoEnable, "GDRCOPY_FIFO_ENABLE", 1); ncclResult_t initChannel(struct ncclComm* comm, int channelid) { struct ncclChannel* channel = comm->channels+channelid; @@ -20,12 +24,25 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelid) { NCCLCHECK(ncclCudaCalloc(&channel->devPeers, comm->nRanks+1)); // The extra one rank is for collnet root (i.e. network) NCCLCHECK(ncclCalloc(&channel->peers, comm->nRanks+1)); for (size_t i=0; i<comm->nRanks+1; ++i) { - channel->peers[i].send.comm = comm; - channel->peers[i].recv.comm = comm; + for (int b=0; b<NCCL_MAX_CONNS; b++) { + channel->peers[i].send[b].comm = comm; + channel->peers[i].recv[b].comm = comm; + } } // Per-channel operation list. NCCLCHECK(ncclCudaHostCalloc(&channel->workFifo, NCCL_MAX_OPS)); + if (ncclGdrCopy != NULL && ncclParamGdrCopyFifoEnable() == 1) { + // GDRCOPY support + // We allocate a workFifo in GDR mapped CUDA memory + // But we still allocate the Host workFifo so that we + // can copy the work elements to CUDA memory on kernel launch + NCCLCHECK(ncclGdrCudaCalloc(&channel->workFifoGdr, &channel->workFifoDev, NCCL_MAX_OPS, &channel->gdrMemDesc)); + } else { + // The device workFifo is the Host one + channel->workFifoDev = channel->workFifo; + } + return ncclSuccess; } @@ -33,6 +50,10 @@ ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks) { if (channel->id == -1) return ncclSuccess; // Operation list NCCLCHECK(ncclCudaHostFree(channel->workFifo)); + if (channel->gdrMemDesc) { + // GDRCOPY support + NCCLCHECK(ncclGdrCudaFree(channel->gdrMemDesc)); + } // Free Ring index to rank tables free(channel->ring.userRanks); @@ -42,11 +63,15 @@ ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks) { // Note: free all send resources first due to CollNet arrangement for (int r=0; r<nRanks+1; r++) { struct ncclPeer* peer = channel->peers+r; - if (peer->send.transportResources) NCCLCHECK(peer->send.transportComm->free(peer->send.transportResources)); + for (int b=0; b<NCCL_MAX_CONNS; b++) { + if (peer->send[b].transportResources) NCCLCHECK(peer->send[b].transportComm->free(peer->send[b].transportResources)); + } } for (int r=0; r<nRanks+1; r++) { struct ncclPeer* peer = channel->peers+r; - if (peer->recv.transportResources) NCCLCHECK(peer->recv.transportComm->free(peer->recv.transportResources)); + for (int b=0; b<NCCL_MAX_CONNS; b++) { + if (peer->recv[b].transportResources) NCCLCHECK(peer->recv[b].transportComm->free(peer->recv[b].transportResources)); + } } // Free the peer structures. diff --git a/src/collectives/device/all_reduce.h b/src/collectives/device/all_reduce.h index fe2e6fc..88acb13 100644 --- a/src/collectives/device/all_reduce.h +++ b/src/collectives/device/all_reduce.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -149,7 +149,7 @@ class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_TREE, NCCL_PROTO_SIMPLE, FUNC, T } #else int nthreadsSplit = nthreads/2; - if (nthreadsSplit == 256) nthreadsSplit += 64; + if (nthreadsSplit >= 256) nthreadsSplit += 64; if (tree->up == -1) { if (tid < nthreads+WARP_SIZE) { // ReduceAndBroadcast : max number of recv is 3, max number of send is 3 @@ -198,59 +198,78 @@ class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_TREE, NCCL_PROTO_SIMPLE, FUNC, T template<class FUNC, typename T, int UNROLL> class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_COLLNET, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { +#define COLLNET_COPY_THREADS 96 public: __device__ void run(struct ncclWorkElem* args) { const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; + //const int nthreads = args->nThreads-3*WARP_SIZE; const int bid = args->coll.bid; const int nChannels = args->coll.nChannels; struct ncclDevComm* comm = args->comm; struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclTree* tree = &channel->collTree; + struct ncclDirect* tree = &channel->collTree; const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); int chunkSize = args->coll.lastChunkSize; - const ssize_t minChunkSize = nthreads*8*sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = nChannels*chunkSize; const ssize_t size = args->coll.count; - - if (loopSize > size) { - chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; - } + const ssize_t loopSize = nChannels*tree->nHeads*chunkSize; // Compute pointers const T * __restrict__ thisInput = (const T*)args->sendbuff; T * __restrict__ thisOutput = (T*)args->recvbuff; - if (blockIdx.x < nChannels) { // first half of the channels do reduce - ncclPrimitives<UNROLL, 1, 1, T, 1, 1, 0, FUNC> - prims(tid, nthreads, tree->down, &tree->up, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + const int hasUp = (tree->up[0] >= 0) ? 1 : 0; + const int hasDn = (tree->down[0] >= 0) ? 1 : 0; + const int nThreadsScatter = (hasUp && hasDn) ? COLLNET_COPY_THREADS : hasUp ? 3*COLLNET_COPY_THREADS : 0; + const int nThreadsGather = (hasUp && hasDn) ? COLLNET_COPY_THREADS : hasUp ? 2*COLLNET_COPY_THREADS : 0; + const int nThreadsBcast = (hasUp && hasDn) ? COLLNET_COPY_THREADS : hasUp ? 0 : 2*COLLNET_COPY_THREADS; + // Gather does not need sync threads, sparing one more warp for reduce + const int nThreadsReduce = NCCL_SIMPLE_MAX_NTHREADS + WARP_SIZE - nThreadsScatter - nThreadsGather - nThreadsBcast; + const int tidStartBcast = nThreadsGather; + const int tidStartScatter = tidStartBcast + nThreadsBcast + WARP_SIZE; + const int tidStartReduce = tidStartScatter + nThreadsScatter + WARP_SIZE; + + if (tid >= tidStartScatter && tid < tidStartReduce && hasUp) { + // Scatter + ncclPrimitives<UNROLL, 1, 1, T, 0, NCCL_MAX_DIRECT_ARITY, 0, FUNC> + prims(tid-tidStartScatter, nThreadsScatter, NULL, tree->up, NULL, stepSize, channel, comm, ncclShmem->ptrs, 4); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Up - ssize_t offset = gridOffset + bid*chunkSize; + ssize_t offset = gridOffset + bid*tree->nHeads*chunkSize; + int nelem = min(tree->nHeads*chunkSize, size-offset); + prims.scatter(thisInput+offset, nelem, chunkSize, tree->headRank, tree->shift); + } + } else if (tid >= tidStartReduce && tree->out != -1) { + // Reduce, send to network + ncclPrimitives<UNROLL, 1, 1, T, NCCL_MAX_DIRECT_ARITY, 1, 0, FUNC> + prims(tid-tidStartReduce, nThreadsReduce, tree->down, &tree->out, NULL, stepSize, channel, comm, ncclShmem->ptrs, 6); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + ssize_t offset = gridOffset + (bid*tree->nHeads+tree->headRank)*chunkSize; int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - prims.send(thisInput+offset, nelem); - } else { + if (hasDn) { prims.recvReduceSend(thisInput+offset, nelem); + } else { + prims.send(thisInput+offset, nelem); } } - } - - if (blockIdx.x >= nChannels) { // second half of the channels do broadcast - ncclPrimitives<UNROLL, 1, 1, T, 1, 1, 0, FUNC> - prims(tid, nthreads, &tree->up, tree->down, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + } else if (tid < tidStartBcast && hasUp) { + // Gather + ncclPrimitives<UNROLL, 1, 1, T, NCCL_MAX_DIRECT_ARITY, 0, 0, FUNC> + prims(tid, nThreadsGather, tree->up, NULL, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Down - ssize_t offset = gridOffset + bid*chunkSize; + ssize_t offset = gridOffset + bid*tree->nHeads*chunkSize; + int nelem = min(tree->nHeads*chunkSize, size-offset); + prims.gather(thisOutput+offset, nelem, chunkSize, tree->headRank, tree->shift); + } + } else if (tid >= tidStartBcast && tid < tidStartScatter && tree->out != -1) { + // Recv from network, broadcast + ncclPrimitives<UNROLL, 1, 1, T, 1, NCCL_MAX_DIRECT_ARITY, 0, FUNC> + prims(tid-tidStartBcast, nThreadsBcast, &tree->out, tree->down, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 2); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + ssize_t offset = gridOffset + (bid*tree->nHeads+tree->headRank)*chunkSize; int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - prims.send(thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - prims.recv(thisOutput+offset, nelem); - } else { + if (hasDn) { prims.recvCopySend(thisOutput+offset, nelem); + } else { + prims.recv(thisOutput+offset, nelem); } } } @@ -397,60 +416,7 @@ class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_TREE, NCCL_PROTO_LL, FUNC, T, UN template<class FUNC, typename T, int UNROLL> class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_COLLNET, NCCL_PROTO_LL, FUNC, T, UNROLL> { public: - __device__ void run(struct ncclWorkElem* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads; - const int bid = args->coll.bid; - const int nChannels = args->coll.nChannels; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclTree* tree = &channel->collTree; - const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); - ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); - const ssize_t minChunkSize = nthreads*sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = nChannels*chunkSize; - const ssize_t size = args->coll.count; - - if (loopSize > size) { - chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; - } - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->sendbuff; - T * __restrict__ thisOutput = (T*)args->recvbuff; - - if (blockIdx.x < nChannels) { // first half of the channels do reduce - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, tree->down, &tree->up, stepLines, channel, comm); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Up - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - LLprims.send(thisInput+offset, nelem); - } else { - LLprims.recvReduceSend(thisInput+offset, nelem); - } - } - } - - if (blockIdx.x >= nChannels) { // second half of the channels do broadcast - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &tree->up, tree->down, stepLines, channel, comm); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Down - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - LLprims.send(thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - LLprims.recv(thisOutput+offset, nelem); - } else { - LLprims.recvCopySend(thisOutput+offset, nelem); - } - } - } - } + __device__ void run(struct ncclWorkElem* args) { } }; #include "prims_ll128.h" diff --git a/src/collectives/device/common.h b/src/collectives/device/common.h index 265218a..2673a0a 100644 --- a/src/collectives/device/common.h +++ b/src/collectives/device/common.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2017-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2017-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -41,9 +41,9 @@ static __device__ void load_parallel(void* dst, void* src, size_t size, int tid) int* s = (int*)src; for (int o = tid; o < (size/sizeof(int)); o += blockDim.x) d[o] = s[o]; } -static __device__ void load_coll(struct ncclWork* localWork, struct ncclWork* hostWork, int tid, struct ncclDevComm* comm) { - __syncthreads(); - load_parallel(localWork, hostWork, sizeof(struct ncclWork), tid); + +static __device__ void load_coll(struct ncclWork* localWork, struct ncclWork *hostWork, struct ncclWork* workFifo, int tid, struct ncclDevComm* comm) { + load_parallel(localWork, workFifo, sizeof(struct ncclWork), tid); // Check whether the last operation was aborted and make sure all threads exit int abort = tid == 0 ? *(comm->abortFlag) : 0; exitIfAbortBarrier(abort); @@ -57,8 +57,8 @@ class ncclFunction { }; struct ncclShmemPtrs { - void* srcs[NCCL_MAX_DEV_ARITY+1]; - void* dsts[NCCL_MAX_DEV_ARITY+1]; + void* srcs[NCCL_MAX_DIRECT_ARITY+1]; + void* dsts[NCCL_MAX_DIRECT_ARITY+1]; }; struct ncclShmemData { @@ -82,7 +82,6 @@ __device__ void ncclKernel(struct ncclWorkElem first) { struct ncclDevComm* comm = first.comm; struct ncclChannel* channel = comm->channels+bid; struct ncclWorkElem* w = NULL; - uint16_t index = first.index; /* To optimize for latency, (only) the first operation is passed as argument.*/ if (bid == 0 && first.funcIndex != FUNC_INDEX_P2P) w = &first; @@ -90,7 +89,8 @@ __device__ void ncclKernel(struct ncclWorkElem first) { while (1) { if (w == NULL) { w = shmem.localWork.elems; - load_coll(&shmem.localWork, channel->workFifo+index, tid, comm); + __syncthreads(); + load_coll(&shmem.localWork, channel->workFifo+channel->index, channel->workFifoDev+channel->index, tid, comm); } if (tid < w->nThreads) { if (w->funcIndex == FINDEX) { @@ -99,7 +99,7 @@ __device__ void ncclKernel(struct ncclWorkElem first) { ncclFuncs[w->funcIndex](w); } } - index = (index+1) % NCCL_MAX_OPS; + if (tid == 0) channel->index = (channel->index+1) % NCCL_MAX_OPS; if (w->active == 2) { return; } diff --git a/src/collectives/device/primitives.h b/src/collectives/device/primitives.h index 69348db..605a491 100644 --- a/src/collectives/device/primitives.h +++ b/src/collectives/device/primitives.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -168,9 +168,58 @@ class ncclPrimitives { } } + // Scatter and gather do not support DIRECT + template <int RECV, int SEND> + inline __device__ void + ScatterGatherOp(const T* srcPtr, T* dstPtr, int totalElem, int peerElem, int skip, int shift) { + int offset = 0; // slice offset + int sliceSize = stepSize*SLICESTEPS; + int dataSize = max(DIVUP(peerElem, 16*SLICESPERCHUNK)*16, sliceSize/32); // per-peer slice size + + #pragma unroll + for (int slice=0; slice<SLICESPERCHUNK; ++slice) { + int realSize = max(0, min(dataSize, peerElem-offset)); + if (tid < nworkers) { + if (RECV && (role & ROLE_WAIT_RECV)) waitRecv<0, 0>(0); + // realSize is not accurate here; but intra-node does not rely on sizes FIFO + if (SEND && (role & ROLE_WAIT_SEND)) waitSend<0, 0>(0, realSize*sizeof(T)); + subBarrier(); + if (SEND) { + #pragma unroll + for (int j=0; j<nsend; j++) { + int i = (j+shift)%nsend; + int peerOffset = i*peerElem + offset; + if (skip >=0 && i >= skip) peerOffset += peerElem; + const T* src0 = srcPtr + peerOffset; + int realPeerSize = min(realSize, totalElem-peerOffset); + if (realPeerSize > 0) ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, 1>(tid, nworkers, 1, &src0, 1, dsts+i, realPeerSize); + } + } else if (RECV) { + #pragma unroll + for (int j=0; j<nrecv; j++) { + int i = (j+shift)%nrecv; + int peerOffset = i*peerElem + offset; + if (skip >= 0 && i >= skip) peerOffset += peerElem; + T* dst0 = dstPtr + peerOffset; + int realPeerSize = min(realSize, totalElem-peerOffset); + if (realPeerSize > 0) ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, 1>(tid, nworkers, 1, srcs+i, 1, &dst0, realPeerSize); + } + } + } + barrier(); + if (SEND && (role & ROLE_POST_SEND) && realSize > 0 && index == 0) __threadfence_system(); + __syncwarp(); + if (SEND && (role & ROLE_POST_SEND)) postSend(); + if (RECV && (role & ROLE_POST_RECV)) postRecv(); + offset += realSize; + } + } + __device__ __forceinline__ void loadRecvConn(struct ncclChannel* channel, T* directBuff) { if (role & (ROLE_WAIT_RECV|ROLE_POST_RECV)) { - conn = &channel->devPeers[peer].recv.conn; + // For oneshot: groups 0,2 use conn 0, groups 4,6 use conn 1 + const int connIndex = (NSEND == NCCL_MAX_DIRECT_ARITY || NRECV == NCCL_MAX_DIRECT_ARITY) ? group/4 : 0; + conn = &channel->devPeers[peer].recv[connIndex].conn; step = conn->step; step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS); if (role & ROLE_POST_RECV) { @@ -193,7 +242,9 @@ class ncclPrimitives { __device__ __forceinline__ void loadSendConn(struct ncclChannel* channel) { if (role & (ROLE_WAIT_SEND|ROLE_POST_SEND)) { - conn = &channel->devPeers[peer].send.conn; + // For oneshot: groups 0,2 use conn 0, groups 4,6 use conn 1 + const int connIndex = (NSEND == NCCL_MAX_DIRECT_ARITY || NRECV == NCCL_MAX_DIRECT_ARITY) ? group/4 : 0; + conn = &channel->devPeers[peer].send[connIndex].conn; step = conn->step; step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS); if (role & ROLE_POST_SEND) { @@ -203,7 +254,7 @@ class ncclPrimitives { buff = (T*)conn->buffs[NCCL_PROTO_SIMPLE]; if (DIRECT && (conn->direct & NCCL_DIRECT_GPU)) { void* volatile* ptr = conn->ptrExchange; - while ((direct = (T*)(*ptr)) == NULL); + while ((direct = (T*)(*ptr)) == NULL) { if (checkAbort()) break; } *ptr = NULL; } connHeadPtr = conn->head; @@ -318,6 +369,16 @@ class ncclPrimitives { GenericOp<0, 1, 1, 1, 1, 1>(src, dst, nelem, directOffset); } + __device__ __forceinline__ void + scatter(const T* src, int totalElem, int peerElem, int skip, int shift) { + ScatterGatherOp<0, 1>(src, NULL, totalElem, peerElem, skip, shift); + } + + __device__ __forceinline__ void + gather(T* dst, int totalElem, int peerElem, int skip, int shift) { + ScatterGatherOp<1, 0>(NULL, dst, totalElem, peerElem, skip, shift); + } + __device__ __forceinline__ ~ncclPrimitives() { // Save steps for the next operation saveSync(); diff --git a/src/collectives/device/prims_ll.h b/src/collectives/device/prims_ll.h index 9e362f9..48972a9 100644 --- a/src/collectives/device/prims_ll.h +++ b/src/collectives/device/prims_ll.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -203,8 +203,9 @@ class ncclLLPrimitives { // Make sure step is updated before we read it. barrier(); - for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i); - for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i); + // If we are going to support oneshot collNet + LL, then we would need to add connector index here + for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv->conn, i); + for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send->conn, i); loadRecvSync(); loadSendSync(); } diff --git a/src/collectives/device/prims_ll128.h b/src/collectives/device/prims_ll128.h index 999d0d5..9ce1195 100644 --- a/src/collectives/device/prims_ll128.h +++ b/src/collectives/device/prims_ll128.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -349,8 +349,8 @@ class ncclLL128Primitives { // Make sure step is updated before we read it. barrier(); - for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i); - for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i); + for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv->conn, i); + for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send->conn, i); loadRecvSync(); loadSendSync(); } diff --git a/src/collectives/device/sendrecv.h b/src/collectives/device/sendrecv.h index 1cb34f3..b489f42 100644 --- a/src/collectives/device/sendrecv.h +++ b/src/collectives/device/sendrecv.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -49,10 +49,10 @@ class ncclFunction<ncclFuncSendRecv, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, struct ncclChannel* channel = comm->channels+blockIdx.x; const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/(sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize/SENDRECV_SLICEFACTOR; int nThreadsSplit = nThreads/2; if ((tid < nThreadsSplit) && recvCount >= 0) { + const int chunkSize = args->p2p.recvChunkSize/sizeof(T); int peer = (comm->rank-delta+comm->nRanks)%comm->nRanks; int nt = nThreadsSplit; ncclPrimitives<UNROLL, 1, 1, T, 1, 0, 1, FUNC> @@ -68,6 +68,7 @@ class ncclFunction<ncclFuncSendRecv, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, } } if ((tid >= nThreadsSplit) && sendCount >= 0) { + const int chunkSize = args->p2p.sendChunkSize/sizeof(T); int peer = (comm->rank+delta)%comm->nRanks; int nt = nThreads-nThreadsSplit; ncclPrimitives<UNROLL, 1, 1, T, 0, 1, 1, FUNC> diff --git a/src/enqueue.cc b/src/enqueue.cc index 4137f61..09da21c 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2017-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2017-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -7,6 +7,7 @@ #include "enqueue.h" #include "argcheck.h" #include "coll_net.h" +#include "gdrwrap.h" // Only generate inline kernels for LL #define NCCL_FUNC5(func, algo, redop, dtype) \ @@ -63,6 +64,21 @@ static void* const ncclKerns[1+NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_N NCCL_FUNCS2A(AllReduce) }; +// Determine the maximum kernel stack size of all CUDA kernels +size_t ncclKernMaxLocalSize() { + ncclResult_t res = ncclSuccess; + int numNcclKerns = sizeof(ncclKerns)/sizeof(ncclKerns[0]); + cudaFuncAttributes attr = {0}; + size_t max = 0; + for (int i = 0; i < numNcclKerns; i++) { + CUDACHECKGOTO(cudaFuncGetAttributes(&attr, ncclKerns[i]), res, error); + if (attr.localSizeBytes > max) max = attr.localSizeBytes; + } + +error: + return (res != ncclSuccess) ? 0 : max; +} + /*****************************************************************************/ /* Launch system : synchronization and CUDA kernel launch */ /*****************************************************************************/ @@ -108,14 +124,23 @@ static ncclResult_t getNextOp(struct ncclChannel* channel, struct ncclWork** wor return ncclSuccess; } -static ncclResult_t setupLaunch(struct ncclComm* comm, struct cudaLaunchParams* params) { +static ncclResult_t setupLaunch(struct ncclQueueInfo* eqInfo, int usingCudaGraph) { + ncclComm_t comm = eqInfo->comm; + struct cudaLaunchParams* params = comm->myParams; + // Only launch blocks where we have work to do. - for (int c=0; c<comm->p2pnChannels; c++) { - if (comm->channels[c].workCount) params->gridDim.x = c+1; + // This is not supported when we are in cudaGraph mode. + // Because in cudaGraph mode the launch param needs to be determined + // at capture time instead of launch time. + if (!usingCudaGraph) { + for (int c=0; c<comm->p2pnChannels; c++) { + if (comm->channels[c].workCount) params->gridDim.x = c+1; + } + eqInfo->maxChannels = params->gridDim.x; } // Set active = 2 for the last operation and add a no-op on empty channels (p2p case). - for (int c=0; c<params->gridDim.x; c++) { + for (int c=0; c<eqInfo->maxChannels; c++) { struct ncclChannel* channel = comm->channels+c; if (channel->workCount == 0) { struct ncclWork* w; @@ -126,18 +151,35 @@ static ncclResult_t setupLaunch(struct ncclComm* comm, struct cudaLaunchParams* e->p2p.nThreads = 0; } channel->workFifo[(channel->workFifoTail-1)%NCCL_MAX_OPS].elems[0].active = 2; - } - // Find the first operation, choose the kernel accordingly and pass it - // as the first argument. - struct ncclChannel* c0 = comm->channels; - struct ncclWork* work = c0->workFifo+((c0->workFifoTail-c0->workCount)%NCCL_MAX_OPS); - struct ncclWorkElem* elem = work->elems; - memcpy(&comm->args, elem, sizeof(struct ncclWorkElem)); - // As we inline the first coll directly, we can free it immediately. - if (elem->funcIndex != FUNC_INDEX_P2P) elem->active = 0; + if (c == 0) { + // Find the first operation, choose the kernel accordingly and pass it as the first argument. + // Note that changing cuda launch argument after capture is not supported by cudaGraph + struct ncclWork* work = channel->workFifo+((channel->workFifoTail-channel->workCount)%NCCL_MAX_OPS); + struct ncclWorkElem* elem = work->elems; + if (!usingCudaGraph) { + params->func = ncclKerns[elem->funcIndex]; + memcpy(&comm->args, elem, sizeof(struct ncclWorkElem)); + } + // As we inline the first coll directly, we can free it immediately. + if (elem->funcIndex != FUNC_INDEX_P2P) elem->active = 0; + } + + if (channel->gdrMemDesc) { + // GDRCOPY support + uint64_t first = (channel->workFifoTail-channel->workCount)%NCCL_MAX_OPS; + uint64_t nelems = channel->workCount; + TRACE(NCCL_INIT, "GDRCOPY : copy workFifo %p to %p first %ld last %ld nelems %zi", + channel->workFifo, channel->workFifoGdr, first, last, nelems); + + for (int i = 0; i < nelems; i++) { + int elem = (first+i) % NCCL_MAX_OPS; + // Copy Host workFifo to CUDA workFifo via the GDRCOPY mapping + NCCLCHECK(ncclGdrCudaCopy(channel->gdrMemDesc, channel->workFifoGdr+elem, channel->workFifo+elem, 1)); + } + } + } - params->func = ncclKerns[elem->funcIndex]; return ncclSuccess; } @@ -180,21 +222,23 @@ ncclResult_t ncclCpuBarrierOut(struct ncclComm* comm) { return ncclSuccess; } -ncclResult_t ncclBarrierEnqueue(struct ncclComm* comm) { +ncclResult_t ncclLaunchBarrier(struct ncclComm* comm) { struct cudaLaunchParams* params = comm->myParams; if (params->gridDim.x == 0) return ncclSuccess; - NCCLCHECK(setupLaunch(comm, params)); - // Use internal NCCL stream for CGMD/GROUP launch if required or if the user stream is NULL - if (comm->launchMode == ncclComm::GROUP && (comm->groupCudaStream || comm->userStream == NULL)) { + if (comm->launchMode == ncclComm::GROUP && + (comm->groupCudaStream || + comm->userStream == cudaStreamDefault || + comm->userStream == cudaStreamLegacy || + comm->userStream == cudaStreamPerThread)) { // Enqueue event in user stream - CUDACHECK(cudaEventRecord(comm->doneEvent, comm->userStream)); + CUDACHECK(cudaEventRecord(comm->intDoneEvent, comm->userStream)); // Create dependency between user stream and internal NCCL stream - CUDACHECK(cudaStreamWaitEvent(comm->groupStream, comm->doneEvent, 0)); + CUDACHECK(cudaStreamWaitEvent(comm->groupStream, comm->intDoneEvent, 0)); params->stream = comm->groupStream; } else { - if (comm->userStream != params->stream) { + if (comm->userStream != params->stream && !comm->usingCudaGraph) { // Stream changed from last call, create dependency against last NCCL kernel launch CUDACHECK(cudaStreamWaitEvent(comm->userStream, comm->doneEvent, 0)); } @@ -213,7 +257,7 @@ ncclResult_t ncclBarrierEnqueue(struct ncclComm* comm) { return ncclSuccess; } -ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) { +ncclResult_t ncclLaunchKernel(ncclComm_t comm) { struct cudaLaunchParams *params = comm->myParams; if (params->gridDim.x == 0) return ncclSuccess; @@ -226,44 +270,73 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) { (comm->launchMode == ncclComm::GROUP && comm->groupCudaStream) ? "/Stream" : ""); } - - if (comm->launchMode == ncclComm::PARALLEL) { - CUDACHECK(cudaLaunchKernel(params->func, params->gridDim, params->blockDim, params->args, params->sharedMem, params->stream)); - } else { + if (comm->launchMode == ncclComm::GROUP) { NCCLCHECK(ncclCpuBarrierOut(comm)); + } else { + CUDACHECK(cudaLaunchKernel(params->func, params->gridDim, params->blockDim, params->args, params->sharedMem, params->stream)); } + return ncclSuccess; +} + +static ncclResult_t ncclLaunchProxy(struct ncclQueueInfo* eqInfo) { // Start the network proxies as soon as the kernel has been launched. We can't // perform any CUDA call between the two or having a cudaFree between the CUDA // launch and the ncclProxyStart call could cause a deadlock. // Also, starting the proxies after the CUDA launch seems to be better for // performance (latency). - uint64_t max = 0ULL; - for (int r=0; r<params->gridDim.x; r++) { + ncclComm_t comm = eqInfo->comm; + if (eqInfo->maxChannels == 0) return ncclSuccess; + + for (int r=0; r<eqInfo->maxChannels; r++) { struct ncclChannel* channel = comm->channels+r; - max = std::max(max, channel->workFifoTail); channel->workCount = 0; } - for (int r=0; r<comm->p2pnChannels; r++) { - struct ncclChannel* channel = comm->channels+r; - channel->workFifoTail = max; - } - params->gridDim.x = params->blockDim.x = 0; - comm->lastOpCount = max; + comm->lastChannel = 0; NCCLCHECK(ncclProxyStart(comm)); return ncclSuccess; } -ncclResult_t ncclEnqueueEvents(ncclComm_t comm) { +ncclResult_t ncclRecordEvents(ncclComm_t comm) { struct cudaLaunchParams *params = comm->myParams; - // Enqueue event after NCCL kernel - CUDACHECK(cudaEventRecord(comm->doneEvent, params->stream)); + + // Enqueue event after NCCL kernel (only in non-graph mode) + if (!comm->usingCudaGraph) CUDACHECK(cudaEventRecord(comm->doneEvent, params->stream)); // Use internal NCCL stream for CGMD/GROUP launch if required or if the user stream is NULL - if (comm->launchMode == ncclComm::GROUP && (comm->groupCudaStream || comm->userStream == NULL)) { + if (comm->launchMode == ncclComm::GROUP && + (comm->groupCudaStream || + comm->userStream == cudaStreamDefault || + comm->userStream == cudaStreamLegacy || + comm->userStream == cudaStreamPerThread)) { + CUDACHECK(cudaEventRecord(comm->intDoneEvent, params->stream)); // Create dependency between NCCL internal stream and user stream - CUDACHECK(cudaStreamWaitEvent(comm->userStream, comm->doneEvent, 0)); + CUDACHECK(cudaStreamWaitEvent(comm->userStream, comm->intDoneEvent, 0)); } + return ncclSuccess; +} + +ncclResult_t ncclLaunchReset(ncclComm_t comm) { comm->userStreamSet = false; + + // We are finishing capture of the current launch + // But we need to keep the current enqueue info for CUDA graph + // Thus we need to creating a new enqueue info for the next run + if (comm->usingCudaGraph) { + NCCLCHECK(ncclCalloc(&comm->enqueueInfo, 1)); + comm->enqueueInfo->comm = comm; + } else { + // If not in CUDA graph mode, we reuse the same info space + NCCLCHECK(ncclResetQueueInfo(comm->enqueueInfo)); + } + + struct cudaLaunchParams *params = comm->myParams; + params->gridDim.x = params->blockDim.x = 0; + params->func = NULL; + + // Reset launch mode to GROUP if changed + if (comm->launchMode == ncclComm::GROUP_GRAPH) comm->launchMode = ncclComm::GROUP; + comm->usingCudaGraph = 0; + return ncclSuccess; } @@ -280,10 +353,10 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { int nAlgos = NCCL_NUM_ALGORITHMS; // Check collNet support int collNetTypeSupport = 0; - if (info->comm->collNetSupport) + if (info->comm->collNetSupport > 0) NCCLCHECK(collNetReduceSupport(info->datatype, info->op, &collNetTypeSupport)); - if (collNetTypeSupport != 1) nAlgos--; for (int a=0; a<nAlgos; a++) { + if (a == NCCL_ALGO_COLLNET && collNetTypeSupport != 1) continue; for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { float time; NCCLCHECK(ncclTopoGetAlgoTime(info, a, p, &time)); @@ -301,17 +374,31 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { //if (comm->rank == 0) INFO(NCCL_TUNING, "%ld Bytes -> Algo %d proto %d time %f", info->nBytes, info->algorithm, info->protocol, minTime); TRACE(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %f", info->nBytes, info->algorithm, info->protocol, minTime); - int nc = (info->nChannels > 0) ? info->nChannels : - (info->algorithm == NCCL_ALGO_COLLNET) ? comm->nChannels/2 : comm->nChannels; // CollNet uses one channel for up and one channel for down + int nc = (info->nChannels > 0) ? info->nChannels : comm->nChannels; int nt = comm->maxThreads[info->algorithm][info->protocol]; int threadThreshold = comm->threadThresholds[info->algorithm][info->protocol]; - while (info->nBytes < nc*nt*threadThreshold) { - if (info->algorithm != NCCL_ALGO_COLLNET && nc >= 2) nc--; - else if ((nt % 128) == 0) nt/=2; - else break; + if (info->algorithm == NCCL_ALGO_COLLNET) { + int ncSwitch = 16; + bool flag = true; + while (ncSwitch >= 1 && flag) { + while ((flag = info->nBytes < nc*nt*info->comm->channels[0].collTree.nHeads*threadThreshold) && nc > ncSwitch) { + if (nc == ncSwitch+ncSwitch/2) threadThreshold /= 2; + nc--; + } + ncSwitch /= 2; + } + } else { + while (info->nBytes < nc*nt*threadThreshold) { + if (nc >= 2) nc--; + else if ((nt % 128) == 0) nt/=2; + else break; + } + } + if (info->protocol == NCCL_PROTO_SIMPLE) { + nt += WARP_SIZE; // Extra warp for sync + if (info->algorithm == NCCL_ALGO_TREE) nt += WARP_SIZE; + if (info->algorithm == NCCL_ALGO_COLLNET) nt += 3*WARP_SIZE; } - if (info->protocol == NCCL_PROTO_SIMPLE) nt += WARP_SIZE; // Extra warp for sync - if (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_TREE) nt += WARP_SIZE; info->nChannels = nc; info->nThreads = nt; return ncclSuccess; @@ -327,7 +414,7 @@ static ncclResult_t getPatternInfo(struct ncclInfo* info) { case ncclFuncAllGather: info->pattern = ncclPatternRing; break; case ncclFuncAllReduce: - info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUp : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break; + info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUpDown : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break; default: WARN("Unknown pattern for collective %d algorithm %d", info->coll, info->algorithm); return ncclInternalError; @@ -342,9 +429,9 @@ static ncclResult_t getLoopInfo(struct ncclInfo* info) { case ncclPatternTreeUpDown: case ncclPatternPipelineFrom: case ncclPatternPipelineTo: - case ncclPatternCollTreeUp: - case ncclPatternCollTreeDown: info->nstepsPerLoop = info-> nchunksPerLoop = 1; break; + case ncclPatternCollTreeUpDown: + info->nstepsPerLoop = 1; info->nchunksPerLoop = info->comm->channels[0].collTree.nHeads; break; case ncclPatternRing: info->nstepsPerLoop = info->comm->nRanks-1; info->nchunksPerLoop = info->comm->nRanks; break; case ncclPatternRingTwice: @@ -390,9 +477,10 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo work->coll.lastChunkSize = chunkSize / ncclTypeSize(info->datatype); } else if (info->algorithm == NCCL_ALGO_COLLNET && info->protocol == NCCL_PROTO_SIMPLE) { // Optimize chunkSize / nSteps - while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].collTree.depth*16 && chunkSize > 131072) chunkSize /= 2; - while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].collTree.depth*4 && chunkSize > 65536) chunkSize /= 2; - while (info->nBytes / (info->nChannels*chunkSize) < info->comm->channels[0].collTree.depth && chunkSize > 32768) chunkSize /= 2; + while (info->nBytes / (info->nChannels*info->comm->channels[0].collTree.nHeads*chunkSize) < info->comm->channels[0].collTree.depth*32 && chunkSize > 262144) chunkSize /= 2; + while (info->nBytes / (info->nChannels*info->comm->channels[0].collTree.nHeads*chunkSize) < info->comm->channels[0].collTree.depth*16 && chunkSize > 131072) chunkSize /= 2; + while (info->nBytes / (info->nChannels*info->comm->channels[0].collTree.nHeads*chunkSize) < info->comm->channels[0].collTree.depth*8 && chunkSize > 32768) chunkSize /= 2; + while (info->nBytes / (info->nChannels*info->comm->channels[0].collTree.nHeads*chunkSize) < info->comm->channels[0].collTree.depth/2 && chunkSize > 16384) chunkSize /= 2; // Use lastChunkSize as chunkSize work->coll.lastChunkSize = chunkSize / ncclTypeSize(info->datatype); } else if (info->protocol == NCCL_PROTO_LL) { @@ -417,20 +505,23 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo if (info->protocol == NCCL_PROTO_LL128) chunkEffectiveSize = (chunkSize / NCCL_LL128_LINEELEMS) * NCCL_LL128_DATAELEMS; //if (info->comm->rank == 0) printf("Coll %d, size %ld -> %dx%d, chunkSize %d (algo %d proto%d)\n", info->coll, info->nBytes, info->nChannels, info->nThreads, chunkSize, info->algorithm, info->protocol); int nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize))); - proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps; + proxyArgs->subs[0].nsteps = info->nstepsPerLoop * nLoops * chunkSteps; proxyArgs->sliceSteps = sliceSteps; proxyArgs->chunkSteps = chunkSteps; + proxyArgs->chunkSize = chunkSize; proxyArgs->protocol = info->protocol; proxyArgs->dtype = info->datatype; - proxyArgs->redOp = info->op; + proxyArgs->redOp = (info->algorithm == NCCL_ALGO_COLLNET) ? info->op : ncclNumOps; // Only set redOp when using CollNet + proxyArgs->pattern = info->pattern; + proxyArgs->root = info->root; // This is used by P2P to reduce the receive buffer size. We don't use it in collectives // because some protocols need to transmit more than the total size, plus they sometimes // round up - proxyArgs->recvbytes = stepSize*proxyArgs->sliceSteps; + proxyArgs->subs[0].recvbytes = stepSize*proxyArgs->sliceSteps; - TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p", - proxyArgs->opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, info->nBytes, info->protocol, info->nChannels, info->nThreads, - nLoops, proxyArgs->nsteps, info->comm); + TRACE(NCCL_COLL,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d chunksize %d comm %p", + proxyArgs->opCount, sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, info->nBytes, info->protocol, info->nChannels, info->nThreads, + nLoops, proxyArgs->subs[0].nsteps, chunkSize, info->comm); return ncclSuccess; } @@ -445,64 +536,95 @@ static ncclResult_t checkSetStream(struct ncclInfo* info) { return ncclSuccess; } -ncclResult_t ncclSaveKernel(struct ncclInfo* info) { - if (info->comm->nRanks == 1) { +// Compute enqueue element, save it in list +// Compute CUDA launch parameters +// Capture time code in view of CUDA graph +static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) { + ncclComm_t comm = info->comm; + if (comm->nRanks == 1) { if (info->sendbuff != info->recvbuff) CUDACHECK(cudaMemcpyAsync(info->recvbuff, info->sendbuff, info->nBytes, cudaMemcpyDeviceToDevice, info->stream)); return ncclSuccess; } - struct ncclWorkElem work; - struct ncclProxyArgs proxyArgs; - memset(&proxyArgs, 0, sizeof(struct ncclProxyArgs)); - NCCLCHECK(computeColl(info, &work, &proxyArgs)); + // Compute cuda kernel arg and proxy arg templates + struct ncclQueueElem* eqElem; + NCCLCHECK(ncclAddQueueElem(comm->enqueueInfo, &eqElem)); + struct ncclWorkElem* work = &eqElem->work; + eqElem->proxyArgs.nsubs = 1; + NCCLCHECK(computeColl(info, work, &eqElem->proxyArgs)); - info->comm->myParams->blockDim.x = std::max<unsigned>(info->comm->myParams->blockDim.x, info->nThreads); + // Determine grid size + struct cudaLaunchParams* params = comm->myParams; + params->gridDim.x += info->nChannels; + params->gridDim.x = std::min<unsigned>(params->gridDim.x, comm->nChannels); + params->blockDim.x = std::max<unsigned>(params->blockDim.x, info->nThreads); + comm->enqueueInfo->maxChannels = params->gridDim.x; // params may be varied by a second graph hence we need to capture it here + + // Inline the first kernel + if (params->func == NULL) { + params->func = ncclKerns[work->funcIndex]; + memcpy(&comm->args, work, sizeof(struct ncclWorkElem)); + comm->args.coll.bid = 0; // Only inline for channel 0 + comm->args.active = 2; // I am so far the last element; may be changed later in aggregation mode + } - int nChannels = work.coll.nChannels; - int nSubChannels = (info->pattern == ncclPatternCollTreeUp || info->pattern == ncclPatternCollTreeDown) ? 2 : 1; + return ncclSuccess; +} - for (int bid=0; bid<nChannels*nSubChannels; bid++) { - int channelId = info->comm->myParams->gridDim.x % info->comm->nChannels; - struct ncclChannel* channel = info->comm->channels+channelId; +// Dynamic enqueue code +static ncclResult_t ncclEnqueueCollKernel(ncclComm_t comm, struct ncclQueueElem* eqElem) { + struct ncclWorkElem* work = &eqElem->work; + struct ncclProxyArgs* proxyArgs = &eqElem->proxyArgs; + + int nChannels = work->coll.nChannels; + for (int bid=0; bid<nChannels; bid++) { + int channelId = comm->lastChannel % comm->nChannels; + struct ncclChannel* channel = comm->channels+channelId; // Proxy - proxyArgs.channel = channel; - // Adjust pattern for CollNet based on channel index - if (nSubChannels == 2) { - info->pattern = (channelId < info->comm->nChannels/nSubChannels) ? ncclPatternCollTreeUp : ncclPatternCollTreeDown; - } + proxyArgs->subs[0].channel = channel; + proxyArgs->opCount = comm->collOpCount; + proxyArgs->commOpCount = comm->opCount; - if (proxyArgs.nsteps) NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks)); + if (proxyArgs->subs[0].nsteps) NCCLCHECK(ncclProxySaveColl(proxyArgs, comm->nRanks)); - info->comm->myParams->gridDim.x++; - work.coll.bid = bid % nChannels; - NCCLCHECK(getNextOp(channel, NULL, &work)); + comm->lastChannel++; + work->coll.bid = bid % nChannels; + NCCLCHECK(getNextOp(channel, NULL, work)); + //INFO(NCCL_COLL, "Host enqueue: bid %d channel %d index %ld nThreads %d funcIndex %d count %ld nChannels %d", + // work->coll.bid, channelId, channel->workFifoTail, work->nThreads, work->funcIndex, work->coll.count, work->coll.nChannels); } + comm->collOpCount++; return ncclSuccess; } #define NCCL_MIN_CHANNEL_SIZE (NCCL_LL_THREAD_THRESHOLD*64) #define NCCL_AGG_CHANNEL_SIZE (1LL << 21) /* 2 MiB, ideal per-channel size to fully utilize bandwidth */ -ncclResult_t ncclSaveCommKernels(ncclComm_t comm) { +ncclResult_t ncclSetupAsyncKernels(ncclComm_t comm) { if (comm->asyncOpCount == 0) { return ncclSuccess; } else if (comm->asyncOpCount == 1) { // No aggregation struct ncclInfo* info = comm->asyncOps; info->nChannels = 0; - NCCLCHECK(ncclSaveKernel(info)); + NCCLCHECK(ncclSetupCollKernel(info)); } else { // Aggregation size_t channelSize = NCCL_AGG_CHANNEL_SIZE * comm->nRanks; // scale channel size based on nranks as latency increases // Reduce the per-channel size if we cannot fully utilize the channels while (comm->asyncTotalSize < channelSize * comm->nChannels && channelSize > NCCL_MIN_CHANNEL_SIZE) channelSize /= 2; + int channelUsed = 0; for (int c = 0; c < comm->asyncOpCount; c++) { struct ncclInfo* info = comm->asyncOps+c; info->nChannels = std::min((int)DIVUP(info->nBytes, channelSize), comm->nChannels); // assign number of channels - NCCLCHECK(ncclSaveKernel(info)); + channelUsed += info->nChannels; + NCCLCHECK(ncclSetupCollKernel(info)); } + // If we wrap around on channels, then the inlined op on channel 0 is not the last one on this channel + // Then we need to change active from 2 to 1 + if (channelUsed > comm->nChannels) comm->args.active = 1; } // Reset counters comm->asyncOpCount = 0; @@ -533,7 +655,7 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { int delta = (comm->nRanks - (comm->rank-peer)) % comm->nRanks; for (int c=0; c<comm->p2pnChannelsPerPeer; c++) { int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].send.connected == 0) { + if (comm->channels[channelId].peers[peer].send[0].connected == 0) { // P2P uses only 1 connector comm->connectSend[peer] |= (1<<channelId); comm->connect = 1; } @@ -546,7 +668,7 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { int delta = (comm->nRanks + (comm->rank-peer)) % comm->nRanks; for (int c=0; c<comm->p2pnChannelsPerPeer; c++) { int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].recv.connected == 0) { + if (comm->channels[channelId].peers[peer].recv[0].connected == 0) { // P2P uses only 1 connector comm->connectRecv[peer] |= (1<<channelId); comm->connect = 1; } @@ -558,56 +680,165 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { return ncclSuccess; } -static int getSegment(struct ncclInfo* info, struct ncclWork* work) { - for (int s=0; s<NCCL_MAX_WORK_ELEMENTS && work->elems[s].p2p.delta != info->delta; s++) { +static int getSegment(int delta, struct ncclWork* work) { + for (int s=0; s<NCCL_MAX_WORK_ELEMENTS && work->elems[s].p2p.delta != delta; s++) { if (work->elems[s].p2p.nThreads == 0) return s; } return -1; } -static ncclResult_t saveP2pOp(struct ncclInfo* info /* input */, struct ncclWork* work, int s) { - struct ncclWorkElem* elem = work->elems+s; +static ncclResult_t computeP2pWorkElem(struct ncclInfo* info /* input */, struct ncclWorkElem* elem /* output */) { elem->comm = info->comm->devComm; elem->funcIndex = FUNC_INDEX_P2P; - elem->nThreads = info->nThreads = NCCL_MAX_NTHREADS; + elem->nThreads = NCCL_MAX_NTHREADS; elem->sendbuff = info->sendbuff; elem->recvbuff = info->recvbuff; elem->p2p.sendCount = info->sendbytes; elem->p2p.recvCount = info->recvbytes; + elem->p2p.sendChunkSize = info->sendChunkSize; + elem->p2p.recvChunkSize = info->recvChunkSize; elem->p2p.delta = info->delta; + return ncclSuccess; +} + +static ncclResult_t enqueueP2pOp(struct ncclWorkElem* elem /* input */, struct ncclWork* work, int s) { + // Copy element into corresponding segment of ncclWork + memcpy(work->elems+s, elem, sizeof(struct ncclWorkElem)); + + // Determine nThreads at dynamic time const int nsegments = s+1; int nThreads = 512; while (nsegments*nThreads > 512) nThreads /= 2; if (nThreads >= 128) nThreads += WARP_SIZE; for (int i=0; i<nsegments; i++) work->elems[i].p2p.nThreads = nThreads; + return ncclSuccess; } -ncclResult_t ncclSaveP2pKernel(struct ncclInfo* info) { - int channelId = info->channelId; - struct ncclChannel* channel = info->comm->channels+channelId; +ncclResult_t ncclEnqueueP2pKernel(struct ncclComm* comm, struct ncclQueueElem* eqElem) { + struct ncclWorkElem* workElem = &eqElem->work; + struct ncclProxyArgs* proxyArgs = &eqElem->proxyArgs; // Try to reuse last p2p operation if not full yet + struct ncclChannel* channel = proxyArgs->subs[0].channel; int opIndex = (channel->workFifoTail-1+NCCL_MAX_OPS)%NCCL_MAX_OPS; struct ncclWork* w = channel->workFifo+opIndex; int segment = -1; if (channel->workCount && w->elems[0].funcIndex == FUNC_INDEX_P2P && w->elems[NCCL_MAX_WORK_ELEMENTS-1].p2p.nThreads == 0) { // Try to pack more segments into a single operation - segment = getSegment(info, w); + segment = getSegment(workElem->p2p.delta, w); } if (segment == -1) { NCCLCHECK(getNextOp(channel, &w, NULL)); segment = 0; } - NCCLCHECK(ncclProxySaveP2p(info, channel, segment)); - NCCLCHECK(saveP2pOp(info, w, segment)); - info->comm->myParams->gridDim.x = std::max<unsigned>(info->comm->myParams->gridDim.x, channelId+1); - info->comm->myParams->blockDim.x = std::max<unsigned>(info->comm->myParams->blockDim.x, info->nThreads); + // store work element into FIFO + NCCLCHECK(ncclProxySaveP2p(comm, proxyArgs)); + NCCLCHECK(enqueueP2pOp(workElem, w, segment)); + return ncclSuccess; +} + +ncclResult_t ncclSetupP2pKernel(struct ncclInfo* info) { + ncclComm* comm = info->comm; + // Compute cuda kernel arg and proxy arg templates + struct ncclQueueElem* eqElem; + NCCLCHECK(ncclAddQueueElem(comm->enqueueInfo, &eqElem)); + // The proxy code will set and tune the send/recv chunk size, make sure to run it first. + NCCLCHECK(ncclProxyComputeP2p(info, &eqElem->proxyArgs)); + NCCLCHECK(computeP2pWorkElem(info, &eqElem->work)); + int channelId = info->channelId; + struct cudaLaunchParams* params = comm->myParams; + params->gridDim.x = std::max<unsigned>(params->gridDim.x, channelId+1); + params->blockDim.x = std::max<unsigned>(params->blockDim.x, eqElem->work.nThreads); + comm->enqueueInfo->maxChannels = params->gridDim.x; // params may be varied by a second graph hence we need to capture it here + + // Record the first kernel to launch + // Just for CUDA kernel to know this is a P2P operation + // The CUDA kernel does not use the inlined first work element as fastpath argument + if (params->func == NULL) { + params->func = ncclKerns[eqElem->work.funcIndex]; + memcpy(&comm->args, &eqElem->work, sizeof(struct ncclWorkElem)); + } return ncclSuccess; } +template<int USING_CUDA_GRAPH> +void CUDART_CB ncclEnqueueHostSetup(void* arg) { + ncclResult_t ret; + struct ncclQueueInfo* eqInfo = (struct ncclQueueInfo*)arg; + ncclComm_t comm = eqInfo->comm; + + // Iterate through the element list + struct ncclQueueElem* eqElem = eqInfo->elemList.head; + while (eqElem != eqInfo->elemList.tail) { // The queue always has one extra element + if (eqElem->work.funcIndex == FUNC_INDEX_P2P) { + NCCLCHECKGOTO(ncclEnqueueP2pKernel(comm, eqElem), ret, cb_end); + } else { + NCCLCHECKGOTO(ncclEnqueueCollKernel(comm, eqElem), ret, cb_end); + } + eqElem = eqElem->next; + } + + NCCLCHECKGOTO(setupLaunch(eqInfo, USING_CUDA_GRAPH), ret, cb_end); + NCCLCHECKGOTO(ncclLaunchProxy(eqInfo), ret, cb_end); + +cb_end: + if (ret != ncclSuccess) { + WARN("Failure in host setup : %s", ncclGetErrorString(ret)); + } + eqInfo->ret = ret; +} + +template void CUDART_CB ncclEnqueueHostSetup<0>(void*); +template void CUDART_CB ncclEnqueueHostSetup<1>(void*); + +ncclResult_t ncclGetCudaGraph(ncclComm_t comm, cudaGraph_t* graph) { + comm->usingCudaGraph = 0; +#if CUDART_VERSION >= 11030 + cudaStreamCaptureStatus captureStatus; + unsigned long long cudaGraphId; + CUDACHECK(cudaStreamGetCaptureInfo_v2(comm->userStream, &captureStatus, &cudaGraphId, graph, NULL, NULL)); + if (captureStatus == cudaStreamCaptureStatusActive) { + if (cudaGraphId != comm->lastCudaGraphId) { + INFO(NCCL_COLL, "stream is being captured by a new graph, id %llu", cudaGraphId); + // We are in a new graph, hence need to forget the last setup node so that + // the first setup node in the new graph will not have a dependency + comm->lastCudaGraphId = cudaGraphId; + comm->lastSetupNode = NULL; + } + if (comm->launchMode == ncclComm::GROUP) comm->launchMode = ncclComm::GROUP_GRAPH; + comm->usingCudaGraph = 1; + } +#endif + return ncclSuccess; +} + +ncclResult_t ncclCudaGraphHostSetup(ncclComm_t comm, cudaGraph_t graph) { +#if CUDART_VERSION >= 11030 + struct ncclQueueInfo* eqInfo = comm->enqueueInfo; + // Create a CUDA object to wrap around the argument space + // which CUDA graph would manage lifetime of + cudaUserObject_t object; + CUDACHECK(cudaUserObjectCreate(&object, eqInfo, ncclDestroyQueueInfo, 1/*initialRefcount*/, cudaUserObjectNoDestructorSync)); + CUDACHECK(cudaGraphRetainUserObject(graph, object, 1, cudaGraphUserObjectMove)); + + cudaHostFn_t fn = ncclEnqueueHostSetup<1>; + // Add a CPU node to the graph + cudaGraphNode_t setupNode; + cudaHostNodeParams setupNodeParams = {fn, eqInfo}; + int numDependencies = comm->lastSetupNode == NULL ? 0 : 1; + CUDACHECK(cudaGraphAddHostNode(&setupNode, graph, &comm->lastSetupNode, numDependencies, &setupNodeParams)); + CUDACHECK(cudaStreamUpdateCaptureDependencies(comm->userStream, &setupNode, 1, cudaStreamAddCaptureDependencies)); + comm->lastSetupNode = setupNode; + return ncclSuccess; +#else + WARN("NCCL does not support this CUDA version for CUDA graph feature"); + return ncclInternalError; +#endif +} + ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) { // Launch asynchronously if needed if (ncclAsyncMode()) { @@ -647,10 +878,27 @@ end: info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); - NCCLCHECK(ncclSaveKernel(info)); - NCCLCHECK(ncclBarrierEnqueue(info->comm)); - NCCLCHECK(ncclBarrierEnqueueWait(info->comm)); - NCCLCHECK(ncclEnqueueEvents(info->comm)); + // Check whether we are in cuda graph mode + cudaGraph_t graph; + ncclComm_t comm = info->comm; + NCCLCHECK(ncclGetCudaGraph(comm, &graph)); + + // Common part between graph mode and non-graph mode + NCCLCHECK(ncclSetupCollKernel(info)); + + // Host setup + if (comm->usingCudaGraph) { + NCCLCHECK(ncclCudaGraphHostSetup(comm, graph)); + } else { + ncclEnqueueHostSetup<0>(comm->enqueueInfo); + NCCLCHECK(comm->enqueueInfo->ret); + } + + // Common part between graph mode and non-graph mode + NCCLCHECK(ncclLaunchBarrier(comm)); + NCCLCHECK(ncclLaunchKernel(comm)); + NCCLCHECK(ncclRecordEvents(comm)); + NCCLCHECK(ncclLaunchReset(comm)); return ncclSuccess; } } diff --git a/src/graph/connect.cc b/src/graph/connect.cc index a64f9be..b06ea5d 100644 --- a/src/graph/connect.cc +++ b/src/graph/connect.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -14,7 +14,7 @@ /******************************************************************/ ncclResult_t ncclTopoPreset(struct ncclComm* comm, - struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph, + struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoRanks* topoRanks) { int rank = comm->rank; int localRanks = comm->localRanks; @@ -25,12 +25,15 @@ ncclResult_t ncclTopoPreset(struct ncclComm* comm, channel->ring.prev = channel->ring.next = -1; channel->tree.up = -1; for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) channel->tree.down[i] = -1; - channel->collTree.up = -1; - for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) channel->collTree.down[i] = -1; + channel->collTree.out = -1; + channel->collTree.headRank = -1; + channel->collTree.nHeads = 0; + channel->collTree.shift = 0; + for (int i=0; i<NCCL_MAX_DIRECT_ARITY; i++) channel->collTree.up[i] = -1; + for (int i=0; i<NCCL_MAX_DIRECT_ARITY; i++) channel->collTree.down[i] = -1; int* ringIntra = ringGraph->intra+c*localRanks; int* treeIntra = treeGraph->intra+c*localRanks; - int* collNetIntra = collNetGraph->intra+c*localRanks; for (int i=0; i<localRanks; i++) { if (ringIntra[i] == rank) { @@ -50,12 +53,6 @@ ncclResult_t ncclTopoPreset(struct ncclComm* comm, channel->tree.up = i == 0 ? -1 : treeIntra[i-1]; channel->tree.down[0] = i == localRanks-1 ? -1 : treeIntra[i+1]; } - if (collNetIntra[i] == rank) { - int prev = (i-1+localRanks)%localRanks, next = (i+1)%localRanks; - - channel->collTree.up = collNetIntra[prev]; - channel->collTree.down[0] = collNetIntra[next]; - } } topoRanks->ringPrev[c] = channel->ring.prev; topoRanks->ringNext[c] = channel->ring.next; @@ -167,36 +164,53 @@ static ncclResult_t connectTrees(struct ncclComm* comm, int* treeToParent, int* return ncclSuccess; } -ncclResult_t ncclTopoConnectCollNet(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, int rank) { - int nranks = comm->nRanks; - int depth = nranks/comm->nNodes; - int sendIndex = collNetGraph->pattern == NCCL_TOPO_PATTERN_TREE ? 0 : 1; // send GPU index depends on topo pattern - int sendEndIndex = (sendIndex+comm->localRanks-1)%comm->localRanks; - for (int c=0; c<comm->nChannels/2; c++) { - struct ncclChannel* channel = comm->channels+c; - // Set root of collTree to id nranks - if (rank == collNetGraph->intra[sendIndex+c*comm->localRanks]) { // is master - channel->collTree.up = nranks; - } - if (rank == collNetGraph->intra[sendEndIndex+c*comm->localRanks]) { // is bottom of intra-node chain - channel->collTree.down[0] = -1; - } - channel->collTree.depth = depth; - INFO(NCCL_GRAPH, "CollNet Channel %d rank %d up %d down %d", c, rank, channel->collTree.up, channel->collTree.down[0]); +static ncclResult_t connectCollNet(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph) { + int rank = comm->rank; + int localRanks = comm->localRanks; + int nHeads = collNetGraph->nChannels; + int *heads; + NCCLCHECK(ncclCalloc(&heads, nHeads)); + // Find all head ranks + // Head index is always 0 + for (int c=0; c<nHeads; c++) { + int* collNetIntra = collNetGraph->intra+c*localRanks; + heads[c] = collNetIntra[0]; } - int recvIndex = 0; // recv GPU index is always 0 - int recvEndIndex = (recvIndex+comm->localRanks-1)%comm->localRanks; - for (int c=0; c<comm->nChannels/2; c++) { - struct ncclChannel* channel = comm->channels+comm->nChannels/2+c; - // Set root of collTree to id nranks - if (rank == collNetGraph->intra[recvIndex+c*comm->localRanks]) { // is master - channel->collTree.up = nranks; + // For all channels + for (int c=0; c<comm->nChannels; c++) { + struct ncclChannel* channel = comm->channels+c; + char line[1024]; + sprintf(line, "CollNet channel %d rank %d ", c, rank); + int nDown = 0; + for (int i=0; i<nHeads; i++) { + if (rank == heads[i]) { // is head + channel->collTree.headRank = i; // Mark the index for deciding offset in the CUDA kernel + channel->collTree.out = comm->nRanks; // Set root of collTree to id nranks + int* collNetIntra = collNetGraph->intra+i*localRanks; + sprintf(line+strlen(line), "down "); + for (int r=0; r<localRanks; r++) { + if (collNetIntra[r] == rank) continue; + channel->collTree.down[nDown++] = collNetIntra[r]; // connect to all peers + sprintf(line+strlen(line), " %d ", collNetIntra[r]); + } + sprintf(line+strlen(line), "nDown %d ", nDown); + break; + } } - if (rank == collNetGraph->intra[recvEndIndex+c*comm->localRanks]) { // is bottom of intra-node chain - channel->collTree.down[0] = -1; + // Connect to all heads + int nUp = 0; + sprintf(line+strlen(line), "up "); + for (int h=0; h<nHeads; h++) { + if (rank == heads[h]) continue; + channel->collTree.up[nUp++] = heads[h]; + sprintf(line+strlen(line), " %d ", heads[h]); } - channel->collTree.depth = depth; - INFO(NCCL_GRAPH, "CollNet Channel %d rank %d up %d down %d", comm->nChannels/2+c, rank, channel->collTree.up, channel->collTree.down[0]); + channel->collTree.nHeads = nHeads; + channel->collTree.shift = (rank%localRanks)%nHeads; // Shift by intraRank so that leaves don't send to same head simultaneously + channel->collTree.depth = (nUp == 0 && nDown == 0) ? 1 : 2; + sprintf(line+strlen(line), "nUp %d nHeads %d ", nUp, nHeads); + sprintf(line+strlen(line), "headRank %d out %d shift %d", channel->collTree.headRank, channel->collTree.out, channel->collTree.shift); + INFO(NCCL_GRAPH, "%s", line); } return ncclSuccess; } @@ -231,7 +245,18 @@ int ncclMaxNchannels() { return maxNchannels; } -ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePatterns, struct ncclTopoRanks** allTopoRanks, int* rings) { +static int copyChannels(struct ncclComm* comm, int start, int end, int* ringPrev, int* ringNext) { + int nranks = comm->nRanks; + int c; + for (c=start; c<end; c++) { + memcpy(ringPrev+c*nranks, ringPrev+(c-start)*nranks, nranks*sizeof(int)); + memcpy(ringNext+c*nranks, ringNext+(c-start)*nranks, nranks*sizeof(int)); + memcpy(comm->channels+c, comm->channels+c-start, sizeof(struct ncclChannel)); + } + return c; +} + +ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePatterns, struct ncclTopoRanks** allTopoRanks, int* rings, struct ncclTopoGraph* collNetGraph) { // Gather data from all ranks int *ringRecv, *ringSend, *ringPrev, *ringNext, *treeToParent, *treeToChild0, *treeToChild1; int nranks = comm->nRanks; @@ -266,16 +291,20 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePa // Duplication should be complete now nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2); + // Setup CollNet + if (comm->collNetSupport == 1) { + // Add more channels to saturate intra-node bandwidth, except the 1 PPN case + if (collNetGraph->speedIntra > collNetGraph->speedInter && comm->nRanks > comm->nNodes) { + int collNetNchannels = std::min(MAXCHANNELS, nChannels+nChannels/2); + nChannels = comm->nChannels = copyChannels(comm, nChannels, collNetNchannels, ringPrev, ringNext); + } + NCCLCHECK(connectCollNet(comm, collNetGraph)); + } + // Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS. // We permit combining max, then min, to only use the first channels, then duplicate them. nChannels = comm->nChannels = std::min((int)ncclMaxNchannels(), nChannels); - int c; - for (c=nChannels; c<ncclMinNchannels(); c++) { - memcpy(ringPrev+c*nranks, ringPrev+(c-nChannels)*nranks, nranks*sizeof(int)); - memcpy(ringNext+c*nranks, ringNext+(c-nChannels)*nranks, nranks*sizeof(int)); - memcpy(comm->channels+c, comm->channels+c-nChannels, sizeof(struct ncclChannel)); - } - nChannels = comm->nChannels = c; + nChannels = comm->nChannels = copyChannels(comm, nChannels, ncclMinNchannels(), ringPrev, ringNext); // Create rings array and check all is fine NCCLCHECK(ncclBuildRings(nChannels, rings, comm->rank, comm->nRanks, ringPrev, ringNext)); diff --git a/src/graph/paths.cc b/src/graph/paths.cc index f4e331b..079f5d8 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -280,8 +280,7 @@ ncclResult_t ncclTopoCheckP2p(struct ncclTopoSystem* system, int64_t id1, int64_ NCCLCHECK(ncclTopoCpuType(system, &arch, &vendor, &model)); if (arch == NCCL_TOPO_CPU_ARCH_ARM) p2pLevel = PATH_PXB; if (arch == NCCL_TOPO_CPU_ARCH_X86 && vendor == NCCL_TOPO_CPU_VENDOR_INTEL) { - if (model == NCCL_TOPO_CPU_TYPE_BDW) p2pLevel = PATH_PXB; - else p2pLevel = PATH_PHB; + p2pLevel = PATH_PXB; } if (arch == NCCL_TOPO_CPU_ARCH_X86 && vendor == NCCL_TOPO_CPU_VENDOR_ZHAOXIN) { p2pLevel = PATH_PXB; diff --git a/src/graph/search.cc b/src/graph/search.cc index 6e9a208..cc9358b 100644 --- a/src/graph/search.cc +++ b/src/graph/search.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -393,9 +393,67 @@ ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopo return ncclSuccess; } +// Select only NICs with the maximum bandwidth w.r.t. GPUs, and sort them by distance. +ncclResult_t ncclTopoSelectNets(struct ncclTopoSystem* system, int* nets, int* netcountRet) { + float* maxwidths; + int* minhops; + int netcount = 0; + NCCLCHECK(ncclCalloc(&minhops, system->nodes[NET].count)); + NCCLCHECK(ncclCalloc(&maxwidths, system->nodes[NET].count)); + for (int n=0; n<system->nodes[NET].count; n++) { + maxwidths[n] = 0.0; + minhops[n] = 255; + struct ncclTopoNode* net = system->nodes[NET].nodes+n; + struct ncclTopoLinkList* paths = net->paths[GPU]; + for (int g=0; g<system->nodes[GPU].count; g++) { + if (paths[g].width > maxwidths[n] || (paths[g].width == maxwidths[n] && paths[g].count < minhops[n])) { + maxwidths[n] = paths[g].width; + minhops[n] = paths[g].count; + } + } + if (netcount && maxwidths[nets[0]] > maxwidths[n]) continue; // Do not keep NICs with lower BW + if (netcount && maxwidths[nets[0]] < maxwidths[n]) netcount = 0; // Remove all NICs with lower BW + int index; + for (index = 0; index < netcount; index++) { + if (minhops[n] < minhops[nets[index]]) break; + } + // Insert net at index + // Shift all nets with higher nhops + for (int i = netcount; i>index; i--) nets[i] = nets[i-1]; + // Insert this net at index + nets[index] = n; + netcount++; + } + + *netcountRet = netcount; + + // Then shuffle NICs with the same nhops based on the GPU device number, so that when we have + // 2 NICs and 2 GPUs and create communicators with only one GPU, we will use both NICs. + for (int start = 0; start < netcount;) { + int end = start+1; + while (end < netcount && minhops[nets[end]] == minhops[nets[start]]) end++; + // Shuffle + for (int r=0; r<system->nodes[GPU].nodes[0].gpu.dev % (end-start); r++) { + int netStart = nets[start]; + for (int i=start; i<end-1; i++) nets[i] = nets[i+1]; + nets[end-1] = netStart; + } + start = end; + } + + free(minhops); + free(maxwidths); + return ncclSuccess; +} + ncclResult_t ncclTopoSearchRecNet(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int backToNet, int backToFirstRank, int* time) { const int speed = graph->speedInter; - for (int n=0; n<system->nodes[NET].count; n++) { + int* nets; + NCCLCHECK(ncclCalloc(&nets, system->nodes[NET].count)); + int netcount; + NCCLCHECK(ncclTopoSelectNets(system, nets, &netcount)); + for (int i=0; i<netcount; i++) { + int n = nets[i]; struct ncclTopoNode* net = system->nodes[NET].nodes+n; struct ncclTopoNode* gpu; if (graph->collNet && net->net.collSupport == 0) continue; @@ -463,6 +521,7 @@ ncclResult_t ncclTopoSearchRecNet(struct ncclTopoSystem* system, struct ncclTopo } } } + free(nets); return ncclSuccess; } @@ -705,6 +764,7 @@ search: for (int g=0; g<ngpus; g++) { printf("%d ", graph->intra[c*ngpus+g]); } + printf("[%d %d]", graph->inter[0], graph->inter[1]); printf("\n"); } #endif @@ -845,7 +905,7 @@ ncclResult_t ncclTopoDumpGraphs(struct ncclTopoSystem* system, int ngraphs, stru return ncclSuccess; } -ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct ncclTopoGraph* graph, int channelId, int* dev) { +ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct ncclTopoGraph* graph, int channelId, int rr, int* dev) { if (graph) { // Honor the net device in the graph int channel = channelId%graph->nChannels; @@ -854,7 +914,7 @@ ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct n *dev = graph->inter[channel*2+index]; } else { int64_t id; - NCCLCHECK(ncclTopoGetLocalNet(system, rank, &id, channelId)); + NCCLCHECK(ncclTopoGetLocalNet(system, rank, &id, rr)); *dev = id; } return ncclSuccess; diff --git a/src/graph/topo.cc b/src/graph/topo.cc index b5c5cc8..52d5406 100644 --- a/src/graph/topo.cc +++ b/src/graph/topo.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -172,6 +172,65 @@ ncclResult_t ncclTopoConnectNodes(struct ncclTopoNode* node, struct ncclTopoNode return ncclSuccess; } +// BCM Gen4 Switches present themselves as a two-level hierarchical switch +// even though they're supposed to sustain full BW across all ports. +// Flatten the switch as this extra level can break the search and make +// NCCL take wrong topology decisions. +ncclResult_t ncclTopoFlattenBcmSwitches(struct ncclTopoSystem* system) { + for (int s=0; s<system->nodes[PCI].count; s++) { + struct ncclTopoNode* pciSwitch = system->nodes[PCI].nodes+s; + uint64_t device = pciSwitch->pci.device; + // Only flatten PEX Gen 4 switches in base mode + if ((device & 0xfffffffffffff000) == 0x1000c0101000a000) { + // Find sub switches with the same device ID. + int64_t* subSwIds; + NCCLCHECK(ncclCalloc(&subSwIds, pciSwitch->nlinks)); + int subs = 0; + for (int l=0; l<pciSwitch->nlinks; l++) { + struct ncclTopoNode* sub = pciSwitch->links[l].remNode; + // Only fuse sub switches with the same device ID. + if (sub->type != PCI || sub->pci.device != device) continue; + // Save sub switch for later + subSwIds[subs++] = sub->id; + // Remove link to that sub switch + memmove(pciSwitch->links+l, pciSwitch->links+l+1, (pciSwitch->nlinks-l-1)*(sizeof(struct ncclTopoLink))); + pciSwitch->nlinks--; + // Don't increase l for the next iteration as we just shifted all links by one. + l--; + } + + for (int s=0; s<subs; s++) { + // Find sub switch (system->nodes[PCI].nodes is changing every time we remove a node) + int index; + NCCLCHECK(ncclTopoIdToIndex(system, PCI, subSwIds[s], &index)); + struct ncclTopoNode* sub = system->nodes[PCI].nodes+index; + // Connect all sub PCI devices to the parent switch + for (int l=0; l<sub->nlinks; l++) { + struct ncclTopoNode* remNode = sub->links[l].remNode; + if (remNode == pciSwitch) continue; + // Add link from parent PCI switch -> PCI device + memcpy(pciSwitch->links+pciSwitch->nlinks, sub->links+l, sizeof(struct ncclTopoLink)); + pciSwitch->nlinks++; + // Update link from PCI device -> parent PCI switch + for (int rl=0; rl<remNode->nlinks; rl++) { + if (remNode->links[rl].remNode == sub) { + remNode->links[rl].remNode = pciSwitch; + break; + } + } + } + NCCLCHECK(ncclTopoRemoveNode(system, PCI, index)); + } + // Set subdevice to 0x0000 to make sure we don't merge this switch again. + pciSwitch->pci.device = 0x1000c01010000000; + free(subSwIds); + // Restart, as system->nodes[PCI].nodes has changed. + s = 0; + } + } + return ncclSuccess; +} + ncclResult_t ncclTopoConnectCpus(struct ncclTopoSystem* system) { // And connect all CPU nodes together for (int n=0; n<system->nodes[CPU].count; n++) { @@ -190,6 +249,8 @@ static ncclResult_t ncclTopoPrintRec(struct ncclTopoNode* node, struct ncclTopoN sprintf(line+offset, "%s/%lX (%d)", topoNodeTypeStr[node->type], node->id, node->gpu.rank); } else if (node->type == CPU) { sprintf(line+offset, "%s/%lX (%d/%d/%d)", topoNodeTypeStr[node->type], node->id, node->cpu.arch, node->cpu.vendor, node->cpu.model); + } else if (node->type == PCI) { + sprintf(line+offset, "%s/%lX (%lx)", topoNodeTypeStr[node->type], node->id, node->pci.device); } else { sprintf(line+offset, "%s/%lX", topoNodeTypeStr[node->type], node->id); } @@ -345,6 +406,15 @@ ncclResult_t ncclTopoAddPci(struct ncclXmlNode* xmlPci, struct ncclTopoSystem* s NCCLCHECK(ncclTopoAddNic(xmlNic, system, nicNode)); } else if (type == PCI) { NCCLCHECK(ncclTopoCreateNode(system, &node, type, busId)); + NCCLCHECK(xmlGetAttr(xmlPci, "vendor", &str)); + if (str) node->pci.device += strtol(str, NULL, 0) << 48; + NCCLCHECK(xmlGetAttr(xmlPci, "device", &str)); + if (str) node->pci.device += strtol(str, NULL, 0) << 32; + NCCLCHECK(xmlGetAttr(xmlPci, "subsystem_vendor", &str)); + if (str) node->pci.device += strtol(str, NULL, 0) << 16; + NCCLCHECK(xmlGetAttr(xmlPci, "subsystem_device", &str)); + if (str) node->pci.device += strtol(str, NULL, 0); + for (int s=0; s<xmlPci->nSubs; s++) { struct ncclXmlNode* xmlSubPci = xmlPci->subs[s]; NCCLCHECK(ncclTopoAddPci(xmlSubPci, system, node)); @@ -475,6 +545,7 @@ ncclResult_t ncclTopoGetSystemFromXml(struct ncclXml* xml, struct ncclTopoSystem } NCCLCHECK(ncclTopoAddNvLinks(topNode, *topoSystem, NULL)); + NCCLCHECK(ncclTopoFlattenBcmSwitches(*topoSystem)); NCCLCHECK(ncclTopoConnectCpus(*topoSystem)); NCCLCHECK(ncclTopoSortSystem(*topoSystem)); @@ -602,7 +673,7 @@ ncclResult_t ncclTopoGetLocalNet(struct ncclTopoSystem* system, int rank, int64_ } if (path->width == maxWidth && path->type == minType) nets[count++] = system->nodes[NET].nodes[n].id; } - *id = nets[rr % count]; + *id = nets[rr%count]; free(nets); return ncclSuccess; } diff --git a/src/graph/topo.h b/src/graph/topo.h index 0cb6d33..1e10bb2 100644 --- a/src/graph/topo.h +++ b/src/graph/topo.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -27,8 +27,7 @@ // Intel CPU convert GPU P2P traffic into 64B PCI TLPs, so GPU // to GPU traffic consumes more PCI bandwidth. -#define INTEL_P2P(speed) (speed*9/12) -#define INTEL_P2P_OVERHEAD(speed) (speed*12/9) +#define INTEL_P2P_OVERHEAD(speed) (speed*6/5) #define NCCL_TOPO_NODE_TYPES 7 #define GPU 0 @@ -105,6 +104,9 @@ struct ncclTopoNode { int model; cpu_set_t affinity; }cpu; + struct { + uint64_t device; + }pci; }; int nlinks; struct ncclTopoLink links[NCCL_TOPO_MAX_LINKS]; diff --git a/src/graph/tuning.cc b/src/graph/tuning.cc index 42a4dde..db085cb 100644 --- a/src/graph/tuning.cc +++ b/src/graph/tuning.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -79,8 +79,10 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom int simpleDefaultThreads = (ringGraph->speedIntra*ringGraph->nChannels <= PCI_WIDTH) ? 256 : NCCL_SIMPLE_MAX_NTHREADS; comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_SIMPLE] = getNthreads("NCCL_NTHREADS", ncclParamNthreads(), 2*WARP_SIZE, NCCL_SIMPLE_MAX_NTHREADS, simpleDefaultThreads); - comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_SIMPLE] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_SIMPLE] = + comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_SIMPLE] = getNthreads("NCCL_NTHREADS", ncclParamNthreads(), 2*WARP_SIZE, NCCL_SIMPLE_MAX_NTHREADS, NCCL_SIMPLE_MAX_NTHREADS); + comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_SIMPLE] = + getNthreads("NCCL_NTHREADS", ncclParamNthreads(), NCCL_SIMPLE_MAX_NTHREADS, NCCL_SIMPLE_MAX_NTHREADS, NCCL_SIMPLE_MAX_NTHREADS); comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_LL] = getNthreads("NCCL_NTHREADS", ncclParamNthreads(), 2*WARP_SIZE, NCCL_LL_MAX_NTHREADS, NCCL_LL_MAX_NTHREADS); comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_LL128] = @@ -128,8 +130,7 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL) busBw = std::min(busBw*1.0/3.8, llMaxBw); if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL128) busBw = std::min(busBw * (nNodes == 1 ? 7.0/9.0 : 0.915 /*120.0/128.0*/), ll128MaxBwPerCh[coll]*graphs[a]->nChannels); if (a == NCCL_ALGO_COLLNET) busBw *= .9; - if (a == NCCL_ALGO_COLLNET && p == NCCL_PROTO_LL) busBw *= 1.0/6.0; // Take into account that GDR read is disabled on both sides - if (a == NCCL_ALGO_COLLNET && p == NCCL_PROTO_LL128) busBw = 0; // CollNet does not support LL128 + if (a == NCCL_ALGO_COLLNET && p != NCCL_PROTO_SIMPLE) busBw = 0; // Oneshot CollNet only supports Simple // Convert bus BW to algorithm BW float ratio = (a != NCCL_ALGO_RING) ? .5 : (1.0 * nRanks) / nsteps; @@ -233,6 +234,7 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom comm->threadThresholds[a][NCCL_PROTO_SIMPLE] = NCCL_SIMPLE_THREAD_THRESHOLD; } comm->threadThresholds[NCCL_ALGO_RING][NCCL_PROTO_LL] *= nRanks; + comm->threadThresholds[NCCL_ALGO_COLLNET][NCCL_PROTO_SIMPLE] = 512; // Override defaults with user env char* str = getenv("NCCL_THREAD_THRESHOLDS"); diff --git a/src/graph/xml.cc b/src/graph/xml.cc index a12865e..91e8f94 100644 --- a/src/graph/xml.cc +++ b/src/graph/xml.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -469,6 +469,26 @@ ncclResult_t ncclTopoGetXmlFromSys(struct ncclXmlNode* pciNode, struct ncclXml* if (path == NULL) NCCLCHECK(getPciPath(busId, &path)); NCCLCHECK(ncclTopoSetAttrFromSys(pciNode, path, "class", "class")); } + NCCLCHECK(xmlGetAttrIndex(pciNode, "vendor", &index)); + if (index == -1) { + if (path == NULL) NCCLCHECK(getPciPath(busId, &path)); + NCCLCHECK(ncclTopoSetAttrFromSys(pciNode, path, "vendor", "vendor")); + } + NCCLCHECK(xmlGetAttrIndex(pciNode, "device", &index)); + if (index == -1) { + if (path == NULL) NCCLCHECK(getPciPath(busId, &path)); + NCCLCHECK(ncclTopoSetAttrFromSys(pciNode, path, "device", "device")); + } + NCCLCHECK(xmlGetAttrIndex(pciNode, "subsystem_vendor", &index)); + if (index == -1) { + if (path == NULL) NCCLCHECK(getPciPath(busId, &path)); + NCCLCHECK(ncclTopoSetAttrFromSys(pciNode, path, "subsystem_vendor", "subsystem_vendor")); + } + NCCLCHECK(xmlGetAttrIndex(pciNode, "subsystem_device", &index)); + if (index == -1) { + if (path == NULL) NCCLCHECK(getPciPath(busId, &path)); + NCCLCHECK(ncclTopoSetAttrFromSys(pciNode, path, "subsystem_device", "subsystem_device")); + } NCCLCHECK(xmlGetAttrIndex(pciNode, "link_speed", &index)); if (index == -1) { if (path == NULL) NCCLCHECK(getPciPath(busId, &path)); diff --git a/src/group.cc b/src/group.cc index 43a9328..b695f3a 100644 --- a/src/group.cc +++ b/src/group.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -125,7 +125,7 @@ static ncclResult_t scheduleSendRecv(struct ncclComm* comm, int delta, int chann info.sendbytes = sendbytes; info.recvbytes = recvbytes; if (delta == 0 && sendbytes != recvbytes) return ncclInvalidUsage; - NCCLCHECK(ncclSaveP2pKernel(&info)); + NCCLCHECK(ncclSetupP2pKernel(&info)); return ncclSuccess; } @@ -133,7 +133,7 @@ void* ncclAsyncThreadPreconnect(void* args_) { struct ncclAsyncArgs* args = (struct ncclAsyncArgs*)args_; struct ncclComm* comm = args->coll.comm; CUDACHECKTHREAD(cudaSetDevice(comm->cudaDev)); - NCCLCHECKTHREAD(ncclTransportP2pSetup(comm, NULL)); + NCCLCHECKTHREAD(ncclTransportP2pSetup(comm, NULL, 0)); return args; } @@ -163,6 +163,7 @@ ncclResult_t ncclGroupEnd() { int doneArray[MAX_ASYNC_OPS]; for (int i=0; i<ncclGroupIndex; i++) doneArray[i] = 1; ncclResult_t ret = ncclGroupError; + int usingCudaGraphAll = -1; if (ret != ncclSuccess) goto group_cleanup; /* Launch async ncclCommInitRank */ @@ -304,34 +305,62 @@ sched_delta: * prevent some ranks from launching their network threads, which would * prevent the NCCL call from completing, blocking the cudaFree call. */ + + // Check whether we are in cuda graph mode + cudaGraph_t* graphs; + NCCLCHECK(ncclCalloc(&graphs, ncclGroupIndex)); + for (int i=0; i<ncclGroupIndex; i++) { + struct ncclAsyncArgs* args = ncclGroupArgs+i; + if (args->funcType == ASYNC_FUNC_COLL) { + ncclComm_t comm = args->coll.comm; + NCCLCHECKGOTO(ncclGetCudaGraph(comm, graphs+i), ret, group_cleanup); + if (usingCudaGraphAll == -1) { + usingCudaGraphAll = comm->usingCudaGraph; + } else if (usingCudaGraphAll != comm->usingCudaGraph) { + WARN("Illegal to have some communicators in graph mode while others not"); + ret = ncclInvalidUsage; + goto group_cleanup; + } + } + } for (int i=0; i<ncclGroupIndex; i++) { struct ncclAsyncArgs* args = ncclGroupArgs+i; if (args->funcType == ASYNC_FUNC_COLL) { ncclComm_t comm = args->coll.comm; - NCCLCHECKGOTO(ncclSaveCommKernels(comm), ret, group_cleanup); + NCCLCHECKGOTO(ncclSetupAsyncKernels(comm), ret, group_cleanup); } } for (int i=0; i<ncclGroupIndex; i++) { struct ncclAsyncArgs* args = ncclGroupArgs+i; if (args->funcType == ASYNC_FUNC_COLL) { - if (args->coll.comm->userStream == NULL) + if (args->coll.comm->userStream == cudaStreamDefault || + args->coll.comm->userStream == cudaStreamPerThread || + args->coll.comm->userStream == cudaStreamLegacy) CUDACHECKGOTO(cudaSetDevice(args->coll.comm->cudaDev), ret, end); - NCCLCHECKGOTO(ncclBarrierEnqueue(args->coll.comm), ret, end); + if (usingCudaGraphAll == 1) { + NCCLCHECKGOTO(ncclCudaGraphHostSetup(args->coll.comm, graphs[i]), ret, end); + } else { + ncclEnqueueHostSetup<0>(args->coll.comm->enqueueInfo); + } + NCCLCHECKGOTO(ncclLaunchBarrier(args->coll.comm), ret, end); } } for (int i=0; i<ncclGroupIndex; i++) { struct ncclAsyncArgs* args = ncclGroupArgs+i; if (args->funcType == ASYNC_FUNC_COLL) { CUDACHECKGOTO(cudaSetDevice(args->coll.comm->cudaDev), ret, end); - NCCLCHECKGOTO(ncclBarrierEnqueueWait(args->coll.comm), ret, end); + NCCLCHECKGOTO(ncclLaunchKernel(args->coll.comm), ret, end); } } for (int i=0; i<ncclGroupIndex; i++) { struct ncclAsyncArgs* args = ncclGroupArgs+i; if (args->funcType == ASYNC_FUNC_COLL) { - if (args->coll.comm->userStream == NULL) + if (args->coll.comm->userStream == cudaStreamDefault || + args->coll.comm->userStream == cudaStreamPerThread || + args->coll.comm->userStream == cudaStreamLegacy) CUDACHECKGOTO(cudaSetDevice(args->coll.comm->cudaDev), ret, end); - NCCLCHECKGOTO(ncclEnqueueEvents(args->coll.comm), ret, end); + NCCLCHECKGOTO(ncclRecordEvents(args->coll.comm), ret, end); + NCCLCHECKGOTO(ncclLaunchReset(args->coll.comm), ret, end); } } @@ -370,8 +399,7 @@ group_cleanup: pthread_mutex_unlock(&state->poolMutex); state->nextOps = NULL; - comm->myParams->gridDim.x = comm->myParams->blockDim.x = 0; - comm->userStreamSet = false; + ncclLaunchReset(comm); } } } diff --git a/src/include/alloc.h b/src/include/alloc.h index cc652ce..08c63e9 100644 --- a/src/include/alloc.h +++ b/src/include/alloc.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -38,8 +38,13 @@ static ncclResult_t ncclCalloc(T** ptr, size_t nelem) { template <typename T> static ncclResult_t ncclCudaCalloc(T** ptr, size_t nelem) { + // Need async stream for P2P pre-connect + CUDA Graph + cudaStream_t stream; + CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); CUDACHECK(cudaMalloc(ptr, nelem*sizeof(T))); - CUDACHECK(cudaMemset(*ptr, 0, nelem*sizeof(T))); + CUDACHECK(cudaMemsetAsync(*ptr, 0, nelem*sizeof(T), stream)); + CUDACHECK(cudaStreamSynchronize(stream)); + CUDACHECK(cudaStreamDestroy(stream)); return ncclSuccess; } diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index dbe4320..fff8e26 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -14,8 +14,8 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv); ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out); ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState); ncclResult_t bootstrapAllGather(void* commState, void* allData, int size); -ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size); -ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size); +ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size); +ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size); ncclResult_t bootstrapRemAlloc(size_t size, int rank, void* commState, int* id, cudaIpcMemHandle_t* ipc, void** ptr); ncclResult_t bootstrapRemFree(int id, int rank, void* commState); ncclResult_t bootstrapClose(void* commState); diff --git a/src/include/checks.h b/src/include/checks.h index ce81312..131c079 100644 --- a/src/include/checks.h +++ b/src/include/checks.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -27,6 +27,15 @@ } \ } while(false) +// Report failure but clear error and continue +#define CUDACHECKIGNORE(cmd) do { \ + cudaError_t err = cmd; \ + if( err != cudaSuccess ) { \ + INFO(NCCL_ALL,"%s:%d Cuda failure '%s'", __FILE__, __LINE__, cudaGetErrorString(err)); \ + (void) cudaGetLastError(); \ + } \ +} while(false) + #include <errno.h> // Check system calls #define SYSCHECK(call, name) do { \ diff --git a/src/include/comm.h b/src/include/comm.h index 56116e0..640dcd3 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -77,16 +77,17 @@ struct ncclComm { int nNodes; int localRanks; - enum { GROUP, PARALLEL } launchMode; + enum { GROUP, PARALLEL, GROUP_GRAPH } launchMode; cudaStream_t userStream; bool userStreamSet; cudaEvent_t doneEvent; + cudaEvent_t intDoneEvent; bool checkPointers; - // Counter to make sure collectives match (needed for bcast/reduce - // where syncs are not symmetric). + // Counter for tracking CUDA launches (P2P and collectives included) uint64_t opCount; - uint64_t lastOpCount; + // Collective operation counter + uint64_t collOpCount; // Channels for collectives int nChannels; @@ -145,12 +146,19 @@ struct ncclComm { struct ncclInfo* asyncOps; int asyncOpCount; size_t asyncTotalSize; + int lastChannel; //list of async p2p operation queued in a group semantics struct ncclP2Plist* p2pSends; struct ncclP2Plist* p2pRecvs; int p2pSendCount; int p2pRecvCount; + + // Store info for cudaGraph + int usingCudaGraph; // Only use it during capture time, not launch time + struct ncclQueueInfo* enqueueInfo; + cudaGraphNode_t lastSetupNode; + unsigned long long lastCudaGraphId; }; #endif diff --git a/src/include/devcomm.h b/src/include/devcomm.h index 9870117..9071dd1 100644 --- a/src/include/devcomm.h +++ b/src/include/devcomm.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -99,8 +99,9 @@ struct ncclConnInfo { struct ncclConnector { int connected; struct ncclProxyArgs *proxyAppend; + struct ncclProxyArgs **proxyAppendPtr; struct ncclTransportComm* transportComm; - void* transportResources; // Host-side resources + void* transportResources; struct ncclConnInfo conn; struct ncclComm *comm; }; @@ -125,9 +126,21 @@ struct ncclTree { int down[NCCL_MAX_TREE_ARITY]; }; +#define NCCL_MAX_DIRECT_ARITY 7 +struct ncclDirect { + int depth; + int out; + int nHeads; + int headRank; + int shift; + int up[NCCL_MAX_DIRECT_ARITY]; + int down[NCCL_MAX_DIRECT_ARITY]; +}; + +#define NCCL_MAX_CONNS 2 struct ncclPeer { - struct ncclConnector send; - struct ncclConnector recv; + struct ncclConnector send[NCCL_MAX_CONNS]; + struct ncclConnector recv[NCCL_MAX_CONNS]; }; struct ncclDevComm; @@ -161,6 +174,8 @@ struct ncclWorkElem { struct { size_t sendCount; size_t recvCount; + int sendChunkSize; + int recvChunkSize; int32_t delta; uint16_t nThreads; } p2p; @@ -177,7 +192,7 @@ struct ncclChannel { struct { struct ncclRing ring; struct ncclTree tree; - struct ncclTree collTree; + struct ncclDirect collTree; int id; @@ -189,6 +204,12 @@ struct ncclChannel { struct ncclWork* workFifo; int workCount; uint64_t workFifoTail; // Only used by CPU + uint16_t index; // Only used by GPU + + // GDRCOPY support + struct ncclWork* workFifoGdr; + struct ncclWork* workFifoDev; + void* gdrMemDesc; }; int data[0x80]; }; diff --git a/src/include/enqueue.h b/src/include/enqueue.h index 2c2ab1f..6081f85 100644 --- a/src/include/enqueue.h +++ b/src/include/enqueue.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -11,15 +11,82 @@ #include "group.h" #include "collectives.h" +size_t ncclKernMaxLocalSize(); ncclResult_t ncclEnqueueCheck(struct ncclInfo* info); ncclResult_t ncclCpuBarrierIn(struct ncclComm* comm, int* isLast); ncclResult_t ncclCpuBarrierLast(struct ncclComm* comm); ncclResult_t ncclCpuBarrierOut(struct ncclComm* comm); -ncclResult_t ncclBarrierEnqueue(struct ncclComm* comm); -ncclResult_t ncclBarrierEnqueueWait(struct ncclComm* comm); -ncclResult_t ncclEnqueueEvents(struct ncclComm* comm); -ncclResult_t ncclSaveKernel(struct ncclInfo* info); -ncclResult_t ncclSaveP2pKernel(struct ncclInfo* info); -ncclResult_t ncclSaveCommKernels(struct ncclComm* comm); +ncclResult_t ncclLaunchBarrier(struct ncclComm* comm); +ncclResult_t ncclLaunchKernel(ncclComm_t comm); +ncclResult_t ncclRecordEvents(struct ncclComm* comm); +ncclResult_t ncclLaunchReset(ncclComm_t comm); +ncclResult_t ncclSetupP2pKernel(struct ncclInfo* info); +ncclResult_t ncclSetupAsyncKernels(struct ncclComm* comm); +template<int USING_CUDA_GRAPH> +void CUDART_CB ncclEnqueueHostSetup(void* arg); +ncclResult_t ncclGetCudaGraph(ncclComm_t comm, cudaGraph_t* graph); +ncclResult_t ncclCudaGraphHostSetup(ncclComm_t comm, cudaGraph_t graph); +// Enqueue information (for kernel and proxy) for each operation +struct ncclQueueElem { + struct ncclWorkElem work; + struct ncclProxyArgs proxyArgs; + struct ncclQueueElem* next; +}; + +// Store enqueue elements in a list +struct ncclQueueElemList { + struct ncclQueueElem* head; + struct ncclQueueElem* tail; +}; + +// Structure passed to CUDA graph +struct ncclQueueInfo { + ncclComm_t comm; + int maxChannels; // Dynamic version of gridDim + ncclResult_t ret; // Return value of host setup call + struct ncclQueueElemList elemList; +}; + +// Get next element from enqueue list +static ncclResult_t ncclAddQueueElem(struct ncclQueueInfo* eqInfo, struct ncclQueueElem** elemOut) { + if (eqInfo == NULL) return ncclInternalError; + struct ncclQueueElemList* list = &eqInfo->elemList; + if (list->tail != NULL) { + *elemOut = list->tail; + memset(*elemOut, 0, sizeof(struct ncclWorkElem) + sizeof(struct ncclProxyArgs)); + } else { + NCCLCHECK(ncclCalloc(&list->tail, 1)); + *elemOut = list->tail; + list->head = list->tail; + } + if (list->tail->next == NULL) { + NCCLCHECK(ncclCalloc(&list->tail->next, 1)); + } + list->tail = list->tail->next; + return ncclSuccess; +} + +// Reset element queue +static ncclResult_t ncclResetQueueInfo(struct ncclQueueInfo* eqInfo) { + if (eqInfo == NULL) return ncclInternalError; + eqInfo->maxChannels = 0; + eqInfo->ret = ncclSuccess; + eqInfo->elemList.tail = eqInfo->elemList.head; + return ncclSuccess; +} + +// Destroy enqueue info space +// used by both CUDA graph and non CUDA graph +static void ncclDestroyQueueInfo(void* ptr) { + if (ptr == NULL) return; + struct ncclQueueInfo* eqInfo = (struct ncclQueueInfo*)ptr; + struct ncclQueueElem* head = eqInfo->elemList.head; + while (head != NULL) { + struct ncclQueueElem* temp = head; + head = head->next; + free(temp); + } + free(eqInfo); +} #endif // End include guard diff --git a/src/include/gdrwrap.h b/src/include/gdrwrap.h new file mode 100644 index 0000000..a34193c --- /dev/null +++ b/src/include/gdrwrap.h @@ -0,0 +1,251 @@ +/************************************************************************* + * Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef NCCL_GDRWRAP_H_ +#define NCCL_GDRWRAP_H_ + +#include "nccl.h" +#include <stdint.h> // for standard [u]intX_t types +#include <stdio.h> + +// These can be used if the GDR library isn't thread safe +#include <pthread.h> +extern pthread_mutex_t gdrLock; +#define GDRLOCK() pthread_mutex_lock(&gdrLock) +#define GDRUNLOCK() pthread_mutex_unlock(&gdrLock) +#define GDRLOCKCALL(cmd, ret) do { \ + GDRLOCK(); \ + ret = cmd; \ + GDRUNLOCK(); \ +} while(false) + +#define GDRCHECK(cmd) do { \ + int e; \ + /* GDRLOCKCALL(cmd, e); */ \ + e = cmd; \ + if( e != 0 ) { \ + WARN("GDRCOPY failure %d", e); \ + return ncclSystemError; \ + } \ +} while(false) + +// This is required as the GDR memory is mapped WC +#if !defined(__NVCC__) +#if defined(__PPC__) +static inline void wc_store_fence(void) { asm volatile("sync") ; } +#elif defined(__x86_64__) +#include <immintrin.h> +static inline void wc_store_fence(void) { _mm_sfence(); } +#elif defined(__aarch64__) +#ifdef __cplusplus +#include <atomic> +static inline void wc_store_fence(void) { std::atomic_thread_fence(std::memory_order_release); } +#else +#include <stdatomic.h> +static inline void wc_store_fence(void) { atomic_thread_fence(memory_order_release); } +#endif +#endif +#endif + +//#define GDR_DIRECT 1 +#ifdef GDR_DIRECT +// Call the GDR API library code directly rather than via +// dlopen() wrappers +#include <gdrapi.h> + +static ncclResult_t wrap_gdr_symbols(void) { return ncclSuccess; } +static gdr_t wrap_gdr_open(void) { gdr_t g = gdr_open(); return g; } +static ncclResult_t wrap_gdr_close(gdr_t g) { GDRCHECK(gdr_close(g)); return ncclSuccess; } +static ncclResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle) { + GDRCHECK(gdr_pin_buffer(g, addr, size, p2p_token, va_space, handle)); + return ncclSuccess; +} +static ncclResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle) { + GDRCHECK(gdr_unpin_buffer(g, handle)); + return ncclSuccess; +} +static ncclResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info) { + GDRCHECK(gdr_get_info(g, handle, info)); + return ncclSuccess; +} +static ncclResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size) { + GDRCHECK(gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size)); + return ncclSuccess; +} +static ncclResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size) { + GDRCHECK(gdr_unmap(gdr_t g, gdr_mh_t handle, void **va, size_t size)); + return ncclSuccess; +} +static void wrap_gdr_runtime_get_version(int *major, int *minor) { + gdr_runtime_get_version(major, minor); + return ncclSuccess; +} +static void wrap_gdr_driver_get_version(gdr_t g, int *major, int *minor) { + gdr_driver_get_version(g, major, minor); + return ncclSuccess; +} +static ncclResult_t wrap_gdr_copy_to_mapping(gdr_mh_t handle, void *map_d_ptr, const void *h_ptr, size_t size) { + GDRCHECK(gdr_copy_to_mapping(handle, map_d_ptr, h_ptr, size)); + return ncclSuccess; +} +static ncclResult_t wrap_gdr_copy_from_mapping(gdr_mh_t handle, void *h_ptr, const void *map_d_ptr, size_t size) { + GDRCHECK(gdr_copy_from_mapping(handle, h_ptr, map_d_ptr, size)); + return ncclSuccess; +} + +#else +// Dynamically handle dependency the GDR API library + +/* Extracted from gdrapi.h (v2.1 Nov 2020) */ + +#define GPU_PAGE_SHIFT 16 +#define GPU_PAGE_SIZE (1UL << GPU_PAGE_SHIFT) +#define GPU_PAGE_OFFSET (GPU_PAGE_SIZE-1) +#define GPU_PAGE_MASK (~GPU_PAGE_OFFSET) + +struct gdr; +typedef struct gdr *gdr_t; + +typedef struct gdr_mh_s { + unsigned long h; +} gdr_mh_t; + +struct gdr_info { + uint64_t va; + uint64_t mapped_size; + uint32_t page_size; + uint64_t tm_cycles; + uint32_t cycles_per_ms; + unsigned mapped:1; + unsigned wc_mapping:1; +}; +typedef struct gdr_info gdr_info_t; + +/* End of gdrapi.h */ + +ncclResult_t wrap_gdr_symbols(void); + +gdr_t wrap_gdr_open(void); +ncclResult_t wrap_gdr_close(gdr_t g); +ncclResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle); +ncclResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle); +ncclResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info); +ncclResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size); +ncclResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size); +ncclResult_t wrap_gdr_runtime_get_version(int *major, int *minor); +ncclResult_t wrap_gdr_driver_get_version(gdr_t g, int *major, int *minor); +ncclResult_t wrap_gdr_copy_to_mapping(gdr_mh_t handle, void *map_d_ptr, const void *h_ptr, size_t size); +ncclResult_t wrap_gdr_copy_from_mapping(gdr_mh_t handle, void *h_ptr, const void *map_d_ptr, size_t size); + +#endif // GDR_DIRECT + +// Global GDR driver handle +extern gdr_t ncclGdrCopy; + +#include "alloc.h" + +typedef struct gdr_mem_desc { + void *gdrDevMem; + void *gdrMap; + size_t gdrOffset; + size_t gdrMapSize; + gdr_mh_t gdrMh; +} gdr_mem_desc_t; + +static gdr_t ncclGdrInit() { + int libMajor, libMinor, drvMajor, drvMinor; + gdr_t handle = NULL; + // Dynamically load the GDRAPI library symbols + if (wrap_gdr_symbols() == ncclSuccess) { + handle = wrap_gdr_open(); + + if (handle != NULL) { + ncclResult_t res; + + // Query the version of libgdrapi + NCCLCHECKGOTO(wrap_gdr_runtime_get_version(&libMajor, &libMinor), res, error); + + // Query the version of gdrdrv driver + NCCLCHECKGOTO(wrap_gdr_driver_get_version(handle, &drvMajor, &drvMinor), res, error); + + // Only support GDRAPI 2.1 and later + if (libMajor < 2 || (libMajor == 2 && libMinor < 1) || drvMajor < 2 || (drvMajor == 2 && drvMinor < 1)) { + goto error; + } + else + INFO(NCCL_INIT, "GDRCOPY enabled library %d.%d driver %d.%d", libMajor, libMinor, drvMajor, drvMinor); + } + } + return handle; +error: + if (handle != NULL) (void) wrap_gdr_close(handle); + return NULL; +} + +template <typename T> +static ncclResult_t ncclGdrCudaCalloc(T** ptr, T** devPtr, size_t nelem, void** gdrHandle) { + gdr_info_t info; + size_t mapSize; + gdr_mh_t mh; + char *devMem; + void *gdrMap; + + mapSize = sizeof(T)*nelem; + + // GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE + ALIGN_SIZE(mapSize, GPU_PAGE_SIZE); + // GDRCOPY Pinned buffer has to be GPU_PAGE_SIZE aligned too + NCCLCHECK(ncclCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1)); + uint64_t alignedAddr = (((uint64_t) devMem) + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; + size_t align = alignedAddr - (uint64_t)devMem; + + //TRACE(NCCL_INIT, "GDRCOPY: Pin buffer 0x%lx (%p) align %zi size %zi", alignedAddr, devMem, align, mapSize); + NCCLCHECK(wrap_gdr_pin_buffer(ncclGdrCopy, alignedAddr, mapSize, 0, 0, &mh)); + + NCCLCHECK(wrap_gdr_map(ncclGdrCopy, mh, &gdrMap, mapSize)); + //TRACE(NCCL_INIT, "GDRCOPY : mapped %p (0x%lx) at %p", devMem, alignedAddr, gdrMap); + + NCCLCHECK(wrap_gdr_get_info(ncclGdrCopy, mh, &info)); + + // Will offset ever be non zero ? + ssize_t off = info.va - alignedAddr; + + gdr_mem_desc_t* md; + NCCLCHECK(ncclCalloc(&md, 1)); + md->gdrDevMem = devMem; + md->gdrMap = gdrMap; + md->gdrMapSize = mapSize; + md->gdrOffset = off+align; + md->gdrMh = mh; + *gdrHandle = md; + + *ptr = (T *)((char *)gdrMap+off); + if (devPtr) *devPtr = (T *)(devMem+off+align); + + TRACE(NCCL_INIT, "GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zi at %p", + md->gdrDevMem, md->gdrMap, md->gdrOffset, md->gdrMh.h, md->gdrMapSize, *ptr); + + return ncclSuccess; +} + +template <typename T> +static ncclResult_t ncclGdrCudaCopy(void *gdrHandle, T* dst, T* src, size_t nelem) { + gdr_mem_desc_t *md = (gdr_mem_desc_t*)gdrHandle; + NCCLCHECK(wrap_gdr_copy_to_mapping(md->gdrMh, dst, src, nelem*sizeof(T))); + return ncclSuccess; +} + +static ncclResult_t ncclGdrCudaFree(void* gdrHandle) { + gdr_mem_desc_t *md = (gdr_mem_desc_t*)gdrHandle; + NCCLCHECK(wrap_gdr_unmap(ncclGdrCopy, md->gdrMh, md->gdrMap, md->gdrMapSize)); + NCCLCHECK(wrap_gdr_unpin_buffer(ncclGdrCopy, md->gdrMh)); + CUDACHECK(cudaFree(md->gdrDevMem)); + free(md); + + return ncclSuccess; +} + +#endif // End include guard diff --git a/src/include/graph.h b/src/include/graph.h index 0c912eb..892c6d2 100644 --- a/src/include/graph.h +++ b/src/include/graph.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -28,7 +28,7 @@ ncclResult_t ncclTopoTrimSystem(struct ncclTopoSystem* system, struct ncclComm* ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm); // Query topology -ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct ncclTopoGraph* graph, int channelId, int* net); +ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct ncclTopoGraph* graph, int channelId, int rr, int* net); ncclResult_t ncclTopoCheckP2p(struct ncclTopoSystem* system, int64_t id1, int64_t id2, int* p2p, int *read, int* intermediateRank); ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* topo, int64_t busId, int netDev, int read, int* useGdr); @@ -91,13 +91,11 @@ struct ncclTopoRanks { }; ncclResult_t ncclTopoPreset(struct ncclComm* comm, - struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph, + struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoRanks* topoRanks); ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, int* treePatterns, - struct ncclTopoRanks** allTopoRanks, int* rings); - -ncclResult_t ncclTopoConnectCollNet(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, int rank); + struct ncclTopoRanks** allTopoRanks, int* rings, struct ncclTopoGraph* collNetGraph); ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCompCap, struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph); #include "info.h" diff --git a/src/include/ibvwrap.h b/src/include/ibvwrap.h index 0943f99..4ec1ac6 100644 --- a/src/include/ibvwrap.h +++ b/src/include/ibvwrap.h @@ -4,7 +4,7 @@ * Copyright (c) 2005, 2006, 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2005 PathScale, Inc. All rights reserved. * - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -1089,7 +1089,7 @@ static inline int ibv_post_send(struct ibv_qp *qp, struct ibv_send_wr *wr, struc static inline ncclResult_t wrap_ibv_post_send(struct ibv_qp *qp, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr) { int ret = qp->context->ops.post_send(qp, wr, bad_wr); /*returns 0 on success, or the value of errno on failure (which indicates the failure reason)*/ if (ret != IBV_SUCCESS) { - WARN("ibv_post_send() failed with error %s", strerror(ret)); + WARN("ibv_post_send() failed with error %s, Bad WR %p, First WR %p", strerror(ret), wr, *bad_wr); return ncclSystemError; } return ncclSuccess; diff --git a/src/include/info.h b/src/include/info.h index 8f125e1..78a5297 100644 --- a/src/include/info.h +++ b/src/include/info.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -18,8 +18,7 @@ typedef enum { ncclPatternTreeUp, ncclPatternTreeDown, ncclPatternTreeUpDown, - ncclPatternCollTreeUp, - ncclPatternCollTreeDown + ncclPatternCollTreeUpDown } ncclPattern_t; // Used to pass NCCL call information between functions @@ -49,6 +48,8 @@ struct ncclInfo { int nchunksPerLoop; ssize_t sendbytes; ssize_t recvbytes; + int recvChunkSize; + int sendChunkSize; uint32_t delta; int channelId; }; diff --git a/src/include/net.h b/src/include/net.h index 244215e..ef553e2 100644 --- a/src/include/net.h +++ b/src/include/net.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -47,11 +47,12 @@ static ncclResult_t ncclGpuGdrSupport(int* gdrSupport) { ncclNetHandle_t handle; void* gpuPtr = NULL; void* mHandle = NULL; - NCCLCHECK(ncclNetListen(dev, &handle, &lComm)); - NCCLCHECK(ncclNetConnect(dev, &handle, &sComm)); - NCCLCHECK(ncclNetAccept(lComm, &rComm)); - CUDACHECK(cudaMalloc(&gpuPtr, GPU_BUF_SIZE)); + ncclResult_t ret; ncclDebugNoWarn = NCCL_NET; + NCCLCHECKGOTO(ncclNetListen(dev, &handle, &lComm), ret, cleanup1); + NCCLCHECKGOTO(ncclNetConnect(dev, &handle, &sComm), ret, cleanup2); + NCCLCHECKGOTO(ncclNetAccept(lComm, &rComm), ret, cleanup3); + CUDACHECKGOTO(cudaMalloc(&gpuPtr, GPU_BUF_SIZE), ret, cleanup4); if (ncclNetRegMr(sComm, gpuPtr, GPU_BUF_SIZE, NCCL_PTR_CUDA, &mHandle) == ncclSuccess) { NCCLCHECK(ncclNetDeregMr(sComm, mHandle)); NCCLCHECK(ncclNetRegMr(rComm, gpuPtr, GPU_BUF_SIZE, NCCL_PTR_CUDA, &mHandle)); @@ -60,9 +61,13 @@ static ncclResult_t ncclGpuGdrSupport(int* gdrSupport) { } ncclDebugNoWarn = 0; CUDACHECK(cudaFree(gpuPtr)); +cleanup4: NCCLCHECK(ncclNetCloseRecv(rComm)); +cleanup3: NCCLCHECK(ncclNetCloseSend(sComm)); +cleanup2: NCCLCHECK(ncclNetCloseListen(lComm)); +cleanup1: break; } return ncclSuccess; diff --git a/src/include/proxy.h b/src/include/proxy.h index 9796baf..58a58b2 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -14,47 +14,66 @@ enum ncclProxyOpState { ncclProxyOpNone, ncclProxyOpReady, ncclProxyOpProgress } struct ncclProxyArgs; typedef ncclResult_t (*proxyProgressFunc_t)(struct ncclProxyArgs*); -struct ncclProxyArgs { - proxyProgressFunc_t progress; +#define NCCL_PROXY_MAX_SUBS MAXCHANNELS +static_assert(NCCL_MAX_WORK_ELEMENTS <= MAXCHANNELS, "Not enough sub space for max work elements"); + +struct ncclProxySubArgs { struct ncclChannel* channel; struct ncclConnector* connector; - size_t sendbytes; - size_t recvbytes; - int sliceSteps; - int chunkSteps; int nsteps; - uint64_t opCount; - int protocol; - int segment; // Only for profiling - ncclDataType_t dtype; - ncclRedOp_t redOp; - int state; // add component before this line -- it is left out during initialization + ssize_t sendbytes; + ssize_t recvbytes; + int sendChunkSize; + int recvChunkSize; + int delta; // Internal state + uint64_t base; uint64_t posted; - uint64_t received; // Only used by recv proxy to wait for flush. + uint64_t received; + uint64_t flushed; uint64_t transmitted; uint64_t done; uint64_t end; void* requests[NCCL_STEPS]; +}; + +struct ncclProxyArgs { + proxyProgressFunc_t progress; + struct ncclProxySubArgs subs[NCCL_PROXY_MAX_SUBS]; + int nsubs; + int done; + int sliceSteps; + int chunkSteps; + int chunkSize; + uint64_t opCount; + uint64_t commOpCount; + int protocol; + ncclDataType_t dtype; + ncclRedOp_t redOp; + ncclPattern_t pattern; + int root; + int state; + char* sharedBuff[NCCL_STEPS]; + int sharedSize[NCCL_STEPS]; + int idle; // Element linking pthread_mutex_t mutex; struct ncclProxyArgs* next; struct ncclProxyArgs* nextPeer; - struct ncclProxyArgs* nextGroup; struct ncclProxyArgs** proxyAppendPtr; }; struct ncclProxySharedBuffers { - int nslots; - int slotSize; - char* cudaBuff[2*MAXCHANNELS]; - int* cudaUsed[2*MAXCHANNELS]; - char* hostBuff[2*MAXCHANNELS]; - int* hostUsed[2*MAXCHANNELS]; + int size; + char* cudaBuff; + char* hostBuff; struct ncclProxyArgs* proxyAppend[2*MAXCHANNELS]; // Separate send and recv + // Collnet sharing is technically per device, but for now MAXDEVICES == MAXCHANNELS. + struct ncclProxyArgs* proxyAppendCollNet[2*MAXCHANNELS]; + void* collNetResources; }; struct ncclProxyPool; @@ -63,11 +82,16 @@ struct ncclProxyState { pthread_mutex_t opsMutex; pthread_mutex_t poolMutex; bool stop; - struct ncclProxySharedBuffers* sharedBuffs; - struct ncclProxyArgs* ops; - struct ncclProxyArgs* nextOps; + struct ncclProxySharedBuffers sharedBuffs; + struct ncclProxyArgs* ops; // Running operations, used by proxy thread + struct ncclProxyArgs* postedOps; // Posted operations, shared between proxy and main thread, locked with opsMutex + struct ncclProxyArgs* postedOpsEnd; + struct ncclProxyArgs* nextOps; // Pending operations, used by main thread (could still be cancelled) struct ncclProxyArgs* nextOpsEnd; - struct ncclProxyArgs* pool; + struct ncclProxyArgs* pool; // Free operations for main thread + struct ncclProxyArgs* poolFreed; // Freed operations by the progress thread + struct ncclProxyArgs* poolReturned; // Shared between main and progress thread, lock with poolMutex + struct ncclProxyPool* pools; }; @@ -79,15 +103,16 @@ enum proxyMode { proxyTo = 2 }; -ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks); -ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel, int segment); +ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int nranks); +ncclResult_t ncclProxyComputeP2p(struct ncclInfo* info, struct ncclProxyArgs* args); +ncclResult_t ncclProxySaveP2p(struct ncclComm* comm, struct ncclProxyArgs* args); ncclResult_t ncclProxyStart(struct ncclComm* comm); ncclResult_t ncclProxyCreate(struct ncclComm* comm); ncclResult_t ncclProxyDestroy(struct ncclComm* comm); ncclResult_t ncclProxySharedBuffersInit(struct ncclComm* comm, int cuda, int* size, char** ptr); -ncclResult_t ncclProxySharedBuffersAlloc(struct ncclComm* comm, int cuda, int type, int channel, int size, char** ptr); -ncclResult_t ncclProxySharedBuffersFree(struct ncclComm* comm, int cuda, int type, int channel, int size, char* ptr); +ncclResult_t ncclProxySharedBuffersGetP2p(struct ncclComm* comm, int cuda, int type, int channel, int slot, int index, char** ptr); +ncclResult_t ncclProxySharedBuffersGetCollNet(struct ncclComm* comm, int cuda, int type, int slot, int channel, char** ptr); ncclResult_t ncclProxySharedBuffersDestroy(struct ncclComm* comm); #include <unistd.h> diff --git a/src/include/transport.h b/src/include/transport.h index 2ecc727..33ff432 100644 --- a/src/include/transport.h +++ b/src/include/transport.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -41,7 +41,7 @@ struct ncclConnect { }; struct ncclTransportComm { - ncclResult_t (*setup)(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo*, struct ncclPeerInfo*, struct ncclConnect*, struct ncclConnector*, int channelId); + ncclResult_t (*setup)(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo*, struct ncclPeerInfo*, struct ncclConnect*, struct ncclConnector*, int channelId, int connIndex); ncclResult_t (*connect)(struct ncclComm* comm, struct ncclConnect*, int nranks, int rank, struct ncclConnector*); ncclResult_t (*free)(void*); ncclResult_t (*proxy)(struct ncclProxyArgs*); @@ -54,7 +54,10 @@ struct ncclTransport { struct ncclTransportComm recv; }; -ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend); -ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph); +ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex); +ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex); +enum { collNetRecv=0, collNetSend=1 }; +int ncclTransportCollNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int masterRank, int masterPeer, int collNetGraphChannelId, int type); +ncclResult_t ncclTransportCollNetCheck(struct ncclComm* comm, int collNetSetupFail); #endif diff --git a/src/init.cc b/src/init.cc index 08bddfd..ea18362 100644 --- a/src/init.cc +++ b/src/init.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -7,6 +7,7 @@ #include "nccl.h" #include "channel.h" #include "nvmlwrap.h" +#include "gdrwrap.h" #include "bootstrap.h" #include "transport.h" #include "group.h" @@ -111,15 +112,31 @@ ncclResult_t initNet() { return ncclSuccess; } +// GDRCOPY support: Off by default +NCCL_PARAM(GdrCopyEnable, "GDRCOPY_ENABLE", 0); + +// GDRCOPY support +gdr_t ncclGdrCopy = NULL; + +ncclResult_t initGdrCopy() { + if (ncclParamGdrCopyEnable() == 1) { + ncclGdrCopy = ncclGdrInit(); + } + return ncclSuccess; +} + NCCL_PARAM(CollNetEnable, "COLLNET_ENABLE", 0); pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; static bool initialized = false; +static size_t maxLocalSizeBytes = 0; static ncclResult_t ncclInit() { if (initialized) return ncclSuccess; pthread_mutex_lock(&initLock); if (!initialized) { initEnv(); + initGdrCopy(); + maxLocalSizeBytes = ncclKernMaxLocalSize(); NCCLCHECK(initNet()); INFO(NCCL_INIT, "Using network %s", ncclNetName()); initialized = true; @@ -179,10 +196,15 @@ static ncclResult_t commFree(ncclComm_t comm) { if (comm->doneEvent != NULL) CUDACHECK(cudaEventDestroy(comm->doneEvent)); + if (comm->intDoneEvent != NULL) + CUDACHECK(cudaEventDestroy(comm->intDoneEvent)); + if (comm->launchMode == ncclComm::GROUP) { CUDACHECK(cudaStreamDestroy(comm->groupStream)); } + ncclDestroyQueueInfo(comm->enqueueInfo); + // Last rank frees shared resources between threads int isLast; NCCLCHECK(ncclCpuBarrierIn(comm, &isLast)); @@ -216,6 +238,8 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { // the device we're on (failure cause #1) , better know it early. cudaEvent_t doneEvent; CUDACHECK(cudaEventCreateWithFlags(&doneEvent, cudaEventDisableTiming)); + cudaEvent_t intDoneEvent; + CUDACHECK(cudaEventCreateWithFlags(&intDoneEvent, cudaEventDisableTiming)); struct ncclComm* comm; NCCLCHECK(ncclCalloc(&comm, 1)); @@ -227,6 +251,7 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { TRACE(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x", comm, rank, ndev, comm->cudaDev, comm->busId); comm->doneEvent = doneEvent; + comm->intDoneEvent = intDoneEvent; comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false; #if CUDART_VERSION >= 9020 comm->groupCudaStream = ncclParamGroupCudaStream(); @@ -247,6 +272,11 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { comm->asyncOpCount = 0; comm->asyncTotalSize = 0; + NCCLCHECK(ncclCalloc(&comm->enqueueInfo, 1)); + comm->enqueueInfo->comm = comm; + comm->lastSetupNode = NULL; + comm->lastCudaGraphId = -1; + static_assert(MAXCHANNELS <= sizeof(*comm->connectSend)*8, "comm->connectSend must have enough bits for all channels"); static_assert(MAXCHANNELS <= sizeof(*comm->connectRecv)*8, "comm->connectRecv must have enough bits for all channels"); NCCLCHECK(ncclCalloc(&comm->connectSend, comm->nRanks)); @@ -381,11 +411,11 @@ ncclResult_t ncclCommSetIntra(struct ncclComm* comm, int rank, int ranks, struct int cgMdLaunch = 0; // Set CG Mode - comm->launchMode = ncclComm::GROUP; + comm->launchMode = ncclComm::PARALLEL; char* str = getenv("NCCL_LAUNCH_MODE"); if (str) INFO(NCCL_ENV, "NCCL_LAUNCH_MODE set by environment to %s", str); - if (comm->intraRanks == 1 || (str && strcmp(str, "PARALLEL") == 0)) { - comm->launchMode = ncclComm::PARALLEL; + if (str && strcmp(str, "GROUP") == 0) { + comm->launchMode = ncclComm::GROUP; } if (comm->launchMode == ncclComm::GROUP) { CUDACHECK(cudaStreamCreateWithFlags(&comm->groupStream, cudaStreamNonBlocking)); @@ -427,128 +457,6 @@ static ncclResult_t computeBuffSizes(struct ncclComm* comm) { return ncclSuccess; } -extern struct ncclTransport collNetTransport; - -// All ranks must participate in collNetSetup call -// type: 0 for send, 1 for recv -// return: 0 - unsupported, 1 - supported -// We do not NCCLCHECK this call because we would fall back to P2P network in case CollNet setup fails -static int collNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int rank, int nranks, int masterRank, int masterPeer, int nMasters, int type) { - int rankInCollNet = -1; - int supported = 0; - int isMaster = (rank == masterRank) ? 1 : 0; - struct { - int collNetRank; - ncclConnect connect; - } sendrecvExchange; - - // check if we can connect to collnet, whose root is the nranks-th rank - struct ncclPeerInfo *myInfo = comm->peerInfo+rank, *peerInfo = comm->peerInfo+nranks; - peerInfo->rank = nranks; - int ret = 1; - if (isMaster) { - NCCLCHECK(collNetTransport.canConnect(&ret, comm->topo, collNetGraph, myInfo, peerInfo)); - } - - // send master receives connect info from peer recv master - if (isMaster && type == 0) { - NCCLCHECK(bootstrapRecv(comm->bootstrap, masterPeer, &sendrecvExchange, sizeof(sendrecvExchange))); - rankInCollNet = sendrecvExchange.collNetRank; - INFO(NCCL_INIT, "CollNet [send] : rank %d collNetRank %d collNetNranks %d received connect from rank %d", rank, rankInCollNet, nMasters, masterPeer); - } - - // select - struct ncclPeer* root = channel->peers+nranks; - struct ncclConnector* conn = (type == 1) ? &root->recv : &root->send; - struct ncclTransportComm* transportComm = (type == 1) ? &(collNetTransport.recv) : &(collNetTransport.send); - conn->transportComm = transportComm; - // setup - struct ncclConnect myConnect; - if (isMaster && ret > 0) { - NCCLCHECK(transportComm->setup(comm, collNetGraph, myInfo, peerInfo, &myConnect, conn, channel->id)); - } - // prepare connect handles - ncclResult_t res; - struct { - int isMaster; - ncclConnect connect; - } *allConnects = NULL; - ncclConnect *masterConnects = NULL; - NCCLCHECK(ncclCalloc(&masterConnects, nMasters)); - if (type == 1) { // recv side: AllGather - // all ranks must participate - NCCLCHECK(ncclCalloc(&allConnects, nranks)); - allConnects[rank].isMaster = isMaster; - memcpy(&(allConnects[rank].connect), &myConnect, sizeof(struct ncclConnect)); - NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allConnects, sizeof(*allConnects)), res, cleanup); - // consolidate - int c = 0; - for (int r = 0; r < nranks; r++) { - if (allConnects[r].isMaster) { - memcpy(masterConnects+c, &(allConnects[r].connect), sizeof(struct ncclConnect)); - if (r == rank) rankInCollNet = c; - c++; - } - } - } else { // send side : copy in connect info received from peer recv master - if (isMaster) memcpy(masterConnects+rankInCollNet, &(sendrecvExchange.connect), sizeof(struct ncclConnect)); - } - // connect - if (isMaster && ret > 0) { - NCCLCHECKGOTO(transportComm->connect(comm, masterConnects, nMasters, rankInCollNet, conn), res, cleanup); - struct ncclPeer* devRoot = channel->devPeers+nranks; - struct ncclConnector* devConn = (type == 1) ? &devRoot->recv : &devRoot->send; - CUDACHECKGOTO(cudaMemcpy(devConn, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice), res, cleanup); - } - // recv side sends connect info to send side - if (isMaster && type == 1) { - sendrecvExchange.collNetRank = rankInCollNet; - memcpy(&sendrecvExchange.connect, masterConnects+rankInCollNet, sizeof(struct ncclConnect)); - NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, masterPeer, &sendrecvExchange, sizeof(sendrecvExchange)), res, cleanup); - INFO(NCCL_INIT, "CollNet [recv] : rank %d collNetRank %d collNetNranks %d sent connect to rank %d", rank, rankInCollNet, nMasters, masterPeer); - } - if (ret > 0) { - supported = 1; - } -cleanup: - if (allConnects != NULL) free(allConnects); - if (masterConnects != NULL) free(masterConnects); - return supported; -} - -static ncclResult_t checkCollNetSetup(struct ncclComm* comm, int rank, int collNetSetupFail) { - int nranks = comm->nRanks; - // AllGather collNet setup results - int* allGatherFailures; - NCCLCHECK(ncclCalloc(&allGatherFailures, nranks)); - allGatherFailures[rank] = collNetSetupFail; - NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGatherFailures, sizeof(int))); - for (int i=0; i<nranks; i++) { - if (allGatherFailures[i] != 0) { - collNetSetupFail = 1; - break; - } - } - free(allGatherFailures); - if (collNetSetupFail) { - if (rank == 0) WARN("Cannot initialize CollNet, using %s instead", ncclNetName()); - // Free collNet resources - for (int r=0; r<comm->nChannels; r++) { - struct ncclChannel* channel = comm->channels+r; - struct ncclPeer* peer = channel->peers+nranks; - if (peer->send.transportResources && peer->send.transportComm) NCCLCHECK(peer->send.transportComm->free(peer->send.transportResources)); - if (peer->recv.transportResources && peer->recv.transportComm) NCCLCHECK(peer->recv.transportComm->free(peer->recv.transportResources)); - peer->send.transportResources = NULL; // avoid double free - peer->recv.transportResources = NULL; // avoid double free - } - // Set support to 0 - comm->collNetSupport = 0; - } else { - comm->collNetSupport = 1; - } - return ncclSuccess; -} - NCCL_PARAM(CrossNic, "CROSS_NIC", 2); NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0); @@ -671,9 +579,17 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(ncclTopoDumpGraphs(comm->topo, 3, graphs)); } + // Determine CollNet support + if (tmpNnodes > 1 && ncclParamCollNetEnable() == 1 && collNetSupport() == 1 && collNetGraph.nChannels > 0) comm->collNetSupport = 1; + if (intraRanks > 8) { + if (comm->collNetSupport == 1) WARN("CollNet currently only supports up to 8 GPUs per node"); + comm->collNetSupport = 0; + } + // AllGather3 - begin struct ncclGraphInfo { int pattern; + int nChannels; int sameChannels; float speedIntra; float speedInter; @@ -682,8 +598,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm }; struct { - int cudaCompCap; - int nChannels; + int collNetSupport; struct ncclGraphInfo tree; struct ncclGraphInfo ring; struct ncclGraphInfo collNet; @@ -691,28 +606,31 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm } *allGather3Data; NCCLCHECK(ncclCalloc(&allGather3Data, nranks)); - allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = - std::min(treeGraph.nChannels, ringGraph.nChannels); allGather3Data[rank].tree.pattern = treeGraph.pattern; + allGather3Data[rank].tree.nChannels = treeGraph.nChannels; allGather3Data[rank].tree.sameChannels = treeGraph.sameChannels; allGather3Data[rank].tree.speedIntra = treeGraph.speedIntra; allGather3Data[rank].tree.speedInter = treeGraph.speedInter; allGather3Data[rank].tree.typeIntra = treeGraph.typeIntra; allGather3Data[rank].tree.typeInter = treeGraph.typeInter; allGather3Data[rank].ring.pattern = ringGraph.pattern; + allGather3Data[rank].ring.nChannels = ringGraph.nChannels; allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels; allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra; allGather3Data[rank].ring.speedInter = ringGraph.speedInter; allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra; allGather3Data[rank].ring.typeInter = ringGraph.typeInter; allGather3Data[rank].collNet.pattern = collNetGraph.pattern; + allGather3Data[rank].collNet.nChannels = collNetGraph.nChannels; allGather3Data[rank].collNet.sameChannels = collNetGraph.sameChannels; allGather3Data[rank].collNet.speedIntra = collNetGraph.speedIntra; allGather3Data[rank].collNet.speedInter = collNetGraph.speedInter; allGather3Data[rank].collNet.typeIntra = collNetGraph.typeIntra; allGather3Data[rank].collNet.typeInter = collNetGraph.typeInter; + allGather3Data[rank].collNetSupport = comm->collNetSupport; - NCCLCHECK(ncclTopoPreset(comm, &treeGraph, &ringGraph, &collNetGraph, &allGather3Data[rank].topoRanks)); + comm->nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); + NCCLCHECK(ncclTopoPreset(comm, &treeGraph, &ringGraph, &allGather3Data[rank].topoRanks)); NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather3Data, sizeof(*allGather3Data))); @@ -741,24 +659,28 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm for (int i=0; i<nranks; i++) { allTopoRanks[i] = &allGather3Data[i].topoRanks; // Make sure we align all ranks so that the tuning is consistent across ranks - treeGraph.nChannels = ringGraph.nChannels = comm->nChannels = std::min(allGather3Data[i].nChannels, comm->nChannels); + treeGraph.nChannels = std::min(allGather3Data[i].tree.nChannels, treeGraph.nChannels); treeGraph.sameChannels = std::min(allGather3Data[i].tree.sameChannels, treeGraph.sameChannels); treeGraph.speedIntra = std::min(allGather3Data[i].tree.speedIntra, treeGraph.speedIntra); treeGraph.speedInter = std::min(allGather3Data[i].tree.speedInter, treeGraph.speedInter); treeGraph.typeIntra = std::min(allGather3Data[i].tree.typeIntra, treeGraph.typeIntra); treeGraph.typeInter = std::min(allGather3Data[i].tree.typeInter, treeGraph.typeInter); + ringGraph.nChannels = std::min(allGather3Data[i].ring.nChannels, ringGraph.nChannels); ringGraph.sameChannels = std::min(allGather3Data[i].ring.sameChannels, ringGraph.sameChannels); ringGraph.speedIntra = std::min(allGather3Data[i].ring.speedIntra, ringGraph.speedIntra); ringGraph.speedInter = std::min(allGather3Data[i].ring.speedInter, ringGraph.speedInter); ringGraph.typeIntra = std::min(allGather3Data[i].ring.typeIntra, ringGraph.typeIntra); ringGraph.typeInter = std::min(allGather3Data[i].ring.typeInter, ringGraph.typeInter); + collNetGraph.nChannels = std::min(allGather3Data[i].collNet.nChannels, collNetGraph.nChannels); collNetGraph.sameChannels = std::min(allGather3Data[i].collNet.sameChannels, collNetGraph.sameChannels); collNetGraph.speedIntra = std::min(allGather3Data[i].collNet.speedIntra, collNetGraph.speedIntra); collNetGraph.speedInter = std::min(allGather3Data[i].collNet.speedInter, collNetGraph.speedInter); collNetGraph.typeIntra = std::min(allGather3Data[i].collNet.typeIntra, collNetGraph.typeIntra); collNetGraph.typeInter = std::min(allGather3Data[i].collNet.typeInter, collNetGraph.typeInter); + comm->collNetSupport = std::min(allGather3Data[i].collNetSupport, comm->collNetSupport); } + comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); if (comm->nChannels < nChannelsOrig) { // We started duplicating channels during Preset(), so we need to move the // duplicated channels since we have removed some. @@ -767,13 +689,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm int *rings; NCCLCHECK(ncclCalloc(&rings, nranks*MAXCHANNELS)); - - NCCLCHECK(ncclTopoPostset(comm, nodesFirstRank, nodesTreePatterns, allTopoRanks, rings)); - if (comm->nNodes > 1 && - ncclParamCollNetEnable() == 1 && - collNetSupport() && collNetGraph.nChannels) { - NCCLCHECK(ncclTopoConnectCollNet(comm, &collNetGraph, rank)); - } + NCCLCHECK(ncclTopoPostset(comm, nodesFirstRank, nodesTreePatterns, allTopoRanks, rings, &collNetGraph)); free(allTopoRanks); free(nodesTreePatterns); @@ -808,44 +724,58 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm struct ncclChannel* channel = comm->channels+c; NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore); if (comm->nRanks == 1) continue; - NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore); + NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, 1, &channel->ring.prev, 1, &channel->ring.next, 0), ret, affinity_restore); } - NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &ringGraph), ret, affinity_restore); + NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &ringGraph, 0), ret, affinity_restore); INFO(NCCL_INIT, "Connected all rings"); // Connect Trees for (int c=0; c<comm->nChannels; c++) { struct ncclChannel* channel = comm->channels+c; if (comm->nRanks == 1) continue; - NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, NCCL_MAX_TREE_ARITY, channel->tree.down, 1, &channel->tree.up), ret, affinity_restore); - NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, 1, &channel->tree.up, NCCL_MAX_TREE_ARITY, channel->tree.down), ret, affinity_restore); + NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, NCCL_MAX_TREE_ARITY, channel->tree.down, 1, &channel->tree.up, 0), ret, affinity_restore); + NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, 1, &channel->tree.up, NCCL_MAX_TREE_ARITY, channel->tree.down, 0), ret, affinity_restore); } - NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &treeGraph), ret, affinity_restore); + NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &treeGraph, 0), ret, affinity_restore); INFO(NCCL_INIT, "Connected all trees"); // Check if we can setup CollNet - if (comm->nNodes > 1 && - ncclParamCollNetEnable() == 1 && - collNetSupport() && collNetGraph.nChannels) { - int logicChannels = comm->nChannels/2; + if (comm->collNetSupport > 0) { int collNetSetupFail = 0; - const int recvIndex = 0; // recv GPU index is always 0 - const int sendIndex = collNetGraph.pattern == NCCL_TOPO_PATTERN_TREE ? 0 : 1; // send GPU index depends on topo pattern - for (int c=0; c<logicChannels; c++) { - struct ncclChannel* channelRecv = comm->channels+logicChannels+c; - struct ncclChannel* channelSend = comm->channels+c; - NCCLCHECK(ncclTransportP2pConnect(comm, channelRecv, 1, &channelRecv->collTree.up, 1, channelRecv->collTree.down)); - NCCLCHECK(ncclTransportP2pConnect(comm, channelSend, 1, channelSend->collTree.down, 1, &channelSend->collTree.up)); - const int recvMaster = collNetGraph.intra[c*comm->localRanks+recvIndex]; - const int sendMaster = collNetGraph.intra[c*comm->localRanks+sendIndex]; - if (collNetSetup(comm, &collNetGraph, channelRecv, rank, nranks, recvMaster, sendMaster, comm->nNodes, 1) != 1) - collNetSetupFail = 1; - else if (collNetSetup(comm, &collNetGraph, channelSend, rank, nranks, sendMaster, recvMaster, comm->nNodes, 0) != 1) - collNetSetupFail = 1; + // Find all head ranks + int nHeads = collNetGraph.nChannels; + int *heads; + NCCLCHECK(ncclCalloc(&heads, nHeads)); + // Head GPU index is always 0 + for (int c=0; c<nHeads; c++) { + heads[c] = collNetGraph.intra[c*comm->localRanks+0]; + } + for (int c=0; c<comm->nChannels; c++) { + struct ncclChannel* channel = comm->channels+c; + for (int h=0; h<nHeads; h++) { + const int head = heads[h]; + if (ncclTransportCollNetSetup(comm, &collNetGraph, channel, head, head, h, collNetRecv) != 1) + collNetSetupFail = 1; + else if (ncclTransportCollNetSetup(comm, &collNetGraph, channel, head, head, h, collNetSend) != 1) + collNetSetupFail = 1; + } } - NCCLCHECK(ncclTransportP2pSetup(comm, &collNetGraph)); // Verify CollNet setup across ranks - NCCLCHECK(checkCollNetSetup(comm, rank, collNetSetupFail)); + NCCLCHECK(ncclTransportCollNetCheck(comm, collNetSetupFail)); + if (comm->collNetSupport) { + TRACE(NCCL_INIT, "rank %d Connected inter-node CollNet", rank); + for (int c=0; c<comm->nChannels; c++) { + struct ncclChannel* channelRecv = comm->channels+c; + NCCLCHECK(ncclTransportP2pConnect(comm, channelRecv, NCCL_MAX_DIRECT_ARITY, channelRecv->collTree.up, NCCL_MAX_DIRECT_ARITY, channelRecv->collTree.down, 0)); + } + NCCLCHECK(ncclTransportP2pSetup(comm, &collNetGraph, 0)); + for (int c=0; c<comm->nChannels; c++) { + struct ncclChannel* channelSend = comm->channels+c; + NCCLCHECK(ncclTransportP2pConnect(comm, channelSend, NCCL_MAX_DIRECT_ARITY, channelSend->collTree.down, NCCL_MAX_DIRECT_ARITY, channelSend->collTree.up, 1)); + } + NCCLCHECK(ncclTransportP2pSetup(comm, &collNetGraph, 1)); + INFO(NCCL_INIT, "rank %d Connected CollNet", rank); + } } TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels); free(rings); @@ -870,10 +800,18 @@ affinity_restore: return ncclSuccess; } +NCCL_PARAM(SetStackSize, "SET_STACK_SIZE", 0); + ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) { ncclResult_t res; CUDACHECK(cudaSetDevice(cudaDev)); + // Set the maximum kernel stack size of all kernels to avoid + // a CUDA memory reconfig on load (c.f. NVSHMEM issue) + if (maxLocalSizeBytes > 0 && ncclParamSetStackSize() == 1) { + TRACE(NCCL_INIT, "Setting cudaLimitStackSize to %zi", maxLocalSizeBytes); + CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, maxLocalSizeBytes)); + } NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup); NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup); NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup); @@ -913,6 +851,7 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUni } else { NCCLCHECKGOTO(ncclCommInitRankSync(newcomm, nranks, commId, myrank, cudaDev), res, end); } + end: if (ncclAsyncMode()) return ncclAsyncErrCheck(res); else return res; diff --git a/src/misc/gdrwrap.cc b/src/misc/gdrwrap.cc new file mode 100644 index 0000000..ed0c697 --- /dev/null +++ b/src/misc/gdrwrap.cc @@ -0,0 +1,246 @@ +/************************************************************************* + * Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "gdrwrap.h" + +#ifndef GDR_DIRECT +#include "core.h" + +static enum { gdrUninitialized, gdrInitializing, gdrInitialized, gdrError } gdrState = gdrUninitialized; + +/* Function pointers assigned from dlopen() */ +static gdr_t (*gdr_internal_open)(void); +static int (*gdr_internal_close)(gdr_t g); +static int (*gdr_internal_pin_buffer)(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle); +static int (*gdr_internal_unpin_buffer)(gdr_t g, gdr_mh_t handle); +static int (*gdr_internal_get_info)(gdr_t g, gdr_mh_t handle, gdr_info_t *info); +static int (*gdr_internal_map)(gdr_t g, gdr_mh_t handle, void **va, size_t size); +static int (*gdr_internal_unmap)(gdr_t g, gdr_mh_t handle, void *va, size_t size); +static void (*gdr_internal_runtime_get_version)(int *major, int *minor); +static void (*gdr_internal_driver_get_version)(gdr_t g, int *major, int *minor); +static int (*gdr_internal_copy_to_mapping)(gdr_mh_t handle, void *map_d_ptr, const void *h_ptr, size_t size); +static int (*gdr_internal_copy_from_mapping)(gdr_mh_t handle, void *h_ptr, const void *map_d_ptr, size_t size); + + +// Used to make the GDR library calls thread safe +pthread_mutex_t gdrLock = PTHREAD_MUTEX_INITIALIZER; + +#define GDRAPI_LIBNAME "libgdrapi.so" + +#define LOAD_SYM(handle, symbol, funcptr) do { \ + cast = (void**)&funcptr; \ + tmp = dlsym(handle, symbol); \ + if (tmp == NULL) { \ + WARN("dlsym failed on %s - %s", symbol, dlerror());\ + goto teardown; \ + } \ + *cast = tmp; \ + } while (0) + +#define LOAD_SYM_OPTIONAL(handle, symbol, funcptr) do {\ + cast = (void**)&funcptr; \ + tmp = dlsym(handle, symbol); \ + if (tmp == NULL) { \ + INFO(NCCL_INIT,"dlsym failed on %s, ignoring", symbol); \ + } \ + *cast = tmp; \ + } while (0) + +ncclResult_t wrap_gdr_symbols(void) { + if (gdrState == gdrInitialized) + return ncclSuccess; + if (gdrState == gdrError) + return ncclSystemError; + + if (__sync_bool_compare_and_swap(&gdrState, gdrUninitialized, gdrInitializing) == false) { + // Another thread raced in front of us. Wait for it to be done. + while (gdrState == gdrInitializing) pthread_yield(); + return (gdrState == gdrInitialized) ? ncclSuccess : ncclSystemError; + } + + static void* gdrhandle = NULL; + void* tmp; + void** cast; + + gdrhandle=dlopen(GDRAPI_LIBNAME, RTLD_NOW); + if (!gdrhandle) { + WARN("Failed to open %s", GDRAPI_LIBNAME); + goto teardown; + } + + /* Load the function pointers from the DL library image */ + LOAD_SYM(gdrhandle, "gdr_open", gdr_internal_open); + LOAD_SYM(gdrhandle, "gdr_close", gdr_internal_close); + LOAD_SYM(gdrhandle, "gdr_pin_buffer", gdr_internal_pin_buffer); + LOAD_SYM(gdrhandle, "gdr_unpin_buffer", gdr_internal_unpin_buffer); + LOAD_SYM(gdrhandle, "gdr_get_info", gdr_internal_get_info); + LOAD_SYM(gdrhandle, "gdr_map", gdr_internal_map); + LOAD_SYM(gdrhandle, "gdr_unmap", gdr_internal_unmap); + LOAD_SYM(gdrhandle, "gdr_runtime_get_version", gdr_internal_runtime_get_version); + LOAD_SYM(gdrhandle, "gdr_driver_get_version", gdr_internal_driver_get_version); + LOAD_SYM(gdrhandle, "gdr_copy_to_mapping", gdr_internal_copy_to_mapping); + LOAD_SYM(gdrhandle, "gdr_copy_from_mapping", gdr_internal_copy_from_mapping); + + gdrState = gdrInitialized; + return ncclSuccess; + +teardown: + gdr_internal_open = NULL; + gdr_internal_close = NULL; + gdr_internal_pin_buffer = NULL; + gdr_internal_unpin_buffer = NULL; + gdr_internal_get_info = NULL; + gdr_internal_map = NULL; + gdr_internal_unmap = NULL; + gdr_internal_runtime_get_version = NULL; + gdr_internal_driver_get_version = NULL; + gdr_internal_copy_to_mapping = NULL; + gdr_internal_copy_from_mapping = NULL; + + if (gdrhandle != NULL) dlclose(gdrhandle); + gdrState = gdrError; + return ncclSystemError; +} + + +gdr_t wrap_gdr_open(void) { + if (gdr_internal_open == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return NULL; + } + return gdr_internal_open(); +} + +ncclResult_t wrap_gdr_close(gdr_t g) { + if (gdr_internal_close == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret = gdr_internal_close(g); + if (ret != 0) { + WARN("gdr_close() failed: %d", ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle) { + if (gdr_internal_pin_buffer == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_pin_buffer(g, addr, size, p2p_token, va_space, handle), ret); + if (ret != 0) { + WARN("gdr_pin_buffer(addr %lx, size %zi) failed: %d", addr, size, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle) { + if (gdr_internal_unpin_buffer == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_unpin_buffer(g, handle), ret); + if (ret != 0) { + WARN("gdr_unpin_buffer(handle %lx) failed: %d", handle.h, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info) { + if (gdr_internal_get_info == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_get_info(g, handle, info), ret); + if (ret != 0) { + WARN("gdr_get_info(handle %lx) failed: %d", handle.h, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size) { + if (gdr_internal_map == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_map(g, handle, va, size), ret); + if (ret != 0) { + WARN("gdr_map(handle %lx, size %zi) failed: %d", handle.h, size, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size) { + if (gdr_internal_unmap == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_unmap(g, handle, va, size), ret); + if (ret != 0) { + WARN("gdr_unmap(handle %lx, va %p, size %zi) failed: %d", handle.h, va, size, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_runtime_get_version(int *major, int *minor) { + if (gdr_internal_runtime_get_version == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + gdr_internal_runtime_get_version(major, minor); + return ncclSuccess; +} + +ncclResult_t wrap_gdr_driver_get_version(gdr_t g, int *major, int *minor) { + if (gdr_internal_driver_get_version == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + gdr_internal_driver_get_version(g, major, minor); + return ncclSuccess; +} + +ncclResult_t wrap_gdr_copy_to_mapping(gdr_mh_t handle, void *map_d_ptr, const void *h_ptr, size_t size) { + if (gdr_internal_copy_to_mapping == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_copy_to_mapping(handle, map_d_ptr, h_ptr, size), ret); + if (ret != 0) { + WARN("gdr_copy_to_mapping(handle %lx, map_d_ptr %p, h_ptr %p, size %zi) failed: %d", handle.h, map_d_ptr, h_ptr, size, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t wrap_gdr_copy_from_mapping(gdr_mh_t handle, void *h_ptr, const void *map_d_ptr, size_t size) { + if (gdr_internal_copy_from_mapping == NULL) { + WARN("GDRCOPY lib wrapper not initialized."); + return ncclInternalError; + } + int ret; + GDRLOCKCALL(gdr_internal_copy_from_mapping(handle, h_ptr, map_d_ptr, size), ret); + if (ret != 0) { + WARN("gdr_copy_from_mapping(handle %lx, h_ptr %p, map_d_ptr %p, size %zi) failed: %d", handle.h, h_ptr, map_d_ptr, size, ret); + return ncclSystemError; + } + return ncclSuccess; +} + +#endif /* !GDR_DIRECT */ diff --git a/src/nccl.h.in b/src/nccl.h.in index b4f34ef..6cb046a 100644 --- a/src/nccl.h.in +++ b/src/nccl.h.in @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -16,7 +16,7 @@ #define NCCL_SUFFIX "${nccl:Suffix}" #define NCCL_VERSION_CODE ${nccl:Version} -#define NCCL_VERSION(X,Y,Z) ((X) * 1000 + (Y) * 100 + (Z)) +#define NCCL_VERSION(X,Y,Z) (((X) >= 2 && (Y) >= 9) ? (X) * 10000 + (Y) * 100 + (Z) : (X) * 1000 + (Y) * 100 + (Z)) #ifdef __cplusplus extern "C" { diff --git a/src/proxy.cc b/src/proxy.cc index 503781e..ed503ac 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1,12 +1,11 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ #include "comm.h" #include "info.h" -#include "graph.h" #include "collectives.h" enum { proxyRecv=0, proxySend=1 }; @@ -34,26 +33,32 @@ struct ncclProxyPool { static ncclResult_t allocateArgs(struct ncclComm* comm, struct ncclProxyArgs** argsptr) { struct ncclProxyState* state = &comm->proxyState; struct ncclProxyArgs* elem; - pthread_mutex_lock(&state->poolMutex); if (state->pool == NULL) { - // Allocate a new pool of elements - struct ncclProxyPool* newPool; - NCCLCHECK(ncclCalloc(&newPool, 1)); - struct ncclProxyArgs* newElems = newPool->elems; - // Chain newly allocated elements - for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) { - if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1; + // Check whether there are freed elements + if (state->poolReturned) { + pthread_mutex_lock(&state->poolMutex); + state->pool = state->poolReturned; + state->poolReturned = NULL; + pthread_mutex_unlock(&state->poolMutex); + } else { + // Allocate a new pool of elements + struct ncclProxyPool* newPool; + NCCLCHECK(ncclCalloc(&newPool, 1)); + struct ncclProxyArgs* newElems = newPool->elems; + // Chain newly allocated elements + for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) { + if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1; + } + // Add them all to the pool list + state->pool = newElems; + // Save the pool memory block for later resource release + newPool->next = state->pools; + state->pools = newPool; } - // Add them all to the pool list - state->pool = newElems; - // Save the pool memory block for later resource release - newPool->next = state->pools; - state->pools = newPool; } elem = state->pool; state->pool = state->pool->next; - pthread_mutex_unlock(&state->poolMutex); - elem->next = elem->nextPeer = elem->nextGroup = NULL; + elem->next = elem->nextPeer = NULL; *argsptr = elem; return ncclSuccess; } @@ -75,23 +80,18 @@ ncclResult_t dumpProxyState(struct ncclProxyState* state) { WARN("Active list loop at element %ld", OP_INDEX(op)); } op->idle |= OP_SEEN; - printf("[%ld]", OP_INDEX(op)); + printf("[%ld(%ld/%d)]", OP_INDEX(op), op->opCount, op->nsubs); if (op->nextPeer) { printf("(%ld)", OP_INDEX(op->nextPeer)); struct ncclProxyArgs* n = op->nextPeer; n->idle |= OP_SEEN; - while (n->nextGroup || n->nextPeer) { - n = n->nextGroup ? n->nextGroup : n->nextPeer; + while (n->nextPeer) { + n = n->nextPeer; n->idle |= OP_SEEN; } } - if (op->nextGroup) { - printf("--G->"); - op = op->nextGroup; - } else { - printf("--N->"); - op = op->next; - } + printf("->"); + op = op->next; } printf("[X]\n"); @@ -128,44 +128,62 @@ ncclResult_t dumpProxyState(struct ncclProxyState* state) { return ncclSuccess; } -static ncclResult_t ProxyAppend(struct ncclProxyState* state, struct ncclProxyArgs* args, int shared) { +static ncclResult_t ProxyAppend(struct ncclProxyState* state, struct ncclProxyArgs* args) { struct ncclProxyArgs* proxyAppend = *args->proxyAppendPtr; + int shared = args->subs[0].connector->conn.shared; if (proxyAppend) { if (shared && proxyAppend->opCount == args->opCount) { + if ((proxyAppend->sliceSteps != args->sliceSteps) || + (proxyAppend->chunkSteps != args->chunkSteps) || + (proxyAppend->protocol != args->protocol) || + (proxyAppend->dtype != args->dtype) || + (proxyAppend->redOp != args->redOp)) { + WARN("Proxy append mismatch"); + return ncclInternalError; + } + if (proxyAppend->nsubs >= NCCL_PROXY_MAX_SUBS) { + WARN("Proxy append out of bound"); + return ncclInternalError; + } + memcpy(proxyAppend->subs+proxyAppend->nsubs, args->subs, sizeof(struct ncclProxySubArgs)); + proxyAppend->nsubs++; args->next = proxyAppend->next; - proxyAppend->next = NULL; - proxyAppend->nextGroup = args; - DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as group, prevGroup %5ld, next %5ld : \n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend), OP_INDEX(args->next)); + // Free args as we merged them + args->next = state->poolFreed; + state->poolFreed = args; + DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as group with %5ld\n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend)); } else { proxyAppend->nextPeer = args; - DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld : \n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend)); + DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld\n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend)); + *(args->proxyAppendPtr) = args; } } else { // Nothing running for that peer. Add to the list if (state->ops == NULL) { // Create the list - DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element : \n", OP_INDEX(args), shared, args->opCount); + DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element\n", OP_INDEX(args), shared, args->opCount); state->ops = args; } else { // Append element at the end of the list struct ncclProxyArgs* last = state->ops; - while (last->nextGroup || last->next) last = last->nextGroup ? last->nextGroup : last->next; + while (last->next) last = last->next; last->next = args; - DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element : \n", OP_INDEX(args),shared, args->opCount); + DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element\n", OP_INDEX(args),shared, args->opCount); } + *(args->proxyAppendPtr) = args; } - *(args->proxyAppendPtr) = args; return ncclSuccess; } -static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) { +static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args, int connIndex) { if (peer < 0) return ncclSuccess; - struct ncclPeer* peerComm = args->channel->peers+peer; - struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send; + struct ncclChannel* channel = args->subs[0].channel; + struct ncclPeer* peerComm = channel->peers+peer; + struct ncclConnector* connector = type == proxyRecv ? peerComm->recv+connIndex : peerComm->send+connIndex; if (connector->transportComm == NULL) { - WARN("[%d] Error no transport for %s peer %d on channel %d", connector->comm->rank, - type == proxyRecv ? "recv" : "send", peer, args->channel->id); + WARN("Rank %d has no transport for %s peer %d on channel %d", connector->comm->rank, + type == proxyRecv ? "recv" : "send", peer, channel->id); return ncclInternalError; } if (connector->transportComm->proxy == NULL) return ncclSuccess; @@ -174,14 +192,10 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) { struct ncclProxyArgs* op; NCCLCHECK(allocateArgs(connector->comm, &op)); memcpy(op, args, sizeof(struct ncclProxyArgs)); - op->connector = connector; + op->subs[0].connector = connector; op->progress = connector->transportComm->proxy; op->state = ncclProxyOpReady; - - op->proxyAppendPtr = - connector->conn.shared ? - state->sharedBuffs->proxyAppend+2*args->channel->id+type : // Shared buffers - &connector->proxyAppend; // Dedicated buffers + op->proxyAppendPtr = connector->proxyAppendPtr; if (state->nextOps == NULL) state->nextOps = op; else state->nextOpsEnd->next = op; @@ -189,120 +203,131 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) { return ncclSuccess; } -ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks) { +ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int nranks) { + struct ncclChannel* channel = args->subs[0].channel; + int pattern = args->pattern; if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) { - struct ncclRing* ring = &args->channel->ring; - if (NeedProxy(proxyRecv, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy(proxyRecv, ring->prev, args)); - if (NeedProxy(proxySend, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy(proxySend, ring->next, args)); + struct ncclRing* ring = &channel->ring; + if (NeedProxy(proxyRecv, pattern, args->root, ring, nranks)) NCCLCHECK(SaveProxy(proxyRecv, ring->prev, args, 0)); + if (NeedProxy(proxySend, pattern, args->root, ring, nranks)) NCCLCHECK(SaveProxy(proxySend, ring->next, args, 0)); } if (pattern == ncclPatternTreeUp || pattern == ncclPatternTreeUpDown) { // Tree up - struct ncclTree* tree = &args->channel->tree; - for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxyRecv, tree->down[i], args)); - NCCLCHECK(SaveProxy(proxySend, tree->up, args)); + struct ncclTree* tree = &channel->tree; + for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxyRecv, tree->down[i], args, 0)); + NCCLCHECK(SaveProxy(proxySend, tree->up, args, 0)); } if (pattern == ncclPatternTreeDown || pattern == ncclPatternTreeUpDown) { // Tree down - struct ncclTree* tree = &args->channel->tree; - for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxySend, tree->down[i], args)); - NCCLCHECK(SaveProxy(proxyRecv, tree->up, args)); + struct ncclTree* tree = &channel->tree; + for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxySend, tree->down[i], args, 0)); + NCCLCHECK(SaveProxy(proxyRecv, tree->up, args, 0)); } - if (pattern == ncclPatternCollTreeUp) { + if (pattern == ncclPatternCollTreeUpDown) { // CollTree up - struct ncclTree* tree = &args->channel->collTree; - NCCLCHECK(SaveProxy(proxyRecv, tree->down[0], args)); - NCCLCHECK(SaveProxy(proxySend, tree->up, args)); - } - if (pattern == ncclPatternCollTreeDown) { + NCCLCHECK(SaveProxy(proxySend, channel->collTree.out, args, 1)); // For CollTree up, we are using push // CollTree down - struct ncclTree* tree = &args->channel->collTree; - NCCLCHECK(SaveProxy(proxySend, tree->down[0], args)); - NCCLCHECK(SaveProxy(proxyRecv, tree->up, args)); + NCCLCHECK(SaveProxy(proxyRecv, channel->collTree.out, args, 0)); } return ncclSuccess; } -ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel, int segment) { - struct ncclProxyArgs args; - memset(&args, 0, sizeof(struct ncclProxyArgs)); - args.channel = channel; - args.sliceSteps = 1; - args.chunkSteps = 1; - args.protocol = NCCL_PROTO_SIMPLE; - args.segment = segment; - args.opCount = channel->workFifoTail-1; - args.dtype = info->datatype; +ncclResult_t ncclProxyComputeP2p(struct ncclInfo* info, struct ncclProxyArgs* args) { + memset(args, 0, sizeof(struct ncclProxyArgs)); + int channelId = info->channelId; + args->nsubs = 1; + struct ncclProxySubArgs* sub = args->subs; + + struct ncclChannel* channel = info->comm->channels+channelId; + sub->channel = channel; + args->sliceSteps = 1; + args->chunkSteps = 1; + args->protocol = NCCL_PROTO_SIMPLE; + args->dtype = info->datatype; + sub->delta = info->delta; + sub->recvbytes = info->recvbytes; + sub->sendbytes = info->sendbytes; + + int stepSize = info->comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/SENDRECV_SLICEFACTOR; + info->recvChunkSize = stepSize; + info->sendChunkSize = stepSize; + if (info->delta > 0 && info->recvbytes >= 0) { int peerrecv = (info->comm->nRanks+info->comm->rank-info->delta)%info->comm->nRanks; - args.nsteps = DIVUP(info->recvbytes, info->comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/SENDRECV_SLICEFACTOR); - if (args.nsteps == 0) args.nsteps = 1; - args.recvbytes = info->recvbytes; - args.sendbytes = 0; - NCCLCHECK(SaveProxy(proxyRecv, peerrecv, &args)); + if (channel->peers[peerrecv].recv[0].transportComm && channel->peers[peerrecv].recv[0].transportComm->proxy) { + // Tune chunk size for the network + if (info->recvbytes < stepSize) info->recvChunkSize /= 4; + else if (info->recvbytes < 8*stepSize) info->recvChunkSize /= 2; + } + sub->recvChunkSize = info->recvChunkSize; } if (info->delta > 0 && info->sendbytes >= 0) { int peersend = (info->comm->rank+info->delta)%info->comm->nRanks; - args.nsteps = DIVUP(info->sendbytes, info->comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/SENDRECV_SLICEFACTOR); - if (args.nsteps == 0) args.nsteps = 1; - args.sendbytes = info->sendbytes; - args.recvbytes = 0; - NCCLCHECK(SaveProxy(proxySend, peersend, &args)); + if (channel->peers[peersend].send[0].transportComm && channel->peers[peersend].send[0].transportComm->proxy) { + // Tune chunk size for the network + if (info->sendbytes < stepSize) info->sendChunkSize /= 4; + else if (info->sendbytes < 8*stepSize) info->sendChunkSize /= 2; + } + sub->sendChunkSize = info->sendChunkSize; } return ncclSuccess; } -static ncclResult_t removeOp(struct ncclProxyState* state, struct ncclProxyArgs** opPtr, struct ncclProxyArgs** prevOpPtr, struct ncclProxyArgs** prevGroupPtr) { +ncclResult_t ncclProxySaveP2p(struct ncclComm* comm, struct ncclProxyArgs* args) { + struct ncclProxySubArgs* sub = args->subs; + struct ncclChannel* channel = sub->channel; + args->opCount = channel->workFifoTail-1; + args->commOpCount = comm->opCount; + const ssize_t recvbytesOrig = sub->recvbytes; + const ssize_t sendbytesOrig = sub->sendbytes; + if (sub->delta > 0 && recvbytesOrig >= ssize_t(0)) { + int peerrecv = (comm->nRanks+comm->rank-sub->delta)%comm->nRanks; + sub->recvbytes = recvbytesOrig; + sub->sendbytes = 0; + sub->nsteps = DIVUP(sub->recvbytes, sub->recvChunkSize); + if (sub->nsteps == 0) sub->nsteps = 1; + NCCLCHECK(SaveProxy(proxyRecv, peerrecv, args, 0)); + } + if (sub->delta > 0 && sendbytesOrig >= ssize_t(0)) { + int peersend = (comm->rank+sub->delta)%comm->nRanks; + sub->sendbytes = sendbytesOrig; + sub->recvbytes = 0; + sub->nsteps = DIVUP(sub->sendbytes, sub->sendChunkSize); + if (sub->nsteps == 0) sub->nsteps = 1; + NCCLCHECK(SaveProxy(proxySend, peersend, args, 0)); + } + // Reset proxy args for potentially multiple cuda graph launches + // It is safe as long as SaveProxy copies contents of args to op + sub->recvbytes = recvbytesOrig; + sub->sendbytes = sendbytesOrig; + return ncclSuccess; +} + +static ncclResult_t removeOp(struct ncclProxyState* state, struct ncclProxyArgs** opPtr, struct ncclProxyArgs** prevOpPtr) { struct ncclProxyArgs* freeOp = *opPtr; - DEBUG_PROXY_PRINT("Remove %ld/%ld -> %ld -> %ld/%ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(*prevGroupPtr), OP_INDEX(freeOp), OP_INDEX(freeOp->next), OP_INDEX(freeOp->nextGroup)); - if (*prevGroupPtr && *prevOpPtr) return ncclInternalError; - if (freeOp->nextGroup) { - // Part of a group : remove the element - struct ncclProxyArgs* next = freeOp->nextGroup; - *opPtr = next; - if (*prevGroupPtr) { - (*prevGroupPtr)->nextGroup = next; - } else if (*prevOpPtr) { - (*prevOpPtr)->next = next; + DEBUG_PROXY_PRINT("Remove %ld -> %ld -> %ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(freeOp), OP_INDEX(freeOp->next)); + struct ncclProxyArgs* next = freeOp->next; + *opPtr = next; + if (freeOp->nextPeer) { + // replace op by nextPeer + struct ncclProxyArgs* nextPeer = freeOp->nextPeer; + if (*prevOpPtr) { + (*prevOpPtr)->next = nextPeer; } else { - state->ops = next; + state->ops = nextPeer; } + nextPeer->next = next; + *(prevOpPtr) = nextPeer; } else { - struct ncclProxyArgs* next = freeOp->next; - *opPtr = next; - if ((*prevGroupPtr)) { - (*prevGroupPtr)->next = next; - (*prevGroupPtr)->nextGroup = NULL; - (*prevGroupPtr)->nextPeer = freeOp->nextPeer; - if (*(freeOp->proxyAppendPtr) == freeOp) *(freeOp->proxyAppendPtr) = *prevGroupPtr; - (*prevOpPtr) = *prevGroupPtr; - (*prevGroupPtr) = NULL; + *(freeOp->proxyAppendPtr) = NULL; + if (*prevOpPtr) { + (*prevOpPtr)->next = next; } else { - if (freeOp->nextPeer) { - // replace op by nextPeer - struct ncclProxyArgs* nextPeer = freeOp->nextPeer; - if (*prevOpPtr) { - (*prevOpPtr)->next = nextPeer; - } else { - state->ops = nextPeer; - } - struct ncclProxyArgs* lastGroup = nextPeer; - while (lastGroup->nextGroup) lastGroup = lastGroup->nextGroup; - lastGroup->next = next; - *(prevOpPtr) = lastGroup; - } else { - *(freeOp->proxyAppendPtr) = NULL; - if (*prevOpPtr) { - (*prevOpPtr)->next = next; - } else { - state->ops = next; - } - } + state->ops = next; } } - pthread_mutex_lock(&state->poolMutex); - freeOp->next = state->pool; - state->pool = freeOp; - pthread_mutex_unlock(&state->poolMutex); + freeOp->next = state->poolFreed; + state->poolFreed = freeOp; DEBUG_PROXY_PRINT("Removed %5ld (%5ld) : ", OP_INDEX(freeOp), OP_INDEX(*freeOp->proxyAppendPtr)); NCCLCHECK(dumpProxyState(state)); return ncclSuccess; @@ -310,33 +335,81 @@ static ncclResult_t removeOp(struct ncclProxyState* state, struct ncclProxyArgs* static ncclResult_t progressOps(struct ncclProxyState* state, struct ncclProxyArgs** opsPtr, int* idle, struct ncclComm* comm) { struct ncclProxyArgs* prevOp = NULL; - struct ncclProxyArgs* prevGroup = NULL; struct ncclProxyArgs* op = *opsPtr; while (op) { if (op->state == ncclProxyOpNone) return ncclInternalError; - // opCount >= lastOpCount are part of an ongoing GroupStart/GroupEnd that hasn't started - // yet and might be cancelled before they even start. Hold on on those. - if (op->opCount < comm->lastOpCount) { - NCCLCHECK(op->progress(op)); - *idle &= op->idle; - } + NCCLCHECK(op->progress(op)); + *idle &= op->idle; if (op->state == ncclProxyOpNone) { - NCCLCHECK(removeOp(state, &op, &prevOp, &prevGroup)); + NCCLCHECK(removeOp(state, &op, &prevOp)); } else { - if (op->nextGroup) { - prevGroup = op; - prevOp = NULL; - op = op->nextGroup; - } else { - prevOp = op; - prevGroup = NULL; - op = op->next; - } + prevOp = op; + op = op->next; } } return ncclSuccess; } +ncclResult_t ncclProxyAppendPosted(struct ncclProxyState* state) { + // Return any freed element first + if (state->poolFreed) { + struct ncclProxyArgs* end = state->poolFreed; + while (end->next) end = end->next; + pthread_mutex_lock(&state->poolMutex); + end->next = state->poolReturned; + state->poolReturned = state->poolFreed; + pthread_mutex_unlock(&state->poolMutex); + state->poolFreed = NULL; + } + + // Then wait until we have new work to do + pthread_mutex_lock(&state->opsMutex); + while (state->postedOps == NULL) { + if (state->stop) return ncclSuccess; + pthread_cond_wait(&state->cond, &state->opsMutex); + } + + // Sort operations as we append them : collectives and + // receives first, then sends. + + struct ncclProxyArgs* next, *prev = NULL, *op = state->postedOps; + int commOpCount = op->commOpCount; + while (op && op->commOpCount == commOpCount) { + next = op->next; + if (op->subs[0].sendbytes) { + if (prev) prev->next = next; + else state->postedOps = next; + op->next = NULL; + NCCLCHECK(ProxyAppend(state, op)); + } else prev = op; + op = next; + } + op = state->postedOps; + while (op && op->commOpCount == commOpCount) { + next = op->next; + op->next = NULL; + NCCLCHECK(ProxyAppend(state, op)); + op = next; + } + state->postedOps = op; + if (op == NULL) state->postedOpsEnd = NULL; + NCCLCHECK(dumpProxyState(state)); + pthread_mutex_unlock(&state->opsMutex); + + if (state->poolFreed) { + struct ncclProxyArgs* end = state->poolFreed; + while (end->next) end = end->next; + pthread_mutex_lock(&state->poolMutex); + end->next = state->poolReturned; + state->poolReturned = state->poolFreed; + pthread_mutex_unlock(&state->poolMutex); + state->poolFreed = NULL; + } + + return ncclSuccess; +} + + void* persistentThread(void *comm_) { struct ncclComm* comm = (struct ncclComm*)comm_; struct ncclProxyState* state = &comm->proxyState; @@ -344,158 +417,95 @@ void* persistentThread(void *comm_) { sprintf(threadName, "NCCLproxy %5d", comm->rank); nvtxNameOsThreadA(syscall(SYS_gettid), threadName); - pthread_mutex_lock(&state->opsMutex); struct ncclProxyArgs** opsPtr = &state->ops; while (1) { if (*comm->abortFlag) { - pthread_mutex_unlock(&state->opsMutex); return NULL; } while (*opsPtr == NULL) { if (state->stop) { // No more commands to process and proxy has been requested to stop - pthread_mutex_unlock(&state->opsMutex); return NULL; } - pthread_cond_wait(&state->cond, &state->opsMutex); + ncclResult_t ret = ncclProxyAppendPosted(state); + if (ret != ncclSuccess) { + comm->fatalError = ret; + INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); + return NULL; + } } int idle = 1; ncclResult_t ret = progressOps(state, opsPtr, &idle, comm); if (ret != ncclSuccess) { comm->fatalError = ret; INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); - pthread_mutex_unlock(&state->opsMutex); return NULL; } if (idle) { - pthread_mutex_unlock(&state->opsMutex); sched_yield(); // No request progressed. Let others run. - pthread_mutex_lock(&state->opsMutex); } } } ncclResult_t ncclProxyStart(struct ncclComm* comm) { struct ncclProxyState* state = &comm->proxyState; + if (state->nextOps == NULL) return ncclSuccess; pthread_mutex_lock(&state->opsMutex); - - // Sort operations as we append them : collectives and - // receives first, then sends. - ncclProxyArgs* next, *prev = NULL, *op = state->nextOps; - while (op) { - next = op->next; - if (op->sendbytes) { - if (prev) prev->next = next; - else state->nextOps = next; - op->next = NULL; - NCCLCHECK(ProxyAppend(state, op, op->connector->conn.shared)); - } else prev = op; - op = next; - } - op = state->nextOps; - while (op) { - next = op->next; - op->next = NULL; - NCCLCHECK(ProxyAppend(state, op, op->connector->conn.shared)); - op = next; - } + if (state->postedOps) state->postedOpsEnd->next = state->nextOps; + else state->postedOps = state->nextOps; + state->postedOpsEnd = state->nextOpsEnd; state->nextOps = state->nextOpsEnd = NULL; - NCCLCHECK(dumpProxyState(state)); - - if (state->ops != NULL) - pthread_cond_signal(&state->cond); + pthread_cond_signal(&state->cond); pthread_mutex_unlock(&state->opsMutex); + comm->opCount++; return ncclSuccess; } -NCCL_PARAM(ProxySharedBuffersCount, "SHARED_BUFF_COUNT", -2); - ncclResult_t ncclProxySharedBuffersInit(struct ncclComm* comm, int cuda, int* size, char** ptr) { - struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs; - if (state == NULL) { - NCCLCHECK(ncclCalloc(&state, 1)); - comm->proxyState.sharedBuffs = state; - state->nslots = ncclParamProxySharedBuffersCount(); - if (state->nslots == -2) { - state->nslots = NCCL_STEPS*NCCL_MAX_WORK_ELEMENTS; - } - state->slotSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/(NCCL_STEPS*SENDRECV_SLICEFACTOR); + struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs; + if (state->size == 0) { + int p2pnChannels = 1; + while (p2pnChannels < comm->nChannels) p2pnChannels *= 2; + int p2pSize = 2*p2pnChannels*NCCL_MAX_WORK_ELEMENTS*comm->buffSizes[NCCL_PROTO_SIMPLE]/SENDRECV_SLICEFACTOR; + int collNetSize = 2*comm->nChannels*comm->buffSizes[NCCL_PROTO_SIMPLE]; + state->size = std::max(p2pSize, collNetSize); } - char* buff; - int* used; - *size = 2*comm->p2pnChannels*state->slotSize*state->nslots; + *size = state->size; - if (cuda && state->cudaBuff[0] == NULL) { - NCCLCHECK(ncclCudaCalloc(&buff, *size)); - NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots)); - for (int i=0; i<2*comm->p2pnChannels; i++) { - state->cudaBuff[i] = buff + state->nslots*state->slotSize*i; - state->cudaUsed[i] = used + state->nslots*i; - } - } else if (state->hostBuff[0] == NULL) { - NCCLCHECK(ncclCudaHostCalloc(&buff, *size)); - NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots)); - for (int i=0; i<2*comm->p2pnChannels; i++) { - state->hostBuff[i] = buff + state->nslots*state->slotSize*i; - state->hostUsed[i] = used + state->nslots*i; - } + if (cuda && state->cudaBuff == NULL) { + NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, *size)); + } else if (state->hostBuff == NULL) { + NCCLCHECK(ncclCudaHostCalloc(&state->hostBuff, *size)); } - buff = cuda ? state->cudaBuff[0] : state->hostBuff[0]; - - *ptr = buff; + *ptr = cuda ? state->cudaBuff : state->hostBuff; return ncclSuccess; } -ncclResult_t ncclProxySharedBuffersAlloc(struct ncclComm* comm, int cuda, int type, int channel, int size, char** ptr) { - struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs; - // Use different pools for different channels and also separate send/recv. - int p = 2*channel+type; - int* used = cuda ? state->cudaUsed[p] : state->hostUsed[p]; - char* buff = cuda ? state->cudaBuff[p] : state->hostBuff[p]; - if (buff == NULL) return ncclInternalError; - int nslots = 1; - while (nslots*state->slotSize < size) nslots *= 2; - for (int s=0; s<state->nslots; s+=nslots) { - int u = 0; - for (int i=0; i<nslots; i++) u += used[s+i]; - if (u == 0) { - for (int i=0; i<nslots; i++) used[s+i] = 1; - *ptr = buff+state->slotSize*s; - return ncclSuccess; - } - } - *ptr = NULL; +ncclResult_t ncclProxySharedBuffersGetP2p(struct ncclComm* comm, int cuda, int type, int channel, int slot, int index, char** ptr) { + struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs; + // Use different pools for separate send/recv. + char* buff = cuda ? state->cudaBuff : state->hostBuff; + int slotSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/(NCCL_STEPS*SENDRECV_SLICEFACTOR); + int globalSlot = (((type*comm->p2pnChannels+channel)*NCCL_STEPS)+slot)*NCCL_MAX_WORK_ELEMENTS+index; + *ptr = buff + slotSize * globalSlot; return ncclSuccess; } - -ncclResult_t ncclProxySharedBuffersFree(struct ncclComm* comm, int cuda, int type, int channel, int size, char* ptr) { - struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs; - int p = 2*channel+type; - int* used = cuda ? state->cudaUsed[p] : state->hostUsed[p]; - char* buff = cuda ? state->cudaBuff[p] : state->hostBuff[p]; - if (buff == NULL) return ncclInternalError; - int nslots = 1; - while (nslots*state->slotSize < size) nslots *= 2; - int s = (ptr-buff)/state->slotSize; - if (s < 0 || s+nslots > state->nslots) { - WARN("Error freeing shared buffer : freeing ptr %p size %d (start %p slot size %d nslots %d)", ptr, size, buff, state->slotSize, state->nslots); - return ncclInternalError; - } - for (int i=0; i<nslots; i++) used[s+i] = 0; +ncclResult_t ncclProxySharedBuffersGetCollNet(struct ncclComm* comm, int cuda, int type, int slot, int channel, char** ptr) { + struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs; + // Use different pools for different channels. + char* buff = cuda ? state->cudaBuff : state->hostBuff; + int slotSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS; + int globalSlot = (type*NCCL_STEPS+slot)*comm->nChannels+channel; + *ptr = buff + slotSize * globalSlot; return ncclSuccess; } ncclResult_t ncclProxySharedBuffersDestroy(struct ncclComm* comm) { - struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs; - if (state) { - CUDACHECK(cudaFree(state->cudaBuff[0])); - free(state->cudaUsed[0]); - NCCLCHECK(ncclCudaHostFree(state->hostBuff[0])); - free(state->hostUsed[0]); - free(state); - } + struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs; + CUDACHECK(cudaFree(state->cudaBuff)); + NCCLCHECK(ncclCudaHostFree(state->hostBuff)); return ncclSuccess; } diff --git a/src/transport.cc b/src/transport.cc index a5af541..c7c841b 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -19,7 +19,11 @@ struct ncclTransport ncclTransports[NTRANSPORTS] = { }; template <int type> -static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId) { +static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclConnect* connect, int channelId, int peer, int connIndex) { + struct ncclPeerInfo* myInfo = comm->peerInfo+comm->rank; + struct ncclPeerInfo* peerInfo = comm->peerInfo+peer; + struct ncclConnector* connector = (type == 1) ? comm->channels[channelId].peers[peer].send + connIndex : + comm->channels[channelId].peers[peer].recv + connIndex; for (int t=0; t<NTRANSPORTS; t++) { struct ncclTransport *transport = ncclTransports+t; struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv; @@ -27,7 +31,7 @@ static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* NCCLCHECK(transport->canConnect(&ret, comm->topo, graph, myInfo, peerInfo)); if (ret) { connector->transportComm = transportComm; - NCCLCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId)); + NCCLCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId, connIndex)); return ncclSuccess; } } @@ -35,17 +39,17 @@ static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* return ncclInternalError; } -ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) { +ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) { TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv); uint32_t mask = 1 << channel->id; for (int i=0; i<nrecv; i++) { int peer = peerRecv[i]; - if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].recv.connected) continue; + if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].recv[connIndex].connected) continue; comm->connectRecv[peer] |= mask; } for (int i=0; i<nsend; i++) { int peer = peerSend[i]; - if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].send.connected) continue; + if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].send[connIndex].connected) continue; comm->connectSend[peer] |= mask; } return ncclSuccess; @@ -60,9 +64,14 @@ void dumpData(struct ncclConnect* data, int ndata) { } } -ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph) { +ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex) { + // Stream used during transport setup; need for P2P pre-connect + CUDA Graph + cudaStream_t transportSetupStream; + CUDACHECK(cudaStreamCreateWithFlags(&transportSetupStream, cudaStreamNonBlocking)); + struct ncclConnect data[2*MAXCHANNELS]; for (int i=1; i<comm->nRanks; i++) { + int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0); int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; uint32_t recvMask = comm->connectRecv[recvPeer]; @@ -72,50 +81,173 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* int sendChannels = 0, recvChannels = 0; for (int c=0; c<MAXCHANNELS; c++) { if (recvMask & (1<<c)) { - struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv; - NCCLCHECK(selectTransport<0>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c)); + NCCLCHECK(selectTransport<0>(comm, graph, recvData+recvChannels++, c, recvPeer, connIndex)); } } struct ncclConnect* sendData = recvData+recvChannels; for (int c=0; c<MAXCHANNELS; c++) { if (sendMask & (1<<c)) { - struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send; - NCCLCHECK(selectTransport<1>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c)); + NCCLCHECK(selectTransport<1>(comm, graph, sendData+sendChannels++, c, sendPeer, connIndex)); } } if (sendPeer == recvPeer) { if (recvChannels+sendChannels) { - NCCLCHECK(bootstrapSend(comm->bootstrap, recvPeer, data, sizeof(struct ncclConnect)*(recvChannels+sendChannels))); - NCCLCHECK(bootstrapRecv(comm->bootstrap, recvPeer, data, sizeof(struct ncclConnect)*(recvChannels+sendChannels))); + NCCLCHECK(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data, sizeof(struct ncclConnect)*(recvChannels+sendChannels))); + NCCLCHECK(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data, sizeof(struct ncclConnect)*(recvChannels+sendChannels))); sendData = data; recvData = data+sendChannels; } } else { - if (recvChannels) NCCLCHECK(bootstrapSend(comm->bootstrap, recvPeer, recvData, sizeof(struct ncclConnect)*recvChannels)); - if (sendChannels) NCCLCHECK(bootstrapSend(comm->bootstrap, sendPeer, sendData, sizeof(struct ncclConnect)*sendChannels)); - if (sendChannels) NCCLCHECK(bootstrapRecv(comm->bootstrap, sendPeer, sendData, sizeof(struct ncclConnect)*sendChannels)); - if (recvChannels) NCCLCHECK(bootstrapRecv(comm->bootstrap, recvPeer, recvData, sizeof(struct ncclConnect)*recvChannels)); + if (recvChannels) NCCLCHECK(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData, sizeof(struct ncclConnect)*recvChannels)); + if (sendChannels) NCCLCHECK(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData, sizeof(struct ncclConnect)*sendChannels)); + if (sendChannels) NCCLCHECK(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData, sizeof(struct ncclConnect)*sendChannels)); + if (recvChannels) NCCLCHECK(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData, sizeof(struct ncclConnect)*recvChannels)); } for (int c=0; c<MAXCHANNELS; c++) { if (sendMask & (1<<c)) { - struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send; + struct ncclConnector* conn = comm->channels[c].peers[sendPeer].send + connIndex; NCCLCHECK(conn->transportComm->connect(comm, sendData++, 1, comm->rank, conn)); conn->connected = 1; - CUDACHECK(cudaMemcpy(&comm->channels[c].devPeers[sendPeer].send, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice)); + CUDACHECK(cudaMemcpyAsync(comm->channels[c].devPeers[sendPeer].send+connIndex, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice, transportSetupStream)); } } for (int c=0; c<MAXCHANNELS; c++) { if (recvMask & (1<<c)) { - struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv; + struct ncclConnector* conn = comm->channels[c].peers[recvPeer].recv + connIndex; NCCLCHECK(conn->transportComm->connect(comm, recvData++, 1, comm->rank, conn)); conn->connected = 1; - CUDACHECK(cudaMemcpy(&comm->channels[c].devPeers[recvPeer].recv, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice)); + CUDACHECK(cudaMemcpyAsync(comm->channels[c].devPeers[recvPeer].recv+connIndex, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice, transportSetupStream)); } } comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0; } + CUDACHECK(cudaStreamSynchronize(transportSetupStream)); + CUDACHECK(cudaStreamDestroy(transportSetupStream)); return ncclSuccess; } +extern struct ncclTransport collNetTransport; + +// All ranks must participate in collNetSetup call +// return: 0 - unsupported, 1 - supported +// We do not NCCLCHECK this call because we would fall back to P2P network in case CollNet setup fails +int ncclTransportCollNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int masterRank, int masterPeer, int collNetGraphChannelId, int type) { + int rank = comm->rank; + int nranks = comm->nRanks; + int nMasters = comm->nNodes; + int rankInCollNet = -1; + int supported = 0; + int isMaster = (rank == masterRank) ? 1 : 0; + struct { + int collNetRank; + ncclConnect connect; + } sendrecvExchange; + + // check if we can connect to collnet, whose root is the nranks-th rank + struct ncclPeerInfo *myInfo = comm->peerInfo+rank, *peerInfo = comm->peerInfo+nranks; + peerInfo->rank = nranks; + int ret = 1; + if (isMaster) { + NCCLCHECK(collNetTransport.canConnect(&ret, comm->topo, collNetGraph, myInfo, peerInfo)); + } + + // send master receives connect info from peer recv master + if (isMaster && type == collNetSend) { + NCCLCHECK(bootstrapRecv(comm->bootstrap, masterPeer, collNetGraph->id, &sendrecvExchange, sizeof(sendrecvExchange))); + rankInCollNet = sendrecvExchange.collNetRank; + TRACE(NCCL_INIT, "CollNet [send] : rank %d collNetRank %d collNetNranks %d received connect from rank %d", rank, rankInCollNet, nMasters, masterPeer); + } + + // select + struct ncclPeer* root = channel->peers+nranks; + // connector index: 0 for recv, 1 for send + struct ncclConnector* conn = (type == collNetRecv) ? root->recv+type : root->send+type; + struct ncclTransportComm* transportComm = (type == collNetRecv) ? &(collNetTransport.recv) : &(collNetTransport.send); + conn->transportComm = transportComm; + // setup + struct ncclConnect myConnect; + if (isMaster && ret > 0) { + NCCLCHECK(transportComm->setup(comm, collNetGraph, myInfo, peerInfo, &myConnect, conn, collNetGraphChannelId, type)); + } + // prepare connect handles + ncclResult_t res; + struct { + int isMaster; + ncclConnect connect; + } *allConnects = NULL; + ncclConnect *masterConnects = NULL; + NCCLCHECK(ncclCalloc(&masterConnects, nMasters)); + if (type == collNetRecv) { // recv side: AllGather + // all ranks must participate + NCCLCHECK(ncclCalloc(&allConnects, nranks)); + allConnects[rank].isMaster = isMaster; + memcpy(&(allConnects[rank].connect), &myConnect, sizeof(struct ncclConnect)); + NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allConnects, sizeof(*allConnects)), res, cleanup); + // consolidate + int c = 0; + for (int r = 0; r < nranks; r++) { + if (allConnects[r].isMaster) { + memcpy(masterConnects+c, &(allConnects[r].connect), sizeof(struct ncclConnect)); + if (r == rank) rankInCollNet = c; + c++; + } + } + } else { // send side : copy in connect info received from peer recv master + if (isMaster) memcpy(masterConnects+rankInCollNet, &(sendrecvExchange.connect), sizeof(struct ncclConnect)); + } + // connect + if (isMaster && ret > 0) { + NCCLCHECKGOTO(transportComm->connect(comm, masterConnects, nMasters, rankInCollNet, conn), res, cleanup); + struct ncclPeer* devRoot = channel->devPeers+nranks; + struct ncclConnector* devConn = (type == collNetRecv) ? devRoot->recv+type : devRoot->send+type; + CUDACHECKGOTO(cudaMemcpy(devConn, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice), res, cleanup); + } + // recv side sends connect info to send side + if (isMaster && type == collNetRecv) { + sendrecvExchange.collNetRank = rankInCollNet; + memcpy(&sendrecvExchange.connect, masterConnects+rankInCollNet, sizeof(struct ncclConnect)); + NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, masterPeer, collNetGraph->id, &sendrecvExchange, sizeof(sendrecvExchange)), res, cleanup); + TRACE(NCCL_INIT, "CollNet [recv] : rank %d collNetRank %d collNetNranks %d sent connect to rank %d", rank, rankInCollNet, nMasters, masterPeer); + } + if (ret > 0) { + supported = 1; + } +cleanup: + if (allConnects != NULL) free(allConnects); + if (masterConnects != NULL) free(masterConnects); + return supported; +} + +ncclResult_t ncclTransportCollNetCheck(struct ncclComm* comm, int collNetSetupFail) { + int rank = comm->rank; + int nranks = comm->nRanks; + // AllGather collNet setup results + int* allGatherFailures; + NCCLCHECK(ncclCalloc(&allGatherFailures, nranks)); + allGatherFailures[rank] = collNetSetupFail; + NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGatherFailures, sizeof(int))); + for (int i=0; i<nranks; i++) { + if (allGatherFailures[i] != 0) { + collNetSetupFail = 1; + break; + } + } + free(allGatherFailures); + if (collNetSetupFail) { + if (rank == 0) WARN("Cannot initialize CollNet, using point-to-point network instead"); + // Free collNet resources + for (int r=0; r<comm->nChannels; r++) { + struct ncclChannel* channel = comm->channels+r; + struct ncclPeer* peer = channel->peers+nranks; + if (peer->send->transportResources && peer->send->transportComm) NCCLCHECK(peer->send->transportComm->free(peer->send->transportResources)); + if (peer->recv->transportResources && peer->recv->transportComm) NCCLCHECK(peer->recv->transportComm->free(peer->recv->transportResources)); + peer->send->transportResources = NULL; // avoid double free + peer->recv->transportResources = NULL; // avoid double free + } + // Set support to 0 + comm->collNetSupport = 0; + } + return ncclSuccess; +} diff --git a/src/transport/coll_net.cc b/src/transport/coll_net.cc index af865ce..7b7ec56 100644 --- a/src/transport/coll_net.cc +++ b/src/transport/coll_net.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -7,16 +7,17 @@ #include "comm.h" #include "coll_net.h" #include "graph.h" -#include <assert.h> + +#define COLLNET_GROUP_NSUBS 8 +#define COLLNET_MAX_GROUPS (NCCL_PROXY_MAX_SUBS/COLLNET_GROUP_NSUBS) struct collNetRecvConnectInfo { collNetHandle_t collNetHandle; }; struct collNetSendConnectInfo { - void* collNetComm; void* mhandles[NCCL_NUM_PROTOCOLS]; - struct reqSlot* reqFifo; + void* reqFifo; }; struct reqSlot { @@ -25,10 +26,10 @@ struct reqSlot { }; struct collNetSendResources { - void* collNetSendComm; + struct ncclComm* comm; + void* collNetComm; struct ncclSendMem* sendMem; struct ncclRecvMem* recvMem; - uint32_t* llData; int netDev; int useGdr; void* sendMhandles[NCCL_NUM_PROTOCOLS]; @@ -36,82 +37,129 @@ struct collNetSendResources { struct ncclRecvMem* devRecvMem; uint64_t step; uint64_t llLastCleaning; - struct reqSlot* reqFifo; + struct reqSlot (*reqFifo)[NCCL_STEPS]; int collNetRank; }; struct collNetRecvResources { - void* netListenComm; - void* collNetRecvComm; + struct ncclComm* comm; + void* collNetComm; struct ncclSendMem* sendMem; struct ncclRecvMem* recvMem; - uint32_t* llData; int netDev; int useGdr; void* mhandles[NCCL_NUM_PROTOCOLS]; struct ncclRecvMem* devRecvMem; uint64_t step; uint64_t llLastCleaning; - struct reqSlot* reqFifo; + struct reqSlot reqFifo[COLLNET_MAX_GROUPS][NCCL_STEPS]; int collNetRank; }; +struct collNetSharedResources { + void* collNetListenComms[MAXCHANNELS]; + void* collNetComms[MAXCHANNELS]; + int collNetCommRefCount[MAXCHANNELS]; +}; + /* Determine if we can communicate with the peer */ ncclResult_t collNetCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) { *ret = 1; return ncclSuccess; } +ncclResult_t collNetSharedListen(struct ncclComm* comm, int netDev, void* collNetHandle) { + struct collNetSharedResources* resources = (struct collNetSharedResources*)comm->proxyState.sharedBuffs.collNetResources; + if (resources == NULL) { + NCCLCHECK(ncclCalloc(&resources, 1)); + comm->proxyState.sharedBuffs.collNetResources = resources; + } + if (resources->collNetComms[netDev] == NULL) + NCCLCHECK(collNetListen(netDev, collNetHandle, resources->collNetListenComms+netDev)); + return ncclSuccess; +} + /* Setup send connector, and return connect information for others in the coll communicator to connect to me */ -ncclResult_t collNetSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) { +ncclResult_t collNetSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId, int connIndex) { struct collNetSendResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); send->transportResources = resources; + send->conn.shared = 1; + resources->comm = comm; - NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev)); + NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, 0, &resources->netDev)); NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 1, &resources->useGdr)); + send->proxyAppendPtr = comm->proxyState.sharedBuffs.proxyAppendCollNet+2*resources->netDev+1; + NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1)); int recvSize = offsetof(struct ncclRecvMem, buff); - for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) recvSize += send->comm->buffSizes[p]; + // Simple uses shared buffers and we don't support LL128 + recvSize += send->comm->buffSizes[NCCL_PROTO_LL]; if (resources->useGdr) { NCCLCHECK(ncclCudaCalloc((char**)(&resources->devRecvMem), recvSize)); } NCCLCHECK(ncclCudaHostCalloc((char**)&resources->recvMem, recvSize)); - NCCLCHECK(ncclIbMalloc((void**)&(resources->llData), send->comm->buffSizes[NCCL_PROTO_LL]/2)); - INFO(NCCL_INIT|NCCL_NET,"Coll %02d : %d [send] via COLLNET/%s/%d%s", channelId, myInfo->rank, collNetName(), resources->netDev, + INFO(NCCL_INIT|NCCL_NET,"CollNet %02d : %d [send] via COLLNET/%s/%d%s", channelId, myInfo->rank, collNetName(), resources->netDev, resources->useGdr ? "/GDRDMA" : ""); return ncclSuccess; } /* Setup recv connector */ -ncclResult_t collNetRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) { +ncclResult_t collNetRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId, int connIndex) { struct collNetRecvResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); recv->transportResources = resources; + recv->conn.shared = 1; + resources->comm = comm; - NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev)); + NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, 0, &resources->netDev)); NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 0, &resources->useGdr)); + recv->proxyAppendPtr = comm->proxyState.sharedBuffs.proxyAppendCollNet+2*resources->netDev; + NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1)); int recvSize = offsetof(struct ncclRecvMem, buff); - for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) recvSize += recv->comm->buffSizes[p]; + // Simple uses shared buffers and we don't support LL128 + recvSize += recv->comm->buffSizes[NCCL_PROTO_LL]; if (resources->useGdr) { NCCLCHECK(ncclCudaCalloc((char**)(&resources->devRecvMem), recvSize)); } NCCLCHECK(ncclCudaHostCalloc((char**)&resources->recvMem, recvSize)); - NCCLCHECK(ncclIbMalloc((void**)&(resources->llData), recv->comm->buffSizes[NCCL_PROTO_LL]/2)); - - INFO(NCCL_INIT|NCCL_NET,"Coll %02d : %d [receive] via COLLNET/%s/%d%s", channelId, myInfo->rank, collNetName(), resources->netDev, + INFO(NCCL_INIT|NCCL_NET,"CollNet %02d : %d [receive] via COLLNET/%s/%d%s", channelId, myInfo->rank, collNetName(), resources->netDev, resources->useGdr ? "/GDRDMA" : ""); struct collNetRecvConnectInfo* info = (struct collNetRecvConnectInfo*) connectInfo; - NCCLCHECK(collNetListen(resources->netDev, &info->collNetHandle, &resources->netListenComm)); + + NCCLCHECK(collNetSharedListen(comm, resources->netDev, &info->collNetHandle)); + return ncclSuccess; +} + +ncclResult_t collNetSharedConnect(struct ncclComm* comm, int netDev, struct ncclConnect* connectInfos, int nranks, int rank, void** collNetComm) { + struct collNetSharedResources* resources = (struct collNetSharedResources*)comm->proxyState.sharedBuffs.collNetResources; + if (resources->collNetComms[netDev] == NULL) { + // Connect to coll comm + collNetHandle_t** handlePtrs = NULL; + NCCLCHECK(ncclCalloc(&handlePtrs, nranks)); + for (int i = 0; i < nranks; i++) { + struct collNetRecvConnectInfo* info = (struct collNetRecvConnectInfo*)(connectInfos+i); + handlePtrs[i] = &(info->collNetHandle); + } + ncclResult_t ret = collNetConnect((void**)handlePtrs, nranks, rank, + resources->collNetListenComms[netDev], + resources->collNetComms+netDev); + free(handlePtrs); + NCCLCHECK(ret); + // Close listen comm + NCCLCHECK(collNetCloseListen(resources->collNetListenComms[netDev])); + } + *collNetComm = resources->collNetComms[netDev]; + resources->collNetCommRefCount[netDev]++; return ncclSuccess; } @@ -121,33 +169,40 @@ ncclResult_t collNetSendConnect(struct ncclComm* comm, struct ncclConnect* conne struct collNetSendConnectInfo* info = (struct collNetSendConnectInfo*)(connectInfos+rank); // Intermediate buffering on GPU for GPU Direct RDMA, but LL buffer is always on host - struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->recvMem; - int offset = 0; - for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { - send->conn.buffs[p] = (p == NCCL_PROTO_LL ? resources->recvMem->buff : recvMem->buff) + offset; - offset += send->comm->buffSizes[p]; - } + send->conn.buffs[NCCL_PROTO_LL] = resources->recvMem->buff; + send->conn.buffs[NCCL_PROTO_LL128] = send->conn.buffs[NCCL_PROTO_SIMPLE] = NULL; send->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0; // Head/Tail/Opcount/Fifos are always on host send->conn.tail = &resources->recvMem->tail; send->conn.sizesFifo = resources->recvMem->sizesFifo; + send->conn.ptrsFifo = resources->recvMem->ptrsFifo; send->conn.head = &resources->sendMem->head; + resources->sendMem->head = -NCCL_STEPS; // Don't give any credit yet when sharing buffers for (int i=0; i<NCCL_STEPS; i++) send->conn.sizesFifo[i] = -1; // Get info from recv side resources->collNetRank = rank; - resources->reqFifo = info->reqFifo; - resources->collNetSendComm = info->collNetComm; + resources->reqFifo = (struct reqSlot (*)[NCCL_STEPS])(info->reqFifo); for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) resources->recvMhandles[p] = info->mhandles[p]; - // Register buffers - NCCLCHECK(collNetRegMr(resources->collNetSendComm, send->conn.buffs[NCCL_PROTO_SIMPLE], send->comm->buffSizes[NCCL_PROTO_SIMPLE], - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->sendMhandles[NCCL_PROTO_SIMPLE])); - NCCLCHECK(collNetRegMr(resources->collNetSendComm, resources->llData, send->comm->buffSizes[NCCL_PROTO_LL]/2, - NCCL_PTR_HOST, &resources->sendMhandles[NCCL_PROTO_LL])); + NCCLCHECK(collNetSharedConnect(comm, resources->netDev, connectInfos, nranks, rank, &resources->collNetComm)); + + int size; + char* ptr; + // Allocate & Register shared buffers for the Simple protocol + NCCLCHECK(ncclProxySharedBuffersInit(send->comm, resources->useGdr, &size, &ptr)); + NCCLCHECK(collNetRegMr(resources->collNetComm, ptr, size, + resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, + &resources->sendMhandles[NCCL_PROTO_SIMPLE])); + + // Allocate & Register shared buffers for the LL protocol + NCCLCHECK(ncclProxySharedBuffersInit(send->comm, 0, &size, &ptr)); + NCCLCHECK(collNetRegMr(resources->collNetComm, ptr, size, + NCCL_PTR_HOST, + &resources->sendMhandles[NCCL_PROTO_LL])); return ncclSuccess; } @@ -168,52 +223,57 @@ ncclResult_t collNetRecvConnect(struct ncclComm* comm, struct ncclConnect* conne // Head/Tail/Opcount are always on host recv->conn.tail = &resources->recvMem->tail; + recv->conn.ptrsFifo = resources->recvMem->ptrsFifo; recv->conn.head = &resources->sendMem->head; - // Connect to coll comm - collNetHandle_t** handlePtrs = NULL; - NCCLCHECK(ncclCalloc(&handlePtrs, nranks)); - for (int i = 0; i < nranks; i++) { - struct collNetRecvConnectInfo* info = (struct collNetRecvConnectInfo*)(connectInfos+i); - handlePtrs[i] = &(info->collNetHandle); - } - ncclResult_t res; - NCCLCHECKGOTO(collNetConnect((void**)handlePtrs, nranks, rank, resources->netListenComm, &resources->collNetRecvComm), res, cleanup); + NCCLCHECK(collNetSharedConnect(comm, resources->netDev, connectInfos, nranks, rank, &resources->collNetComm)); - // Register buffers - NCCLCHECK(collNetRegMr(resources->collNetRecvComm, recv->conn.buffs[NCCL_PROTO_SIMPLE], recv->comm->buffSizes[NCCL_PROTO_SIMPLE], - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandles[NCCL_PROTO_SIMPLE])); - NCCLCHECK(collNetRegMr(resources->collNetRecvComm, resources->llData, recv->comm->buffSizes[NCCL_PROTO_LL]/2, - NCCL_PTR_HOST, &resources->mhandles[NCCL_PROTO_LL])); + int size; + char* ptr; - // Create shared info between send and recv proxies - NCCLCHECK(ncclCalloc(&(resources->reqFifo), NCCL_STEPS)); + // Allocate & Register shared buffers for the Simple protocol + NCCLCHECK(ncclProxySharedBuffersInit(recv->comm, resources->useGdr, &size, &ptr)); + NCCLCHECK(collNetRegMr(resources->collNetComm, ptr, size, + resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, + &resources->mhandles[NCCL_PROTO_SIMPLE])); + + // Allocate & Register shared buffers for the LL protocol + NCCLCHECK(ncclProxySharedBuffersInit(recv->comm, 0, &size, &ptr)); + NCCLCHECK(collNetRegMr(resources->collNetComm, ptr, size, + NCCL_PTR_HOST, + &resources->mhandles[NCCL_PROTO_LL])); // Pass info to send side info->reqFifo = resources->reqFifo; - info->collNetComm = resources->collNetRecvComm; for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) info->mhandles[p] = resources->mhandles[p]; -cleanup: - if (handlePtrs != NULL) free(handlePtrs); - // Close listen comm - NCCLCHECK(collNetCloseListen(resources->netListenComm)); + return ncclSuccess; +} - return res; +ncclResult_t collNetSharedFree(struct ncclComm* comm, int netDev) { + struct collNetSharedResources* resources = (struct collNetSharedResources*)comm->proxyState.sharedBuffs.collNetResources; + resources->collNetCommRefCount[netDev]--; + if (resources->collNetCommRefCount[netDev] == 0) { + NCCLCHECK(collNetCloseColl(resources->collNetComms[netDev])); + } + for (int c=0; c<MAXCHANNELS; c++) if (resources->collNetCommRefCount[c]) return ncclSuccess; + comm->proxyState.sharedBuffs.collNetResources = NULL; + free(resources); + return ncclSuccess; } ncclResult_t collNetSendFree(void* sendTransportResources) { struct collNetSendResources* resources = (struct collNetSendResources*)sendTransportResources; NCCLCHECK(ncclCudaHostFree(resources->sendMem)); NCCLCHECK(ncclCudaHostFree(resources->recvMem)); - if (resources->collNetSendComm) { - NCCLCHECK(collNetDeregMr(resources->collNetSendComm, resources->sendMhandles[NCCL_PROTO_LL])); - NCCLCHECK(collNetDeregMr(resources->collNetSendComm, resources->sendMhandles[NCCL_PROTO_SIMPLE])); + if (resources->collNetComm) { + NCCLCHECK(collNetDeregMr(resources->collNetComm, resources->sendMhandles[NCCL_PROTO_LL])); + NCCLCHECK(collNetDeregMr(resources->collNetComm, resources->sendMhandles[NCCL_PROTO_SIMPLE])); } - if (resources->useGdr) - CUDACHECK(cudaFree(resources->devRecvMem)); - free(resources->llData); + if (resources->useGdr) CUDACHECK(cudaFree(resources->devRecvMem)); + + NCCLCHECK(collNetSharedFree(resources->comm, resources->netDev)); free(resources); return ncclSuccess; } @@ -221,110 +281,145 @@ ncclResult_t collNetSendFree(void* sendTransportResources) { ncclResult_t collNetRecvFree(void* recvTransportResources) { struct collNetRecvResources* resources = (struct collNetRecvResources*)recvTransportResources; NCCLCHECK(ncclCudaHostFree(resources->sendMem)); - if (resources->collNetRecvComm) { - NCCLCHECK(collNetDeregMr(resources->collNetRecvComm, resources->mhandles[NCCL_PROTO_LL])); - NCCLCHECK(collNetDeregMr(resources->collNetRecvComm, resources->mhandles[NCCL_PROTO_SIMPLE])); - } NCCLCHECK(ncclCudaHostFree(resources->recvMem)); - if (resources->useGdr) - CUDACHECK(cudaFree(resources->devRecvMem)); - free(resources->llData); - free(resources->reqFifo); - - // Make sure SendFree is called before RecvFree - if (resources->collNetRecvComm) { - NCCLCHECK(collNetCloseColl(resources->collNetRecvComm)); + if (resources->collNetComm) { + NCCLCHECK(collNetDeregMr(resources->collNetComm, resources->mhandles[NCCL_PROTO_LL])); + NCCLCHECK(collNetDeregMr(resources->collNetComm, resources->mhandles[NCCL_PROTO_SIMPLE])); } + if (resources->useGdr) CUDACHECK(cudaFree(resources->devRecvMem)); + + NCCLCHECK(collNetSharedFree(resources->comm, resources->netDev)); free(resources); return ncclSuccess; } +#define LAST_OF_GROUP(s) \ + (s % COLLNET_GROUP_NSUBS == COLLNET_GROUP_NSUBS-1 || s == args->nsubs-1) + ncclResult_t collNetSendProxy(struct ncclProxyArgs* args) { if (args->protocol == NCCL_PROTO_LL128) { WARN("CollNet does not support LL128"); return ncclInternalError; } - struct collNetSendResources* resources = (struct collNetSendResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { - // Round to next multiple of sliceSteps - resources->step = ROUNDUP(resources->step, args->chunkSteps); - args->posted = args->transmitted = args->done = resources->step; - args->end = resources->step + args->nsteps; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + struct collNetSendResources* resources = (struct collNetSendResources*) (sub->connector->transportResources); + // Round to next multiple of sliceSteps + sub->base = ROUNDUP(resources->step, args->chunkSteps); + sub->posted = sub->received = sub->transmitted = sub->done = 0; + resources->step = sub->base + sub->nsteps; + } args->state = ncclProxyOpProgress; } args->idle = 1; if (args->state == ncclProxyOpProgress) { int p = args->protocol; - int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS; - char* localBuff = args->connector->conn.buffs[p]; - void* sendMhandle = resources->sendMhandles[p]; - void* recvMhandle = resources->recvMhandles[p]; - struct reqSlot* reqFifo = resources->reqFifo; - int buffSlot = args->transmitted%NCCL_STEPS; - if (args->transmitted < args->end && args->transmitted < args->done + NCCL_STEPS - && reqFifo[buffSlot].recvBuff != NULL) { - volatile int* sizesFifo = resources->recvMem->sizesFifo; - volatile uint64_t* recvTail = &resources->recvMem->tail; - if (sizesFifo[buffSlot] != -1 && (*recvTail > args->transmitted || args->protocol == NCCL_PROTO_LL)) { - // We have something to receive, let's check if it's completely ready. - int size = sizesFifo[buffSlot]; - char* buff = localBuff+buffSlot*stepSize; - int ready = 1; - if (args->protocol == NCCL_PROTO_LL) { - uint32_t flag = NCCL_LL_FLAG(args->transmitted + 1); - int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine)); - union ncclLLFifoLine* lines = (union ncclLLFifoLine*)buff; - // Pack data into another buffer - int stepLines = stepSize / sizeof(union ncclLLFifoLine); - uint32_t* sendBuff = resources->llData+buffSlot*2*stepLines; // each line has two data elements - buff = (char*)sendBuff; - for (int i=0; i<nFifoLines; i++) { - volatile uint32_t *f1 = &lines[i].flag1; - volatile uint32_t *d1 = &lines[i].data1; - volatile uint32_t *f2 = &lines[i].flag2; - volatile uint32_t *d2 = &lines[i].data2; - if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } - sendBuff[2*i] = d1[0]; - sendBuff[2*i+1] = d2[0]; - } - size = nFifoLines*2*sizeof(uint32_t); + int nGroups = DIVUP(args->nsubs, COLLNET_GROUP_NSUBS); + int perGroupSteps = NCCL_STEPS / nGroups; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + struct collNetSendResources* resources = (struct collNetSendResources*) (sub->connector->transportResources); + void* sendMhandle = resources->sendMhandles[p]; + void* recvMhandle = resources->recvMhandles[p]; + int stepSize = sub->connector->comm->buffSizes[p] / NCCL_STEPS; + auto reqFifo = resources->reqFifo; + if (sub->posted < sub->nsteps && sub->posted < sub->done + NCCL_STEPS) { + int buffSlot = (sub->base+sub->posted)%NCCL_STEPS; + if (p == NCCL_PROTO_SIMPLE) { + char* ptr; + int sharedBuffSlot = sub->posted%NCCL_STEPS; + NCCLCHECK(ncclProxySharedBuffersGetCollNet(sub->connector->comm, resources->useGdr, 0, sharedBuffSlot, 0, &ptr)); + resources->recvMem->ptrsFifo[buffSlot] = ptr + s*args->chunkSize; + __sync_synchronize(); } - if (ready) { - // Data is ready, try to send. - int count = size/ncclTypeSize(args->dtype); - NCCLCHECK(collNetIallreduce(resources->collNetSendComm, (void*) buff, (void*)(reqFifo[buffSlot].recvBuff), count, args->dtype, args->redOp, sendMhandle, recvMhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - TRACE(NCCL_NET, "sendProxy [%d/%d] Iallreduce posted, req %p", args->transmitted, buffSlot, args->requests[buffSlot]); + volatile uint64_t* sendHead = &resources->sendMem->head; + sub->posted += args->sliceSteps; + *sendHead = sub->base + sub->posted - NCCL_STEPS; + } + // Enforce sync between operations of the same group. + bool groupSync = (((s == 0) && ((sub+args->nsubs-1)->received == sub->received)) || (s && (sub-1)->received > sub->received)); + if (groupSync && sub->received < sub->posted && sub->received < sub->done + perGroupSteps) { + int buffSlot = (sub->base+sub->received)%NCCL_STEPS; + int sharedBuffSlot = sub->received%NCCL_STEPS; + volatile int* sizesFifo = resources->recvMem->sizesFifo; + volatile uint64_t* recvTail = &resources->recvMem->tail; + if (sizesFifo[buffSlot] != -1 && ((*recvTail > (sub->base+sub->received)) || p == NCCL_PROTO_LL)) { + // We have something to receive, let's check whether data is ready. + int size = sizesFifo[buffSlot]; + int ready = 1; + if (s == 0) { + NCCLCHECK(ncclProxySharedBuffersGetCollNet(sub->connector->comm, p == NCCL_PROTO_SIMPLE ? resources->useGdr : 0, 0, sharedBuffSlot, 0, &args->sharedBuff[sharedBuffSlot])); + args->sharedSize[sharedBuffSlot] = p == NCCL_PROTO_SIMPLE ? args->chunkSize : size/2; + } + if (p == NCCL_PROTO_LL) { + char* localBuff = sub->connector->conn.buffs[p]; + uint32_t flag = NCCL_LL_FLAG(sub->base + sub->received + 1); + int nFifoLines = size / sizeof(union ncclLLFifoLine); + union ncclLLFifoLine* lines = (union ncclLLFifoLine*)(localBuff+buffSlot*stepSize); + // Pack data into the shared buffer + uint32_t* sendBuff = (uint32_t*)(args->sharedBuff[sharedBuffSlot]+args->sharedSize[sharedBuffSlot]*s); + for (int i=0; i<nFifoLines; i++) { + volatile uint32_t *f1 = &lines[i].flag1; + volatile uint32_t *d1 = &lines[i].data1; + volatile uint32_t *f2 = &lines[i].flag2; + volatile uint32_t *d2 = &lines[i].data2; + if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } + sendBuff[2*i] = d1[0]; + sendBuff[2*i+1] = d2[0]; + } + } + if (ready) { sizesFifo[buffSlot] = -1; - // Make sure size is reset to zero before we update the head. - __sync_synchronize(); - args->transmitted += args->sliceSteps; + sub->received += args->sliceSteps; args->idle = 0; - return ncclSuccess; + //continue; } } } - } - // Check whether the network has completed some send operations. - if (args->done < args->transmitted) { - int done, size; - int buffSlot = args->done%NCCL_STEPS; - NCCLCHECK(collNetTest((void*)(args->requests[buffSlot]), &done, &size)); - if (done) { - TRACE(NCCL_NET, "sendProxy [%d/%d] request %p done, size %d", args->done, buffSlot, args->requests[buffSlot], size); - reqFifo[buffSlot].size = size; - // Make sure size is updated before we set recvBuff to NULL (from the view of recv proxy, concerning the flush) - // (reordered store after store is possible on POWER, though not on x86) - __sync_synchronize(); - reqFifo[buffSlot].recvBuff = NULL; // Notify recvProxy - args->done += args->sliceSteps; - resources->sendMem->head = args->done; - args->idle = 0; - if (args->done == args->end) { - resources->step = args->end; - args->state = ncclProxyOpNone; + if (LAST_OF_GROUP(s) && (sub->transmitted < sub->received)) { + int group = s / COLLNET_GROUP_NSUBS; + int buffSlot = (sub->base+sub->transmitted)%NCCL_STEPS; + int sharedBuffSlot = sub->transmitted%NCCL_STEPS; + if (reqFifo[group][buffSlot].recvBuff != NULL) { + int totalSize = (s-group*COLLNET_GROUP_NSUBS+1) * args->sharedSize[sharedBuffSlot]; + int count = totalSize / ncclTypeSize(args->dtype); + reqFifo[group][buffSlot].size = args->sharedSize[sharedBuffSlot]; + char* sendAddress = (char*)args->sharedBuff[sharedBuffSlot] + group*COLLNET_GROUP_NSUBS*args->sharedSize[sharedBuffSlot]; + NCCLCHECK(collNetIallreduce(resources->collNetComm, sendAddress, (void*)(reqFifo[group][buffSlot].recvBuff), count, args->dtype, args->redOp, sendMhandle, recvMhandle, sub->requests+buffSlot)); + if (sub->requests[buffSlot] == NULL) continue; + + TRACE(NCCL_NET, "sendProxy [%d/%d/%d] Iallreduce posted, size %d req %p", sub->transmitted, group, buffSlot, totalSize, sub->requests[buffSlot]); + // Make sure size is reset to zero before we update the head. + __sync_synchronize(); + sub->transmitted += args->sliceSteps; + args->idle = 0; + continue; + } + } + // Check whether the network has completed some send operations. + if (LAST_OF_GROUP(s) && sub->done < sub->transmitted) { + int done, size; + int group = s / COLLNET_GROUP_NSUBS; + int buffSlot = (sub->base+sub->done)%NCCL_STEPS; + NCCLCHECK(collNetTest((void*)(sub->requests[buffSlot]), &done, &size)); + if (done) { + TRACE(NCCL_NET, "sendProxy [%d/%d/%d] request %p done, size %d", sub->done, group, buffSlot, sub->requests[buffSlot], size); + // Make sure size is updated before we set recvBuff to NULL (from the view of recv proxy, concerning the flush) + // (reordered store after store is possible on POWER, though not on x86) + __sync_synchronize(); + reqFifo[group][buffSlot].recvBuff = NULL; // Notify recvProxy + for (int i=group*COLLNET_GROUP_NSUBS; i<=s; i++) args->subs[i].done += args->sliceSteps; + args->idle = 0; + int allDone = 1; + for (int i=0; i<args->nsubs; i++) { + if (args->subs[i].done < args->subs[i].nsteps) { allDone = 0; break; } + } + if (allDone) { + args->state = ncclProxyOpNone; + TRACE(NCCL_NET, "sendProxy [%d/%d] stopped", sub->done, s); + } } - return ncclSuccess; } } } @@ -336,81 +431,115 @@ ncclResult_t collNetRecvProxy(struct ncclProxyArgs* args) { WARN("CollNet does not support LL128"); return ncclInternalError; } - struct collNetRecvResources* resources = (struct collNetRecvResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { - // Round to next multiple of sliceSteps - resources->step = ROUNDUP(resources->step, args->chunkSteps); - args->posted = args->received = args->transmitted = args->done = resources->step; - args->end = resources->step + args->nsteps; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + struct collNetRecvResources* resources = (struct collNetRecvResources*) (sub->connector->transportResources); + // Round to next multiple of sliceSteps + sub->base = ROUNDUP(resources->step, args->chunkSteps); + sub->posted = sub->received = sub->flushed = sub->transmitted = sub->done = 0; + resources->step = sub->base + sub->nsteps; + } args->state = ncclProxyOpProgress; } args->idle = 1; if (args->state == ncclProxyOpProgress) { int p = args->protocol; - int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS; - char* localBuff = args->connector->conn.buffs[p]; - void* mhandle = resources->mhandles[p]; - struct reqSlot* reqFifo = resources->reqFifo; - if ((args->posted < args->done + NCCL_STEPS) && (args->posted < args->end)) { - int buffSlot = args->posted%NCCL_STEPS; - char* recvBuff = p == NCCL_PROTO_LL ? (char*)resources->llData : localBuff; - int recvStepSize = p == NCCL_PROTO_LL ? stepSize/2 : stepSize; - reqFifo[buffSlot].recvBuff = recvBuff+buffSlot*recvStepSize; - TRACE(NCCL_NET, "recvProxy [%d/%d] posted buffer %p", args->posted, buffSlot, reqFifo[buffSlot].recvBuff); - args->posted += args->sliceSteps; - args->idle = 0; - return ncclSuccess; - } - if (args->posted > args->received) { - int buffSlot = args->received%NCCL_STEPS; - if (reqFifo[buffSlot].recvBuff == NULL) { // Buffer is cleared : coll is complete - TRACE(NCCL_NET, "recvProxy [%d/%d] done, size %d", args->received, buffSlot, reqFifo[buffSlot].size); - if (args->protocol == NCCL_PROTO_LL) { // ll + int nGroups = DIVUP(args->nsubs, COLLNET_GROUP_NSUBS); + int perGroupSteps = NCCL_STEPS / nGroups; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + struct collNetRecvResources* resources = (struct collNetRecvResources*) (sub->connector->transportResources); + void* mhandle = resources->mhandles[p]; + int stepSize = sub->connector->comm->buffSizes[p] / NCCL_STEPS; + auto reqFifo = resources->reqFifo; + // Enforce sync between operations of the same group. + if (LAST_OF_GROUP(s) && (sub->posted < sub->done + perGroupSteps) && (sub->posted < sub->nsteps)) { + int group = s / COLLNET_GROUP_NSUBS; + int buffSlot = (sub->base+sub->posted)%NCCL_STEPS; + char* ptr; + int sharedBuffSlot = sub->posted%NCCL_STEPS; + NCCLCHECK(ncclProxySharedBuffersGetCollNet(sub->connector->comm, p == NCCL_PROTO_SIMPLE ? resources->useGdr : 0, 1, sharedBuffSlot, 0, &ptr)); + args->sharedBuff[sharedBuffSlot] = ptr; + int slotSize = sub->connector->comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS; + reqFifo[group][buffSlot].recvBuff = args->sharedBuff[sharedBuffSlot] + group*COLLNET_GROUP_NSUBS*slotSize; + TRACE(NCCL_NET, "recvProxy [%d/%d/%d] posted buffer %p", sub->posted, group, buffSlot, reqFifo[group][buffSlot].recvBuff); + sub->posted += args->sliceSteps; + args->idle = 0; + continue; + } + if (LAST_OF_GROUP(s) && (sub->posted > sub->received)) { + int group = s / COLLNET_GROUP_NSUBS; + int buffSlot = (sub->base+sub->received)%NCCL_STEPS; + int sharedBuffSlot = sub->received%NCCL_STEPS; + if (reqFifo[group][buffSlot].recvBuff == NULL) { // Buffer is cleared : coll is complete + args->sharedSize[sharedBuffSlot] = reqFifo[group][buffSlot].size; + int totalSize = args->sharedSize[sharedBuffSlot]*(s-group*COLLNET_GROUP_NSUBS+1); + TRACE(NCCL_NET, "recvProxy [%d/%d/%d] received, size %d", sub->received, group, buffSlot, totalSize); + sub->received += args->sliceSteps; + if (reqFifo[group][buffSlot].size > 0 && p == NCCL_PROTO_SIMPLE && resources->useGdr) { + int slotSize = sub->connector->comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS; + char* recvAddress = (char*)args->sharedBuff[sharedBuffSlot] + group*COLLNET_GROUP_NSUBS*slotSize; + NCCLCHECK(collNetIflush(resources->collNetComm, recvAddress, totalSize, mhandle, sub->requests+buffSlot)); + } else { + for (int i=group*COLLNET_GROUP_NSUBS; i<=s; i++) args->subs[i].flushed += args->sliceSteps; + } + args->idle = 0; + continue; + } + } + if (LAST_OF_GROUP(s) && (sub->received > sub->flushed)) { + // Progress flush operations + int group = s / COLLNET_GROUP_NSUBS; + int buffSlot = (sub->base + sub->flushed)%NCCL_STEPS; + int done = 1; + if (sub->requests[buffSlot]) NCCLCHECK(collNetTest(sub->requests[buffSlot], &done, NULL)); + if (done) { + TRACE(NCCL_NET, "recvProxy [%d/%d/%d] flushed", sub->flushed, group, buffSlot); + for (int i=group*COLLNET_GROUP_NSUBS; i<=s; i++) args->subs[i].flushed += args->sliceSteps; + args->idle = 0; + //continue; + } + } + if (sub->flushed > sub->transmitted) { + int group = s / COLLNET_GROUP_NSUBS; + int buffSlot = (sub->base + sub->transmitted)%NCCL_STEPS; + int sharedBuffSlot = sub->transmitted%NCCL_STEPS; + int slotSize = sub->connector->comm->buffSizes[NCCL_PROTO_SIMPLE] / NCCL_STEPS; + char* ptr = args->sharedBuff[sharedBuffSlot] + group*COLLNET_GROUP_NSUBS*slotSize + (s%COLLNET_GROUP_NSUBS)*args->sharedSize[sharedBuffSlot]; + if (p == NCCL_PROTO_SIMPLE) { + volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo; + ptrsFifo[buffSlot] = ptr; + __sync_synchronize(); + resources->recvMem->tail = sub->base + sub->flushed; + } + if (p == NCCL_PROTO_LL) { // ll // re-attach flag - uint32_t flag = NCCL_LL_FLAG(args->received + 1); - int stepLines = stepSize / sizeof(union ncclLLFifoLine); + char* localBuff = sub->connector->conn.buffs[p]; + uint32_t flag = NCCL_LL_FLAG(sub->base + sub->transmitted + 1); union ncclLLFifoLine* lines = (union ncclLLFifoLine*)(localBuff+buffSlot*stepSize); - uint32_t* recvData = resources->llData+buffSlot*2*stepLines; - int nFifoLines = DIVUP(reqFifo[buffSlot].size, 2*sizeof(uint32_t)); + uint32_t* recvData = (uint32_t*)ptr; + int nFifoLines = DIVUP(args->sharedSize[sharedBuffSlot], 2*sizeof(uint32_t)); for (int i=0; i<nFifoLines; i++) { lines[i].v[0] = ((uint64_t)flag << 32) + recvData[2*i]; lines[i].v[1] = ((uint64_t)flag << 32) + recvData[2*i+1]; } } - args->received += args->sliceSteps; - if (reqFifo[buffSlot].size > 0 && args->protocol == NCCL_PROTO_SIMPLE && resources->useGdr) { - NCCLCHECK(collNetIflush(resources->collNetRecvComm, localBuff+buffSlot*stepSize, reqFifo[buffSlot].size, mhandle, args->requests+buffSlot)); - } else { - args->requests[buffSlot] = NULL; - } + sub->transmitted += args->sliceSteps; args->idle = 0; - return ncclSuccess; + continue; } - } - if (args->received > args->transmitted) { - // Progress flush operations - int buffSlot = args->transmitted%NCCL_STEPS; - int done = 1; - if (args->requests[buffSlot]) NCCLCHECK(collNetTest(args->requests[buffSlot], &done, NULL)); - if (done) { - args->transmitted += args->sliceSteps; - __sync_synchronize(); - resources->recvMem->tail = args->transmitted; - args->idle = 0; - return ncclSuccess; - } - } - if (args->transmitted > args->done) { + // Enforce sync here to make sure the last sub doesn't increase "done" before all others in the group have + // reached the same point, otherwise we would start posting buffers to the send proxy before we're done + // processing all the shared buffer. + bool groupSync = (((s == 0) && ((sub+args->nsubs-1)->done == sub->done)) || (s && (sub-1)->done > sub->done)); volatile uint64_t* sendHead = &resources->sendMem->head; - uint64_t done = *sendHead; - while (done > args->done && - // LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted. - args->transmitted > args->done) { - args->done += args->sliceSteps; + if (groupSync && sub->done < sub->transmitted && (sub->base+sub->done) < *sendHead) { + sub->done += args->sliceSteps; args->idle = 0; - if (args->done == args->end) { - resources->step = args->end; + if (sub->done == sub->nsteps && s == args->nsubs-1) { args->state = ncclProxyOpNone; + TRACE(NCCL_NET, "recvProxy [%d/%d] stopped", sub->done, s); } } } diff --git a/src/transport/net.cc b/src/transport/net.cc index 86c43f8..391f7cf 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,6 +8,7 @@ #include "net.h" #include "graph.h" #include "collectives.h" +#include "gdrwrap.h" struct netConnectInfo { ncclNetHandle_t netHandle; @@ -37,6 +38,13 @@ struct netRecvResources { void* netRecvComm; struct ncclSendMem* sendMem; struct ncclRecvMem* recvMem; + + // GDRCOPY support + void* gdrMemDesc; + struct ncclRecvMem* devRecvMem; + void* gdrFlushDesc; + int* devFlushMem; + int netDev; int useGdr; int shared; @@ -58,13 +66,16 @@ NCCL_PARAM(NetSharedBuffers, "NET_SHARED_BUFFERS", -2); /* Determine if we will use this transport for this peer and return connect * information for this peer */ -ncclResult_t netSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) { +ncclResult_t netSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId, int connIndex) { struct netSendResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); send->transportResources = resources; send->conn.shared = resources->shared = ncclParamNetSharedBuffers() != -2 ? ncclParamNetSharedBuffers() : graph ? 0 : 1; + send->proxyAppendPtr = send->conn.shared ? comm->proxyState.sharedBuffs.proxyAppend+2*channelId+1 : &send->proxyAppend; - NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev)); + // Send/Receive: Round-robin NICs based on the receiver's CUDA device + int nicRR = comm->peerInfo[peerInfo->rank].cudaDev; + NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, nicRR, &resources->netDev)); NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 1, &resources->useGdr)); NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1)); @@ -111,20 +122,45 @@ ncclResult_t netSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st return ncclSuccess; } -ncclResult_t netRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) { +// GDRCOPY support: TAIL_ENABLE When enabled locates the RX proxy tail in CUDA memory +NCCL_PARAM(GdrCopyTailEnable, "GDRCOPY_TAIL_ENABLE", 1); +// GDRCOPY support: FLUSH_ENABLE When enabled uses a PCI-E read to flush GDRDMA buffers +NCCL_PARAM(GdrCopyFlushEnable, "GDRCOPY_FLUSH_ENABLE", 0); + +ncclResult_t netRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId, int connIndex) { struct netRecvResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); recv->transportResources = resources; recv->conn.shared = resources->shared = ncclParamNetSharedBuffers() != -2 ? ncclParamNetSharedBuffers() : graph ? 0 : 1; + recv->proxyAppendPtr = recv->conn.shared ? comm->proxyState.sharedBuffs.proxyAppend+2*channelId : &recv->proxyAppend; - NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev)); + // Send/Receive: Round-robin NICs based on the receiver's CUDA device + int nicRR = comm->cudaDev; + NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, nicRR, &resources->netDev)); NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 0, &resources->useGdr)); NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1)); NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1)); + // GDRCOPY tail support + if (ncclGdrCopy != NULL && ncclParamGdrCopyTailEnable() == 1) { + struct ncclRecvMem* devCudaPtr; + NCCLCHECK(ncclGdrCudaCalloc(&resources->devRecvMem, &devCudaPtr, 1, &resources->gdrMemDesc)); + // The GDR mapped VA doesn't work on the SMs + recv->conn.tail = &((struct ncclRecvMem*)devCudaPtr)->tail; + } else { + recv->conn.tail = &resources->recvMem->tail; + } + + // GDRCOPY flush support +#if defined (__x86_64__) + if (ncclGdrCopy != NULL && ncclParamGdrCopyFlushEnable() == 1) { + int* cudaPtr; + NCCLCHECK(ncclGdrCudaCalloc(&resources->devFlushMem, &cudaPtr, 1, &resources->gdrFlushDesc)); + } +#endif + recv->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0; - recv->conn.tail = &resources->recvMem->tail; // Only fuse P2P buffers, continue to allocate dedicated buffers for ring/tree recv->conn.ptrsFifo = resources->shared ? resources->recvMem->ptrsFifo : NULL; recv->conn.head = &resources->sendMem->head; @@ -233,6 +269,14 @@ ncclResult_t netSendFree(void* transportResources) { ncclResult_t netRecvFree(void* transportResources) { struct netRecvResources* resources = (struct netRecvResources*)transportResources; + // GDRCOPY support + if (resources->gdrFlushDesc) { + NCCLCHECK(ncclGdrCudaFree(resources->gdrFlushDesc)); + } + // GDRCOPY support + if (resources->gdrMemDesc) { + NCCLCHECK(ncclGdrCudaFree(resources->gdrMemDesc)); + } NCCLCHECK(ncclCudaHostFree(resources->sendMem)); NCCLCHECK(ncclCudaHostFree(resources->recvMem)); for (int l=0; l<LOC_COUNT; l++) { @@ -251,202 +295,232 @@ ncclResult_t netRecvFree(void* transportResources) { static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps"); ncclResult_t netSendProxy(struct ncclProxyArgs* args) { - struct netSendResources* resources = (struct netSendResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { - // Round to next multiple of sliceSteps - resources->step = ROUNDUP(resources->step, args->chunkSteps); - args->posted = args->transmitted = args->done = resources->step; - args->end = resources->step + args->nsteps; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + struct netSendResources* resources = (struct netSendResources*) (sub->connector->transportResources); + // Round to next multiple of sliceSteps + sub->base = ROUNDUP(resources->step, args->chunkSteps); + sub->posted = sub->transmitted = sub->done = 0; + } args->state = ncclProxyOpProgress; } args->idle = 1; if (args->state == ncclProxyOpProgress) { int p = args->protocol; - int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS; - char* localBuff = args->connector->conn.buffs[p]; - void* mhandle = *(resources->mhandlesProto[p]); - int buffSize = stepSize*args->sliceSteps; - if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR; - if (args->sendbytes < buffSize) buffSize = args->sendbytes; - // Post buffers to the GPU - if (args->posted < args->end && args->posted < args->done + NCCL_STEPS) { - if (resources->shared) { - char* ptr; - NCCLCHECK(ncclProxySharedBuffersAlloc(args->connector->comm, resources->useGdr, 0, args->channel->id, buffSize, &ptr)); - if (ptr == NULL) return ncclInternalError; - resources->recvMem->ptrsFifo[args->posted%NCCL_STEPS] = ptr; - __sync_synchronize(); - volatile uint64_t* sendHead = &resources->sendMem->head; - args->posted += args->sliceSteps; - *sendHead = args->posted - NCCL_STEPS; - } else args->posted += args->sliceSteps; - args->idle = 0; - return ncclSuccess; - } - // Check whether we received data from the GPU and send it to the network - int buffSlot = args->transmitted%NCCL_STEPS; - if (args->transmitted < args->posted && args->transmitted < args->done + NCCL_STEPS) { - volatile int* sizesFifo = resources->recvMem->sizesFifo; - volatile uint64_t* recvTail = &resources->recvMem->tail; - if (sizesFifo[buffSlot] != -1 && (*recvTail > args->transmitted || args->protocol == NCCL_PROTO_LL)) { - // We have something to receive, let's check if it's completely ready. - int size = sizesFifo[buffSlot]; - char* buff = resources->shared ? (char*)resources->recvMem->ptrsFifo[buffSlot] : localBuff+buffSlot*stepSize; - int ready = 1; - if (args->protocol == NCCL_PROTO_LL128) { - int ready = resources->useGdr; - if (!ready) { - // When data is in sysmem, we need to wait until all flags are correct since the GPU only - // called threadfence() - uint64_t flag = args->transmitted + 1; - int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS); - volatile uint64_t* lines = (volatile uint64_t*)buff; - ready = 1; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + if (sub->done == sub->nsteps) continue; + struct netSendResources* resources = (struct netSendResources*) (sub->connector->transportResources); + void* mhandle = *(resources->mhandlesProto[p]); + int stepSize = sub->connector->comm->buffSizes[p] / NCCL_STEPS; + char* localBuff = sub->connector->conn.buffs[p]; + int buffSize = stepSize*args->sliceSteps; + if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR; + if (sub->sendbytes < buffSize) buffSize = sub->sendbytes; + // Post buffers to the GPU + if (sub->posted < sub->nsteps && sub->posted < sub->done + NCCL_STEPS) { + int buffSlot = (sub->base+sub->posted)%NCCL_STEPS; + if (resources->shared) { + char* ptr; + int sharedBuffSlot = sub->posted%NCCL_STEPS; + NCCLCHECK(ncclProxySharedBuffersGetP2p(sub->connector->comm, resources->useGdr, 0, sub->channel->id, sharedBuffSlot, s, &ptr)); + resources->recvMem->ptrsFifo[buffSlot] = ptr; + __sync_synchronize(); + volatile uint64_t* sendHead = &resources->sendMem->head; + sub->posted += args->sliceSteps; + *sendHead = sub->base + sub->posted - NCCL_STEPS; + } else sub->posted += args->sliceSteps; + args->idle = 0; + continue; + } + // Check whether we received data from the GPU and send it to the network + if (sub->transmitted < sub->posted && sub->transmitted < sub->done + NCCL_STEPS) { + int buffSlot = (sub->base+sub->transmitted)%NCCL_STEPS; + volatile int* sizesFifo = resources->recvMem->sizesFifo; + volatile uint64_t* recvTail = &resources->recvMem->tail; + if (sizesFifo[buffSlot] != -1 && ((*recvTail > (sub->base+sub->transmitted)) || p == NCCL_PROTO_LL)) { + // We have something to receive, let's check if it's completely ready. + int size = sizesFifo[buffSlot]; + char* buff = resources->shared ? (char*)resources->recvMem->ptrsFifo[buffSlot] : localBuff+buffSlot*stepSize; + int ready = 1; + if (p == NCCL_PROTO_LL128) { + ready = resources->useGdr; + if (!ready) { + // When data is in sysmem, we need to wait until all flags are correct since the GPU only + // called threadfence() + uint64_t flag = sub->base+sub->transmitted+1; + int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS); + volatile uint64_t* lines = (volatile uint64_t*)buff; + ready = 1; + for (int i=0; i<nFifoLines; i++) { + if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; } + } + } + } else if (p == NCCL_PROTO_LL) { + uint32_t flag = NCCL_LL_FLAG(sub->base+sub->transmitted+1); + int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine)); + union ncclLLFifoLine* lines = (union ncclLLFifoLine*)buff; for (int i=0; i<nFifoLines; i++) { - if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; } + volatile uint32_t *f1 = &lines[i].flag1; + volatile uint32_t *f2 = &lines[i].flag2; + if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } } } - } else if (args->protocol == NCCL_PROTO_LL) { - uint32_t flag = NCCL_LL_FLAG(args->transmitted + 1); - int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine)); - union ncclLLFifoLine* lines = (union ncclLLFifoLine*)buff; - for (int i=0; i<nFifoLines; i++) { - volatile uint32_t *f1 = &lines[i].flag1; - volatile uint32_t *f2 = &lines[i].flag2; - if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } + if (ready) { + // Data is ready, try to send. + NCCLCHECK(ncclNetIsend(resources->netSendComm, buff, size, mhandle, sub->requests+buffSlot)); + if (sub->requests[buffSlot] != NULL) { + TRACE(NCCL_NET, "sendProxy [%d/%d] Isend (LL) posted, req %p", sub->transmitted, buffSlot, sub->requests[buffSlot]); + sizesFifo[buffSlot] = -1; + // Make sure size is reset to zero before we update the head. + __sync_synchronize(); + sub->transmitted += args->sliceSteps; + args->idle = 0; + continue; + } } } - if (ready) { - // Data is ready, try to send. - NCCLCHECK(ncclNetIsend(resources->netSendComm, buff, size, mhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - TRACE(NCCL_NET, "sendProxy [%d/%d] Isend (LL) posted, req %p", args->transmitted, buffSlot, args->requests[buffSlot]); - sizesFifo[buffSlot] = -1; - // Make sure size is reset to zero before we update the head. - __sync_synchronize(); - args->transmitted += args->sliceSteps; - args->idle = 0; - return ncclSuccess; + } + // Check whether the network has completed some send operations. + if (sub->done < sub->transmitted) { + int done; + int buffSlot = (sub->base+sub->done)%NCCL_STEPS; + NCCLCHECK(ncclNetTest(sub->requests[buffSlot], &done, NULL)); + if (done) { + TRACE(NCCL_NET, "sendProxy [%d/%d] request %p done, size %d", sub->done, buffSlot, sub->requests[buffSlot]); + sub->done += args->sliceSteps; + + if (resources->shared == 0) { + resources->sendMem->head = sub->base + sub->done; + } + args->idle = 0; + if (sub->done == sub->nsteps) { + resources->step = sub->base + sub->nsteps; + args->done++; } } } } - // Check whether the network has completed some send operations. - if (args->done < args->transmitted) { - int done; - int buffSlot = args->done%NCCL_STEPS; - NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL)); - if (done) { - TRACE(NCCL_NET, "sendProxy [%d/%d] request %p done, size %d", args->done, buffSlot, args->requests[buffSlot]); - if (resources->shared) { - char* ptr = (char*)resources->recvMem->ptrsFifo[args->done%NCCL_STEPS]; - NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 0, args->channel->id, buffSize, ptr)); - } - args->done += args->sliceSteps; - - if (resources->shared == 0) { - resources->sendMem->head = args->done; - } - args->idle = 0; - if (args->done == args->end) { - resources->step = args->end; - args->state = ncclProxyOpNone; - } - return ncclSuccess; - } + if (args->done == args->nsubs) { + args->state = ncclProxyOpNone; } } return ncclSuccess; } ncclResult_t netRecvProxy(struct ncclProxyArgs* args) { - struct netRecvResources* resources = (struct netRecvResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { - // Round to next multiple of sliceSteps - resources->step = ROUNDUP(resources->step, args->chunkSteps); - args->posted = args->received = args->transmitted = args->done = resources->step; - args->end = resources->step + args->nsteps; + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + struct netRecvResources* resources = (struct netRecvResources*) (sub->connector->transportResources); + // Round to next multiple of sliceSteps + sub->base = ROUNDUP(resources->step, args->chunkSteps); + sub->posted = sub->received = sub->transmitted = sub->done = 0; + } args->state = ncclProxyOpProgress; } args->idle = 1; if (args->state == ncclProxyOpProgress) { int p = args->protocol; - int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS; - char* localBuff = args->connector->conn.buffs[p]; - void* mhandle = *(resources->mhandlesProto[p]); - int buffSize = stepSize*args->sliceSteps; - if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR; - if (args->recvbytes < buffSize) buffSize = args->recvbytes; - if ((args->posted < args->done + NCCL_STEPS) && (args->posted < args->end)) { - int buffSlot = args->posted%NCCL_STEPS; - char* ptr; - if (resources->shared) { - NCCLCHECK(ncclProxySharedBuffersAlloc(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, &ptr)); - if (ptr == NULL) return ncclInternalError; - volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo; - ptrsFifo[buffSlot] = ptr; - } else { - ptr = localBuff+buffSlot*stepSize; - } - NCCLCHECK(ncclNetIrecv(resources->netRecvComm, ptr, buffSize, mhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - TRACE(NCCL_NET, "recvProxy [%d/%d] posted recv request %p", args->posted, buffSlot, args->requests[buffSlot]); - args->posted += args->sliceSteps; - args->idle = 0; - return ncclSuccess; - } else if (resources->shared) { - NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, ptr)); - } - } - if (args->posted > args->received) { - int buffSlot = args->received%NCCL_STEPS; - int done, size; - NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size)); - if (done) { - args->received += args->sliceSteps; - if (size > 0 && args->protocol == NCCL_PROTO_SIMPLE && resources->useGdr) { - // Don't pass data to the GPU yet, flush first. + for (int s=0; s<args->nsubs; s++) { + struct ncclProxySubArgs* sub = args->subs+s; + if (sub->done == sub->nsteps) continue; + struct netRecvResources* resources = (struct netRecvResources*) (sub->connector->transportResources); + void* mhandle = *(resources->mhandlesProto[p]); + int stepSize = sub->connector->comm->buffSizes[p] / NCCL_STEPS; + char* localBuff = sub->connector->conn.buffs[p]; + int buffSize = stepSize*args->sliceSteps; + if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR; + if (sub->recvbytes < buffSize) buffSize = sub->recvbytes; + + if ((sub->posted < sub->done + NCCL_STEPS) && (sub->posted < sub->nsteps)) { + int buffSlot = (sub->base+sub->posted)%NCCL_STEPS; + char* ptr; + if (resources->shared) { + int sharedBuffSlot = sub->posted%NCCL_STEPS; + NCCLCHECK(ncclProxySharedBuffersGetP2p(sub->connector->comm, resources->useGdr, 1, sub->channel->id, sharedBuffSlot, s, &ptr)); volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo; - char* ptr = resources->shared ? (char*)(ptrsFifo[buffSlot]) : localBuff+buffSlot*stepSize; - NCCLCHECK(ncclNetIflush(resources->netRecvComm, ptr, size, mhandle, args->requests+buffSlot)); + ptrsFifo[buffSlot] = ptr; } else { - args->requests[buffSlot] = NULL; + ptr = localBuff+buffSlot*stepSize; + } + NCCLCHECK(ncclNetIrecv(resources->netRecvComm, ptr, buffSize, mhandle, sub->requests+buffSlot)); + if (sub->requests[buffSlot] != NULL) { + TRACE(NCCL_NET, "recvProxy [%d/%d] posted recv request %p", sub->posted, buffSlot, sub->requests[buffSlot]); + sub->posted += args->sliceSteps; + args->idle = 0; + continue; } - args->idle = 0; - return ncclSuccess; } - } - if (args->received > args->transmitted) { - // Progress flush operations - int buffSlot = args->transmitted%NCCL_STEPS; - int done = 1; - if (args->requests[buffSlot]) NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL)); - if (done) { - args->transmitted += args->sliceSteps; - __sync_synchronize(); - resources->recvMem->tail = args->transmitted; - args->idle = 0; - return ncclSuccess; + if (sub->posted > sub->received) { + int buffSlot = (sub->base+sub->received)%NCCL_STEPS; + int done, size; + NCCLCHECK(ncclNetTest(sub->requests[buffSlot], &done, &size)); + if (done) { + sub->received += args->sliceSteps; + if (size > 0 && p == NCCL_PROTO_SIMPLE && resources->useGdr) { + // Don't pass data to the GPU yet, flush first. + + // GDRCOPY support + if (resources->devFlushMem) { +#if defined (__x86_64__) + // Force a PCI-E read from GPU memory + asm volatile ("mov (%0), %%eax" :: "l"(resources->devFlushMem) : "%eax"); +#else + WARN("NET: GDR Flush only supported on x86_64"); + return ncclInternalError; +#endif + sub->requests[buffSlot] = NULL; + } else { + volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo; + char* ptr = resources->shared ? (char*)(ptrsFifo[buffSlot]) : localBuff+buffSlot*stepSize; + NCCLCHECK(ncclNetIflush(resources->netRecvComm, ptr, size, mhandle, sub->requests+buffSlot)); + } + } else { + sub->requests[buffSlot] = NULL; + } + args->idle = 0; + continue; + } } - } - if (args->transmitted > args->done) { - volatile uint64_t* sendHead = &resources->sendMem->head; - uint64_t done = *sendHead; - while (done > args->done && - // LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted. - args->transmitted > args->done) { - if (resources->shared) { - char* ptr = (char*)resources->recvMem->ptrsFifo[args->done%NCCL_STEPS]; - NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, ptr)); + if (sub->received > sub->transmitted) { + // Progress flush operations + int buffSlot = (sub->base+sub->transmitted)%NCCL_STEPS; + int done = 1; + if (sub->requests[buffSlot]) NCCLCHECK(ncclNetTest(sub->requests[buffSlot], &done, NULL)); + if (done) { + sub->transmitted += args->sliceSteps; + __sync_synchronize(); + if (resources->devRecvMem) { + // GDRCOPY support: Write updated tail directly to the device memory + resources->devRecvMem->tail = sub->base + sub->transmitted; + wc_store_fence(); // Flush out WC write + } else { + resources->recvMem->tail = sub->base + sub->transmitted; + } + args->idle = 0; + continue; } - args->done += args->sliceSteps; - args->idle = 0; - if (args->done == args->end) { - resources->step = args->end; - args->state = ncclProxyOpNone; + } + if (sub->transmitted > sub->done) { + volatile uint64_t* sendHead = &resources->sendMem->head; + uint64_t done = *sendHead; + while (done > sub->base + sub->done && + // LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted. + sub->transmitted > sub->done) { + sub->done += args->sliceSteps; + args->idle = 0; + if (sub->done == sub->nsteps) { + resources->step = sub->base + sub->nsteps; + args->done++; + } } } } + if (args->done == args->nsubs) { + args->state = ncclProxyOpNone; + } } return ncclSuccess; } diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 4ed46eb..b399318 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -641,22 +641,22 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, void* mhandle, vo NCCLCHECK(ncclIbGetRequest(&comm->verbs, &req)); req->size = size; - struct ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = (uint64_t)req; + struct ibv_send_wr wr[2]; + memset(&wr[0], 0, sizeof(wr[0])); + wr[0].wr_id = (uint64_t)req; struct ibv_sge sge; if (size == 0) { - wr.sg_list = NULL; - wr.num_sge = 0; + wr[0].sg_list = NULL; + wr[0].num_sge = 0; } else { sge.addr=(uintptr_t)data; sge.length=(unsigned int)size; sge.lkey=mr->lkey; - wr.sg_list = &sge; - wr.num_sge = 1; + wr[0].sg_list = &sge; + wr[0].num_sge = 1; } #if USE_RDMA_WRITE == 0 - wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; + wr[0].opcode = IBV_WR_SEND; + wr[0].send_flags = IBV_SEND_SIGNALED; #else __sync_synchronize(); // order the readyPtr load against rkey load below // Sanity checks to catch user collective call count/size mismatches @@ -666,15 +666,11 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, void* mhandle, vo size, slot->size, slot->addr, slot->rkey, slot->seq, comm->fifoHead); return ncclInternalError; } - int useAr = 0; - if (size > ncclParamIbArThreshold()) { - useAr = 1; - } - wr.opcode = useAr ? IBV_WR_RDMA_WRITE : IBV_WR_RDMA_WRITE_WITH_IMM; - wr.send_flags = useAr ? 0 : IBV_SEND_SIGNALED; - wr.wr.rdma.remote_addr = slot->addr; - wr.wr.rdma.rkey = slot->rkey; - wr.imm_data = size; // Send the message size via imm_data + wr[0].opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr[0].send_flags = IBV_SEND_SIGNALED; + wr[0].wr.rdma.remote_addr = slot->addr; + wr[0].wr.rdma.rkey = slot->rkey; + wr[0].imm_data = size; // Send the message size via imm_data __sync_synchronize(); #endif // We must clear slot->ready, but reset other fields to aid @@ -684,21 +680,29 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, void* mhandle, vo slot->rkey = slot->size = slot->seq = 0; comm->fifoHead++; - struct ibv_send_wr* bad_wr; - NCCLCHECK(wrap_ibv_post_send(comm->qp, &wr, &bad_wr)); #if USE_RDMA_WRITE // When using adaptive routing, send the bulk of the data first as an // RDMA_WRITE, then a 0-byte RDMA_WRITE_WITH_IMM to trigger a remote // completion. - if (useAr) { - wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; - wr.sg_list = NULL; - wr.num_sge = 0; - wr.send_flags |= IBV_SEND_SIGNALED; - NCCLCHECK(wrap_ibv_post_send(comm->qp, &wr, &bad_wr)); + if (size > ncclParamIbArThreshold()) { + memset(&wr[1], 0, sizeof(wr[1])); + memcpy(&wr[1], &wr[0], sizeof(wr[0])); + wr[1].sg_list = NULL; + wr[1].num_sge = 0; + wr[0].next = &wr[1]; + + wr[0].opcode = IBV_WR_RDMA_WRITE; + wr[1].opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + + wr[0].send_flags = 0; + wr[1].send_flags = IBV_SEND_SIGNALED; } #endif + + struct ibv_send_wr* bad_wr; + NCCLCHECK(wrap_ibv_post_send(comm->qp, wr, &bad_wr)); + *request = req; return ncclSuccess; } diff --git a/src/transport/net_socket.cc b/src/transport/net_socket.cc index 268fc76..79d991d 100644 --- a/src/transport/net_socket.cc +++ b/src/transport/net_socket.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -10,9 +10,7 @@ #include "net.h" #include "param.h" -#include <assert.h> #include <pthread.h> -#include <stdio.h> #include <stdlib.h> #include <poll.h> #include <limits.h> diff --git a/src/transport/p2p.cc b/src/transport/p2p.cc index e05a433..75fff87 100644 --- a/src/transport/p2p.cc +++ b/src/transport/p2p.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -141,29 +141,30 @@ static ncclResult_t p2pMap(struct ncclPeerInfo* myInfo, struct ncclPeerInfo* pee /* Send: Create and return connect structures for this peer to connect to me */ ncclResult_t p2pSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, - struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) { - + struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId, int connIndex) { struct p2pSendResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); send->transportResources = resources; int useRead, intermediateRank; NCCLCHECK(p2pGetInfo(comm->topo, myInfo, peerInfo, &useRead, &intermediateRank)); - int sendSize = sizeof(struct ncclSendMem); - // For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure - if (useRead) sendSize += send->comm->buffSizes[NCCL_PROTO_SIMPLE]; - ALIGN_SIZE(sendSize, CUDA_IPC_MIN); struct p2pConnectInfo info; - info.read = useRead; + // For CollNet, we use write for scatter-reduce (conn 1), read for broadcast-gather (conn 0) + info.read = (connIndex == 0) ? useRead : 0; const char* useReadStr = info.read ? "/read" : ""; + int sendSize = sizeof(struct ncclSendMem); + // For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure + if (info.read) sendSize += send->comm->buffSizes[NCCL_PROTO_SIMPLE]; + ALIGN_SIZE(sendSize, CUDA_IPC_MIN); + resources->remoteId = -1; resources->bootstrap = comm->bootstrap; if (intermediateRank == -1) { NCCLCHECK(ncclCudaCalloc((char**)&info.directPtr, sendSize)); info.rank = myInfo->rank; if (myInfo->pidHash == peerInfo->pidHash) { - if (useRead == 0) send->conn.direct |= NCCL_DIRECT_GPU; + if (info.read == 0) send->conn.direct |= NCCL_DIRECT_GPU; INFO(NCCL_INIT|NCCL_P2P, "Channel %02d : %d[%lx] -> %d[%lx] via P2P/direct pointer%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, useReadStr); } else { @@ -189,28 +190,29 @@ ncclResult_t p2pSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st /* Create and return connect structures for this peer to connect to me */ ncclResult_t p2pRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, - struct ncclConnect* connectInfo, struct ncclConnector * recv, int channelId) { - + struct ncclConnect* connectInfo, struct ncclConnector * recv, int channelId, int connIndex) { struct p2pRecvResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); recv->transportResources = resources; int useRead, intermediateRank; NCCLCHECK(p2pGetInfo(comm->topo, myInfo, peerInfo, &useRead, &intermediateRank)); + + struct p2pConnectInfo info; + // For CollNet, we use write for scatter-reduce (conn 1), read for broadcast-gather (conn 0) + info.read = (connIndex == 0) ? useRead : 0; + int recvSize = offsetof(struct ncclRecvMem, buff); // For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure - for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) if (!(useRead && p == NCCL_PROTO_SIMPLE)) recvSize += recv->comm->buffSizes[p]; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) if (!(info.read && p == NCCL_PROTO_SIMPLE)) recvSize += recv->comm->buffSizes[p]; ALIGN_SIZE(recvSize, CUDA_IPC_MIN); - struct p2pConnectInfo info; - info.read = useRead; - resources->remoteId = -1; resources->bootstrap = comm->bootstrap; if (intermediateRank == -1) { NCCLCHECK(ncclCudaCalloc((char**)&info.directPtr, recvSize)); info.rank = myInfo->rank; if (myInfo->pidHash == peerInfo->pidHash) { - if (useRead == 0) recv->conn.direct |= NCCL_DIRECT_GPU; + if (info.read == 0) recv->conn.direct |= NCCL_DIRECT_GPU; } else { CUDACHECK(cudaIpcGetMemHandle(&info.devIpc, info.directPtr)); } diff --git a/src/transport/shm.cc b/src/transport/shm.cc index d8a5b52..98e25a9 100644 --- a/src/transport/shm.cc +++ b/src/transport/shm.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -57,8 +57,7 @@ ncclResult_t shmCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTop #define MAX_SHM_NAME_LEN 1024 /* Create and return connect structures for this peer to connect to me */ -ncclResult_t shmSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) { - +ncclResult_t shmSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId, int connIndex) { struct shmSendResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); send->transportResources = resources; @@ -81,7 +80,7 @@ ncclResult_t shmSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st return ncclSuccess; } -ncclResult_t shmRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) { +ncclResult_t shmRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId, int connIndex) { struct shmRecvResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); recv->transportResources = resources; |