diff options
Diffstat (limited to 'src/collectives')
26 files changed, 1817 insertions, 1445 deletions
diff --git a/src/collectives/all_gather.cc b/src/collectives/all_gather.cc index 348c176..266fd5a 100644 --- a/src/collectives/all_gather.cc +++ b/src/collectives/all_gather.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -11,7 +11,8 @@ NCCL_API(ncclResult_t, ncclAllGather, const void* sendbuff, void* recvbuff, size ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream); ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream) { - struct ncclInfo info = { ncclCollAllGather, "AllGather", + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncAllGather, "AllGather", sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */ ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS }; return ncclEnqueueCheck(&info); diff --git a/src/collectives/all_reduce.cc b/src/collectives/all_reduce.cc index 7796d5b..b67f3be 100644 --- a/src/collectives/all_reduce.cc +++ b/src/collectives/all_reduce.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -10,7 +10,8 @@ NCCL_API(ncclResult_t, ncclAllReduce, const void* sendbuff, void* recvbuff, size ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream); ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream) { - struct ncclInfo info = { ncclCollAllReduce, "AllReduce", + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncAllReduce, "AllReduce", sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */ ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS }; return ncclEnqueueCheck(&info); diff --git a/src/collectives/broadcast.cc b/src/collectives/broadcast.cc index 042301b..db0fb49 100644 --- a/src/collectives/broadcast.cc +++ b/src/collectives/broadcast.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -11,7 +11,8 @@ NCCL_API(ncclResult_t, ncclBroadcast, const void* sendbuff, void* recvbuff, size ncclComm_t comm, cudaStream_t stream); ncclResult_t ncclBroadcast(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, int root, ncclComm_t comm, cudaStream_t stream) { - struct ncclInfo info = { ncclCollBroadcast, "Broadcast", + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncBroadcast, "Broadcast", sendbuff, recvbuff, count, datatype, ncclSum, root, comm, stream, /* Args */ BROADCAST_CHUNKSTEPS, BROADCAST_SLICESTEPS }; return ncclEnqueueCheck(&info); diff --git a/src/collectives/device/Makefile b/src/collectives/device/Makefile index 001059c..3796fb1 100644 --- a/src/collectives/device/Makefile +++ b/src/collectives/device/Makefile @@ -1,5 +1,5 @@ # -# Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. # # See LICENSE.txt for license information # @@ -10,7 +10,7 @@ include ../../../makefiles/version.mk BUILDDIR ?= $(abspath ../../../build) OBJDIR := $(BUILDDIR)/obj/collectives/device -LIBSRCFILES := all_reduce.cu broadcast.cu reduce.cu all_gather.cu reduce_scatter.cu +LIBSRCFILES := all_reduce.cu broadcast.cu reduce.cu all_gather.cu reduce_scatter.cu sendrecv.cu LIBSRCFILES += functions.cu diff --git a/src/collectives/device/all_gather.cu b/src/collectives/device/all_gather.cu index 109c341..4022e2e 100644 --- a/src/collectives/device/all_gather.cu +++ b/src/collectives/device/all_gather.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,4 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_C(ncclAllGather, ncclCollAllGather); +IMPL_COLL_C(AllGather); diff --git a/src/collectives/device/all_gather.h b/src/collectives/device/all_gather.h index 0ad5ba9..e057dc8 100644 --- a/src/collectives/device/all_gather.h +++ b/src/collectives/device/all_gather.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,188 +8,201 @@ #include "primitives.h" #include "collectives.h" -template<int UNROLL, class FUNC, typename T> -__device__ void ncclAllGatherRingKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->N; - const int nranks = comm->nRanks; - const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * ALLGATHER_CHUNKSTEPS; - const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - ncclPrimitives<UNROLL, ALLGATHER_CHUNKSTEPS/ALLGATHER_SLICESTEPS, ALLGATHER_SLICESTEPS, T, 1, 1, FUNC> - prims(tid, args->nThreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, args->opCount); - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,args->nChannels)); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + bid*realChunkSize; - - /////////////// begin AllGather steps /////////////// - ssize_t offset; - int nelem = min(realChunkSize, size-chunkOffset); - int rankDest; - - // step 0: push data to next GPU - rankDest = ring->devUserRanks[0]; - offset = chunkOffset + rankDest * size; - - if (thisInput + chunkOffset == thisOutput + offset) { // In place - prims.directSend(thisInput+chunkOffset, offset, nelem); - } else { - prims.directCopySend(thisInput+chunkOffset, thisOutput+offset, offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllGather, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * ALLGATHER_CHUNKSTEPS; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*(ssize_t)chunkSize; + const ssize_t size = args->coll.count; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + ncclPrimitives<UNROLL, ALLGATHER_CHUNKSTEPS/ALLGATHER_SLICESTEPS, ALLGATHER_SLICESTEPS, T, 1, 1, 1, FUNC> + prims(tid, nthreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0); + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nChannels)); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + bid*realChunkSize; + + /////////////// begin AllGather steps /////////////// + ssize_t offset; + int nelem = min(realChunkSize, size-chunkOffset); + int rankDest; + + // step 0: push data to next GPU + rankDest = ring->devUserRanks[0]; + offset = chunkOffset + rankDest * size; + + if (thisInput + chunkOffset == thisOutput + offset) { // In place + prims.directSend(thisInput+chunkOffset, offset, nelem); + } else { + prims.directCopySend(thisInput+chunkOffset, thisOutput+offset, offset, nelem); + } + + // k-2 steps: copy to next GPU + for (int j=1; j<nranks-1; ++j) { + rankDest = ring->devUserRanks[nranks-j]; + offset = chunkOffset + rankDest * size; + + prims.directRecvCopySend(thisOutput+offset, offset, nelem); + } + + // Make final copy from buffer to dest. + rankDest = ring->devUserRanks[1]; + offset = chunkOffset + rankDest * size; + + // Final wait/copy. + prims.directRecv(thisOutput+offset, offset, nelem); + } } - - // k-2 steps: copy to next GPU - for (int j=1; j<nranks-1; ++j) { - rankDest = ring->devUserRanks[nranks-j]; - offset = chunkOffset + rankDest * size; - - prims.directRecvCopySend(thisOutput+offset, offset, nelem); +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllGather, NCCL_ALGO_RING, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepLines, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + if (size-gridOffset < loopSize) { + chunkSize = args->coll.lastChunkSize; + } + ssize_t chunkOffset = gridOffset + bid*chunkSize; + + /////////////// begin AllGather steps /////////////// + ssize_t offset; + int nelem = min(chunkSize, size-chunkOffset); + int rankDest; + + // step 0: push data to next GPU + rankDest = ring->devUserRanks[0]; + offset = chunkOffset + rankDest * size; + + if (thisInput + chunkOffset == thisOutput + offset) { // In place + LLprims.send(thisInput+chunkOffset, nelem); + } else { + LLprims.copySend(thisInput+chunkOffset, thisOutput+offset, nelem); + } + + // k-2 steps: copy to next GPU + for (int j=1; j<nranks-1; ++j) { + rankDest = ring->devUserRanks[nranks-j]; + offset = chunkOffset + rankDest * size; + + LLprims.recvCopySend(thisOutput+offset, nelem); + } + + // step k-1: final store + rankDest = ring->devUserRanks[1]; + offset = chunkOffset + rankDest * size; + + LLprims.recv(thisOutput+offset, nelem); + } } - - // Make final copy from buffer to dest. - rankDest = ring->devUserRanks[1]; - offset = chunkOffset + rankDest * size; - - // Final wait/copy. - prims.directRecv(thisOutput+offset, offset, nelem); - } -} - -template<int UNROLL, class FUNC, typename T> -__device__ void ncclAllGatherTreeKernel(struct CollectiveArgs* args) { } - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllGatherRingLLKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - //const int rank = comm->rank; - const int nranks = comm->nRanks; - ssize_t chunkSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - if (size-gridOffset < loopSize) { - chunkSize = args->lastChunkSize; - } - ssize_t chunkOffset = gridOffset + bid*chunkSize; - - /////////////// begin AllGather steps /////////////// - ssize_t offset; - int nelem = min(chunkSize, size-chunkOffset); - int rankDest; - - // step 0: push data to next GPU - rankDest = ring->devUserRanks[0]; - offset = chunkOffset + rankDest * size; - - if (thisInput + chunkOffset == thisOutput + offset) { // In place - LLprims.send(thisInput+chunkOffset, nelem); - } else { - LLprims.copySend(thisInput+chunkOffset, thisOutput+offset, nelem); - } - - // k-2 steps: copy to next GPU - for (int j=1; j<nranks-1; ++j) { - rankDest = ring->devUserRanks[nranks-j]; - offset = chunkOffset + rankDest * size; - - LLprims.recvCopySend(thisOutput+offset, nelem); - } - - // step k-1: final store - rankDest = ring->devUserRanks[1]; - offset = chunkOffset + rankDest * size; - - LLprims.recv(thisOutput+offset, nelem); - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllGatherTreeLLKernel(struct CollectiveArgs* args) { } +}; #include "prims_ll128.h" -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllGatherRingLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - //const int rank = comm->rank; - const int nranks = comm->nRanks; - ssize_t chunkSize = (NCCL_LL128_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; - - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, args->nChannels*minChunkSize)*minChunkSize, chunkSize); - - ssize_t chunkOffset = gridOffset + bid*chunkSize; - - /////////////// begin AllGather steps /////////////// - ssize_t offset; - int nelem = min(chunkSize, size-chunkOffset); - int rankDest; - - // step 0: push data to next GPU - rankDest = ring->devUserRanks[0]; - offset = chunkOffset + rankDest * size; - - if (thisInput + chunkOffset == thisOutput + offset) { // In place - LLprims.send(thisInput+chunkOffset, nelem); - } else { - LLprims.copySend(thisInput+chunkOffset, thisOutput+offset, nelem); - } - - // k-2 steps: copy to next GPU - for (int j=1; j<nranks-1; ++j) { - rankDest = ring->devUserRanks[nranks-j]; - offset = chunkOffset + rankDest * size; - - LLprims.recvCopySend(thisOutput+offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllGather, NCCL_ALGO_RING, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); + ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T)); + // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. + const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + chunkSize = min(DIVUP(size-gridOffset, nChannels*minChunkSize)*minChunkSize, chunkSize); + + ssize_t chunkOffset = gridOffset + bid*chunkSize; + + /////////////// begin AllGather steps /////////////// + ssize_t offset; + int nelem = min(chunkSize, size-chunkOffset); + int rankDest; + + // step 0: push data to next GPU + rankDest = ring->devUserRanks[0]; + offset = chunkOffset + rankDest * size; + + if (thisInput + chunkOffset == thisOutput + offset) { // In place + LLprims.send(thisInput+chunkOffset, nelem); + } else { + LLprims.copySend(thisInput+chunkOffset, thisOutput+offset, nelem); + } + + // k-2 steps: copy to next GPU + for (int j=1; j<nranks-1; ++j) { + rankDest = ring->devUserRanks[nranks-j]; + offset = chunkOffset + rankDest * size; + + LLprims.recvCopySend(thisOutput+offset, nelem); + } + + // step k-1: final store + rankDest = ring->devUserRanks[1]; + offset = chunkOffset + rankDest * size; + + LLprims.recv(thisOutput+offset, nelem); + } } +}; - // step k-1: final store - rankDest = ring->devUserRanks[1]; - offset = chunkOffset + rankDest * size; +template<int PROTO, class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllGather, NCCL_ALGO_TREE, PROTO, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; - LLprims.recv(thisOutput+offset, nelem); - } -} +template<int PROTO, class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllGather, NCCL_ALGO_COLLNET, PROTO, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllGatherTreeLL128Kernel(struct CollectiveArgs* args) { } diff --git a/src/collectives/device/all_reduce.cu b/src/collectives/device/all_reduce.cu index 85d007e..e7c3c28 100644 --- a/src/collectives/device/all_reduce.cu +++ b/src/collectives/device/all_reduce.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,4 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_R(ncclAllReduce, ncclCollAllReduce); +IMPL_COLL_R(AllReduce); diff --git a/src/collectives/device/all_reduce.h b/src/collectives/device/all_reduce.h index 2449c2b..fe2e6fc 100644 --- a/src/collectives/device/all_reduce.h +++ b/src/collectives/device/all_reduce.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,398 +8,442 @@ #include "primitives.h" #include "collectives.h" -template<int UNROLL, class FUNC, typename T> -__device__ void ncclAllReduceRingKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->N; - const int nranks = comm->nRanks; - const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * ALLREDUCE_CHUNKSTEPS; - const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - ncclPrimitives<UNROLL, ALLREDUCE_CHUNKSTEPS/ALLREDUCE_SLICESTEPS, ALLREDUCE_SLICESTEPS, T, 1, 1, FUNC> - prims(tid, args->nThreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, args->opCount); - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += nranks*loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nranks*args->nChannels)); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + bid*nranks*realChunkSize; - - /////////////// begin AllReduce steps /////////////// - ssize_t offset; - int nelem; - int slice; - - // step 0: push data to next GPU - slice = ring->devUserRanks[nranks-1]; - offset = chunkOffset + slice * realChunkSize; - nelem = min(realChunkSize, size-offset); - - prims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; j<nranks; ++j) { - slice = ring->devUserRanks[nranks-j]; - offset = chunkOffset + slice * realChunkSize; +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * ALLREDUCE_CHUNKSTEPS; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*(ssize_t)chunkSize; + const ssize_t size = args->coll.count; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + ncclPrimitives<UNROLL, ALLREDUCE_CHUNKSTEPS/ALLREDUCE_SLICESTEPS, ALLREDUCE_SLICESTEPS, T, 1, 1, 1, FUNC> + prims(tid, nthreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0); + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += nranks*loopSize) { + ssize_t realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nranks*nChannels)); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + bid*nranks*realChunkSize; + + /////////////// begin AllReduce steps /////////////// + ssize_t offset; + int nelem; + int chunk; + + // step 0: push data to next GPU + chunk = ring->devUserRanks[nranks-1]; + offset = chunkOffset + chunk * realChunkSize; nelem = min(realChunkSize, size-offset); - prims.recvReduceSend(thisInput+offset, nelem); - } + prims.send(thisInput+offset, nelem); - // step k-1: reduce this buffer and data, which will produce the final - // result that we store in this data and push to the next GPU - slice = ring->devUserRanks[0]; - offset = chunkOffset + slice * realChunkSize; - nelem = min(realChunkSize, size-offset); + // k-2 steps: reduce and copy to next GPU + for (int j=2; j<nranks; ++j) { + chunk = ring->devUserRanks[nranks-j]; + offset = chunkOffset + chunk * realChunkSize; + nelem = min(realChunkSize, size-offset); - prims.directRecvReduceCopySend(thisInput+offset, thisOutput+offset, offset, nelem); + prims.recvReduceSend(thisInput+offset, nelem); + } - // k-2 steps: copy to next GPU - for (int j=1; j<nranks-1; ++j) { - slice = ring->devUserRanks[nranks-j]; - offset = chunkOffset + slice * realChunkSize; + // step k-1: reduce this buffer and data, which will produce the final + // result that we store in this data and push to the next GPU + chunk = ring->devUserRanks[0]; + offset = chunkOffset + chunk * realChunkSize; nelem = min(realChunkSize, size-offset); - prims.directRecvCopySend(thisOutput+offset, offset, nelem); - } + prims.directRecvReduceCopySend(thisInput+offset, thisOutput+offset, offset, nelem); - // Make final copy from buffer to dest. - slice = ring->devUserRanks[1]; - offset = chunkOffset + slice * realChunkSize; - nelem = min(realChunkSize, size-offset); + // k-2 steps: copy to next GPU + for (int j=1; j<nranks-1; ++j) { + chunk = ring->devUserRanks[nranks-j]; + offset = chunkOffset + chunk * realChunkSize; + nelem = min(realChunkSize, size-offset); - // Final wait/copy. - prims.directRecv(thisOutput+offset, offset, nelem); - } -} - -template<int UNROLL, class FUNC, typename T> -__device__ void ncclAllReduceTreeKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - const ssize_t size = args->N; - const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS); - int chunkSize = args->lastChunkSize; - const ssize_t minChunkSize = nthreads*8*sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = args->nChannels*chunkSize; - - if (loopSize > size) { - chunkSize = DIVUP(size, args->nChannels*minChunkSize)*minChunkSize; + prims.directRecvCopySend(thisOutput+offset, offset, nelem); + } + + // Make final copy from buffer to dest. + chunk = ring->devUserRanks[1]; + offset = chunkOffset + chunk * realChunkSize; + nelem = min(realChunkSize, size-offset); + + // Final wait/copy. + prims.directRecv(thisOutput+offset, offset, nelem); + } } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_TREE, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-2*WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclTree* tree = &channel->tree; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + int chunkSize = args->coll.lastChunkSize; + const ssize_t minChunkSize = nthreads*8*sizeof(uint64_t) / sizeof(T); + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + if (loopSize > size) { + chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; + } - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; - do { - struct ncclTree* tree = &channel->treeUp; - // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) - ncclPrimitives<UNROLL, 1, 1, T, NCCL_MAX_TREE_ARITY, 1, FUNC> prims(tid, args->nThreads, tree->down, &tree->up, NULL, stepSize, channel, comm, args->opCount); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Up - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - prims.send(thisInput+offset, nelem); - } else { - prims.recvReduceSend(thisInput+offset, nelem); +#if 1 + if (tid < nthreads+WARP_SIZE) { + // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) + ncclPrimitives<UNROLL, 1, 1, T, NCCL_MAX_DEV_ARITY, 1, 0, FUNC> + prims(tid, nthreads, tree->down, &tree->up, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Up + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->up == -1) { + prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { + prims.send(thisInput+offset, nelem); + } else { + prims.recvReduceSend(thisInput+offset, nelem); + } } } - } while(0); - do { - struct ncclTree* tree = &channel->treeDn; - // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) - ncclPrimitives<UNROLL, 1, 1, T, 1, NCCL_MAX_TREE_ARITY, FUNC> prims(tid, args->nThreads, &tree->up, tree->down, NULL, stepSize, channel, comm, args->opCount); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Down - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - prims.send(thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - prims.recv(thisOutput+offset, nelem); + if (tid < nthreads+WARP_SIZE) { + // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) + ncclPrimitives<UNROLL, 1, 1, T, 1, NCCL_MAX_DEV_ARITY, 1, FUNC> + prims(tid, nthreads, &tree->up, tree->down, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Down + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->up == -1) { + prims.directSend(thisOutput+offset, offset, nelem); + } else if (tree->down[0] == -1) { + prims.directRecv(thisOutput+offset, offset, nelem); + } else { + prims.directRecvCopySend(thisOutput+offset, offset, nelem); + } + } + } +#else + int nthreadsSplit = nthreads/2; + if (nthreadsSplit == 256) nthreadsSplit += 64; + if (tree->up == -1) { + if (tid < nthreads+WARP_SIZE) { + // ReduceAndBroadcast : max number of recv is 3, max number of send is 3 + ncclPrimitives<UNROLL, 1, 1, T, NCCL_MAX_DEV_ARITY, NCCL_MAX_DEV_ARITY, 1, FUNC> + prims(tid, nthreads, tree->down, tree->down, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + prims.directRecvReduceCopySend(thisInput+offset, thisOutput+offset, offset, nelem); + } + } + } else { + if (tid < nthreadsSplit + WARP_SIZE) { + // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) + ncclPrimitives<UNROLL, 1, 1, T, NCCL_MAX_DEV_ARITY, 1, 0, FUNC> + prims(tid, nthreadsSplit, tree->down, &tree->up, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Up + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->down[0] == -1) { + prims.send(thisInput+offset, nelem); + } else { + prims.recvReduceSend(thisInput+offset, nelem); + } + } } else { - prims.recvCopySend(thisOutput+offset, nelem); + // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) + ncclPrimitives<UNROLL, 1, 1, T, 1, NCCL_MAX_DEV_ARITY, 1, FUNC> + prims(tid-nthreadsSplit-WARP_SIZE, nthreads-nthreadsSplit, &tree->up, tree->down, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 2); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Down + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->down[0] == -1) { + prims.directRecv(thisOutput+offset, offset, nelem); + } else { + prims.directRecvCopySend(thisOutput+offset, offset, nelem); + } + } } } - } while(0); -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllReduceRingLLKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - //const int rank = comm->rank; - const int nranks = comm->nRanks; - ssize_t chunkSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t) / sizeof(T); - const ssize_t minChunkSize = nthreads * (sizeof(uint64_t)) / sizeof(T); - - const ssize_t loopSize = args->nChannels*nranks*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, args->nChannels*nranks*minChunkSize)*minChunkSize, chunkSize); - - /////////////// begin AllReduce steps /////////////// - ssize_t offset; - int nelem; - int slice; - - // step 0: push data to next GPU - slice = ring->devUserRanks[nranks-1]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; j<nranks; ++j) { - slice = ring->devUserRanks[nranks-j]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.recvReduceSend(thisInput+offset, nelem); +#endif + } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_COLLNET, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclTree* tree = &channel->collTree; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + int chunkSize = args->coll.lastChunkSize; + const ssize_t minChunkSize = nthreads*8*sizeof(uint64_t) / sizeof(T); + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + if (loopSize > size) { + chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; } - // step k-1: reduce this buffer and data, which will produce the final - // result that we store in this data and push to the next GPU - slice = ring->devUserRanks[0]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; - // k-2 steps: copy to next GPU - for (int j=1; j<nranks-1; ++j) { - slice = ring->devUserRanks[nranks-j]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); + if (blockIdx.x < nChannels) { // first half of the channels do reduce + ncclPrimitives<UNROLL, 1, 1, T, 1, 1, 0, FUNC> + prims(tid, nthreads, tree->down, &tree->up, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Up + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->up == -1) { + prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { + prims.send(thisInput+offset, nelem); + } else { + prims.recvReduceSend(thisInput+offset, nelem); + } + } + } - LLprims.recvCopySend(thisOutput+offset, nelem); + if (blockIdx.x >= nChannels) { // second half of the channels do broadcast + ncclPrimitives<UNROLL, 1, 1, T, 1, 1, 0, FUNC> + prims(tid, nthreads, &tree->up, tree->down, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Down + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->up == -1) { + prims.send(thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { + prims.recv(thisOutput+offset, nelem); + } else { + prims.recvCopySend(thisOutput+offset, nelem); + } + } } + } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_RING, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const ssize_t minChunkSize = nthreads * (sizeof(uint64_t)) / sizeof(T); + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*nranks*chunkSize; + const ssize_t size = args->coll.count; + + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepLines, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; - // Make final copy from buffer to dest. - slice = ring->devUserRanks[1]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + chunkSize = min(DIVUP(size-gridOffset, nChannels*nranks*minChunkSize)*minChunkSize, chunkSize); - // Here we need to copy from buffer to this output. - LLprims.recv(thisOutput+offset, nelem); - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllReduceTreeLLKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - const ssize_t size = args->N; - ssize_t chunkSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t) / sizeof(T); - const ssize_t minChunkSize = nthreads*sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = args->nChannels*chunkSize; - - if (loopSize > size) { - chunkSize = DIVUP(size, args->nChannels*minChunkSize)*minChunkSize; - } + /////////////// begin AllReduce steps /////////////// + ssize_t offset; + int nelem; + int chunk; - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; + // step 0: push data to next GPU + chunk = ring->devUserRanks[nranks-1]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); - do { - struct ncclTree* tree = &channel->treeUp; - // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) - ncclLLPrimitives<T, FUNC, NCCL_MAX_TREE_ARITY, 1> LLprims(tid, nthreads, tree->down, &tree->up, channel, comm, args->opCount); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Up - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - LLprims.send(thisInput+offset, nelem); - } else { - LLprims.recvReduceSend(thisInput+offset, nelem); - } - } - } while(0); + LLprims.send(thisInput+offset, nelem); - do { - struct ncclTree* tree = &channel->treeDn; - // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) - ncclLLPrimitives<T, FUNC, 1, NCCL_MAX_TREE_ARITY> LLprims(tid, nthreads, &tree->up, tree->down, channel, comm, args->opCount); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - // Down - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - if (tree->up == -1) { - LLprims.send(thisOutput+offset, nelem); - } else if (tree->down[0] == -1) { - LLprims.recv(thisOutput+offset, nelem); - } else { - LLprims.recvCopySend(thisOutput+offset, nelem); + // k-2 steps: reduce and copy to next GPU + for (int j=2; j<nranks; ++j) { + chunk = ring->devUserRanks[nranks-j]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); + + LLprims.recvReduceSend(thisInput+offset, nelem); } - } - } while(0); -} -#include "prims_ll128.h" -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllReduceRingLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - //const int rank = comm->rank; - const int nranks = comm->nRanks; - ssize_t chunkSize = (NCCL_LL128_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; - - const ssize_t loopSize = args->nChannels*nranks*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, args->nChannels*nranks*minChunkSize)*minChunkSize, chunkSize); - - /////////////// begin AllReduce steps /////////////// - ssize_t offset; - int nelem; - int slice; - - // step 0: push data to next GPU - slice = ring->devUserRanks[nranks-1]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; j<nranks; ++j) { - slice = ring->devUserRanks[nranks-j]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; + // step k-1: reduce this buffer and data, which will produce the final + // result that we store in this data and push to the next GPU + chunk = ring->devUserRanks[0]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; nelem = min(chunkSize, size-offset); - LLprims.recvReduceSend(thisInput+offset, nelem); - } + LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); - // step k-1: reduce this buffer and data, which will produce the final - // result that we store in this data and push to the next GPU - slice = ring->devUserRanks[0]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); + // k-2 steps: copy to next GPU + for (int j=1; j<nranks-1; ++j) { + chunk = ring->devUserRanks[nranks-j]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); - LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); + LLprims.recvCopySend(thisOutput+offset, nelem); + } - // k-2 steps: copy to next GPU - for (int j=1; j<nranks-1; ++j) { - slice = ring->devUserRanks[nranks-j]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; + // Make final copy from buffer to dest. + chunk = ring->devUserRanks[1]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; nelem = min(chunkSize, size-offset); - LLprims.recvCopySend(thisOutput+offset, nelem); + // Here we need to copy from buffer to this output. + LLprims.recv(thisOutput+offset, nelem); } - - // Make final copy from buffer to dest. - slice = ring->devUserRanks[1]; - offset = gridOffset + (slice*args->nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - // Here we need to copy from buffer to this output. - LLprims.recv(thisOutput+offset, nelem); - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclAllReduceTreeLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclTree* treeUp = &channel->treeUp; - struct ncclTree* treeDn = &channel->treeDn; - const ssize_t size = args->N; - ssize_t chunkSize = args->lastChunkSize; - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/8; - const ssize_t loopSize = args->nChannels*chunkSize; - int nthreadsSplit = NCCL_LL128_SPLIT(nthreads); - - if (loopSize > size) { - chunkSize = DIVUP(size, args->nChannels*minChunkSize)*minChunkSize; } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_TREE, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclTree* tree = &channel->tree; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const ssize_t minChunkSize = nthreads*sizeof(uint64_t) / sizeof(T); + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + if (loopSize > size) { + chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; + } - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; - if (treeUp->up == -1) { - // ReduceAndBroadcast : max number of recv is 3, max number of send is 3 - ncclLL128Primitives<T, FUNC, NCCL_MAX_TREE_ARITY, NCCL_MAX_TREE_ARITY> LLprims(tid, nthreads, treeUp->down, treeDn->down, channel, comm, args->opCount); - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - ssize_t offset = gridOffset + bid*chunkSize; - int nelem = min(chunkSize, size-offset); - LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); - } - } else { - if (tid < nthreadsSplit) { + do { // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) - ncclLL128Primitives<T, FUNC, NCCL_MAX_TREE_ARITY, 1> LLprims(tid, nthreadsSplit, treeUp->down, &treeUp->up, channel, comm, args->opCount); + ncclLLPrimitives<T, FUNC, NCCL_MAX_DEV_ARITY, 1> LLprims(tid, nthreads, tree->down, &tree->up, stepLines, channel, comm); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { // Up ssize_t offset = gridOffset + bid*chunkSize; int nelem = min(chunkSize, size-offset); - if (treeUp->down[0] == -1) { + if (tree->up == -1) { + LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { LLprims.send(thisInput+offset, nelem); } else { LLprims.recvReduceSend(thisInput+offset, nelem); } } - } else { + } while(0); + + do { // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) - ncclLL128Primitives<T, FUNC, 1, NCCL_MAX_TREE_ARITY> LLprims(tid-nthreadsSplit, nthreads-nthreadsSplit, &treeDn->up, treeDn->down, channel, comm, args->opCount); + ncclLLPrimitives<T, FUNC, 1, NCCL_MAX_DEV_ARITY> LLprims(tid, nthreads, &tree->up, tree->down, stepLines, channel, comm); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Down + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->up == -1) { + LLprims.send(thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { + LLprims.recv(thisOutput+offset, nelem); + } else { + LLprims.recvCopySend(thisOutput+offset, nelem); + } + } + } while(0); + } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_COLLNET, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclTree* tree = &channel->collTree; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const ssize_t minChunkSize = nthreads*sizeof(uint64_t) / sizeof(T); + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + if (loopSize > size) { + chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; + } + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + if (blockIdx.x < nChannels) { // first half of the channels do reduce + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, tree->down, &tree->up, stepLines, channel, comm); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Up + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->up == -1) { + LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { + LLprims.send(thisInput+offset, nelem); + } else { + LLprims.recvReduceSend(thisInput+offset, nelem); + } + } + } + + if (blockIdx.x >= nChannels) { // second half of the channels do broadcast + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &tree->up, tree->down, stepLines, channel, comm); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { // Down ssize_t offset = gridOffset + bid*chunkSize; int nelem = min(chunkSize, size-offset); - if (treeDn->down[0] == -1) { + if (tree->up == -1) { + LLprims.send(thisOutput+offset, nelem); + } else if (tree->down[0] == -1) { LLprims.recv(thisOutput+offset, nelem); } else { LLprims.recvCopySend(thisOutput+offset, nelem); @@ -407,4 +451,154 @@ __device__ void ncclAllReduceTreeLL128Kernel(struct CollectiveArgs* args) { } } } -} +}; + +#include "prims_ll128.h" +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_RING, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); + ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T)); + // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. + const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*nranks*chunkSize; + const ssize_t size = args->coll.count; + + ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + chunkSize = min(DIVUP(size-gridOffset, nChannels*nranks*minChunkSize)*minChunkSize, chunkSize); + + /////////////// begin AllReduce steps /////////////// + ssize_t offset; + int nelem; + int chunk; + + // step 0: push data to next GPU + chunk = ring->devUserRanks[nranks-1]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); + + LLprims.send(thisInput+offset, nelem); + + // k-2 steps: reduce and copy to next GPU + for (int j=2; j<nranks; ++j) { + chunk = ring->devUserRanks[nranks-j]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); + + LLprims.recvReduceSend(thisInput+offset, nelem); + } + + // step k-1: reduce this buffer and data, which will produce the final + // result that we store in this data and push to the next GPU + chunk = ring->devUserRanks[0]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); + + LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); + + // k-2 steps: copy to next GPU + for (int j=1; j<nranks-1; ++j) { + chunk = ring->devUserRanks[nranks-j]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); + + LLprims.recvCopySend(thisOutput+offset, nelem); + } + + // Make final copy from buffer to dest. + chunk = ring->devUserRanks[1]; + offset = gridOffset + (chunk*nChannels+bid) * chunkSize; + nelem = min(chunkSize, size-offset); + + // Here we need to copy from buffer to this output. + LLprims.recv(thisOutput+offset, nelem); + } + } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_TREE, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclTree* tree = &channel->tree; + const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); + ssize_t chunkSize = args->coll.lastChunkSize; + const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/8; + const ssize_t loopSize = nChannels*chunkSize; + int nthreadsSplit = NCCL_LL128_SPLIT(nthreads); + const ssize_t size = args->coll.count; + + if (loopSize > size) { + chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; + } + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + if (tree->up == -1) { + // ReduceAndBroadcast : max number of recv is 3, max number of send is 3 + ncclLL128Primitives<T, FUNC, NCCL_MAX_DEV_ARITY, NCCL_MAX_DEV_ARITY> LLprims(tid, nthreads, tree->down, tree->down, stepSize, channel, comm); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); + } + } else { + if (tid < nthreadsSplit) { + // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) + ncclLL128Primitives<T, FUNC, NCCL_MAX_DEV_ARITY, 1> LLprims(tid, nthreadsSplit, tree->down, &tree->up, stepSize, channel, comm); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Up + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->down[0] == -1) { + LLprims.send(thisInput+offset, nelem); + } else { + LLprims.recvReduceSend(thisInput+offset, nelem); + } + } + } else { + // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) + ncclLL128Primitives<T, FUNC, 1, NCCL_MAX_DEV_ARITY> LLprims(tid-nthreadsSplit, nthreads-nthreadsSplit, &tree->up, tree->down, stepSize, channel, comm); + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + // Down + ssize_t offset = gridOffset + bid*chunkSize; + int nelem = min(chunkSize, size-offset); + if (tree->down[0] == -1) { + LLprims.recv(thisOutput+offset, nelem); + } else { + LLprims.recvCopySend(thisOutput+offset, nelem); + } + } + } + } + } +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_COLLNET, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: +__device__ void run(struct ncclWorkElem* args) { } +}; diff --git a/src/collectives/device/broadcast.cu b/src/collectives/device/broadcast.cu index 8c8dbb6..7759585 100644 --- a/src/collectives/device/broadcast.cu +++ b/src/collectives/device/broadcast.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,4 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_C(ncclBroadcast, ncclCollBroadcast); +IMPL_COLL_C(Broadcast); diff --git a/src/collectives/device/broadcast.h b/src/collectives/device/broadcast.h index de8b989..72216ac 100644 --- a/src/collectives/device/broadcast.h +++ b/src/collectives/device/broadcast.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,143 +8,155 @@ #include "primitives.h" #include "collectives.h" -template<int UNROLL, class FUNC, typename T> -__device__ void ncclBroadcastRingKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->N; - const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * BROADCAST_CHUNKSTEPS; - const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize; - const int rank = ring->devUserRanks[0]; - const int nextRank = ring->devUserRanks[1]; - const int root = args->root; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - ncclPrimitives<UNROLL, BROADCAST_CHUNKSTEPS/BROADCAST_SLICESTEPS, BROADCAST_SLICESTEPS, T, 1, 1, FUNC> - prims(tid, args->nThreads, &ring->prev, &ring->next, NULL, stepSize, channel, comm, args->opCount); - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,args->nChannels)); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t offset = gridOffset + bid*realChunkSize; - int nelem = min(realChunkSize, size-offset); - - if (rank == root) { - if (thisInput == thisOutput) { - prims.send(thisInput+offset, nelem); - } else { - prims.copySend(thisInput+offset, thisOutput+offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * BROADCAST_CHUNKSTEPS; + const ssize_t loopSize = nChannels*(ssize_t)chunkSize; + const ssize_t size = args->coll.count; + const int rank = ring->devUserRanks[0]; + const int nextRank = ring->devUserRanks[1]; + const int root = args->coll.root; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + ncclPrimitives<UNROLL, BROADCAST_CHUNKSTEPS/BROADCAST_SLICESTEPS, BROADCAST_SLICESTEPS, T, 1, 1, 0, FUNC> + prims(tid, nthreads, &ring->prev, &ring->next, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nChannels)); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t offset = gridOffset + bid*realChunkSize; + int nelem = min(realChunkSize, size-offset); + + if (rank == root) { + if (thisInput == thisOutput) { + prims.send(thisInput+offset, nelem); + } else { + prims.copySend(thisInput+offset, thisOutput+offset, nelem); + } + } else if (nextRank == root) { + prims.recv(thisOutput+offset, nelem); + } else { + prims.recvCopySend(thisOutput+offset, nelem); + } } - } else if (nextRank == root) { - prims.recv(thisOutput+offset, nelem); - } else { - prims.recvCopySend(thisOutput+offset, nelem); } - } -} - -template<int UNROLL, class FUNC, typename T> -__device__ void ncclBroadcastTreeKernel(struct CollectiveArgs* args) { } - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclBroadcastRingLLKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - const int rank = ring->devUserRanks[0]; - const int nextRank = ring->devUserRanks[1]; - const int root = args->root; - - ssize_t chunkSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - if (size-gridOffset < loopSize) { - chunkSize = args->lastChunkSize; - } - ssize_t offset = gridOffset + bid*chunkSize; - - int nelem = min(chunkSize, size-offset); - if (rank == root) { - if (thisInput == thisOutput) { - LLprims.send(thisInput+offset, nelem); - } else { - LLprims.copySend(thisInput + offset, thisOutput + offset, nelem); +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_RING, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + const int rank = ring->devUserRanks[0]; + const int nextRank = ring->devUserRanks[1]; + const int root = args->coll.root; + + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepLines, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + if (size-gridOffset < loopSize) { + chunkSize = args->coll.lastChunkSize; + } + ssize_t offset = gridOffset + bid*chunkSize; + + int nelem = min(chunkSize, size-offset); + if (rank == root) { + if (thisInput == thisOutput) { + LLprims.send(thisInput+offset, nelem); + } else { + LLprims.copySend(thisInput + offset, thisOutput + offset, nelem); + } + } else if (nextRank == root) { + LLprims.recv(thisOutput + offset, nelem); + } else { + LLprims.recvCopySend(thisOutput + offset, nelem); + } } - } else if (nextRank == root) { - LLprims.recv(thisOutput + offset, nelem); - } else { - LLprims.recvCopySend(thisOutput + offset, nelem); } - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclBroadcastTreeLLKernel(struct CollectiveArgs* args) { } +}; #include "prims_ll128.h" -template<int UNUSED, class FUNC, typename T> -__device__ void ncclBroadcastRingLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - const int rank = ring->devUserRanks[0]; - const int nextRank = ring->devUserRanks[1]; - const int root = args->root; - - ssize_t chunkSize = (NCCL_LL128_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, args->nChannels*minChunkSize)*minChunkSize, chunkSize); - ssize_t offset = gridOffset + bid*chunkSize; - - int nelem = min(chunkSize, size-offset); - if (rank == root) { - if (thisInput == thisOutput) { - LLprims.send(thisInput+offset, nelem); - } else { - LLprims.copySend(thisInput + offset, thisOutput + offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_RING, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); + ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T)); + const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + const int rank = ring->devUserRanks[0]; + const int nextRank = ring->devUserRanks[1]; + const int root = args->coll.root; + + ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + chunkSize = min(DIVUP(size-gridOffset, nChannels*minChunkSize)*minChunkSize, chunkSize); + ssize_t offset = gridOffset + bid*chunkSize; + + int nelem = min(chunkSize, size-offset); + if (rank == root) { + if (thisInput == thisOutput) { + LLprims.send(thisInput+offset, nelem); + } else { + LLprims.copySend(thisInput + offset, thisOutput + offset, nelem); + } + } else if (nextRank == root) { + LLprims.recv(thisOutput + offset, nelem); + } else { + LLprims.recvCopySend(thisOutput + offset, nelem); + } } - } else if (nextRank == root) { - LLprims.recv(thisOutput + offset, nelem); - } else { - LLprims.recvCopySend(thisOutput + offset, nelem); } - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclBroadcastTreeLL128Kernel(struct CollectiveArgs* args) { } +}; + +template<int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_TREE, PROTO, REDOP, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; + +template<int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_COLLNET, PROTO, REDOP, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; diff --git a/src/collectives/device/common.h b/src/collectives/device/common.h index 46eb9f5..265218a 100644 --- a/src/collectives/device/common.h +++ b/src/collectives/device/common.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2017-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2017-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -10,6 +10,15 @@ #include "collectives.h" #include "devcomm.h" + +#if __CUDA_ARCH__ >= 800 +#define COLL_UNROLL 8 +#define NCCL_MAX_DEV_ARITY (NCCL_MAX_TREE_ARITY-1) // Using balanced tree instead of split tree +#else +#define COLL_UNROLL 4 +#define NCCL_MAX_DEV_ARITY NCCL_MAX_TREE_ARITY +#endif + // Exit If Abort Barrier across CTA: make sure all threads exit consistently // Each thread sets a predicate to true if abort == 1 // all CTA's threads enter the barrier and do a popc on their predicates being True @@ -19,12 +28,12 @@ static inline __device__ void exitIfAbortBarrier(int abort) { asm ("{"); asm volatile (" .reg .pred barr_pred;"); asm volatile (" setp.eq.u32 barr_pred,%0,1;" :: "r"(abort)); - asm volatile (" bar.red.popc.u32 %0, 13, barr_pred;" : "=r"(popc)); + asm volatile (" bar.red.popc.u32 %0, 0, barr_pred;" : "=r"(popc)); asm ("}"); if (popc) { asm volatile ("exit;"); } } -typedef void(*ncclKern_t)(struct CollectiveArgs* args); +typedef void(*ncclKern_t)(struct ncclWorkElem* args); extern __device__ ncclKern_t ncclFuncs[]; static __device__ void load_parallel(void* dst, void* src, size_t size, int tid) { @@ -32,130 +41,143 @@ static __device__ void load_parallel(void* dst, void* src, size_t size, int tid) int* s = (int*)src; for (int o = tid; o < (size/sizeof(int)); o += blockDim.x) d[o] = s[o]; } -static __device__ void load_coll(struct ncclColl* localColl, struct ncclColl* hostColl, int tid, struct ncclDevComm* comm) { +static __device__ void load_coll(struct ncclWork* localWork, struct ncclWork* hostWork, int tid, struct ncclDevComm* comm) { + __syncthreads(); + load_parallel(localWork, hostWork, sizeof(struct ncclWork), tid); // Check whether the last operation was aborted and make sure all threads exit int abort = tid == 0 ? *(comm->abortFlag) : 0; exitIfAbortBarrier(abort); - load_parallel(localColl, hostColl, sizeof(struct ncclColl), tid); - __syncthreads(); - if (tid == 0) hostColl->active = 0; + if (tid == 0) hostWork->elems[0].active = 0; } -extern __device__ volatile uint64_t* ncclShmem; +template <ncclFunc_t FUNCTION, int ALGO, int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; + +struct ncclShmemPtrs { + void* srcs[NCCL_MAX_DEV_ARITY+1]; + void* dsts[NCCL_MAX_DEV_ARITY+1]; +}; + +struct ncclShmemData { + union { + volatile uint64_t data[NCCL_LL128_SHMEM_SIZE]; + struct ncclShmemPtrs ptrs[NCCL_MAX_GROUPS]; + }; + struct ncclWork localWork; +}; -/* Functions for aggregation case */ -#define IMPL_COLL_FUNC(coll, op, ncclFunc, dtype, ctype) \ -__device__ void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args) { \ - coll##Kernel<COLL_UNROLL, ncclFunc<ctype>, ctype>(args); \ +extern __device__ struct ncclShmemData *ncclShmem; +template <ncclFunc_t FUNCTION, int ALGO, int PROTO, class REDOP, typename T, int UNROLL, int FINDEX> +__device__ void ncclKernel(struct ncclWorkElem first) { + int tid = threadIdx.x; + int bid = blockIdx.x; + __shared__ struct ncclShmemData shmem; + ncclShmem = &shmem; + + auto f = ncclFunction<FUNCTION, ALGO, PROTO, REDOP, T, UNROLL>(); + + struct ncclDevComm* comm = first.comm; + struct ncclChannel* channel = comm->channels+bid; + struct ncclWorkElem* w = NULL; + uint16_t index = first.index; + + /* To optimize for latency, (only) the first operation is passed as argument.*/ + if (bid == 0 && first.funcIndex != FUNC_INDEX_P2P) w = &first; + + while (1) { + if (w == NULL) { + w = shmem.localWork.elems; + load_coll(&shmem.localWork, channel->workFifo+index, tid, comm); + } + if (tid < w->nThreads) { + if (w->funcIndex == FINDEX) { + f.run(w); + } else { + ncclFuncs[w->funcIndex](w); + } + } + index = (index+1) % NCCL_MAX_OPS; + if (w->active == 2) { + return; + } + w = NULL; + } } +// Only generate kernels for SUM #if NCCL_OP == 0 -/* Kernels with the first operation inlined */ -#define IMPL_COLL_KERN(coll, op, ncclFunc, dtype, ctype, fIndex) \ -__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl firstColl) { \ - int tid = threadIdx.x; \ - int bid = blockIdx.x; \ - __shared__ volatile uint64_t shmem[NCCL_LL128_SHMEM_SIZE]; \ - ncclShmem = shmem; \ - __shared__ struct ncclColl localColl; \ - \ - struct ncclDevComm* comm = firstColl.args.comm; \ - struct ncclChannel* channel = comm->channels+bid; \ - struct ncclColl* c; \ - if (bid == 0) { \ - /* To optimize for latency, (only) the first operation is passed as argument.*/ \ - c = &firstColl; \ - } else { \ - c = &localColl; \ - load_coll(c, channel->devCollectives+channel->collFifoHead, tid, comm); \ - } \ - while (1) { \ - if (tid < c->args.nThreads) { \ - if (c->funcIndex == fIndex) { \ - coll##Kernel<COLL_UNROLL, ncclFunc<ctype>, ctype>(&c->args); \ - } else { \ - ncclFuncs[c->funcIndex](&c->args); \ - } \ - } \ - int nextIndex = c->nextIndex; \ - if (tid == 0) channel->collFifoHead = nextIndex; \ - \ - if (c->active == 2) { \ - return; \ - } \ - \ - /* Load next collective operation*/ \ - c = &localColl; /* for bid 0 */ \ - load_coll(c, channel->devCollectives+nextIndex, tid, comm); \ - } \ +#define IMPL_COLL_KERN(func, algo, proto, redop, type, fIndex) \ +__global__ void NCCL_KERN_NAME(func, algo, proto, redop, type)(struct ncclWorkElem first) { \ + ncclKernel<ncclFunc##func, NCCL_ALGO_##algo, NCCL_PROTO_##proto, Func##redop<type>, type, COLL_UNROLL, fIndex>(first); \ } #else -#define IMPL_COLL_KERN(coll, op, ncclFunc, dtype, ctype, fIndex) +#define IMPL_COLL_KERN(func, algo, proto, redop, type, fInded) #endif +// Examples : AllReduce, RING, LL, Sum, uint8 +#define IMPL_COLL_FUNC(func, algo, proto, redop, type) \ +__device__ void NCCL_FUNC_NAME(func, algo, proto, redop, type)(struct ncclWorkElem* args) { \ + auto f = ncclFunction<ncclFunc##func, NCCL_ALGO_##algo, NCCL_PROTO_##proto, Func##redop<type>, type, COLL_UNROLL>(); \ + f.run(args); \ +} + // Only generate inline kernels for LL -#define IMPL_COLL4(coll, op, ncclFunc, dtype, ctype, ncclColl, ncclOp, ncclType, al) \ - IMPL_COLL_FUNC(coll##LL, op, ncclFunc, dtype, ctype) \ - IMPL_COLL_FUNC(coll##LL128, op, ncclFunc, dtype, ctype) \ - IMPL_COLL_FUNC(coll, op, ncclFunc, dtype, ctype) \ - IMPL_COLL_KERN(coll##LL, op, ncclFunc, dtype, ctype, FUNC_INDEX(ncclColl, ncclOp, ncclType, al, NCCL_PROTO_LL)) \ +#define IMPL_COLL4(func, algo, redop, type, ncclType) \ + IMPL_COLL_FUNC(func, algo, LL, redop, type) \ + IMPL_COLL_FUNC(func, algo, LL128, redop, type) \ + IMPL_COLL_FUNC(func, algo, SIMPLE, redop, type) \ + IMPL_COLL_KERN(func, algo, LL, redop, type, FUNC_INDEX(ncclFunc##func, nccl##redop, ncclType, NCCL_ALGO_##algo, NCCL_PROTO_LL)) \ -#define IMPL_COLL3(coll, op, ncclFunc, dtype, ctype, ncclColl, ncclOp, ncclType) \ - IMPL_COLL4(coll##Tree, op, ncclFunc, dtype, ctype, ncclColl, ncclOp, ncclType, NCCL_ALGO_TREE) \ - IMPL_COLL4(coll##Ring, op, ncclFunc, dtype, ctype, ncclColl, ncclOp, ncclType, NCCL_ALGO_RING) +#define IMPL_COLL3(func, redop, type, ncclType) \ + IMPL_COLL4(func, TREE, redop, type, ncclType) \ + IMPL_COLL4(func, RING, redop, type, ncclType) \ + IMPL_COLL4(func, COLLNET, redop, type, ncclType) #if NCCL_TYPE == 0 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, i8, int8_t, ncclColl, ncclOp, ncclInt8) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, int8_t, ncclInt8) #elif NCCL_TYPE == 1 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, u8, uint8_t, ncclColl, ncclOp, ncclUint8) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, uint8_t, ncclUint8) #elif NCCL_TYPE == 2 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, i32, int32_t, ncclColl, ncclOp, ncclInt32) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, int32_t, ncclInt32) #elif NCCL_TYPE == 3 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, u32, uint32_t, ncclColl, ncclOp, ncclUint32) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, uint32_t, ncclUint32) #elif NCCL_TYPE == 4 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, i64, int64_t, ncclColl, ncclOp, ncclInt64) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, int64_t, ncclInt64) #elif NCCL_TYPE == 5 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, u64, uint64_t, ncclColl, ncclOp, ncclUint64) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, uint64_t, ncclUint64) #elif NCCL_TYPE == 6 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, f16, half, ncclColl, ncclOp, ncclFloat16) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, half, ncclFloat16) #elif NCCL_TYPE == 7 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, f32, float, ncclColl, ncclOp, ncclFloat32) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, float, ncclFloat32) #elif NCCL_TYPE == 8 -#define IMPL_COLL2(coll, op, ncclFunc, ncclColl, ncclOp) \ - IMPL_COLL3(coll, op, ncclFunc, f64, double, ncclColl, ncclOp, ncclFloat64) +#define IMPL_COLL2(func, redop) IMPL_COLL3(func, redop, double, ncclFloat64) #endif // Reduction define all functions #if NCCL_OP == 0 -#define IMPL_COLL_R(collf, colln) \ - IMPL_COLL2(collf, sum, FuncSum, colln, ncclSum); +#define IMPL_COLL_R(func) IMPL_COLL2(func, Sum); #elif NCCL_OP == 1 -#define IMPL_COLL_R(collf, colln) \ - IMPL_COLL2(collf, prod, FuncProd, colln, ncclProd); +#define IMPL_COLL_R(func) IMPL_COLL2(func, Prod); #elif NCCL_OP == 2 -#define IMPL_COLL_R(collf, colln) \ - IMPL_COLL2(collf, min, FuncMin, colln, ncclMin); +#define IMPL_COLL_R(func) IMPL_COLL2(func, Min); #elif NCCL_OP == 3 -#define IMPL_COLL_R(collf, colln) \ - IMPL_COLL2(collf, max, FuncMax, colln, ncclMax); +#define IMPL_COLL_R(func) IMPL_COLL2(func, Max); #endif -// Copy primitives only define one #if NCCL_OP == 0 && NCCL_TYPE == 0 -#define IMPL_COLL_C(collf, colln) \ - IMPL_COLL3(collf, copy, FuncSum, i8, int8_t, colln, ncclSum, ncclInt8); +// Copy primitives only define one function for copy +#define IMPL_COLL_C(func) IMPL_COLL3(func, Sum, int8_t, ncclInt8); + +// Point-to-point primitives only have one function/kernel. +#define IMPL_COLL_P(func) \ + IMPL_COLL_FUNC(func, RING, SIMPLE, Sum, int8_t); \ + IMPL_COLL_KERN(func, RING, SIMPLE, Sum, int8_t, 0); #else -#define IMPL_COLL_C(collf, colln) +#define IMPL_COLL_C(func) +#define IMPL_COLL_P(func) #endif -#define COLL_UNROLL 4 - #endif diff --git a/src/collectives/device/common_kernel.h b/src/collectives/device/common_kernel.h index aa1e936..ff466a0 100644 --- a/src/collectives/device/common_kernel.h +++ b/src/collectives/device/common_kernel.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -16,6 +16,12 @@ // Define min for ssize_t static __device__ int min(int a, ssize_t b) { return (a < b) ? a : b; } +template <typename T> +inline __device__ void loadPtr(void** ptr, T* &v) { + asm volatile("ld.volatile.global.u64 %0, [%1];" + : "=l"(v) : "l"(ptr)); +} + typedef uint64_t PackType; // unpack x and y to elements of type T and apply FUNC to each element @@ -245,28 +251,57 @@ inline __device__ void Store128(Pack128* p, Pack128& v) { asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" :: "l"(p), "l"(v.x), "l"(v.y) : "memory"); } -template<class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS> -__device__ __forceinline__ void ReduceCopyMulti(const int tid, const int nthreads, - int nsrcs, const T* srcs[MAXSRCS], int ndsts, T* dsts[MAXDSTS], - const int offset, const int N) { - for (int idx = offset+tid; idx < offset+N; idx += nthreads) { - T val = vFetch(srcs[0]+idx); +template<class FUNC, typename T, int UNROLL, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS> +__device__ __forceinline__ void ReduceCopyMulti(const int w, const int nw, const int t, + int nsrcs, const T** s, int ndsts, T** d, const int elemOffset, const int Nelem) { + const int inc = nw * UNROLL * WARP_SIZE; + int offset = w * UNROLL * WARP_SIZE + t; + + const T* srcs[MAXSRCS]; + for (int i=0; i<MAXSRCS; i++) srcs[i] = s[i]+elemOffset+offset; + T* dsts[MAXDSTS]; + for (int i=0; i<MAXDSTS; i++) dsts[i] = d[i]+elemOffset+offset; + + while (offset < Nelem) { + T vals[UNROLL]; + // Load and reduce + for (int u = 0; u < UNROLL; ++u) vals[u] = vFetch(srcs[0]+u*WARP_SIZE); + + #pragma unroll + for (int i=1; i<MINSRCS; i++) { + T vals2[UNROLL]; + for (int u = 0; u < UNROLL; ++u) vals2[u] = vFetch(srcs[i]+u*WARP_SIZE); + for (int u = 0; u < UNROLL; ++u) vals[u] = FUNC()(vals[u], vals2[u]); + } #pragma unroll - for (int i=1; i<MINSRCS; i++) val = FUNC()(val, vFetch(srcs[i]+idx)); - #pragma unroll 1 - for (int i=MINSRCS; i<MAXSRCS && i<nsrcs; i++) val = FUNC()(val, vFetch(srcs[i]+idx)); + for (int i=MINSRCS; i<MAXSRCS; i++) { + if (i<nsrcs) { + T vals2[UNROLL]; + for (int u = 0; u < UNROLL; ++u) vals2[u] = vFetch(srcs[i]+u*WARP_SIZE); + for (int u = 0; u < UNROLL; ++u) vals[u] = FUNC()(vals[u], vals2[u]); + } + } + // Store #pragma unroll - for (int i=0; i<MINDSTS; i++) vStore(dsts[i]+idx, val); - #pragma unroll 1 - for (int i=MINDSTS; i<MAXDSTS && i<ndsts; i++) vStore(dsts[i]+idx, val); + for (int i = 0; i < MINDSTS; i++) { + for (int u = 0; u < UNROLL; ++u) vStore(dsts[i]+u*WARP_SIZE, vals[u]); + } + #pragma unroll + for (int i=MINDSTS; i<MAXDSTS; i++) { + if (i<ndsts) { + for (int u = 0; u < UNROLL; ++u) vStore(dsts[i]+u*WARP_SIZE, vals[u]); + } + } + for (int i=0; i<MAXSRCS; i++) srcs[i] += inc; + for (int i=0; i<MAXDSTS; i++) dsts[i] += inc; + offset += inc; } } template<class FUNC, typename T, int UNROLL, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS> -__device__ __forceinline__ void ReduceCopy128bMulti( const int w, const int nw, const int t, - int nsrcs, const T* s[MAXSRCS], int ndsts, T* d[MAXDSTS], - const int elemOffset, const int Npack) { +__device__ __forceinline__ void ReduceCopy128bMulti(const int w, const int nw, const int t, + int nsrcs, const T** s, int ndsts, T** d, const int elemOffset, const int Npack) { const int inc = nw * UNROLL * WARP_SIZE; int offset = w * UNROLL * WARP_SIZE + t; @@ -280,25 +315,31 @@ __device__ __forceinline__ void ReduceCopy128bMulti( const int w, const int nw, // Load and reduce for (int u = 0; u < UNROLL; ++u) Fetch128(vals[u], srcs[0]+u*WARP_SIZE); + #pragma unroll for (int i=1; i<MINSRCS; i++) { Pack128 vals2[UNROLL]; for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE); for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]); } - #pragma unroll 1 - for (int i=MINSRCS; i<MAXSRCS && i<nsrcs; i++) { - Pack128 vals2[UNROLL]; - for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE); - for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]); + #pragma unroll + for (int i=MINSRCS; i<MAXSRCS; i++) { + if (i<nsrcs) { + Pack128 vals2[UNROLL]; + for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE); + for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]); + } } // Store + #pragma unroll for (int i = 0; i < MINDSTS; i++) { for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]); } - #pragma unroll 1 - for (int i=MINDSTS; i<MAXDSTS && i<ndsts; i++) { - for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]); + #pragma unroll + for (int i=MINDSTS; i<MAXDSTS; i++) { + if (i<ndsts) { + for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]); + } } for (int i=0; i<MAXSRCS; i++) srcs[i] += inc; for (int i=0; i<MAXDSTS; i++) dsts[i] += inc; @@ -309,72 +350,65 @@ __device__ __forceinline__ void ReduceCopy128bMulti( const int w, const int nw, template <typename T> __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(Pack128); } -// Try to limit consecutive load/stores to 8. -// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise -#define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS))) +#define PACKELEMS (sizeof(Pack128) / sizeof(T)) template<int UNROLL, class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS> __device__ __forceinline__ void ReduceOrCopyMulti(const int tid, const int nthreads, - int nsrcs, const T* srcs[MAXSRCS], int ndsts, T* dsts[MAXDSTS], + int nsrcs, const T** srcs, int ndsts, T** dsts, int N) { int Nrem = N; if (Nrem <= 0) return; - int alignDiff = 0; - int align = ptrAlign128(srcs[0]); - #pragma unroll - for (int i=1; i<MINSRCS; i++) alignDiff |= (align ^ ptrAlign128(srcs[i])); - for (int i=MINSRCS; i<MAXSRCS && i<nsrcs; i++) alignDiff |= (align ^ ptrAlign128(srcs[i])); - #pragma unroll - for (int i=0; i<MINDSTS; i++) alignDiff |= (align ^ ptrAlign128(dsts[i])); - for (int i=MINDSTS; i<MAXDSTS && i<ndsts; i++) alignDiff |= (align ^ ptrAlign128(dsts[i])); - - int Npreamble = alignDiff ? Nrem : - N < alignof(Pack128) ? N : - (alignof(Pack128) - align) % alignof(Pack128); - - // stage 1: preamble: handle any elements up to the point of everything coming - // into alignment - if (Npreamble) { - ReduceCopyMulti<FUNC, T, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(tid, nthreads, nsrcs, srcs, ndsts, dsts, 0, Npreamble); - Nrem -= Npreamble; - if (Nrem == 0) return; - } - int offset = Npreamble; - - // stage 2: fast path: use 128b loads/stores to do the bulk of the work, - // assuming the pointers we have are all 128-bit alignable. int w = tid / WARP_SIZE; // Warp number int nw = nthreads / WARP_SIZE; // Number of warps int t = tid % WARP_SIZE; // Thread (inside the warp) - const int packFactor = sizeof(Pack128) / sizeof(T); + // Check that all is 16B aligned. If not don't use 16B load/stores. + int align = 0; + #pragma unroll + for (int i=0; i<MINSRCS; i++) align |= ptrAlign128(srcs[i]); + for (int i=MINSRCS; i<MAXSRCS && i<nsrcs; i++) align |= ptrAlign128(srcs[i]); + #pragma unroll + for (int i=0; i<MINDSTS; i++) align |= ptrAlign128(dsts[i]); + for (int i=MINDSTS; i<MAXDSTS && i<ndsts; i++) align |= ptrAlign128(dsts[i]); - // stage 2a: main loop - int Npack2a = (Nrem / (packFactor * AUTOUNROLL * WARP_SIZE)) - * (AUTOUNROLL * WARP_SIZE); // round down - int Nelem2a = Npack2a * packFactor; + int offset = 0; + if (align == 0) { + // fast path: use 128b loads/stores to do the bulk of the work, + // assuming the pointers we have are all 128-bit aligned. - ReduceCopy128bMulti<FUNC, T, AUTOUNROLL, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(w, nw, t, nsrcs, srcs, ndsts, dsts, offset, Npack2a); + // main loop + int Npack = (Nrem / (PACKELEMS*UNROLL*WARP_SIZE)) * (UNROLL*WARP_SIZE); // round down + int Nelem = Npack * PACKELEMS; - Nrem -= Nelem2a; - if (Nrem == 0) return; - offset += Nelem2a; + ReduceCopy128bMulti<FUNC, T, UNROLL, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(w, nw, t, nsrcs, srcs, ndsts, dsts, offset, Npack); - // stage 2b: slightly less optimized for section when we don't have full - // unrolling + Nrem -= Nelem; + if (Nrem == 0) return; + offset += Nelem; + + // slightly less optimized for section when we don't have full unrolling + Npack = Nrem / PACKELEMS; + Nelem = Npack * PACKELEMS; + + ReduceCopy128bMulti<FUNC, T, 1, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(w, nw, t, nsrcs, srcs, ndsts, dsts, offset, Npack); + + Nrem -= Nelem; + if (Nrem == 0) return; + offset += Nelem; + } - int Npack2b = Nrem / packFactor; - int Nelem2b = Npack2b * packFactor; + // unrolled, by-type (mostly for unaligned buffers) + int Nelem = (Nrem / (UNROLL*PACKELEMS/2*WARP_SIZE)) * (UNROLL*PACKELEMS/2*WARP_SIZE); // round down - ReduceCopy128bMulti<FUNC, T, 1, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(w, nw, t, nsrcs, srcs, ndsts, dsts, offset, Npack2b); + ReduceCopyMulti<FUNC, T, UNROLL*PACKELEMS/2, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(w, nw, t, nsrcs, srcs, ndsts, dsts, offset, Nelem); - Nrem -= Nelem2b; + Nrem -= Nelem; if (Nrem == 0) return; - offset += Nelem2b; + offset += Nelem; - // stage 2c: tail - ReduceCopyMulti<FUNC, T, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(tid, nthreads, nsrcs, srcs, ndsts, dsts, offset, Nrem); + // no unroll, by type. Should finish what's remaining. + ReduceCopyMulti<FUNC, T, 1, MINSRCS, MAXSRCS, MINDSTS, MAXDSTS>(w, nw, t, nsrcs, srcs, ndsts, dsts, offset, Nrem); } #endif // COMMON_KERNEL_H_ diff --git a/src/collectives/device/functions.cu b/src/collectives/device/functions.cu index 034fe96..553a882 100644 --- a/src/collectives/device/functions.cu +++ b/src/collectives/device/functions.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,70 +8,73 @@ #include "collectives.h" #include "common.h" -__device__ volatile uint64_t* ncclShmem; +__device__ struct ncclShmemData* ncclShmem; -#define NCCL_FUNC5(coll, op, dtype) \ - NCCL_COLL_NAME(coll##LL, op, dtype), \ - NCCL_COLL_NAME(coll##LL128, op, dtype), \ - NCCL_COLL_NAME(coll, op, dtype) +#define NCCL_FUNC5(func, algo, redop, type) \ + NCCL_FUNC_NAME(func, algo, LL, redop, type), \ + NCCL_FUNC_NAME(func, algo, LL128, redop, type), \ + NCCL_FUNC_NAME(func, algo, SIMPLE, redop, type) -#define NCCL_FUNC4(coll, op, dtype) \ - NCCL_FUNC5(coll##Tree, op, dtype), \ - NCCL_FUNC5(coll##Ring, op, dtype) +#define NCCL_FUNC4(func, redop, type) \ + NCCL_FUNC5(func, TREE, redop, type), \ + NCCL_FUNC5(func, RING, redop, type), \ + NCCL_FUNC5(func, COLLNET, redop, type) // Must be consistent with ncclDataType_t -#define NCCL_FUNCS3A(coll, op) \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, u8), \ - NCCL_FUNC4(coll, op, i32), \ - NCCL_FUNC4(coll, op, u32), \ - NCCL_FUNC4(coll, op, i64), \ - NCCL_FUNC4(coll, op, u64), \ - NCCL_FUNC4(coll, op, f16), \ - NCCL_FUNC4(coll, op, f32), \ - NCCL_FUNC4(coll, op, f64) -#define NCCL_FUNCS3B(coll, op) \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8), \ - NCCL_FUNC4(coll, op, i8) +#define NCCL_FUNCS3A(func, redop) \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, uint8_t), \ + NCCL_FUNC4(func, redop, int32_t), \ + NCCL_FUNC4(func, redop, uint32_t), \ + NCCL_FUNC4(func, redop, int64_t), \ + NCCL_FUNC4(func, redop, uint64_t), \ + NCCL_FUNC4(func, redop, half), \ + NCCL_FUNC4(func, redop, float), \ + NCCL_FUNC4(func, redop, double) +#define NCCL_FUNCS3B(func, redop) \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t), \ + NCCL_FUNC4(func, redop, int8_t) // Must be consistent with ncclRedOp_t -#define NCCL_FUNCS2A(coll) \ - NCCL_FUNCS3A(coll, sum ), \ - NCCL_FUNCS3A(coll, prod), \ - NCCL_FUNCS3A(coll, max ), \ - NCCL_FUNCS3A(coll, min ) -#define NCCL_FUNCS2B(coll) \ - NCCL_FUNCS3B(coll, copy), \ - NCCL_FUNCS3B(coll, copy), \ - NCCL_FUNCS3B(coll, copy), \ - NCCL_FUNCS3B(coll, copy) +#define NCCL_FUNCS2A(func) \ + NCCL_FUNCS3A(func, Sum ), \ + NCCL_FUNCS3A(func, Prod), \ + NCCL_FUNCS3A(func, Max ), \ + NCCL_FUNCS3A(func, Min ) +#define NCCL_FUNCS2B(func) \ + NCCL_FUNCS3B(func, Sum), \ + NCCL_FUNCS3B(func, Sum), \ + NCCL_FUNCS3B(func, Sum), \ + NCCL_FUNCS3B(func, Sum) // Must be consistent with ncclFunc_t #define NCCL_FUNCS() { \ - NCCL_FUNCS2B(ncclBroadcast), \ - NCCL_FUNCS2A(ncclReduce), \ - NCCL_FUNCS2B(ncclAllGather), \ - NCCL_FUNCS2A(ncclReduceScatter), \ - NCCL_FUNCS2A(ncclAllReduce) } + NCCL_FUNC_NAME(SendRecv, RING, SIMPLE, Sum, int8_t),\ + NCCL_FUNCS2B(Broadcast), \ + NCCL_FUNCS2A(Reduce), \ + NCCL_FUNCS2B(AllGather), \ + NCCL_FUNCS2A(ReduceScatter), \ + NCCL_FUNCS2A(AllReduce) } // Must be consistent with the ncclFuncSet enum -__device__ ncclKern_t ncclFuncs[NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS] = { +__device__ ncclKern_t ncclFuncs[1+NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS] = { // Don't try to initialize the host shadow copy of this device-side global // variable. There is no host pointer to a device-side function, which // confuses clang. This will be fixed in the next clang release. #if __CUDA_ARCH__ - NCCL_FUNCS2B(ncclBroadcast), - NCCL_FUNCS2A(ncclReduce), - NCCL_FUNCS2B(ncclAllGather), - NCCL_FUNCS2A(ncclReduceScatter), - NCCL_FUNCS2A(ncclAllReduce) + NCCL_FUNC_NAME(SendRecv, RING, SIMPLE, Sum, int8_t), + NCCL_FUNCS2B(Broadcast), + NCCL_FUNCS2A(Reduce), + NCCL_FUNCS2B(AllGather), + NCCL_FUNCS2A(ReduceScatter), + NCCL_FUNCS2A(AllReduce) #endif }; diff --git a/src/collectives/device/gen_rules.sh b/src/collectives/device/gen_rules.sh index 4413213..97dc0ae 100755 --- a/src/collectives/device/gen_rules.sh +++ b/src/collectives/device/gen_rules.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2018-2019, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved. # # See LICENSE.txt for license information # @@ -9,7 +9,7 @@ dir=$1 targets="GENOBJS := \\\\\n" -for base in all_reduce all_gather broadcast reduce reduce_scatter; do +for base in sendrecv all_reduce all_gather broadcast reduce reduce_scatter; do opn=0 for op in sum prod min max; do dtn=0 diff --git a/src/collectives/device/primitives.h b/src/collectives/device/primitives.h index aa3d20d..69348db 100644 --- a/src/collectives/device/primitives.h +++ b/src/collectives/device/primitives.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -31,278 +31,235 @@ } \ } while (0) +#define ROLE_SRC 0x01 +#define ROLE_DST 0x02 +#define ROLE_WAIT_RECV 0x04 +#define ROLE_WAIT_SEND 0x08 +#define ROLE_POST_SEND 0x10 +#define ROLE_POST_RECV 0x20 + // Implementation of primitive types -template <int UNROLL, int SLICESPERCHUNK, int SLICESTEPS, typename T, int NRECV, int NSEND, class FUNC> +template <int UNROLL, int SLICESPERCHUNK, int SLICESTEPS, typename T, int NRECV, int NSEND, int DIRECT, class FUNC> class ncclPrimitives { private: const int tid; - const int nthreads; - const int wid; + int nthreads; + int nworkers; const int stepSize; int nrecv = 0; int nsend = 0; - struct ncclConnInfo* recvConn = NULL; - volatile uint64_t* recvConnHeadPtr = NULL; - uint64_t recvConnHead; - volatile uint64_t* recvConnTailPtr = NULL; - uint64_t recvConnTail; - uint64_t recvConnTailCache; // Cache last seen value - - struct ncclConnInfo* sendConn = NULL; - volatile int* sendConnFifoPtr = NULL; - volatile uint64_t* sendConnTailPtr = NULL; - uint64_t sendConnTail; - volatile uint64_t* sendConnHeadPtr = NULL; - uint64_t sendConnHead; - uint64_t sendConnHeadCache; // Cache last seen value - - uint64_t recvStep[NRECV]; - uint64_t sendStep[NSEND]; - const T* recvDirectBuff[NRECV]; - T* sendDirectBuff[NSEND]; - const T* recvBuff[NRECV]; - T* sendBuff[NSEND]; + struct ncclConnInfo* conn = NULL; + volatile int* connSizesFifoPtr = NULL; + void** connPtrsFifoPtr = NULL; + volatile uint64_t* connHeadPtr = NULL; + volatile uint64_t* connTailPtr = NULL; + uint64_t connTailCache; // Cache last seen value + uint64_t connHeadCache; // Cache last seen value + + int index; // Peer index I'm responsible for + int peer = -1; + int role = 0; + int group; + uint64_t step; + T* direct = NULL; + T* buff; struct ncclDevComm* comm; - inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; } - inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; } - inline __device__ const T* recvPtr(int i) { return ((const T*)recvBuff[i])+recvOffset(i); } - inline __device__ T* sendPtr(int i) { return ((T*)sendBuff[i])+sendOffset(i); } + const T** srcs; + T** dsts; + // Don't use barrier 0 as it's used by the final sync inline __device__ void barrier() { - asm volatile ("bar.sync 1, %0;" :: "r"(nthreads)); + if (nthreads == WARP_SIZE) __syncwarp(); + else asm volatile ("bar.sync %0, %1;" :: "r"(group+1), "r"(nthreads)); } inline __device__ void subBarrier() { - asm volatile ("bar.sync 2, %0;" :: "r"(nthreads-WARP_SIZE)); - } - - uint32_t mismatch = 0; - const uint64_t opCount; - - inline __device__ void checkMismatch(struct ncclConnInfo* conn) { - if (mismatch) { - // In non-LL, we use _threadfence_system before incrementing opCount, yet we are still waiting for credits here, so there must be a size mismatch - *(comm->fatalDevError) = ncclDevAssertedMismatch; - } else if (conn && *conn->opCountRem > opCount) { - mismatch += 1; - } + if (nworkers == nthreads) barrier(); + else asm volatile ("bar.sync %0, %1;" :: "r"(group+2), "r"(nworkers)); } uint32_t spins = 0; uint32_t abort = 0; - inline __device__ int checkAbort(int i, int send) { + inline __device__ int checkAbort() { spins++; if (abort == 0 && spins == SPINS_BEFORE_CHECK_ABORT) { abort = *(comm->abortFlag); - if (wid == i) checkMismatch(send ? sendConn : recvConn); spins = 0; } return abort; } - inline __device__ void waitSend(int nbytes) { + template <int DIRECTPTR> + inline __device__ T* directPtr(ssize_t directOffset) { + return DIRECTPTR && direct ? direct+directOffset : buff+(step%NCCL_STEPS)*stepSize; + } + + template <int DST, int DIRECTSEND> + inline __device__ void waitSend(ssize_t directOffset, int nbytes) { spins = 0; - mismatch = 0; - if (sendConnHeadPtr) { - while (sendConnHeadCache + NCCL_STEPS < sendConnHead + SLICESTEPS) { - sendConnHeadCache = *sendConnHeadPtr; - if (checkAbort(wid, 1)) break; - } - if (sendConnFifoPtr) { - sendConnFifoPtr[sendConnHead%NCCL_STEPS] = nbytes; - } - sendConnHead += SLICESTEPS; + while (connHeadCache + NCCL_STEPS < step + SLICESTEPS) { + connHeadCache = *connHeadPtr; + if (checkAbort()) break; + } + if (connSizesFifoPtr) { + connSizesFifoPtr[step%NCCL_STEPS] = nbytes; } + + if (connPtrsFifoPtr) loadPtr(connPtrsFifoPtr+step%NCCL_STEPS, dsts[DST+index]); + else dsts[DST+index] = directPtr<DIRECTSEND>(directOffset); + step += SLICESTEPS; } - inline __device__ void waitRecv() { + template <int SRC, int DIRECTRECV> + inline __device__ void waitRecv(ssize_t directOffset) { spins = 0; - mismatch = 0; - if (recvConnTailPtr) { - while (recvConnTailCache < recvConnTail + SLICESTEPS) { - recvConnTailCache = *recvConnTailPtr; - if (checkAbort(wid, 0)) break; - } - recvConnTail += SLICESTEPS; + while (connTailCache < step + SLICESTEPS) { + connTailCache = *connTailPtr; + if (checkAbort()) break; } + if (connPtrsFifoPtr) loadPtr(connPtrsFifoPtr+step%NCCL_STEPS, srcs[SRC+index]); + else srcs[SRC+index] = directPtr<DIRECTRECV>(directOffset); + step += SLICESTEPS; } - inline __device__ void incRecv(int i) { - recvStep[i] += SLICESTEPS; - } inline __device__ void postRecv() { - if (recvConnHeadPtr) *recvConnHeadPtr = recvConnHead += SLICESTEPS; + *connHeadPtr = step += SLICESTEPS; } - inline __device__ void incSend(int i) { - sendStep[i] += SLICESTEPS; - } inline __device__ void postSend() { - if (sendConnTailPtr) *sendConnTailPtr = sendConnTail += SLICESTEPS; - } - - template <int DIRECTRECV> - inline __device__ const T* directRecvPtr(int i, int directOffset) { - return DIRECTRECV && recvDirectBuff[i] ? recvDirectBuff[i]+directOffset : recvPtr(i); - } - - template <int DIRECTSEND> - inline __device__ T* directSendPtr(int i, int directOffset) { - return DIRECTSEND && sendDirectBuff[i] ? sendDirectBuff[i]+directOffset : sendPtr(i); - } - - template <int DIRECTRECV> - inline __device__ int directRecvInc(int i, int directInc, int sliceInc) { - return DIRECTRECV && recvDirectBuff[i] ? directInc : sliceInc; - } - - template <int DIRECTSEND> - inline __device__ int directSendInc(int i, int directInc, int sliceInc) { - return DIRECTSEND && sendDirectBuff[i] ? directInc : sliceInc; + *connTailPtr = step += SLICESTEPS; } template <int DIRECTRECV, int DIRECTSEND, int RECV, int SEND, int SRC, int DST> inline __device__ void - GenericOp(const T* srcPtr, T* dstPtr, int nelem, int directOffset) { + GenericOp(const T* srcPtr, T* dstPtr, int nelem, ssize_t directOffset) { int offset = 0; int sliceSize = stepSize*SLICESTEPS; int dataSize = max(DIVUP(nelem, 16*SLICESPERCHUNK)*16, sliceSize/32); - const T* srcs[RECV*NRECV+SRC]; - srcs[0] = SRC ? srcPtr : directRecvPtr<DIRECTRECV>(0, directOffset); - if (RECV) { - if (SRC) srcs[1] = recvPtr(0); - for (int i=1; i<NRECV && i<nrecv; i++) srcs[SRC+i] = recvPtr(i); - } - - T* dsts[SEND*NSEND+DST]; - dsts[0] = DST ? dstPtr : directSendPtr<DIRECTSEND>(0, directOffset); - if (SEND) { - if (DST) dsts[1] = directSendPtr<DIRECTSEND>(0, directOffset); - for (int i=1; i<NSEND && i<nsend; i++) dsts[DST+i] = directSendPtr<DIRECTSEND>(i, directOffset); - } - - bool syncThread = tid >= nthreads-WARP_SIZE; - #pragma unroll for (int slice=0; slice<SLICESPERCHUNK; ++slice) { int realSize = max(0, min(dataSize, nelem-offset)); - if (!syncThread) { - if (SEND) waitSend(realSize*sizeof(T)); - if (RECV) waitRecv(); + if (tid < nworkers) { + if (SRC && (role & ROLE_SRC)) srcs[0] = srcPtr+offset; + if (RECV && (role & ROLE_WAIT_RECV)) waitRecv<SRC, DIRECTRECV>(directOffset+offset); + if (DST && (role & ROLE_DST)) dsts[0] = dstPtr+offset; + if (SEND && (role & ROLE_WAIT_SEND)) waitSend<DST, DIRECTSEND>(directOffset+offset, realSize*sizeof(T)); if (realSize > 0) { subBarrier(); - if (DIRECTRECV && recvDirectBuff[0]) { + if (DIRECTRECV && srcs[0] == dsts[0]) { // We can only have one direct receive. Since srcs[0] == dstPtr+offset, skip one copy if (SEND) { - ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, NSEND>(tid, nthreads-WARP_SIZE, 1, srcs, nsend, dsts+1, realSize); + // (1-SEND) is only there to avoid compilation errors in case NSEND=0 (and SEND=0). + ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, (1-SEND)+NSEND>(tid, nworkers, 1, srcs, nsend, dsts+1, realSize); } } else { - ReduceOrCopyMulti<UNROLL, FUNC, T, RECV+SRC, RECV*NRECV+SRC, SEND+DST, SEND*NSEND+DST>(tid, nthreads-WARP_SIZE, RECV*nrecv+SRC, srcs, SEND*nsend+DST, dsts, realSize); + ReduceOrCopyMulti<UNROLL, FUNC, T, RECV+SRC, RECV*NRECV+SRC, SEND+DST, SEND*NSEND+DST>(tid, nworkers, RECV*nrecv+SRC, srcs, SEND*nsend+DST, dsts, realSize); } } } barrier(); - FOR_SEND(incSend); - FOR_RECV(incRecv); - if (syncThread) { - if (SEND) { - if (realSize > 0 && wid == 0) __threadfence_system(); - __syncwarp(); - postSend(); - } - if (RECV) postRecv(); - } - srcs[0] += SRC ? realSize : directRecvInc<DIRECTRECV>(0, realSize, sliceSize); - for (int i=1-SRC; i<RECV*NRECV; i++) srcs[SRC+i] += sliceSize; - dsts[0] += DST ? realSize : directSendInc<DIRECTSEND>(0, realSize, sliceSize); - for (int i=1-DST; i<SEND*NSEND; i++) dsts[DST+i] += directSendInc<DIRECTSEND>(i, realSize, sliceSize); + if (SEND && (role & ROLE_POST_SEND) && realSize > 0 && index == 0) __threadfence_system(); + __syncwarp(); + if (SEND && (role & ROLE_POST_SEND)) postSend(); + if (RECV && (role & ROLE_POST_RECV)) postRecv(); offset += realSize; } } - __device__ __forceinline__ void loadRecvConn(struct ncclConnInfo* conn, int i, T* directBuff) { - recvBuff[i] = (const T*)conn->buff; - recvStep[i] = conn->step; - recvStep[i] = ROUNDUP(recvStep[i], SLICESPERCHUNK*SLICESTEPS); - recvDirectBuff[i] = NULL; - if (directBuff && conn->direct) { - recvDirectBuff[i] = directBuff; - if (tid == 0) *conn->ptrExchange = directBuff; - } - if (wid == i) recvConn = conn; - if (wid == i) recvConnTail = recvConnHead = recvStep[i]; // Make sure we set this after rounding up - nrecv++; - } - __device__ __forceinline__ void loadRecvSync() { - if (tid >= WARP_SIZE && tid < 2*WARP_SIZE && wid<nrecv) { - recvConnTailPtr = recvConn->tail; - recvConnTailCache = *recvConnTailPtr; - } - if (tid >= nthreads-WARP_SIZE && wid < nrecv) { - recvConnHeadPtr = recvConn->head; - // Return credits in case we rounded up. - *recvConnHeadPtr = recvConnHead; - // Update opCount in case we skipped some operations - *(recvConn->opCountLoc) = opCount; - } - } - - __device__ __forceinline__ void loadSendConn(struct ncclConnInfo* conn, int i, T* directBuff) { - sendBuff[i] = (T*)conn->buff; - sendStep[i] = conn->step; - sendStep[i] = ROUNDUP(sendStep[i], SLICESPERCHUNK*SLICESTEPS); - sendDirectBuff[i] = NULL; - if (directBuff && conn->direct) { - void* volatile* ptr = conn->ptrExchange; - while ((sendDirectBuff[i] = (T*)(*ptr)) == NULL); - barrier(); - if (tid == 0) *ptr = NULL; - } - if (wid == i) sendConn = conn; - if (wid == i) sendConnTail = sendConnHead = sendStep[i]; // Make sure we set this after rounding up - nsend++; - } - __device__ __forceinline__ void loadSendSync() { - if (tid < nsend) { - sendConnHeadPtr = sendConn->head; - sendConnHeadCache = *sendConnHeadPtr; - sendConnFifoPtr = sendConn->fifo; - *(sendConn->opCountLoc) = opCount; - } - if (tid >= nthreads-WARP_SIZE && wid<nsend) { - sendConnTailPtr = sendConn->tail; + __device__ __forceinline__ void loadRecvConn(struct ncclChannel* channel, T* directBuff) { + if (role & (ROLE_WAIT_RECV|ROLE_POST_RECV)) { + conn = &channel->devPeers[peer].recv.conn; + step = conn->step; + step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS); + if (role & ROLE_POST_RECV) { + connHeadPtr = conn->head; + // Return credits in case we rounded up. + *connHeadPtr = step; + } + if (role & ROLE_WAIT_RECV) { + buff = (T*)conn->buffs[NCCL_PROTO_SIMPLE]; + if (DIRECT && (conn->direct & NCCL_DIRECT_GPU)) { + direct = directBuff; + *conn->ptrExchange = directBuff; + } + connTailPtr = conn->tail; + connTailCache = *connTailPtr; + connPtrsFifoPtr = conn->ptrsFifo; + } } } - __device__ __forceinline__ void saveRecvSync() { - if (tid >= nthreads-WARP_SIZE && wid < nrecv) { - recvConn->step = recvConnHead; - *(recvConn->opCountLoc) = opCount+1; - __threadfence_system(); + __device__ __forceinline__ void loadSendConn(struct ncclChannel* channel) { + if (role & (ROLE_WAIT_SEND|ROLE_POST_SEND)) { + conn = &channel->devPeers[peer].send.conn; + step = conn->step; + step = ROUNDUP(step, SLICESPERCHUNK*SLICESTEPS); + if (role & ROLE_POST_SEND) { + connTailPtr = conn->tail; + } + if (role & ROLE_WAIT_SEND) { + buff = (T*)conn->buffs[NCCL_PROTO_SIMPLE]; + if (DIRECT && (conn->direct & NCCL_DIRECT_GPU)) { + void* volatile* ptr = conn->ptrExchange; + while ((direct = (T*)(*ptr)) == NULL); + *ptr = NULL; + } + connHeadPtr = conn->head; + connHeadCache = *connHeadPtr; + connSizesFifoPtr = conn->sizesFifo; + connPtrsFifoPtr = conn->ptrsFifo; + } } } - __device__ __forceinline__ void saveSendSync() { - if (tid < nsend) { - sendConn->step = sendConnHead; - *(sendConn->opCountLoc) = opCount+1; + __device__ __forceinline__ void saveSync() { + if (role & (ROLE_POST_SEND|ROLE_POST_RECV)) { + conn->step = step; __threadfence_system(); } } public: __device__ __forceinline__ - ncclPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount) - : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), stepSize(stepSize), opCount(opCount) { + ncclPrimitives(const int tid, const int nworkers, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, struct ncclShmemPtrs* ptrs, int group) + : comm(comm), tid(tid), nworkers(nworkers), stepSize(stepSize), srcs((const T**)ptrs[group].srcs), dsts((T**)ptrs[group].dsts), group(group) { + nthreads = nworkers; + // For send operations, we need an extra warp to overlap the threadfence and the copy + int postThreads = NSEND && nworkers >= 64 ? WARP_SIZE : 0; + nthreads += postThreads; + // Make sure step is updated before we read it. barrier(); - for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, directBuff); - for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i, directBuff); - loadRecvSync(); - loadSendSync(); + for (int i=0; i<NRECV; i++) if (recvPeers[i] != -1) nrecv++; + for (int i=0; i<NSEND; i++) if (sendPeers[i] != -1) nsend++; + + #define SYNC_GROUP 8 + static_assert(NSEND < SYNC_GROUP && NRECV < SYNC_GROUP, "Not enough threads to cover all peers"); + + int g = tid / SYNC_GROUP; + int ng = nthreads / SYNC_GROUP; + index = tid % SYNC_GROUP; + + if (g == 0) { + if (index < nrecv) role |= ROLE_WAIT_RECV; + if (index == nrecv) role |= ROLE_SRC; + } else if (g == 1) { + if (index < nsend) role |= ROLE_WAIT_SEND; + if (index == nsend) role |= ROLE_DST; + } else if (g == ng - 2) { + if (index < nrecv) role |= ROLE_POST_RECV; + } else if (g == ng - 1) { + if (index < nsend) role |= ROLE_POST_SEND; + } + + if (role & (ROLE_WAIT_RECV|ROLE_POST_RECV)) peer = recvPeers[index]; + if (role & (ROLE_WAIT_SEND|ROLE_POST_SEND)) peer = sendPeers[index]; + + loadRecvConn(channel, directBuff); + loadSendConn(channel); } __device__ __forceinline__ void @@ -310,7 +267,7 @@ class ncclPrimitives { GenericOp<0, 0, 0, 1, 1, 0>(src, NULL, nelem, 0); } __device__ __forceinline__ void - directSend(const T* src, int directOffset, int nelem) { + directSend(const T* src, ssize_t directOffset, int nelem) { GenericOp<0, 1, 0, 1, 1, 0>(src, NULL, nelem, directOffset); } @@ -319,7 +276,7 @@ class ncclPrimitives { GenericOp<0, 0, 1, 0, 0, 1>(NULL, dst, nelem, 0); } __device__ __forceinline__ void - directRecv(T* dst, int directOffset, int nelem) { + directRecv(T* dst, ssize_t directOffset, int nelem) { GenericOp<1, 0, 1, 0, 0, 1>(NULL, dst, nelem, directOffset); } @@ -328,7 +285,7 @@ class ncclPrimitives { GenericOp<0, 0, 0, 1, 1, 1>(src, dst, nelem, 0); } __device__ __forceinline__ void - directCopySend(const T* src, T* dst, int directOffset, int nelem) { + directCopySend(const T* src, T* dst, ssize_t directOffset, int nelem) { GenericOp<0, 1, 0, 1, 1, 1>(src, dst, nelem, directOffset); } @@ -337,7 +294,7 @@ class ncclPrimitives { GenericOp<0, 0, 1, 1, 0, 1>(NULL, dst, nelem, 0); } __device__ __forceinline__ void - directRecvCopySend(T* dst, int directOffset, int nelem) { + directRecvCopySend(T* dst, ssize_t directOffset, int nelem) { GenericOp<1, 1, 1, 1, 0, 1>(NULL, dst, nelem, directOffset); } @@ -356,15 +313,14 @@ class ncclPrimitives { GenericOp<0, 0, 1, 1, 1, 1>(src, dst, nelem, 0); } __device__ __forceinline__ void - directRecvReduceCopySend(const T* src, T* dst, int directOffset, int nelem) { + directRecvReduceCopySend(const T* src, T* dst, ssize_t directOffset, int nelem) { // Direct is only for the send part GenericOp<0, 1, 1, 1, 1, 1>(src, dst, nelem, directOffset); } __device__ __forceinline__ ~ncclPrimitives() { // Save steps for the next operation - saveRecvSync(); - saveSendSync(); + saveSync(); } }; diff --git a/src/collectives/device/prims_ll.h b/src/collectives/device/prims_ll.h index f919493..9e362f9 100644 --- a/src/collectives/device/prims_ll.h +++ b/src/collectives/device/prims_ll.h @@ -1,9 +1,16 @@ +/************************************************************************* + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + template <typename T, class FUNC, int NRECV, int NSEND> class ncclLLPrimitives { private: const int tid; const int nthreads; const int wid; + const int stepLines; int nrecv = 0; int nsend = 0; struct ncclConnInfo* recvConn = NULL; @@ -22,8 +29,8 @@ class ncclLLPrimitives { union ncclLLFifoLine* sendBuff[NSEND]; struct ncclDevComm* comm; - inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*NCCL_LL_SLICE_LINES; } - inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*NCCL_LL_SLICE_LINES; } + inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepLines; } + inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepLines; } inline __device__ union ncclLLFifoLine* recvPtr(int i) { return recvBuff[i]+recvOffset(i); } inline __device__ union ncclLLFifoLine* sendPtr(int i) { return sendBuff[i]+sendOffset(i); } inline __device__ uint32_t recvFlag(int i) { return NCCL_LL_FLAG(recvStep[i]+1); } @@ -33,19 +40,6 @@ class ncclLLPrimitives { asm volatile ("bar.sync 1, %0;" :: "r"(nthreads)); } - uint32_t mismatch = 0; - const uint64_t opCount; - - inline __device__ void checkMismatch(struct ncclConnInfo* conn) { - if (mismatch > 20) { - // We have seen that the peer advanced opcount so many times yet we are still waiting for credit of current op, so it is _most likely_ a mismatch - // Note that we are not using _threadfence_system in LL so the error cannot be asserted - *(comm->fatalDevError) = ncclDevSuspectedMismatch; - } else if (conn && *conn->opCountRem > opCount) { - mismatch += 1; - } - } - uint32_t spins = 0; uint32_t abort = 0; @@ -53,7 +47,6 @@ class ncclLLPrimitives { spins++; if (abort == 0 && spins == SPINS_BEFORE_CHECK_ABORT) { abort = *(comm->abortFlag); - if (wid == i) checkMismatch(send ? sendConn : recvConn); spins = 0; } return abort; @@ -61,14 +54,13 @@ class ncclLLPrimitives { inline __device__ void waitSend(int nbytes) { spins = 0; - mismatch = 0; if (sendConnHeadPtr) { while (sendConnHeadCache + NCCL_STEPS < sendConnHead + 1) { sendConnHeadCache = *sendConnHeadPtr; if (checkAbort(wid, 1)) break; } if (sendConnFifoPtr) { - int size = ((sendConnHead & NCCL_LL_CLEAN_MASK) == NCCL_LL_CLEAN_MASK) ? NCCL_LL_SLICE_LINES*sizeof(union ncclLLFifoLine) : nbytes; + int size = ((sendConnHead & NCCL_LL_CLEAN_MASK) == NCCL_LL_CLEAN_MASK) ? stepLines*sizeof(union ncclLLFifoLine) : nbytes; sendConnFifoPtr[sendConnHead%NCCL_STEPS] = size; } sendConnHead += 1; @@ -88,7 +80,7 @@ class ncclLLPrimitives { // LL Cleanup : write all flags in the slice to make sure we don't have // data corruption when flag loops over. if ((sendStep[i] & NCCL_LL_CLEAN_MASK) == NCCL_LL_CLEAN_MASK) { - for (int o = offset; o<NCCL_LL_SLICE_LINES; o+=nthreads) storeLL(sendPtr(i)+o, 0, sendFlag(i)); + for (int o = offset; o<stepLines; o+=nthreads) storeLL(sendPtr(i)+o, 0, sendFlag(i)); } sendStep[i]++; } @@ -98,7 +90,6 @@ class ncclLLPrimitives { uint32_t flag = recvFlag(i); uint32_t data1, flag1, data2, flag2; spins = 0; - mismatch = 0; do { asm volatile("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" : "=r"(data1), "=r"(flag1), "=r"(data2), "=r"(flag2) : "l"(&src->i4)); if (checkAbort(i, 0)) break; @@ -164,7 +155,7 @@ class ncclLLPrimitives { } __device__ __forceinline__ void loadRecvConn(struct ncclConnInfo* conn, int i) { - recvBuff[i] = conn->llBuff; + recvBuff[i] = (union ncclLLFifoLine*)conn->buffs[NCCL_PROTO_LL]; recvStep[i] = conn->step; if (wid == i) recvConn = conn; nrecv++; @@ -173,13 +164,11 @@ class ncclLLPrimitives { if (tid >= nthreads-WARP_SIZE && wid < nrecv) { recvConnHeadPtr = recvConn->head; recvConnHead = recvConn->step; - // Update opCount in case we skipped some operations - *(recvConn->opCountLoc) = opCount; } } __device__ __forceinline__ void loadSendConn(struct ncclConnInfo* conn, int i) { - sendBuff[i] = conn->llBuff; + sendBuff[i] = (union ncclLLFifoLine*)conn->buffs[NCCL_PROTO_LL]; sendStep[i] = conn->step; if (wid == i) sendConn = conn; nsend++; @@ -189,15 +178,13 @@ class ncclLLPrimitives { sendConnHeadPtr = sendConn->head; sendConnHeadCache = *sendConnHeadPtr; sendConnHead = sendConn->step; - sendConnFifoPtr = sendConn->fifo; - *(sendConn->opCountLoc) = opCount; + sendConnFifoPtr = sendConn->sizesFifo; } } __device__ __forceinline__ void saveRecvSync() { if (tid >= nthreads-WARP_SIZE && wid < nrecv) { recvConn->step = recvConnHead; - *(recvConn->opCountLoc) = opCount+1; __threadfence_block(); } } @@ -205,15 +192,14 @@ class ncclLLPrimitives { __device__ __forceinline__ void saveSendSync() { if (tid < nsend) { sendConn->step = sendConnHead; - *(sendConn->opCountLoc) = opCount+1; __threadfence_block(); } } public: __device__ __forceinline__ - ncclLLPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount) - : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), opCount(opCount) { + ncclLLPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, int stepLines, struct ncclChannel* channel, struct ncclDevComm* comm) + : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), stepLines(stepLines) { // Make sure step is updated before we read it. barrier(); diff --git a/src/collectives/device/prims_ll128.h b/src/collectives/device/prims_ll128.h index 40a8cff..999d0d5 100644 --- a/src/collectives/device/prims_ll128.h +++ b/src/collectives/device/prims_ll128.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -14,6 +14,7 @@ class ncclLL128Primitives { const int tid; const int nthreads; const int wid; + const int stepSize; const int warp; const bool flagThread; int nrecv = 0; @@ -38,8 +39,8 @@ class ncclLL128Primitives { volatile uint64_t* shmem; - inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*NCCL_LL128_SLICE_ELEMS; } - inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*NCCL_LL128_SLICE_ELEMS; } + inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; } + inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; } inline __device__ uint64_t* recvPtr(int i) { return recvBuff[i]+recvOffset(i); } inline __device__ uint64_t* sendPtr(int i) { return sendBuff[i]+sendOffset(i); } inline __device__ uint64_t recvFlag(int i) { return recvStep[i]+1; } @@ -47,22 +48,9 @@ class ncclLL128Primitives { inline __device__ void barrier() { if (NSEND>NRECV) { - asm volatile ("bar.sync 2, %0;" :: "r"(nthreads)); + asm volatile ("bar.sync 1, %0;" :: "r"(nthreads)); } else { - asm volatile ("bar.sync 3, %0;" :: "r"(nthreads)); - } - } - - uint32_t mismatch = 0; - const uint64_t opCount; - - inline __device__ void checkMismatch(struct ncclConnInfo* conn) { - if (mismatch > 20) { - // We have seen that the peer advanced opcount so many times yet we are still waiting for credit of current op, so it is _most likely_ a mismatch - // Note that we are not using _threadfence_system in LL so the error cannot be asserted - *(comm->fatalDevError) = ncclDevSuspectedMismatch; - } else if (conn && *conn->opCountRem > opCount) { - mismatch += 1; + asm volatile ("bar.sync 2, %0;" :: "r"(nthreads)); } } @@ -73,7 +61,6 @@ class ncclLL128Primitives { spins++; if (abort == 0 && spins == SPINS_BEFORE_CHECK_ABORT) { abort = *(comm->abortFlag); - if (wid == i) checkMismatch(send ? sendConn : recvConn); spins = 0; } return abort; @@ -81,7 +68,6 @@ class ncclLL128Primitives { inline __device__ void waitSend(int nbytes) { spins = 0; - mismatch = 0; if (sendConnHeadPtr) { while (sendConnHeadCache + NCCL_STEPS < sendConnHead + 1) { sendConnHeadCache = *sendConnHeadPtr; @@ -225,14 +211,14 @@ class ncclLL128Primitives { /************************ Send **************************/ if (SEND) { for (int i=1; i<NSEND && i<nsend; i++) { - int flag = sendFlag(i); + uint64_t flag = sendFlag(i); uint64_t* ptr = sendPtr(i)+ll128Offset; #pragma unroll for (int u=0; u<ELEMS_PER_THREAD; u+=2) { store128(ptr+u*WARP_SIZE, v[u], flagThread ? flag : v[u+1]); } } - int flag = sendFlag(0); + uint64_t flag = sendFlag(0); uint64_t* ptr = sendPtr(0)+ll128Offset; #pragma unroll for (int u=0; u<ELEMS_PER_THREAD; u+=2) { @@ -309,7 +295,7 @@ class ncclLL128Primitives { } __device__ __forceinline__ void loadRecvConn(struct ncclConnInfo* conn, int i) { - recvBuff[i] = conn->ll128Buff; + recvBuff[i] = (uint64_t*)conn->buffs[NCCL_PROTO_LL128]; recvStep[i] = conn->step; if (wid == i) recvConn = conn; nrecv++; @@ -318,13 +304,11 @@ class ncclLL128Primitives { if (tid >= nthreads-WARP_SIZE && wid < nrecv) { recvConnHeadPtr = recvConn->head; recvConnHead = recvConn->step; - // Update opCount in case we skipped some operations - *(recvConn->opCountLoc) = opCount; } } __device__ __forceinline__ void loadSendConn(struct ncclConnInfo* conn, int i) { - sendBuff[i] = conn->ll128Buff; + sendBuff[i] = (uint64_t*)conn->buffs[NCCL_PROTO_LL128]; sendStep[i] = conn->step; if (wid == i) sendConn = conn; nsend++; @@ -334,11 +318,10 @@ class ncclLL128Primitives { sendConnHeadPtr = sendConn->head; sendConnHeadCache = *sendConnHeadPtr; sendConnHead = sendConn->step; - sendConnFifoPtr = sendConn->fifo; - *(sendConn->opCountLoc) = opCount; + sendConnFifoPtr = sendConn->sizesFifo; } if (tid >= nthreads-WARP_SIZE && wid<nsend) { - if (sendConn->fifo) { + if (sendConn->sizesFifo) { sendConnTailPtr = sendConn->tail; sendConnTail = sendConn->step; } @@ -348,7 +331,6 @@ class ncclLL128Primitives { __device__ __forceinline__ void saveRecvSync() { if (tid >= nthreads-WARP_SIZE && wid < nrecv) { recvConn->step = recvConnHead; - *(recvConn->opCountLoc) = opCount+1; __threadfence_block(); } } @@ -356,15 +338,14 @@ class ncclLL128Primitives { __device__ __forceinline__ void saveSendSync() { if (tid < nsend) { sendConn->step = sendConnHead; - *(sendConn->opCountLoc) = opCount+1; __threadfence_block(); } } public: __device__ __forceinline__ - ncclLL128Primitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount) - : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), warp(tid/WARP_SIZE), flagThread((tid%8)==7), opCount(opCount), shmem(ncclShmem+(threadIdx.x/WARP_SIZE)*NCCL_LL128_SHMEM_ELEMS_PER_THREAD*WARP_SIZE+2*wid) { + ncclLL128Primitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm) + : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), warp(tid/WARP_SIZE), flagThread((tid%8)==7), stepSize(stepSize), shmem(ncclShmem->data+(threadIdx.x/WARP_SIZE)*NCCL_LL128_SHMEM_ELEMS_PER_THREAD*WARP_SIZE+2*wid) { // Make sure step is updated before we read it. barrier(); diff --git a/src/collectives/device/reduce.cu b/src/collectives/device/reduce.cu index a2caac5..66f1bb2 100644 --- a/src/collectives/device/reduce.cu +++ b/src/collectives/device/reduce.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,4 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_R(ncclReduce, ncclCollReduce); +IMPL_COLL_R(Reduce); diff --git a/src/collectives/device/reduce.h b/src/collectives/device/reduce.h index 0680abe..313209d 100644 --- a/src/collectives/device/reduce.h +++ b/src/collectives/device/reduce.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,133 +8,145 @@ #include "primitives.h" #include "collectives.h" -template<int UNROLL, class FUNC, typename T> -__device__ void ncclReduceRingKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->N; - const int nranks = comm->nRanks; - const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * REDUCE_CHUNKSTEPS; - const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize; - const int rank = ring->devUserRanks[0]; - const int prevRank = ring->devUserRanks[nranks-1]; - const int root = args->root; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - ncclPrimitives<UNROLL, REDUCE_CHUNKSTEPS/REDUCE_SLICESTEPS, REDUCE_SLICESTEPS, T, 1, 1, FUNC> - prims(tid, args->nThreads, &ring->prev, &ring->next, NULL, stepSize, channel, comm, args->opCount); - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,args->nChannels)); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t offset = gridOffset + bid*realChunkSize; - int nelem = min(realChunkSize, size-offset); - if (prevRank == root) { - prims.send(thisInput+offset, nelem); - } else if (rank == root) { - prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else { - prims.recvReduceSend(thisInput+offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncReduce, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * REDUCE_CHUNKSTEPS; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*(ssize_t)chunkSize; + const ssize_t size = args->coll.count; + const int rank = ring->devUserRanks[0]; + const int prevRank = ring->devUserRanks[nranks-1]; + const int root = args->coll.root; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + ncclPrimitives<UNROLL, REDUCE_CHUNKSTEPS/REDUCE_SLICESTEPS, REDUCE_SLICESTEPS, T, 1, 1, 0, FUNC> + prims(tid, nthreads, &ring->prev, &ring->next, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nChannels)); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t offset = gridOffset + bid*realChunkSize; + int nelem = min(realChunkSize, size-offset); + if (prevRank == root) { + prims.send(thisInput+offset, nelem); + } else if (rank == root) { + prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else { + prims.recvReduceSend(thisInput+offset, nelem); + } + } } - } -} - -template<int UNROLL, class FUNC, typename T> -__device__ void ncclReduceTreeKernel(struct CollectiveArgs* args) { } - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceRingLLKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - const int rank = comm->rank; - const int nranks = comm->nRanks; - const int prevRank = ring->devUserRanks[nranks-1]; - const int root = args->root; - - ssize_t chunkSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - if (size-gridOffset < loopSize) { - chunkSize = args->lastChunkSize; +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncReduce, NCCL_ALGO_RING, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + const int rank = comm->rank; + const int prevRank = ring->devUserRanks[nranks-1]; + const int root = args->coll.root; + + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepLines, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + if (size-gridOffset < loopSize) { + chunkSize = args->coll.lastChunkSize; + } + ssize_t offset = gridOffset + bid*chunkSize; + + int nelem = min(chunkSize, size-offset); + if (prevRank == root) { + LLprims.send(thisInput+offset, nelem); + } else if (rank == root) { + LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else { + LLprims.recvReduceSend(thisInput+offset, nelem); + } + } } - ssize_t offset = gridOffset + bid*chunkSize; - - int nelem = min(chunkSize, size-offset); - if (prevRank == root) { - LLprims.send(thisInput+offset, nelem); - } else if (rank == root) { - LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else { - LLprims.recvReduceSend(thisInput+offset, nelem); - } - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceTreeLLKernel(struct CollectiveArgs* args) { } +}; #include "prims_ll128.h" -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceRingLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - const int rank = comm->rank; - const int nranks = comm->nRanks; - const int prevRank = ring->devUserRanks[nranks-1]; - const int root = args->root; - - ssize_t chunkSize = (NCCL_LL128_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, args->nChannels*minChunkSize)*minChunkSize, chunkSize); - ssize_t offset = gridOffset + bid*chunkSize; - - int nelem = min(chunkSize, size-offset); - if (prevRank == root) { - LLprims.send(thisInput+offset, nelem); - } else if (rank == root) { - LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); - } else { - LLprims.recvReduceSend(thisInput+offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncReduce, NCCL_ALGO_RING, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); + ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T)); + const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + const int rank = comm->rank; + const int prevRank = ring->devUserRanks[nranks-1]; + const int root = args->coll.root; + + ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + chunkSize = min(DIVUP(size-gridOffset, nChannels*minChunkSize)*minChunkSize, chunkSize); + ssize_t offset = gridOffset + bid*chunkSize; + + int nelem = min(chunkSize, size-offset); + if (prevRank == root) { + LLprims.send(thisInput+offset, nelem); + } else if (rank == root) { + LLprims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem); + } else { + LLprims.recvReduceSend(thisInput+offset, nelem); + } + } } - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceTreeLL128Kernel(struct CollectiveArgs* args) { } +}; + +template<int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction<ncclFuncReduce, NCCL_ALGO_TREE, PROTO, REDOP, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; + +template<int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction<ncclFuncReduce, NCCL_ALGO_COLLNET, PROTO, REDOP, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; diff --git a/src/collectives/device/reduce_scatter.cu b/src/collectives/device/reduce_scatter.cu index 8b45299..c2c6d42 100644 --- a/src/collectives/device/reduce_scatter.cu +++ b/src/collectives/device/reduce_scatter.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,4 +8,4 @@ #include "common.h" #include "collectives.h" -IMPL_COLL_R(ncclReduceScatter, ncclCollReduceScatter); +IMPL_COLL_R(ReduceScatter); diff --git a/src/collectives/device/reduce_scatter.h b/src/collectives/device/reduce_scatter.h index 1985148..a0d45dc 100644 --- a/src/collectives/device/reduce_scatter.h +++ b/src/collectives/device/reduce_scatter.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -8,177 +8,189 @@ #include "primitives.h" #include "collectives.h" -template<int UNROLL, class FUNC, typename T> -__device__ void ncclReduceScatterRingKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->nThreads-WARP_SIZE; - const int bid = args->bid; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const ssize_t size = args->N; - const int nranks = comm->nRanks; - const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS); - const int chunkSize = stepSize * REDUCESCATTER_CHUNKSTEPS; - const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - ncclPrimitives<UNROLL, REDUCESCATTER_CHUNKSTEPS/REDUCESCATTER_SLICESTEPS, REDUCESCATTER_SLICESTEPS, T, 1, 1, FUNC> - prims(tid, args->nThreads, &ring->prev, &ring->next, NULL, stepSize, channel, comm, args->opCount); - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,args->nChannels)); - ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); - ssize_t chunkOffset = gridOffset + bid*realChunkSize; - - /////////////// begin ReduceScatter steps /////////////// - ssize_t offset; - int nelem = min(realChunkSize, size-chunkOffset); - int rankDest; - - // step 0: push data to next GPU - rankDest = ring->devUserRanks[nranks-1]; - offset = chunkOffset + rankDest * size; - - prims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; j<nranks; ++j) { - rankDest = ring->devUserRanks[nranks-j]; - offset = chunkOffset + rankDest * size; - - prims.recvReduceSend(thisInput+offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads-WARP_SIZE; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize * REDUCESCATTER_CHUNKSTEPS; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*(ssize_t)chunkSize; + const ssize_t size = args->coll.count; + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + ncclPrimitives<UNROLL, REDUCESCATTER_CHUNKSTEPS/REDUCESCATTER_SLICESTEPS, REDUCESCATTER_SLICESTEPS, T, 1, 1, 0, FUNC> + prims(tid, nthreads, &ring->prev, &ring->next, NULL, stepSize, channel, comm, ncclShmem->ptrs, 0); + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nChannels)); + ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T)); + ssize_t chunkOffset = gridOffset + bid*realChunkSize; + + /////////////// begin ReduceScatter steps /////////////// + ssize_t offset; + int nelem = min(realChunkSize, size-chunkOffset); + int rankDest; + + // step 0: push data to next GPU + rankDest = ring->devUserRanks[nranks-1]; + offset = chunkOffset + rankDest * size; + + prims.send(thisInput+offset, nelem); + + // k-2 steps: reduce and copy to next GPU + for (int j=2; j<nranks; ++j) { + rankDest = ring->devUserRanks[nranks-j]; + offset = chunkOffset + rankDest * size; + + prims.recvReduceSend(thisInput+offset, nelem); + } + + // step k-1: reduce this buffer and data, which will produce the final result + rankDest = ring->devUserRanks[0]; + offset = chunkOffset + rankDest * size; + + prims.recvReduceCopy(thisInput+offset, thisOutput+chunkOffset, nelem); + } } - - // step k-1: reduce this buffer and data, which will produce the final result - rankDest = ring->devUserRanks[0]; - offset = chunkOffset + rankDest * size; - - prims.recvReduceCopy(thisInput+offset, thisOutput+chunkOffset, nelem); - } -} - -template<int UNROLL, class FUNC, typename T> -__device__ void ncclReduceScatterTreeKernel(struct CollectiveArgs* args) { } - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceScatterRingLLKernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - //const int rank = comm->rank; - const int nranks = comm->nRanks; - ssize_t chunkSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t) / sizeof(T); - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - if (size-gridOffset < loopSize) { - chunkSize = args->lastChunkSize; - } - ssize_t chunkOffset = gridOffset + bid*chunkSize; - - /////////////// begin ReduceScatter steps /////////////// - ssize_t offset; - int nelem = min(chunkSize, size-chunkOffset); - int rankDest; - - // step 0: push data to next GPU - rankDest = ring->devUserRanks[nranks-1]; - offset = chunkOffset + rankDest * size; - - LLprims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; j<nranks; ++j) { - rankDest = ring->devUserRanks[nranks-j]; - offset = chunkOffset + rankDest * size; - - LLprims.recvReduceSend(thisInput+offset, nelem); +}; + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_RING, NCCL_PROTO_LL, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepLines = comm->buffSizes[NCCL_PROTO_LL] / (sizeof(union ncclLLFifoLine)*NCCL_STEPS); + ssize_t chunkSize = stepLines * sizeof(uint64_t) / sizeof(T); + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + ncclLLPrimitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepLines, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + if (size-gridOffset < loopSize) { + chunkSize = args->coll.lastChunkSize; + } + ssize_t chunkOffset = gridOffset + bid*chunkSize; + + /////////////// begin ReduceScatter steps /////////////// + ssize_t offset; + int nelem = min(chunkSize, size-chunkOffset); + int rankDest; + + // step 0: push data to next GPU + rankDest = ring->devUserRanks[nranks-1]; + offset = chunkOffset + rankDest * size; + + LLprims.send(thisInput+offset, nelem); + + // k-2 steps: reduce and copy to next GPU + for (int j=2; j<nranks; ++j) { + rankDest = ring->devUserRanks[nranks-j]; + offset = chunkOffset + rankDest * size; + + LLprims.recvReduceSend(thisInput+offset, nelem); + } + + // step k-1: reduce this buffer and data, which will produce the final + // result that we store in this data + rankDest = ring->devUserRanks[0]; + offset = chunkOffset + rankDest * size; + + LLprims.recvReduceCopy(thisInput+offset, thisOutput+chunkOffset, nelem); + } } - - // step k-1: reduce this buffer and data, which will produce the final - // result that we store in this data - rankDest = ring->devUserRanks[0]; - offset = chunkOffset + rankDest * size; - - LLprims.recvReduceCopy(thisInput+offset, thisOutput+chunkOffset, nelem); - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceScatterTreeLLKernel(struct CollectiveArgs* args) { } +}; #include "prims_ll128.h" -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceScatterRingLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int bid = args->bid; - const int nthreads = args->nThreads; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - - ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, channel, comm, args->opCount); - - const ssize_t size = args->N; - //const int rank = comm->rank; - const int nranks = comm->nRanks; - ssize_t chunkSize = (NCCL_LL128_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T)); - // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; - - const ssize_t loopSize = args->nChannels*chunkSize; - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->ThisInput; - T * __restrict__ thisOutput = (T*)args->ThisOutput; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, args->nChannels*minChunkSize)*minChunkSize, chunkSize); - - ssize_t chunkOffset = gridOffset + bid*chunkSize; - - /////////////// begin ReduceScatter steps /////////////// - ssize_t offset; - int nelem = min(chunkSize, size-chunkOffset); - int rankDest; - - // step 0: push data to next GPU - rankDest = ring->devUserRanks[nranks-1]; - offset = chunkOffset + rankDest * size; - - LLprims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; j<nranks; ++j) { - rankDest = ring->devUserRanks[nranks-j]; - offset = chunkOffset + rankDest * size; - - LLprims.recvReduceSend(thisInput+offset, nelem); +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_RING, NCCL_PROTO_LL128, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) { + const int tid = threadIdx.x; + const int nthreads = args->nThreads; + const int bid = args->coll.bid; + const int nChannels = args->coll.nChannels; + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + struct ncclRing* ring = &channel->ring; + const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); + ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T)); + // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. + const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; + const int nranks = comm->nRanks; + const ssize_t loopSize = nChannels*chunkSize; + const ssize_t size = args->coll.count; + + ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm); + + // Compute pointers + const T * __restrict__ thisInput = (const T*)args->sendbuff; + T * __restrict__ thisOutput = (T*)args->recvbuff; + + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { + chunkSize = min(DIVUP(size-gridOffset, nChannels*minChunkSize)*minChunkSize, chunkSize); + + ssize_t chunkOffset = gridOffset + bid*chunkSize; + + /////////////// begin ReduceScatter steps /////////////// + ssize_t offset; + int nelem = min(chunkSize, size-chunkOffset); + int rankDest; + + // step 0: push data to next GPU + rankDest = ring->devUserRanks[nranks-1]; + offset = chunkOffset + rankDest * size; + + LLprims.send(thisInput+offset, nelem); + + // k-2 steps: reduce and copy to next GPU + for (int j=2; j<nranks; ++j) { + rankDest = ring->devUserRanks[nranks-j]; + offset = chunkOffset + rankDest * size; + + LLprims.recvReduceSend(thisInput+offset, nelem); + } + + // step k-1: reduce this buffer and data, which will produce the final + // result that we store in this data + rankDest = ring->devUserRanks[0]; + offset = chunkOffset + rankDest * size; + + LLprims.recvReduceCopy(thisInput+offset, thisOutput+chunkOffset, nelem); + } } - - // step k-1: reduce this buffer and data, which will produce the final - // result that we store in this data - rankDest = ring->devUserRanks[0]; - offset = chunkOffset + rankDest * size; - - LLprims.recvReduceCopy(thisInput+offset, thisOutput+chunkOffset, nelem); - } -} - -template<int UNUSED, class FUNC, typename T> -__device__ void ncclReduceScatterTreeLL128Kernel(struct CollectiveArgs* args) { } +}; + +template<int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_TREE, PROTO, REDOP, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; + +template<int PROTO, class REDOP, typename T, int UNROLL> +class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_COLLNET, PROTO, REDOP, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* args) {} +}; diff --git a/src/collectives/device/sendrecv.cu b/src/collectives/device/sendrecv.cu new file mode 100644 index 0000000..59e38b5 --- /dev/null +++ b/src/collectives/device/sendrecv.cu @@ -0,0 +1,11 @@ +/************************************************************************* + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "sendrecv.h" +#include "common.h" +#include "collectives.h" + +IMPL_COLL_P(SendRecv); diff --git a/src/collectives/device/sendrecv.h b/src/collectives/device/sendrecv.h new file mode 100644 index 0000000..1cb34f3 --- /dev/null +++ b/src/collectives/device/sendrecv.h @@ -0,0 +1,92 @@ +/************************************************************************* + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "devcomm.h" +#include "primitives.h" +#include "collectives.h" + +template<class FUNC, typename T, int UNROLL> +class ncclFunction<ncclFuncSendRecv, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE, FUNC, T, UNROLL> { + public: + __device__ void run(struct ncclWorkElem* firstArgs) { + struct ncclWorkElem* args = firstArgs; + int tid = threadIdx.x; + int group = 0; + for (int s=0; s<NCCL_MAX_WORK_ELEMENTS; s++) { + int nThreadsSegment = args->p2p.nThreads; + if (nThreadsSegment == 0) return; // Nothing else to do + int groupRecv = group; + group += 1; + int groupSend = group; + group += nThreadsSegment > 128 ? 2 : 1; + if (tid < nThreadsSegment) { + const int nThreads = nThreadsSegment > 128 ? nThreadsSegment-WARP_SIZE : nThreadsSegment; + + // Compute pointers + const T* sendbuff = (const T*)args->sendbuff; + T* recvbuff = (T*)args->recvbuff; + const ssize_t sendCount = args->p2p.sendCount; + const ssize_t recvCount = args->p2p.recvCount; + + const int delta = args->p2p.delta; + if (delta == 0) { + if (tid < nThreads && sendbuff != recvbuff) { + // local copy : ReduceOrCopyMulti takes an int as number of elements, + // so we split it in blocks of 1G elements. + int blockSize = 1<<30; + for (size_t offset=0; offset<sendCount; offset += blockSize) { + size_t remaining = sendCount - offset; + if (remaining < blockSize) blockSize = remaining; + ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, 1>(tid, nThreads, 1, &sendbuff, 1, &recvbuff, blockSize); + sendbuff += blockSize; recvbuff += blockSize; + } + } + } else { + struct ncclDevComm* comm = args->comm; + struct ncclChannel* channel = comm->channels+blockIdx.x; + + const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/(sizeof(T)*NCCL_STEPS); + const int chunkSize = stepSize/SENDRECV_SLICEFACTOR; + + int nThreadsSplit = nThreads/2; + if ((tid < nThreadsSplit) && recvCount >= 0) { + int peer = (comm->rank-delta+comm->nRanks)%comm->nRanks; + int nt = nThreadsSplit; + ncclPrimitives<UNROLL, 1, 1, T, 1, 0, 1, FUNC> + prims(tid, nt, &peer, NULL, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupRecv); + + if (recvCount == 0) { + prims.recv(recvbuff, 0); + } else for (ssize_t offset = 0; offset < recvCount; offset += chunkSize) { + int realChunkSize = min(chunkSize, recvCount-offset); + ALIGN_SIZE(realChunkSize, nt*sizeof(uint64_t)/sizeof(T)); + int nelem = min(realChunkSize, recvCount-offset); + prims.directRecv(recvbuff+offset, offset, nelem); + } + } + if ((tid >= nThreadsSplit) && sendCount >= 0) { + int peer = (comm->rank+delta)%comm->nRanks; + int nt = nThreads-nThreadsSplit; + ncclPrimitives<UNROLL, 1, 1, T, 0, 1, 1, FUNC> + prims(tid-nThreadsSplit, nt, NULL, &peer, recvbuff, stepSize, channel, comm, ncclShmem->ptrs, groupSend); + + if (sendCount == 0) { + prims.send(sendbuff, 0); + } else for (ssize_t offset = 0; offset < sendCount; offset += chunkSize) { + int realChunkSize = min(chunkSize, sendCount-offset); + ALIGN_SIZE(realChunkSize, nt*sizeof(uint64_t)/sizeof(T)); + int nelem = min(realChunkSize, sendCount-offset); + prims.directSend(sendbuff+offset, offset, nelem); + } + } + } + } + tid -= nThreadsSegment; + if (tid < 0) return; + args++; + } + } +}; diff --git a/src/collectives/reduce.cc b/src/collectives/reduce.cc index 67f2fae..86388df 100644 --- a/src/collectives/reduce.cc +++ b/src/collectives/reduce.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -11,7 +11,8 @@ NCCL_API(ncclResult_t, ncclReduce, const void* sendbuff, void* recvbuff, size_t ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream); ncclResult_t ncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - struct ncclInfo info = { ncclCollReduce, "Reduce", + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncReduce, "Reduce", sendbuff, recvbuff, count, datatype, op, root, comm, stream, /* Args */ REDUCE_CHUNKSTEPS, REDUCE_SLICESTEPS }; return ncclEnqueueCheck(&info); diff --git a/src/collectives/reduce_scatter.cc b/src/collectives/reduce_scatter.cc index 5ad7f5f..57c67bf 100644 --- a/src/collectives/reduce_scatter.cc +++ b/src/collectives/reduce_scatter.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -11,7 +11,8 @@ NCCL_API(ncclResult_t, ncclReduceScatter, const void* sendbuff, void* recvbuff, ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream); ncclResult_t ncclReduceScatter(const void* sendbuff, void* recvbuff, size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream) { - struct ncclInfo info = { ncclCollReduceScatter, "ReduceScatter", + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncReduceScatter, "ReduceScatter", sendbuff, recvbuff, recvcount, datatype, op, 0, comm, stream, /* Args */ REDUCESCATTER_CHUNKSTEPS, REDUCESCATTER_SLICESTEPS }; return ncclEnqueueCheck(&info); diff --git a/src/collectives/sendrecv.cc b/src/collectives/sendrecv.cc new file mode 100644 index 0000000..65222a5 --- /dev/null +++ b/src/collectives/sendrecv.cc @@ -0,0 +1,39 @@ +/************************************************************************* + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "enqueue.h" +#include "collectives.h" +#include "argcheck.h" // Need some checks here since we access comm + +NCCL_API(ncclResult_t, ncclSend, const void* sendbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, cudaStream_t stream); +ncclResult_t ncclSend(const void* sendbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, cudaStream_t stream) { + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncSendRecv, "Send", + sendbuff, NULL, count, datatype, ncclSum, peer, comm, stream, /* Args */ + 1, 1 }; + ncclResult_t ret; + NCCLCHECK(ncclGroupStart()); + ret = ncclEnqueueCheck(&info); + NCCLCHECK(ncclGroupEnd()); + return ret; +} + +NCCL_API(ncclResult_t, ncclRecv, void* recvbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, cudaStream_t stream); +ncclResult_t ncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, cudaStream_t stream) { + NVTX3_FUNC_RANGE_IN(nccl_domain); + struct ncclInfo info = { ncclFuncSendRecv, "Recv", + NULL, recvbuff, count, datatype, ncclSum, peer, comm, stream, /* Args */ + 1, 1 }; + ncclResult_t ret; + NCCLCHECK(ncclGroupStart()); + ret = ncclEnqueueCheck(&info); + NCCLCHECK(ncclGroupEnd()); + return ret; +} |