diff options
Diffstat (limited to 'src/transport/net.cc')
-rw-r--r-- | src/transport/net.cc | 573 |
1 files changed, 315 insertions, 258 deletions
diff --git a/src/transport/net.cc b/src/transport/net.cc index 87fc9ce..86c43f8 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -7,24 +7,27 @@ #include "comm.h" #include "net.h" #include "graph.h" +#include "collectives.h" struct netConnectInfo { ncclNetHandle_t netHandle; }; +#define LOC_HOSTMEM 0 +#define LOC_DEVMEM 1 +#define LOC_COUNT 2 + struct netSendResources { void* netSendComm; - struct ncclSendMem* hostSendMem; - struct ncclRecvMem* hostRecvMem; - struct ncclSendMem* devHostSendMem; - struct ncclRecvMem* devHostRecvMem; + struct ncclSendMem* sendMem; + struct ncclRecvMem* recvMem; int netDev; int useGdr; - int buffSize; - void* mhandle; - void* llMhandle; - void* ll128Mhandle; - struct ncclRecvMem* devRecvMem; + int shared; + char* buffers[LOC_COUNT]; + int buffSizes[LOC_COUNT]; + void* mhandles[LOC_COUNT]; + void** mhandlesProto[NCCL_NUM_PROTOCOLS]; uint64_t step; uint64_t llLastCleaning; }; @@ -32,17 +35,15 @@ struct netSendResources { struct netRecvResources { void* netListenComm; void* netRecvComm; - struct ncclSendMem* hostSendMem; - struct ncclRecvMem* hostRecvMem; - struct ncclSendMem* devHostSendMem; - struct ncclRecvMem* devHostRecvMem; + struct ncclSendMem* sendMem; + struct ncclRecvMem* recvMem; int netDev; int useGdr; - int buffSize; - void* mhandle; - void* llMhandle; - void* ll128Mhandle; - struct ncclRecvMem* devRecvMem; + int shared; + char* buffers[LOC_COUNT]; + int buffSizes[LOC_COUNT]; + void* mhandles[LOC_COUNT]; + void** mhandlesProto[NCCL_NUM_PROTOCOLS]; uint64_t step; uint64_t llLastCleaning; }; @@ -53,162 +54,178 @@ ncclResult_t netCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTop return ncclSuccess; } -NCCL_PARAM(NetGdrRead, "NET_GDR_READ", -2); -NCCL_PARAM(NetGdrLevel, "NET_GDR_LEVEL", PATH_PHB); - -static ncclResult_t netGetGdrSupport(struct ncclTopoSystem* topo, int64_t busId, int netDev, int read, int* useGdr) { - *useGdr = 0; - - if (read) { // For reads (sends) only enable under certain conditions - int gdrReadParam = ncclParamNetGdrRead(); - if (gdrReadParam == 0) return ncclSuccess; - if (gdrReadParam < 0) { - int nvlink; - NCCLCHECK(ncclTopoHasNvlink(topo, busId, &nvlink)); - if (!nvlink) return ncclSuccess; - } - } - - // Check if we are close enough that it makes sense to enable GDR - int netGdrLevel = ncclParamNetGdrLevel(); - int distance; - NCCLCHECK(ncclTopoNetDistance(topo, busId, netDev, &distance)); - if (distance >= netGdrLevel) { - INFO(NCCL_NET,"NET/%s : GPU Direct RDMA Disabled for GPU %lx / HCA %d (distance %d >= %d)", ncclNetName(), busId, netDev, distance, netGdrLevel); - return ncclSuccess; - } - - // Finally, check if the NIC supports it - int flags; - NCCLCHECK(ncclNetPtrSupport(netDev, &flags)); - if ((flags & NCCL_PTR_CUDA) == 0) return ncclSuccess; - *useGdr = 1; - INFO(NCCL_NET,"NET/%s : GPU Direct RDMA Enabled for GPU %lx / HCA %d (distance %d < %d), read %d", ncclNetName(), busId, netDev, distance, netGdrLevel, read); - return ncclSuccess; -} +NCCL_PARAM(NetSharedBuffers, "NET_SHARED_BUFFERS", -2); /* Determine if we will use this transport for this peer and return connect * information for this peer */ -ncclResult_t netSendSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int buffSize, int channelId) { +ncclResult_t netSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) { struct netSendResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); send->transportResources = resources; + send->conn.shared = resources->shared = ncclParamNetSharedBuffers() != -2 ? ncclParamNetSharedBuffers() : graph ? 0 : 1; + + NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev)); + NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 1, &resources->useGdr)); + + NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1)); + NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1)); + + send->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0; + send->conn.tail = &resources->recvMem->tail; + send->conn.sizesFifo = resources->recvMem->sizesFifo; + // Only fuse P2P buffers, continue to allocate dedicated buffers for ring/tree + send->conn.ptrsFifo = resources->shared ? resources->recvMem->ptrsFifo : NULL; + send->conn.head = &resources->sendMem->head; + resources->sendMem->head = resources->shared ? -NCCL_STEPS : 0; // Don't give any credit yet when sharing buffers + for (int i=0; i<NCCL_STEPS; i++) send->conn.sizesFifo[i] = -1; + + if (resources->shared == 0) { + int protoLoc[NCCL_NUM_PROTOCOLS]; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { + protoLoc[p] = p != NCCL_PROTO_LL && resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM; + } + int buffSizes[NCCL_NUM_PROTOCOLS]; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { + buffSizes[p] = send->comm->buffSizes[p]; + resources->buffSizes[protoLoc[p]] += buffSizes[p]; + } - NCCLCHECK(ncclTopoGetNetDev(graph, 1, channelId, &resources->netDev)); - NCCLCHECK(netGetGdrSupport(topo, myInfo->busId, resources->netDev, 1, &resources->useGdr)); - - int sendSize = sizeof(struct ncclSendMem); - NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostSendMem, (void**)&resources->devHostSendMem, sendSize)); + if (resources->buffSizes[LOC_DEVMEM]) { + NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM])); + } + if (resources->buffSizes[LOC_HOSTMEM]) { + NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM])); + } - int recvSize = offsetof(struct ncclRecvMem, buff)+buffSize; - if (resources->useGdr) { - NCCLCHECK(ncclCudaCalloc((char**)(&resources->devRecvMem), recvSize)); + int offsets[LOC_COUNT]; + offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { + resources->mhandlesProto[p] = resources->mhandles+protoLoc[p]; + send->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]]; + offsets[protoLoc[p]] += buffSizes[p]; + } } - NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostRecvMem, (void**)&resources->devHostRecvMem, recvSize)); - resources->buffSize = buffSize; - INFO(NCCL_INIT|NCCL_NET,"Ring %02d : %d[%lx] -> %d[%lx] [send] via NET/%s/%d%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, ncclNetName(), resources->netDev, - resources->useGdr ? "/GDRDMA" : ""); + INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [send] via NET/%s/%d%s%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, ncclNetName(), resources->netDev, + resources->useGdr ? "/GDRDMA" : "", resources->shared ? "/Shared" : ""); return ncclSuccess; } -ncclResult_t netRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int buffSize, int channelId) { +ncclResult_t netRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) { struct netRecvResources* resources; NCCLCHECK(ncclCalloc(&resources, 1)); recv->transportResources = resources; + recv->conn.shared = resources->shared = ncclParamNetSharedBuffers() != -2 ? ncclParamNetSharedBuffers() : graph ? 0 : 1; - NCCLCHECK(ncclTopoGetNetDev(graph, 0, channelId, &resources->netDev)); - NCCLCHECK(netGetGdrSupport(topo, myInfo->busId, resources->netDev, 0, &resources->useGdr)); + NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev)); + NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 0, &resources->useGdr)); - int sendSize = sizeof(struct ncclSendMem); - NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostSendMem, (void**)&resources->devHostSendMem, sendSize)); + NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1)); + NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1)); - int recvSize = offsetof(struct ncclRecvMem, buff)+buffSize; - if (resources->useGdr) { - NCCLCHECK(ncclCudaCalloc((char**)(&resources->devRecvMem), recvSize)); + recv->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0; + recv->conn.tail = &resources->recvMem->tail; + // Only fuse P2P buffers, continue to allocate dedicated buffers for ring/tree + recv->conn.ptrsFifo = resources->shared ? resources->recvMem->ptrsFifo : NULL; + recv->conn.head = &resources->sendMem->head; + + if (resources->shared == 0) { // Only allocate dedicated buffers for ring/tree not for p2p + int protoLoc[NCCL_NUM_PROTOCOLS]; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { + protoLoc[p] = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM; + } + + int buffSizes[NCCL_NUM_PROTOCOLS]; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { + buffSizes[p] = recv->comm->buffSizes[p]; + resources->buffSizes[protoLoc[p]] += buffSizes[p]; + } + + if (resources->buffSizes[LOC_DEVMEM]) { + NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM])); + } + if (resources->buffSizes[LOC_HOSTMEM]) { + NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM])); + } + + int offsets[LOC_COUNT]; + offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0; + for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) { + resources->mhandlesProto[p] = resources->mhandles+protoLoc[p]; + recv->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]]; + offsets[protoLoc[p]] += buffSizes[p]; + } } - NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostRecvMem, (void**)&resources->devHostRecvMem, recvSize)); - resources->buffSize = buffSize; - INFO(NCCL_INIT|NCCL_NET,"Ring %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s", channelId, peerInfo->rank, peerInfo->busId, myInfo->rank, myInfo->busId, ncclNetName(), resources->netDev, - resources->useGdr ? "/GDRDMA" : ""); + INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s%s", channelId, peerInfo->rank, peerInfo->busId, myInfo->rank, myInfo->busId, ncclNetName(), resources->netDev, + resources->useGdr ? "/GDRDMA" : "", resources->shared ? "/Shared" : ""); struct netConnectInfo* info = (struct netConnectInfo*) connectInfo; NCCLCHECK(ncclNetListen(resources->netDev, &info->netHandle, &resources->netListenComm)); + return ncclSuccess; } -ncclResult_t netSendConnect(struct ncclConnect* connectInfo, struct ncclConnector* send) { +ncclResult_t netSendConnect(struct ncclComm* comm, struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send) { // Setup device pointers struct netSendResources* resources = (struct netSendResources*)send->transportResources; - - // Intermediate buffering on GPU for GPU Direct RDMA, but LL buffer is always on host - struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->devHostRecvMem; - send->conn.buff = recvMem->buff; - send->conn.llBuff = resources->devHostRecvMem->llBuff; - send->conn.ll128Buff = recvMem->ll128Buff; - - // Head/Tail/Opcount/Fifos are always on host - send->conn.tail = &resources->devHostRecvMem->tail; - send->conn.opCountRem = &resources->devHostRecvMem->opCount; - send->conn.fifo = resources->devHostRecvMem->sizesFifo; - send->conn.head = &resources->devHostSendMem->head; - send->conn.opCountLoc = &resources->devHostSendMem->opCount; - for (int i=0; i<NCCL_STEPS; i++) send->conn.fifo[i] = -1; + struct netConnectInfo* info = (struct netConnectInfo*)connectInfo; // Connect to remote peer - struct netConnectInfo* info = (struct netConnectInfo*)connectInfo; NCCLCHECK(ncclNetConnect(resources->netDev, info->netHandle, &resources->netSendComm)); - NCCLCHECK(ncclNetRegMr(resources->netSendComm, recvMem->buff, resources->buffSize, - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandle)); - NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->devHostRecvMem->llBuff, - NCCL_LL_BUFF_SIZE, NCCL_PTR_HOST, &resources->llMhandle)); - NCCLCHECK(ncclNetRegMr(resources->netSendComm, recvMem->ll128Buff, NCCL_LL128_BUFF_SIZE, - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->ll128Mhandle)); + if (resources->shared) { + // Get shared buffers + int loc = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM; + NCCLCHECK(ncclProxySharedBuffersInit(send->comm, resources->useGdr, resources->buffSizes+loc, resources->buffers+loc)); + resources->mhandlesProto[NCCL_PROTO_SIMPLE] = resources->mhandles+loc; + } + if (resources->buffSizes[LOC_DEVMEM]) { + NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM])); + } + if (resources->buffSizes[LOC_HOSTMEM]) { + NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM])); + } return ncclSuccess; } /* Connect to this peer */ -ncclResult_t netRecvConnect(struct ncclConnect* connectInfo, struct ncclConnector* recv) { +ncclResult_t netRecvConnect(struct ncclComm* comm, struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* recv) { // Setup device pointers struct netRecvResources* resources = (struct netRecvResources*)recv->transportResources; - // Intermediate buffering on GPU for GPU Direct RDMA - struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->devHostRecvMem; - recv->conn.buff = recvMem->buff; - recv->conn.llBuff = recvMem->llBuff; - recv->conn.ll128Buff = recvMem->ll128Buff; - - // Head/Tail/Opcount are always on host - recv->conn.tail = &resources->devHostRecvMem->tail; - recv->conn.opCountLoc = &resources->devHostRecvMem->opCount; - recv->conn.head = &resources->devHostSendMem->head; - recv->conn.opCountRem = &resources->devHostSendMem->opCount; - // Finish connection establishment from remote peer NCCLCHECK(ncclNetAccept(resources->netListenComm, &resources->netRecvComm)); NCCLCHECK(ncclNetCloseListen(resources->netListenComm)); - NCCLCHECK(ncclNetRegMr(resources->netRecvComm, recvMem->buff, resources->buffSize, - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandle)); - NCCLCHECK(ncclNetRegMr(resources->netRecvComm, recvMem->llBuff, NCCL_LL_BUFF_SIZE, - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->llMhandle)); - NCCLCHECK(ncclNetRegMr(resources->netRecvComm, recvMem->ll128Buff, NCCL_LL128_BUFF_SIZE, - resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->ll128Mhandle)); + if (resources->shared) { + // Get shared buffers + int loc = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM; + NCCLCHECK(ncclProxySharedBuffersInit(recv->comm, resources->useGdr, resources->buffSizes+loc, resources->buffers+loc)); + resources->mhandlesProto[NCCL_PROTO_SIMPLE] = resources->mhandles+loc; + } + if (resources->buffSizes[LOC_DEVMEM]) { + NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM])); + } + if (resources->buffSizes[LOC_HOSTMEM]) { + NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM])); + } return ncclSuccess; } ncclResult_t netSendFree(void* transportResources) { struct netSendResources* resources = (struct netSendResources*)transportResources; - NCCLCHECK(ncclCudaHostFree(resources->hostSendMem)); - NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->mhandle)); - NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->llMhandle)); - NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->ll128Mhandle)); - NCCLCHECK(ncclCudaHostFree(resources->hostRecvMem)); - if (resources->useGdr) - CUDACHECK(cudaFree(resources->devRecvMem)); + NCCLCHECK(ncclCudaHostFree(resources->sendMem)); + NCCLCHECK(ncclCudaHostFree(resources->recvMem)); + for (int l=0; l<LOC_COUNT; l++) { + if (resources->buffers[l]) + NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->mhandles[l])); + } + if (resources->shared == 0) { + NCCLCHECK(ncclCudaHostFree(resources->buffers[LOC_HOSTMEM])); + CUDACHECK(cudaFree(resources->buffers[LOC_DEVMEM])); + } NCCLCHECK(ncclNetCloseSend(resources->netSendComm)); free(resources); return ncclSuccess; @@ -216,127 +233,128 @@ ncclResult_t netSendFree(void* transportResources) { ncclResult_t netRecvFree(void* transportResources) { struct netRecvResources* resources = (struct netRecvResources*)transportResources; - NCCLCHECK(ncclCudaHostFree(resources->hostSendMem)); - NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->mhandle)); - NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->llMhandle)); - NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->ll128Mhandle)); - NCCLCHECK(ncclCudaHostFree(resources->hostRecvMem)); - if (resources->useGdr) - CUDACHECK(cudaFree(resources->devRecvMem)); + NCCLCHECK(ncclCudaHostFree(resources->sendMem)); + NCCLCHECK(ncclCudaHostFree(resources->recvMem)); + for (int l=0; l<LOC_COUNT; l++) { + if (resources->buffers[l]) + NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->mhandles[l])); + } + if (resources->shared == 0) { + NCCLCHECK(ncclCudaHostFree(resources->buffers[LOC_HOSTMEM])); + CUDACHECK(cudaFree(resources->buffers[LOC_DEVMEM])); + } NCCLCHECK(ncclNetCloseRecv(resources->netRecvComm)); free(resources); return ncclSuccess; } +static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps"); + ncclResult_t netSendProxy(struct ncclProxyArgs* args) { struct netSendResources* resources = (struct netSendResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { - // Update opCount - resources->hostRecvMem->opCount = args->opCount; - // Round to next multiple of sliceSteps resources->step = ROUNDUP(resources->step, args->chunkSteps); - args->head = resources->step; - args->tail = resources->step; - args->end = args->head + args->nsteps; + args->posted = args->transmitted = args->done = resources->step; + args->end = resources->step + args->nsteps; args->state = ncclProxyOpProgress; } + args->idle = 1; if (args->state == ncclProxyOpProgress) { - args->idle = 1; - if (args->head < args->end) { - if (args->tail < args->end && args->tail < args->head + NCCL_STEPS) { - volatile int* sizesFifo = resources->hostRecvMem->sizesFifo; - volatile uint64_t* recvTail = &resources->hostRecvMem->tail; + int p = args->protocol; + int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS; + char* localBuff = args->connector->conn.buffs[p]; + void* mhandle = *(resources->mhandlesProto[p]); + int buffSize = stepSize*args->sliceSteps; + if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR; + if (args->sendbytes < buffSize) buffSize = args->sendbytes; + // Post buffers to the GPU + if (args->posted < args->end && args->posted < args->done + NCCL_STEPS) { + if (resources->shared) { + char* ptr; + NCCLCHECK(ncclProxySharedBuffersAlloc(args->connector->comm, resources->useGdr, 0, args->channel->id, buffSize, &ptr)); + if (ptr == NULL) return ncclInternalError; + resources->recvMem->ptrsFifo[args->posted%NCCL_STEPS] = ptr; + __sync_synchronize(); + volatile uint64_t* sendHead = &resources->sendMem->head; + args->posted += args->sliceSteps; + *sendHead = args->posted - NCCL_STEPS; + } else args->posted += args->sliceSteps; + args->idle = 0; + return ncclSuccess; + } + // Check whether we received data from the GPU and send it to the network + int buffSlot = args->transmitted%NCCL_STEPS; + if (args->transmitted < args->posted && args->transmitted < args->done + NCCL_STEPS) { + volatile int* sizesFifo = resources->recvMem->sizesFifo; + volatile uint64_t* recvTail = &resources->recvMem->tail; + if (sizesFifo[buffSlot] != -1 && (*recvTail > args->transmitted || args->protocol == NCCL_PROTO_LL)) { + // We have something to receive, let's check if it's completely ready. + int size = sizesFifo[buffSlot]; + char* buff = resources->shared ? (char*)resources->recvMem->ptrsFifo[buffSlot] : localBuff+buffSlot*stepSize; + int ready = 1; if (args->protocol == NCCL_PROTO_LL128) { - int stepSize = NCCL_LL128_BUFF_SIZE/NCCL_STEPS; - if (args->tail < *recvTail) { - int buffSlot = args->tail%NCCL_STEPS; - if (sizesFifo[buffSlot] != -1) { - struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem; - char* localBuff = (char*)localMem->ll128Buff; - int ready = resources->useGdr; - if (!ready) { - // When data is in sysmem, we need to wait until all flags are correct since the GPU only - // called threadfence() - uint64_t flag = args->tail + 1; - int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS); - volatile uint64_t* lines = (volatile uint64_t*)(localBuff+buffSlot*stepSize); - ready = 1; - for (int i=0; i<nFifoLines; i++) { - if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; } - } - } - if (ready) { - // Send through network - NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, sizesFifo[buffSlot], resources->ll128Mhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - sizesFifo[buffSlot] = -1; - // Make sure size is reset to zero before we update the head. - __sync_synchronize(); - args->tail += args->sliceSteps; - args->idle = 0; - } - } + int ready = resources->useGdr; + if (!ready) { + // When data is in sysmem, we need to wait until all flags are correct since the GPU only + // called threadfence() + uint64_t flag = args->transmitted + 1; + int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS); + volatile uint64_t* lines = (volatile uint64_t*)buff; + ready = 1; + for (int i=0; i<nFifoLines; i++) { + if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; } } } } else if (args->protocol == NCCL_PROTO_LL) { - int buffSlot = args->tail%NCCL_STEPS; - int size = sizesFifo[buffSlot]; - if (size != -1) { - uint32_t flag = NCCL_LL_FLAG(args->tail + 1); - int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine)); - size = nFifoLines * sizeof(union ncclLLFifoLine); - union ncclLLFifoLine* lines = resources->hostRecvMem->llBuff+buffSlot*NCCL_LL_SLICE_LINES; - int ready = 1; - for (int i=0; i<nFifoLines; i++) { - volatile uint32_t *f1 = &lines[i].flag1; - volatile uint32_t *f2 = &lines[i].flag2; - if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } - } - if (ready) { - NCCLCHECK(ncclNetIsend(resources->netSendComm, lines, size, resources->llMhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - sizesFifo[buffSlot] = -1; - // Make sure size is reset to zero before we update the head. - __sync_synchronize(); - args->tail += args->sliceSteps; - args->idle = 0; - } - } + uint32_t flag = NCCL_LL_FLAG(args->transmitted + 1); + int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine)); + union ncclLLFifoLine* lines = (union ncclLLFifoLine*)buff; + for (int i=0; i<nFifoLines; i++) { + volatile uint32_t *f1 = &lines[i].flag1; + volatile uint32_t *f2 = &lines[i].flag2; + if (f1[0] != flag || f2[0] != flag) { ready = 0; break; } } - } else if (args->tail < *recvTail) { - int stepSize = args->channel->buffSize/NCCL_STEPS; - struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem; - // Send through network - int buffSlot = args->tail%NCCL_STEPS; - if (sizesFifo[buffSlot] != -1) { - NCCLCHECK(ncclNetIsend(resources->netSendComm, localMem->buff+buffSlot*stepSize, sizesFifo[buffSlot], resources->mhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - sizesFifo[buffSlot] = -1; - // Make sure size is reset to zero before we update the head. - __sync_synchronize(); - args->tail += args->sliceSteps; - args->idle = 0; - } + } + if (ready) { + // Data is ready, try to send. + NCCLCHECK(ncclNetIsend(resources->netSendComm, buff, size, mhandle, args->requests+buffSlot)); + if (args->requests[buffSlot] != NULL) { + TRACE(NCCL_NET, "sendProxy [%d/%d] Isend (LL) posted, req %p", args->transmitted, buffSlot, args->requests[buffSlot]); + sizesFifo[buffSlot] = -1; + // Make sure size is reset to zero before we update the head. + __sync_synchronize(); + args->transmitted += args->sliceSteps; + args->idle = 0; + return ncclSuccess; } } } - if (args->head < args->tail) { - int done; - int buffSlot = args->head%NCCL_STEPS; - NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL)); - if (done) { - args->head += args->sliceSteps; - resources->hostSendMem->head = args->head; - args->idle = 0; + } + // Check whether the network has completed some send operations. + if (args->done < args->transmitted) { + int done; + int buffSlot = args->done%NCCL_STEPS; + NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL)); + if (done) { + TRACE(NCCL_NET, "sendProxy [%d/%d] request %p done, size %d", args->done, buffSlot, args->requests[buffSlot]); + if (resources->shared) { + char* ptr = (char*)resources->recvMem->ptrsFifo[args->done%NCCL_STEPS]; + NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 0, args->channel->id, buffSize, ptr)); + } + args->done += args->sliceSteps; + + if (resources->shared == 0) { + resources->sendMem->head = args->done; } + args->idle = 0; + if (args->done == args->end) { + resources->step = args->end; + args->state = ncclProxyOpNone; + } + return ncclSuccess; } } - if (args->head == args->end) { - resources->step = args->end; - args->idle = 0; - args->state = ncclProxyOpNone; - } } return ncclSuccess; } @@ -344,51 +362,90 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) { ncclResult_t netRecvProxy(struct ncclProxyArgs* args) { struct netRecvResources* resources = (struct netRecvResources*) (args->connector->transportResources); if (args->state == ncclProxyOpReady) { - // Update opCount - resources->hostSendMem->opCount = args->opCount; - // Round to next multiple of sliceSteps resources->step = ROUNDUP(resources->step, args->chunkSteps); - args->head = resources->step; - args->tail = resources->step; - args->end = args->head + args->nsteps; + args->posted = args->received = args->transmitted = args->done = resources->step; + args->end = resources->step + args->nsteps; args->state = ncclProxyOpProgress; } + args->idle = 1; if (args->state == ncclProxyOpProgress) { - args->idle = 1; - int stepSize = ( args->protocol == NCCL_PROTO_LL ? NCCL_LL_BUFF_SIZE : args->protocol == NCCL_PROTO_LL128 ? NCCL_LL128_BUFF_SIZE : args->channel->buffSize ) / NCCL_STEPS; - if (args->head < args->end) { - struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem; - char* localBuff = args->protocol == NCCL_PROTO_LL ? (char*)localMem->llBuff : args->protocol == NCCL_PROTO_LL128 ? (char*)localMem->ll128Buff : localMem->buff; - void* mhandle = args->protocol == NCCL_PROTO_LL ? resources->llMhandle : args->protocol == NCCL_PROTO_LL128 ? resources->ll128Mhandle : resources->mhandle; - volatile uint64_t* sendHead = &resources->hostSendMem->head; - if ((args->tail < args->head + NCCL_STEPS) && (args->tail < *sendHead + NCCL_STEPS) && (args->tail < args->end)) { - int buffSlot = args->tail%NCCL_STEPS; - int sliceSize = stepSize * args->sliceSteps; - NCCLCHECK(ncclNetIrecv(resources->netRecvComm, localBuff+buffSlot*stepSize, sliceSize, mhandle, args->requests+buffSlot)); - if (args->requests[buffSlot] != NULL) { - args->tail += args->sliceSteps; - args->idle = 0; - } + int p = args->protocol; + int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS; + char* localBuff = args->connector->conn.buffs[p]; + void* mhandle = *(resources->mhandlesProto[p]); + int buffSize = stepSize*args->sliceSteps; + if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR; + if (args->recvbytes < buffSize) buffSize = args->recvbytes; + if ((args->posted < args->done + NCCL_STEPS) && (args->posted < args->end)) { + int buffSlot = args->posted%NCCL_STEPS; + char* ptr; + if (resources->shared) { + NCCLCHECK(ncclProxySharedBuffersAlloc(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, &ptr)); + if (ptr == NULL) return ncclInternalError; + volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo; + ptrsFifo[buffSlot] = ptr; + } else { + ptr = localBuff+buffSlot*stepSize; } - if (args->tail > args->head) { - int buffSlot = args->head%NCCL_STEPS; - int done, size; - NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size)); - if (done) { - args->head += args->sliceSteps; - if (args->protocol == NCCL_PROTO_SIMPLE) { - if (resources->useGdr) ncclNetFlush(resources->netRecvComm, localBuff+buffSlot*stepSize, size, mhandle); - resources->hostRecvMem->tail = args->head; - } - args->idle = 0; + NCCLCHECK(ncclNetIrecv(resources->netRecvComm, ptr, buffSize, mhandle, args->requests+buffSlot)); + if (args->requests[buffSlot] != NULL) { + TRACE(NCCL_NET, "recvProxy [%d/%d] posted recv request %p", args->posted, buffSlot, args->requests[buffSlot]); + args->posted += args->sliceSteps; + args->idle = 0; + return ncclSuccess; + } else if (resources->shared) { + NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, ptr)); + } + } + if (args->posted > args->received) { + int buffSlot = args->received%NCCL_STEPS; + int done, size; + NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size)); + if (done) { + args->received += args->sliceSteps; + if (size > 0 && args->protocol == NCCL_PROTO_SIMPLE && resources->useGdr) { + // Don't pass data to the GPU yet, flush first. + volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo; + char* ptr = resources->shared ? (char*)(ptrsFifo[buffSlot]) : localBuff+buffSlot*stepSize; + NCCLCHECK(ncclNetIflush(resources->netRecvComm, ptr, size, mhandle, args->requests+buffSlot)); + } else { + args->requests[buffSlot] = NULL; } + args->idle = 0; + return ncclSuccess; } } - if (args->head == args->end) { - resources->step = args->end; - args->idle = 0; - args->state = ncclProxyOpNone; + if (args->received > args->transmitted) { + // Progress flush operations + int buffSlot = args->transmitted%NCCL_STEPS; + int done = 1; + if (args->requests[buffSlot]) NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL)); + if (done) { + args->transmitted += args->sliceSteps; + __sync_synchronize(); + resources->recvMem->tail = args->transmitted; + args->idle = 0; + return ncclSuccess; + } + } + if (args->transmitted > args->done) { + volatile uint64_t* sendHead = &resources->sendMem->head; + uint64_t done = *sendHead; + while (done > args->done && + // LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted. + args->transmitted > args->done) { + if (resources->shared) { + char* ptr = (char*)resources->recvMem->ptrsFifo[args->done%NCCL_STEPS]; + NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, ptr)); + } + args->done += args->sliceSteps; + args->idle = 0; + if (args->done == args->end) { + resources->step = args->end; + args->state = ncclProxyOpNone; + } + } } } return ncclSuccess; |