Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/nccl.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/net.cc')
-rw-r--r--src/transport/net.cc573
1 files changed, 315 insertions, 258 deletions
diff --git a/src/transport/net.cc b/src/transport/net.cc
index 87fc9ce..86c43f8 100644
--- a/src/transport/net.cc
+++ b/src/transport/net.cc
@@ -1,5 +1,5 @@
/*************************************************************************
- * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved.
+ * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
@@ -7,24 +7,27 @@
#include "comm.h"
#include "net.h"
#include "graph.h"
+#include "collectives.h"
struct netConnectInfo {
ncclNetHandle_t netHandle;
};
+#define LOC_HOSTMEM 0
+#define LOC_DEVMEM 1
+#define LOC_COUNT 2
+
struct netSendResources {
void* netSendComm;
- struct ncclSendMem* hostSendMem;
- struct ncclRecvMem* hostRecvMem;
- struct ncclSendMem* devHostSendMem;
- struct ncclRecvMem* devHostRecvMem;
+ struct ncclSendMem* sendMem;
+ struct ncclRecvMem* recvMem;
int netDev;
int useGdr;
- int buffSize;
- void* mhandle;
- void* llMhandle;
- void* ll128Mhandle;
- struct ncclRecvMem* devRecvMem;
+ int shared;
+ char* buffers[LOC_COUNT];
+ int buffSizes[LOC_COUNT];
+ void* mhandles[LOC_COUNT];
+ void** mhandlesProto[NCCL_NUM_PROTOCOLS];
uint64_t step;
uint64_t llLastCleaning;
};
@@ -32,17 +35,15 @@ struct netSendResources {
struct netRecvResources {
void* netListenComm;
void* netRecvComm;
- struct ncclSendMem* hostSendMem;
- struct ncclRecvMem* hostRecvMem;
- struct ncclSendMem* devHostSendMem;
- struct ncclRecvMem* devHostRecvMem;
+ struct ncclSendMem* sendMem;
+ struct ncclRecvMem* recvMem;
int netDev;
int useGdr;
- int buffSize;
- void* mhandle;
- void* llMhandle;
- void* ll128Mhandle;
- struct ncclRecvMem* devRecvMem;
+ int shared;
+ char* buffers[LOC_COUNT];
+ int buffSizes[LOC_COUNT];
+ void* mhandles[LOC_COUNT];
+ void** mhandlesProto[NCCL_NUM_PROTOCOLS];
uint64_t step;
uint64_t llLastCleaning;
};
@@ -53,162 +54,178 @@ ncclResult_t netCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTop
return ncclSuccess;
}
-NCCL_PARAM(NetGdrRead, "NET_GDR_READ", -2);
-NCCL_PARAM(NetGdrLevel, "NET_GDR_LEVEL", PATH_PHB);
-
-static ncclResult_t netGetGdrSupport(struct ncclTopoSystem* topo, int64_t busId, int netDev, int read, int* useGdr) {
- *useGdr = 0;
-
- if (read) { // For reads (sends) only enable under certain conditions
- int gdrReadParam = ncclParamNetGdrRead();
- if (gdrReadParam == 0) return ncclSuccess;
- if (gdrReadParam < 0) {
- int nvlink;
- NCCLCHECK(ncclTopoHasNvlink(topo, busId, &nvlink));
- if (!nvlink) return ncclSuccess;
- }
- }
-
- // Check if we are close enough that it makes sense to enable GDR
- int netGdrLevel = ncclParamNetGdrLevel();
- int distance;
- NCCLCHECK(ncclTopoNetDistance(topo, busId, netDev, &distance));
- if (distance >= netGdrLevel) {
- INFO(NCCL_NET,"NET/%s : GPU Direct RDMA Disabled for GPU %lx / HCA %d (distance %d >= %d)", ncclNetName(), busId, netDev, distance, netGdrLevel);
- return ncclSuccess;
- }
-
- // Finally, check if the NIC supports it
- int flags;
- NCCLCHECK(ncclNetPtrSupport(netDev, &flags));
- if ((flags & NCCL_PTR_CUDA) == 0) return ncclSuccess;
- *useGdr = 1;
- INFO(NCCL_NET,"NET/%s : GPU Direct RDMA Enabled for GPU %lx / HCA %d (distance %d < %d), read %d", ncclNetName(), busId, netDev, distance, netGdrLevel, read);
- return ncclSuccess;
-}
+NCCL_PARAM(NetSharedBuffers, "NET_SHARED_BUFFERS", -2);
/* Determine if we will use this transport for this peer and return connect
* information for this peer */
-ncclResult_t netSendSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int buffSize, int channelId) {
+ncclResult_t netSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) {
struct netSendResources* resources;
NCCLCHECK(ncclCalloc(&resources, 1));
send->transportResources = resources;
+ send->conn.shared = resources->shared = ncclParamNetSharedBuffers() != -2 ? ncclParamNetSharedBuffers() : graph ? 0 : 1;
+
+ NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev));
+ NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 1, &resources->useGdr));
+
+ NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1));
+ NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1));
+
+ send->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0;
+ send->conn.tail = &resources->recvMem->tail;
+ send->conn.sizesFifo = resources->recvMem->sizesFifo;
+ // Only fuse P2P buffers, continue to allocate dedicated buffers for ring/tree
+ send->conn.ptrsFifo = resources->shared ? resources->recvMem->ptrsFifo : NULL;
+ send->conn.head = &resources->sendMem->head;
+ resources->sendMem->head = resources->shared ? -NCCL_STEPS : 0; // Don't give any credit yet when sharing buffers
+ for (int i=0; i<NCCL_STEPS; i++) send->conn.sizesFifo[i] = -1;
+
+ if (resources->shared == 0) {
+ int protoLoc[NCCL_NUM_PROTOCOLS];
+ for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
+ protoLoc[p] = p != NCCL_PROTO_LL && resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
+ }
+ int buffSizes[NCCL_NUM_PROTOCOLS];
+ for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
+ buffSizes[p] = send->comm->buffSizes[p];
+ resources->buffSizes[protoLoc[p]] += buffSizes[p];
+ }
- NCCLCHECK(ncclTopoGetNetDev(graph, 1, channelId, &resources->netDev));
- NCCLCHECK(netGetGdrSupport(topo, myInfo->busId, resources->netDev, 1, &resources->useGdr));
-
- int sendSize = sizeof(struct ncclSendMem);
- NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostSendMem, (void**)&resources->devHostSendMem, sendSize));
+ if (resources->buffSizes[LOC_DEVMEM]) {
+ NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM]));
+ }
+ if (resources->buffSizes[LOC_HOSTMEM]) {
+ NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM]));
+ }
- int recvSize = offsetof(struct ncclRecvMem, buff)+buffSize;
- if (resources->useGdr) {
- NCCLCHECK(ncclCudaCalloc((char**)(&resources->devRecvMem), recvSize));
+ int offsets[LOC_COUNT];
+ offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0;
+ for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
+ resources->mhandlesProto[p] = resources->mhandles+protoLoc[p];
+ send->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]];
+ offsets[protoLoc[p]] += buffSizes[p];
+ }
}
- NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostRecvMem, (void**)&resources->devHostRecvMem, recvSize));
- resources->buffSize = buffSize;
- INFO(NCCL_INIT|NCCL_NET,"Ring %02d : %d[%lx] -> %d[%lx] [send] via NET/%s/%d%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, ncclNetName(), resources->netDev,
- resources->useGdr ? "/GDRDMA" : "");
+ INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [send] via NET/%s/%d%s%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, ncclNetName(), resources->netDev,
+ resources->useGdr ? "/GDRDMA" : "", resources->shared ? "/Shared" : "");
return ncclSuccess;
}
-ncclResult_t netRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int buffSize, int channelId) {
+ncclResult_t netRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) {
struct netRecvResources* resources;
NCCLCHECK(ncclCalloc(&resources, 1));
recv->transportResources = resources;
+ recv->conn.shared = resources->shared = ncclParamNetSharedBuffers() != -2 ? ncclParamNetSharedBuffers() : graph ? 0 : 1;
- NCCLCHECK(ncclTopoGetNetDev(graph, 0, channelId, &resources->netDev));
- NCCLCHECK(netGetGdrSupport(topo, myInfo->busId, resources->netDev, 0, &resources->useGdr));
+ NCCLCHECK(ncclTopoGetNetDev(comm->topo, myInfo->rank, graph, channelId, &resources->netDev));
+ NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, resources->netDev, 0, &resources->useGdr));
- int sendSize = sizeof(struct ncclSendMem);
- NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostSendMem, (void**)&resources->devHostSendMem, sendSize));
+ NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1));
+ NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1));
- int recvSize = offsetof(struct ncclRecvMem, buff)+buffSize;
- if (resources->useGdr) {
- NCCLCHECK(ncclCudaCalloc((char**)(&resources->devRecvMem), recvSize));
+ recv->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0;
+ recv->conn.tail = &resources->recvMem->tail;
+ // Only fuse P2P buffers, continue to allocate dedicated buffers for ring/tree
+ recv->conn.ptrsFifo = resources->shared ? resources->recvMem->ptrsFifo : NULL;
+ recv->conn.head = &resources->sendMem->head;
+
+ if (resources->shared == 0) { // Only allocate dedicated buffers for ring/tree not for p2p
+ int protoLoc[NCCL_NUM_PROTOCOLS];
+ for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
+ protoLoc[p] = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
+ }
+
+ int buffSizes[NCCL_NUM_PROTOCOLS];
+ for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
+ buffSizes[p] = recv->comm->buffSizes[p];
+ resources->buffSizes[protoLoc[p]] += buffSizes[p];
+ }
+
+ if (resources->buffSizes[LOC_DEVMEM]) {
+ NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM]));
+ }
+ if (resources->buffSizes[LOC_HOSTMEM]) {
+ NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM]));
+ }
+
+ int offsets[LOC_COUNT];
+ offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0;
+ for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
+ resources->mhandlesProto[p] = resources->mhandles+protoLoc[p];
+ recv->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]];
+ offsets[protoLoc[p]] += buffSizes[p];
+ }
}
- NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostRecvMem, (void**)&resources->devHostRecvMem, recvSize));
- resources->buffSize = buffSize;
- INFO(NCCL_INIT|NCCL_NET,"Ring %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s", channelId, peerInfo->rank, peerInfo->busId, myInfo->rank, myInfo->busId, ncclNetName(), resources->netDev,
- resources->useGdr ? "/GDRDMA" : "");
+ INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s%s", channelId, peerInfo->rank, peerInfo->busId, myInfo->rank, myInfo->busId, ncclNetName(), resources->netDev,
+ resources->useGdr ? "/GDRDMA" : "", resources->shared ? "/Shared" : "");
struct netConnectInfo* info = (struct netConnectInfo*) connectInfo;
NCCLCHECK(ncclNetListen(resources->netDev, &info->netHandle, &resources->netListenComm));
+
return ncclSuccess;
}
-ncclResult_t netSendConnect(struct ncclConnect* connectInfo, struct ncclConnector* send) {
+ncclResult_t netSendConnect(struct ncclComm* comm, struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send) {
// Setup device pointers
struct netSendResources* resources = (struct netSendResources*)send->transportResources;
-
- // Intermediate buffering on GPU for GPU Direct RDMA, but LL buffer is always on host
- struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->devHostRecvMem;
- send->conn.buff = recvMem->buff;
- send->conn.llBuff = resources->devHostRecvMem->llBuff;
- send->conn.ll128Buff = recvMem->ll128Buff;
-
- // Head/Tail/Opcount/Fifos are always on host
- send->conn.tail = &resources->devHostRecvMem->tail;
- send->conn.opCountRem = &resources->devHostRecvMem->opCount;
- send->conn.fifo = resources->devHostRecvMem->sizesFifo;
- send->conn.head = &resources->devHostSendMem->head;
- send->conn.opCountLoc = &resources->devHostSendMem->opCount;
- for (int i=0; i<NCCL_STEPS; i++) send->conn.fifo[i] = -1;
+ struct netConnectInfo* info = (struct netConnectInfo*)connectInfo;
// Connect to remote peer
- struct netConnectInfo* info = (struct netConnectInfo*)connectInfo;
NCCLCHECK(ncclNetConnect(resources->netDev, info->netHandle, &resources->netSendComm));
- NCCLCHECK(ncclNetRegMr(resources->netSendComm, recvMem->buff, resources->buffSize,
- resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandle));
- NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->devHostRecvMem->llBuff,
- NCCL_LL_BUFF_SIZE, NCCL_PTR_HOST, &resources->llMhandle));
- NCCLCHECK(ncclNetRegMr(resources->netSendComm, recvMem->ll128Buff, NCCL_LL128_BUFF_SIZE,
- resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->ll128Mhandle));
+ if (resources->shared) {
+ // Get shared buffers
+ int loc = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
+ NCCLCHECK(ncclProxySharedBuffersInit(send->comm, resources->useGdr, resources->buffSizes+loc, resources->buffers+loc));
+ resources->mhandlesProto[NCCL_PROTO_SIMPLE] = resources->mhandles+loc;
+ }
+ if (resources->buffSizes[LOC_DEVMEM]) {
+ NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
+ }
+ if (resources->buffSizes[LOC_HOSTMEM]) {
+ NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
+ }
return ncclSuccess;
}
/* Connect to this peer */
-ncclResult_t netRecvConnect(struct ncclConnect* connectInfo, struct ncclConnector* recv) {
+ncclResult_t netRecvConnect(struct ncclComm* comm, struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* recv) {
// Setup device pointers
struct netRecvResources* resources = (struct netRecvResources*)recv->transportResources;
- // Intermediate buffering on GPU for GPU Direct RDMA
- struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->devHostRecvMem;
- recv->conn.buff = recvMem->buff;
- recv->conn.llBuff = recvMem->llBuff;
- recv->conn.ll128Buff = recvMem->ll128Buff;
-
- // Head/Tail/Opcount are always on host
- recv->conn.tail = &resources->devHostRecvMem->tail;
- recv->conn.opCountLoc = &resources->devHostRecvMem->opCount;
- recv->conn.head = &resources->devHostSendMem->head;
- recv->conn.opCountRem = &resources->devHostSendMem->opCount;
-
// Finish connection establishment from remote peer
NCCLCHECK(ncclNetAccept(resources->netListenComm, &resources->netRecvComm));
NCCLCHECK(ncclNetCloseListen(resources->netListenComm));
- NCCLCHECK(ncclNetRegMr(resources->netRecvComm, recvMem->buff, resources->buffSize,
- resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandle));
- NCCLCHECK(ncclNetRegMr(resources->netRecvComm, recvMem->llBuff, NCCL_LL_BUFF_SIZE,
- resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->llMhandle));
- NCCLCHECK(ncclNetRegMr(resources->netRecvComm, recvMem->ll128Buff, NCCL_LL128_BUFF_SIZE,
- resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->ll128Mhandle));
+ if (resources->shared) {
+ // Get shared buffers
+ int loc = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
+ NCCLCHECK(ncclProxySharedBuffersInit(recv->comm, resources->useGdr, resources->buffSizes+loc, resources->buffers+loc));
+ resources->mhandlesProto[NCCL_PROTO_SIMPLE] = resources->mhandles+loc;
+ }
+ if (resources->buffSizes[LOC_DEVMEM]) {
+ NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
+ }
+ if (resources->buffSizes[LOC_HOSTMEM]) {
+ NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
+ }
return ncclSuccess;
}
ncclResult_t netSendFree(void* transportResources) {
struct netSendResources* resources = (struct netSendResources*)transportResources;
- NCCLCHECK(ncclCudaHostFree(resources->hostSendMem));
- NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->mhandle));
- NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->llMhandle));
- NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->ll128Mhandle));
- NCCLCHECK(ncclCudaHostFree(resources->hostRecvMem));
- if (resources->useGdr)
- CUDACHECK(cudaFree(resources->devRecvMem));
+ NCCLCHECK(ncclCudaHostFree(resources->sendMem));
+ NCCLCHECK(ncclCudaHostFree(resources->recvMem));
+ for (int l=0; l<LOC_COUNT; l++) {
+ if (resources->buffers[l])
+ NCCLCHECK(ncclNetDeregMr(resources->netSendComm, resources->mhandles[l]));
+ }
+ if (resources->shared == 0) {
+ NCCLCHECK(ncclCudaHostFree(resources->buffers[LOC_HOSTMEM]));
+ CUDACHECK(cudaFree(resources->buffers[LOC_DEVMEM]));
+ }
NCCLCHECK(ncclNetCloseSend(resources->netSendComm));
free(resources);
return ncclSuccess;
@@ -216,127 +233,128 @@ ncclResult_t netSendFree(void* transportResources) {
ncclResult_t netRecvFree(void* transportResources) {
struct netRecvResources* resources = (struct netRecvResources*)transportResources;
- NCCLCHECK(ncclCudaHostFree(resources->hostSendMem));
- NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->mhandle));
- NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->llMhandle));
- NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->ll128Mhandle));
- NCCLCHECK(ncclCudaHostFree(resources->hostRecvMem));
- if (resources->useGdr)
- CUDACHECK(cudaFree(resources->devRecvMem));
+ NCCLCHECK(ncclCudaHostFree(resources->sendMem));
+ NCCLCHECK(ncclCudaHostFree(resources->recvMem));
+ for (int l=0; l<LOC_COUNT; l++) {
+ if (resources->buffers[l])
+ NCCLCHECK(ncclNetDeregMr(resources->netRecvComm, resources->mhandles[l]));
+ }
+ if (resources->shared == 0) {
+ NCCLCHECK(ncclCudaHostFree(resources->buffers[LOC_HOSTMEM]));
+ CUDACHECK(cudaFree(resources->buffers[LOC_DEVMEM]));
+ }
NCCLCHECK(ncclNetCloseRecv(resources->netRecvComm));
free(resources);
return ncclSuccess;
}
+static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps");
+
ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
struct netSendResources* resources = (struct netSendResources*) (args->connector->transportResources);
if (args->state == ncclProxyOpReady) {
- // Update opCount
- resources->hostRecvMem->opCount = args->opCount;
-
// Round to next multiple of sliceSteps
resources->step = ROUNDUP(resources->step, args->chunkSteps);
- args->head = resources->step;
- args->tail = resources->step;
- args->end = args->head + args->nsteps;
+ args->posted = args->transmitted = args->done = resources->step;
+ args->end = resources->step + args->nsteps;
args->state = ncclProxyOpProgress;
}
+ args->idle = 1;
if (args->state == ncclProxyOpProgress) {
- args->idle = 1;
- if (args->head < args->end) {
- if (args->tail < args->end && args->tail < args->head + NCCL_STEPS) {
- volatile int* sizesFifo = resources->hostRecvMem->sizesFifo;
- volatile uint64_t* recvTail = &resources->hostRecvMem->tail;
+ int p = args->protocol;
+ int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS;
+ char* localBuff = args->connector->conn.buffs[p];
+ void* mhandle = *(resources->mhandlesProto[p]);
+ int buffSize = stepSize*args->sliceSteps;
+ if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR;
+ if (args->sendbytes < buffSize) buffSize = args->sendbytes;
+ // Post buffers to the GPU
+ if (args->posted < args->end && args->posted < args->done + NCCL_STEPS) {
+ if (resources->shared) {
+ char* ptr;
+ NCCLCHECK(ncclProxySharedBuffersAlloc(args->connector->comm, resources->useGdr, 0, args->channel->id, buffSize, &ptr));
+ if (ptr == NULL) return ncclInternalError;
+ resources->recvMem->ptrsFifo[args->posted%NCCL_STEPS] = ptr;
+ __sync_synchronize();
+ volatile uint64_t* sendHead = &resources->sendMem->head;
+ args->posted += args->sliceSteps;
+ *sendHead = args->posted - NCCL_STEPS;
+ } else args->posted += args->sliceSteps;
+ args->idle = 0;
+ return ncclSuccess;
+ }
+ // Check whether we received data from the GPU and send it to the network
+ int buffSlot = args->transmitted%NCCL_STEPS;
+ if (args->transmitted < args->posted && args->transmitted < args->done + NCCL_STEPS) {
+ volatile int* sizesFifo = resources->recvMem->sizesFifo;
+ volatile uint64_t* recvTail = &resources->recvMem->tail;
+ if (sizesFifo[buffSlot] != -1 && (*recvTail > args->transmitted || args->protocol == NCCL_PROTO_LL)) {
+ // We have something to receive, let's check if it's completely ready.
+ int size = sizesFifo[buffSlot];
+ char* buff = resources->shared ? (char*)resources->recvMem->ptrsFifo[buffSlot] : localBuff+buffSlot*stepSize;
+ int ready = 1;
if (args->protocol == NCCL_PROTO_LL128) {
- int stepSize = NCCL_LL128_BUFF_SIZE/NCCL_STEPS;
- if (args->tail < *recvTail) {
- int buffSlot = args->tail%NCCL_STEPS;
- if (sizesFifo[buffSlot] != -1) {
- struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem;
- char* localBuff = (char*)localMem->ll128Buff;
- int ready = resources->useGdr;
- if (!ready) {
- // When data is in sysmem, we need to wait until all flags are correct since the GPU only
- // called threadfence()
- uint64_t flag = args->tail + 1;
- int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS);
- volatile uint64_t* lines = (volatile uint64_t*)(localBuff+buffSlot*stepSize);
- ready = 1;
- for (int i=0; i<nFifoLines; i++) {
- if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; }
- }
- }
- if (ready) {
- // Send through network
- NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, sizesFifo[buffSlot], resources->ll128Mhandle, args->requests+buffSlot));
- if (args->requests[buffSlot] != NULL) {
- sizesFifo[buffSlot] = -1;
- // Make sure size is reset to zero before we update the head.
- __sync_synchronize();
- args->tail += args->sliceSteps;
- args->idle = 0;
- }
- }
+ int ready = resources->useGdr;
+ if (!ready) {
+ // When data is in sysmem, we need to wait until all flags are correct since the GPU only
+ // called threadfence()
+ uint64_t flag = args->transmitted + 1;
+ int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS);
+ volatile uint64_t* lines = (volatile uint64_t*)buff;
+ ready = 1;
+ for (int i=0; i<nFifoLines; i++) {
+ if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; }
}
}
} else if (args->protocol == NCCL_PROTO_LL) {
- int buffSlot = args->tail%NCCL_STEPS;
- int size = sizesFifo[buffSlot];
- if (size != -1) {
- uint32_t flag = NCCL_LL_FLAG(args->tail + 1);
- int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine));
- size = nFifoLines * sizeof(union ncclLLFifoLine);
- union ncclLLFifoLine* lines = resources->hostRecvMem->llBuff+buffSlot*NCCL_LL_SLICE_LINES;
- int ready = 1;
- for (int i=0; i<nFifoLines; i++) {
- volatile uint32_t *f1 = &lines[i].flag1;
- volatile uint32_t *f2 = &lines[i].flag2;
- if (f1[0] != flag || f2[0] != flag) { ready = 0; break; }
- }
- if (ready) {
- NCCLCHECK(ncclNetIsend(resources->netSendComm, lines, size, resources->llMhandle, args->requests+buffSlot));
- if (args->requests[buffSlot] != NULL) {
- sizesFifo[buffSlot] = -1;
- // Make sure size is reset to zero before we update the head.
- __sync_synchronize();
- args->tail += args->sliceSteps;
- args->idle = 0;
- }
- }
+ uint32_t flag = NCCL_LL_FLAG(args->transmitted + 1);
+ int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine));
+ union ncclLLFifoLine* lines = (union ncclLLFifoLine*)buff;
+ for (int i=0; i<nFifoLines; i++) {
+ volatile uint32_t *f1 = &lines[i].flag1;
+ volatile uint32_t *f2 = &lines[i].flag2;
+ if (f1[0] != flag || f2[0] != flag) { ready = 0; break; }
}
- } else if (args->tail < *recvTail) {
- int stepSize = args->channel->buffSize/NCCL_STEPS;
- struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem;
- // Send through network
- int buffSlot = args->tail%NCCL_STEPS;
- if (sizesFifo[buffSlot] != -1) {
- NCCLCHECK(ncclNetIsend(resources->netSendComm, localMem->buff+buffSlot*stepSize, sizesFifo[buffSlot], resources->mhandle, args->requests+buffSlot));
- if (args->requests[buffSlot] != NULL) {
- sizesFifo[buffSlot] = -1;
- // Make sure size is reset to zero before we update the head.
- __sync_synchronize();
- args->tail += args->sliceSteps;
- args->idle = 0;
- }
+ }
+ if (ready) {
+ // Data is ready, try to send.
+ NCCLCHECK(ncclNetIsend(resources->netSendComm, buff, size, mhandle, args->requests+buffSlot));
+ if (args->requests[buffSlot] != NULL) {
+ TRACE(NCCL_NET, "sendProxy [%d/%d] Isend (LL) posted, req %p", args->transmitted, buffSlot, args->requests[buffSlot]);
+ sizesFifo[buffSlot] = -1;
+ // Make sure size is reset to zero before we update the head.
+ __sync_synchronize();
+ args->transmitted += args->sliceSteps;
+ args->idle = 0;
+ return ncclSuccess;
}
}
}
- if (args->head < args->tail) {
- int done;
- int buffSlot = args->head%NCCL_STEPS;
- NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL));
- if (done) {
- args->head += args->sliceSteps;
- resources->hostSendMem->head = args->head;
- args->idle = 0;
+ }
+ // Check whether the network has completed some send operations.
+ if (args->done < args->transmitted) {
+ int done;
+ int buffSlot = args->done%NCCL_STEPS;
+ NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL));
+ if (done) {
+ TRACE(NCCL_NET, "sendProxy [%d/%d] request %p done, size %d", args->done, buffSlot, args->requests[buffSlot]);
+ if (resources->shared) {
+ char* ptr = (char*)resources->recvMem->ptrsFifo[args->done%NCCL_STEPS];
+ NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 0, args->channel->id, buffSize, ptr));
+ }
+ args->done += args->sliceSteps;
+
+ if (resources->shared == 0) {
+ resources->sendMem->head = args->done;
}
+ args->idle = 0;
+ if (args->done == args->end) {
+ resources->step = args->end;
+ args->state = ncclProxyOpNone;
+ }
+ return ncclSuccess;
}
}
- if (args->head == args->end) {
- resources->step = args->end;
- args->idle = 0;
- args->state = ncclProxyOpNone;
- }
}
return ncclSuccess;
}
@@ -344,51 +362,90 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
ncclResult_t netRecvProxy(struct ncclProxyArgs* args) {
struct netRecvResources* resources = (struct netRecvResources*) (args->connector->transportResources);
if (args->state == ncclProxyOpReady) {
- // Update opCount
- resources->hostSendMem->opCount = args->opCount;
-
// Round to next multiple of sliceSteps
resources->step = ROUNDUP(resources->step, args->chunkSteps);
- args->head = resources->step;
- args->tail = resources->step;
- args->end = args->head + args->nsteps;
+ args->posted = args->received = args->transmitted = args->done = resources->step;
+ args->end = resources->step + args->nsteps;
args->state = ncclProxyOpProgress;
}
+ args->idle = 1;
if (args->state == ncclProxyOpProgress) {
- args->idle = 1;
- int stepSize = ( args->protocol == NCCL_PROTO_LL ? NCCL_LL_BUFF_SIZE : args->protocol == NCCL_PROTO_LL128 ? NCCL_LL128_BUFF_SIZE : args->channel->buffSize ) / NCCL_STEPS;
- if (args->head < args->end) {
- struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem;
- char* localBuff = args->protocol == NCCL_PROTO_LL ? (char*)localMem->llBuff : args->protocol == NCCL_PROTO_LL128 ? (char*)localMem->ll128Buff : localMem->buff;
- void* mhandle = args->protocol == NCCL_PROTO_LL ? resources->llMhandle : args->protocol == NCCL_PROTO_LL128 ? resources->ll128Mhandle : resources->mhandle;
- volatile uint64_t* sendHead = &resources->hostSendMem->head;
- if ((args->tail < args->head + NCCL_STEPS) && (args->tail < *sendHead + NCCL_STEPS) && (args->tail < args->end)) {
- int buffSlot = args->tail%NCCL_STEPS;
- int sliceSize = stepSize * args->sliceSteps;
- NCCLCHECK(ncclNetIrecv(resources->netRecvComm, localBuff+buffSlot*stepSize, sliceSize, mhandle, args->requests+buffSlot));
- if (args->requests[buffSlot] != NULL) {
- args->tail += args->sliceSteps;
- args->idle = 0;
- }
+ int p = args->protocol;
+ int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS;
+ char* localBuff = args->connector->conn.buffs[p];
+ void* mhandle = *(resources->mhandlesProto[p]);
+ int buffSize = stepSize*args->sliceSteps;
+ if (resources->shared) buffSize /= SENDRECV_SLICEFACTOR;
+ if (args->recvbytes < buffSize) buffSize = args->recvbytes;
+ if ((args->posted < args->done + NCCL_STEPS) && (args->posted < args->end)) {
+ int buffSlot = args->posted%NCCL_STEPS;
+ char* ptr;
+ if (resources->shared) {
+ NCCLCHECK(ncclProxySharedBuffersAlloc(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, &ptr));
+ if (ptr == NULL) return ncclInternalError;
+ volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo;
+ ptrsFifo[buffSlot] = ptr;
+ } else {
+ ptr = localBuff+buffSlot*stepSize;
}
- if (args->tail > args->head) {
- int buffSlot = args->head%NCCL_STEPS;
- int done, size;
- NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size));
- if (done) {
- args->head += args->sliceSteps;
- if (args->protocol == NCCL_PROTO_SIMPLE) {
- if (resources->useGdr) ncclNetFlush(resources->netRecvComm, localBuff+buffSlot*stepSize, size, mhandle);
- resources->hostRecvMem->tail = args->head;
- }
- args->idle = 0;
+ NCCLCHECK(ncclNetIrecv(resources->netRecvComm, ptr, buffSize, mhandle, args->requests+buffSlot));
+ if (args->requests[buffSlot] != NULL) {
+ TRACE(NCCL_NET, "recvProxy [%d/%d] posted recv request %p", args->posted, buffSlot, args->requests[buffSlot]);
+ args->posted += args->sliceSteps;
+ args->idle = 0;
+ return ncclSuccess;
+ } else if (resources->shared) {
+ NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, ptr));
+ }
+ }
+ if (args->posted > args->received) {
+ int buffSlot = args->received%NCCL_STEPS;
+ int done, size;
+ NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size));
+ if (done) {
+ args->received += args->sliceSteps;
+ if (size > 0 && args->protocol == NCCL_PROTO_SIMPLE && resources->useGdr) {
+ // Don't pass data to the GPU yet, flush first.
+ volatile void** ptrsFifo = (volatile void**)resources->recvMem->ptrsFifo;
+ char* ptr = resources->shared ? (char*)(ptrsFifo[buffSlot]) : localBuff+buffSlot*stepSize;
+ NCCLCHECK(ncclNetIflush(resources->netRecvComm, ptr, size, mhandle, args->requests+buffSlot));
+ } else {
+ args->requests[buffSlot] = NULL;
}
+ args->idle = 0;
+ return ncclSuccess;
}
}
- if (args->head == args->end) {
- resources->step = args->end;
- args->idle = 0;
- args->state = ncclProxyOpNone;
+ if (args->received > args->transmitted) {
+ // Progress flush operations
+ int buffSlot = args->transmitted%NCCL_STEPS;
+ int done = 1;
+ if (args->requests[buffSlot]) NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL));
+ if (done) {
+ args->transmitted += args->sliceSteps;
+ __sync_synchronize();
+ resources->recvMem->tail = args->transmitted;
+ args->idle = 0;
+ return ncclSuccess;
+ }
+ }
+ if (args->transmitted > args->done) {
+ volatile uint64_t* sendHead = &resources->sendMem->head;
+ uint64_t done = *sendHead;
+ while (done > args->done &&
+ // LL and LL128 can acknowledge 0-bytes send before they even happen. Don't go past what we transmitted.
+ args->transmitted > args->done) {
+ if (resources->shared) {
+ char* ptr = (char*)resources->recvMem->ptrsFifo[args->done%NCCL_STEPS];
+ NCCLCHECK(ncclProxySharedBuffersFree(args->connector->comm, resources->useGdr, 1, args->channel->id, buffSize, ptr));
+ }
+ args->done += args->sliceSteps;
+ args->idle = 0;
+ if (args->done == args->end) {
+ resources->step = args->end;
+ args->state = ncclProxyOpNone;
+ }
+ }
}
}
return ncclSuccess;