diff options
Diffstat (limited to 'src/transport.cc')
-rw-r--r-- | src/transport.cc | 292 |
1 files changed, 81 insertions, 211 deletions
diff --git a/src/transport.cc b/src/transport.cc index 4059849..a5af541 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -1,11 +1,12 @@ /************************************************************************* - * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ #include "comm.h" #include "info.h" +#include "bootstrap.h" extern struct ncclTransport p2pTransport; extern struct ncclTransport shmTransport; @@ -17,235 +18,104 @@ struct ncclTransport ncclTransports[NTRANSPORTS] = { netTransport, }; -#define RECV 0 -#define SEND 1 - -static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { - if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true; - - /* In chains, one rank does not need a proxy. Let's figure out which one it is */ - // Which index in the reorganized rings should we compare root against */ - const int myrank = 0, nextrank = 1, prevrank = nranks-1; - int index = pattern == ncclPatternPipelineFrom ? - /* no recv / no send if root = */ - /* bcast */ (type == RECV ? myrank : nextrank ): - /* reduce */ (type == RECV ? prevrank : myrank ); - int rank = ring->userRanks[index]; - return (root != rank); -} - -enum { proxyRecv=0, proxySend=1 }; - -#define PROXYARGS_ALLOCATE_SIZE 32 -struct ncclProxyPool { - struct ncclProxyPool *next; - struct ncclProxyArgs elems[PROXYARGS_ALLOCATE_SIZE]; -}; - -ncclResult_t transportAllocateProxyArgs(struct ncclComm* comm, struct ncclProxyArgs** argsptr) { - struct ncclProxyState* state = &comm->proxyState; - struct ncclProxyArgs* elem; - pthread_mutex_lock(&state->mutex); - if (state->pool == NULL) { - // Allocate a new pool of elements - struct ncclProxyPool* newPool; - NCCLCHECK(ncclCalloc(&newPool, 1)); - struct ncclProxyArgs* newElems = newPool->elems; - // Chain newly allocated elements - for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) { - if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1; +template <int type> +static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId) { + for (int t=0; t<NTRANSPORTS; t++) { + struct ncclTransport *transport = ncclTransports+t; + struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv; + int ret = 0; + NCCLCHECK(transport->canConnect(&ret, comm->topo, graph, myInfo, peerInfo)); + if (ret) { + connector->transportComm = transportComm; + NCCLCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId)); + return ncclSuccess; } - // Add them all to the pool list - state->pool = newElems; - // Save the pool memory block for later resource release - newPool->next = state->pools; - state->pools = newPool; } - elem = state->pool; - state->pool = state->pool->next; - pthread_mutex_unlock(&state->mutex); - elem->next = elem->nextPeer = NULL; - *argsptr = elem; - return ncclSuccess; + WARN("No transport found !"); + return ncclInternalError; } -static void ProxyAppend(struct ncclConnector* connector, struct ncclProxyArgs* args) { - struct ncclComm* comm = connector->comm; - struct ncclProxyState* state = &comm->proxyState; - pthread_mutex_lock(&state->mutex); - if (connector->proxyAppend == NULL) { - // Nothing running for that peer. Add to the circular list - if (state->ops == NULL) { - // Create the list - args->next = args; - state->ops = args; - } else { - // Insert element in the list - args->next = state->ops->next; - state->ops->next = args; - } - connector->proxyAppend = args; - } else { - // There is an active operation already for that peer. - // Add it to the per-peer list - connector->proxyAppend->nextPeer = args; - connector->proxyAppend = args; +ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) { + TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv); + uint32_t mask = 1 << channel->id; + for (int i=0; i<nrecv; i++) { + int peer = peerRecv[i]; + if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].recv.connected) continue; + comm->connectRecv[peer] |= mask; + } + for (int i=0; i<nsend; i++) { + int peer = peerSend[i]; + if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].send.connected) continue; + comm->connectSend[peer] |= mask; } - pthread_mutex_unlock(&state->mutex); -} - -template <int type> -static ncclResult_t SaveProxy(int peer, struct ncclProxyArgs* args) { - if (peer < 0) return ncclSuccess; - - struct ncclPeer* peerComm = args->channel->peers+peer; - struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send; - if (connector->transportComm->proxy == NULL) return ncclSuccess; - - struct ncclProxyArgs* op; - NCCLCHECK(transportAllocateProxyArgs(connector->comm, &op)); - memcpy(op, args, sizeof(struct ncclProxyArgs)); - op->connector = connector; - op->progress = connector->transportComm->proxy; - op->state = ncclProxyOpReady; - ProxyAppend(connector, op); return ncclSuccess; } -ncclResult_t transportSaveProxies(struct ncclProxyArgs* args, int pattern, int root, int nranks) { - if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) { - struct ncclRing* ring = &args->channel->ring; - if (NeedProxy(RECV, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxyRecv>(ring->prev, args)); - if (NeedProxy(SEND, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxySend>(ring->next, args)); - } - if (pattern == ncclPatternTreeUp || pattern == ncclPatternTreeUpDown) { - // Tree up - struct ncclTree* tree = &args->channel->treeUp; - for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxyRecv>(tree->down[i], args)); - NCCLCHECK(SaveProxy<proxySend>(tree->up, args)); - } - if (pattern == ncclPatternTreeDown || pattern == ncclPatternTreeUpDown) { - // Tree down - struct ncclTree* tree = &args->channel->treeDn; - for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxySend>(tree->down[i], args)); - NCCLCHECK(SaveProxy<proxyRecv>(tree->up, args)); +void dumpData(struct ncclConnect* data, int ndata) { + for (int n=0; n<ndata; n++) { + printf("[%d] ", n); + uint8_t* d = (uint8_t*)data; + for (int i=0; i<sizeof(struct ncclConnect); i++) printf("%02x", d[i]); + printf("\n"); } - return ncclSuccess; } -void* persistentThread(void *comm_) { - struct ncclComm* comm = (struct ncclComm*)comm_; - struct ncclProxyState* state = &comm->proxyState; - struct ncclProxyArgs* op = NULL; - ncclResult_t ret = ncclSuccess; - int idle = 1; - int idleSpin = 0; - while (1) { - do { - if (*comm->abortFlag) return NULL; - if (op == NULL) { - pthread_mutex_lock(&state->mutex); - op = state->ops; - if (op == NULL) { - if (state->stop) { - // No more commands to process and proxy has been requested to stop - pthread_mutex_unlock(&state->mutex); - return NULL; - } - pthread_cond_wait(&state->cond, &state->mutex); - } - pthread_mutex_unlock(&state->mutex); +ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph) { + struct ncclConnect data[2*MAXCHANNELS]; + for (int i=1; i<comm->nRanks; i++) { + int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; + int sendPeer = (comm->rank + i) % comm->nRanks; + uint32_t recvMask = comm->connectRecv[recvPeer]; + uint32_t sendMask = comm->connectSend[sendPeer]; + + struct ncclConnect* recvData = data; + int sendChannels = 0, recvChannels = 0; + for (int c=0; c<MAXCHANNELS; c++) { + if (recvMask & (1<<c)) { + struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv; + NCCLCHECK(selectTransport<0>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+recvPeer, recvData+recvChannels++, conn, c)); } - } while (op == NULL); - op->idle = 0; - // opCount >= lastOpCount are part of an ongoing GroupStart/GroupEnd that hasn't started - // yet and might be cancelled before they even start. Hold on on those. - if (op->state != ncclProxyOpNone && op->opCount < comm->lastOpCount) ret = op->progress(op); - if (ret != ncclSuccess) { - comm->fatalError = ret; - INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); - return NULL; } - idle &= op->idle; - pthread_mutex_lock(&state->mutex); - if (!idle) idleSpin = 0; - struct ncclProxyArgs *next = op->next; - if (next->state == ncclProxyOpNone) { - struct ncclProxyArgs *freeOp = next; - if (next->nextPeer) { - // Replace next by its next per-peer element. - next = next->nextPeer; - if (op != freeOp) { - next->next = freeOp->next; - op->next = next; - } else { - next->next = next; - } - } else { - // Remove next from circular list - next->connector->proxyAppend = NULL; - if (op != freeOp) { - next = next->next; - op->next = next; - } else { - next = NULL; - } + struct ncclConnect* sendData = recvData+recvChannels; + for (int c=0; c<MAXCHANNELS; c++) { + if (sendMask & (1<<c)) { + struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send; + NCCLCHECK(selectTransport<1>(comm, graph, comm->peerInfo+comm->rank, comm->peerInfo+sendPeer, sendData+sendChannels++, conn, c)); } - if (freeOp == state->ops) state->ops = next; - freeOp->next = state->pool; - state->pool = freeOp; } - op = next; - if (op == state->ops) { - if (idle == 1) { - if (++idleSpin == 10) { - sched_yield(); - idleSpin = 0; - } + + if (sendPeer == recvPeer) { + if (recvChannels+sendChannels) { + NCCLCHECK(bootstrapSend(comm->bootstrap, recvPeer, data, sizeof(struct ncclConnect)*(recvChannels+sendChannels))); + NCCLCHECK(bootstrapRecv(comm->bootstrap, recvPeer, data, sizeof(struct ncclConnect)*(recvChannels+sendChannels))); + sendData = data; + recvData = data+sendChannels; } - idle = 1; + } else { + if (recvChannels) NCCLCHECK(bootstrapSend(comm->bootstrap, recvPeer, recvData, sizeof(struct ncclConnect)*recvChannels)); + if (sendChannels) NCCLCHECK(bootstrapSend(comm->bootstrap, sendPeer, sendData, sizeof(struct ncclConnect)*sendChannels)); + if (sendChannels) NCCLCHECK(bootstrapRecv(comm->bootstrap, sendPeer, sendData, sizeof(struct ncclConnect)*sendChannels)); + if (recvChannels) NCCLCHECK(bootstrapRecv(comm->bootstrap, recvPeer, recvData, sizeof(struct ncclConnect)*recvChannels)); } - pthread_mutex_unlock(&state->mutex); - } -} -ncclResult_t transportStartProxy(struct ncclComm* comm) { - pthread_mutex_lock(&comm->proxyState.mutex); - if (comm->proxyState.ops != NULL) - pthread_cond_signal(&comm->proxyState.cond); - pthread_mutex_unlock(&comm->proxyState.mutex); - return ncclSuccess; -} - -ncclResult_t transportCreateProxy(struct ncclComm* comm) { - if (!comm->proxyThread) { - comm->proxyState.cond = PTHREAD_COND_INITIALIZER; - comm->proxyState.mutex = PTHREAD_MUTEX_INITIALIZER; - comm->proxyState.ops = NULL; - pthread_create(&comm->proxyThread, NULL, persistentThread, comm); + for (int c=0; c<MAXCHANNELS; c++) { + if (sendMask & (1<<c)) { + struct ncclConnector* conn = &comm->channels[c].peers[sendPeer].send; + NCCLCHECK(conn->transportComm->connect(comm, sendData++, 1, comm->rank, conn)); + conn->connected = 1; + CUDACHECK(cudaMemcpy(&comm->channels[c].devPeers[sendPeer].send, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice)); + } + } + for (int c=0; c<MAXCHANNELS; c++) { + if (recvMask & (1<<c)) { + struct ncclConnector* conn = &comm->channels[c].peers[recvPeer].recv; + NCCLCHECK(conn->transportComm->connect(comm, recvData++, 1, comm->rank, conn)); + conn->connected = 1; + CUDACHECK(cudaMemcpy(&comm->channels[c].devPeers[recvPeer].recv, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice)); + } + } + comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0; } return ncclSuccess; } -ncclResult_t transportDestroyProxy(struct ncclComm* comm) { - struct ncclProxyState* state = &comm->proxyState; - - // Request the proxy to stop and then wake it - pthread_mutex_lock(&state->mutex); - state->stop = true; - pthread_cond_signal(&state->cond); - pthread_mutex_unlock(&state->mutex); - if (comm->proxyThread) pthread_join(comm->proxyThread, NULL); - - // Free off any memory allocated for the proxy arg pools - pthread_mutex_lock(&state->mutex); - struct ncclProxyState* proxyState = &comm->proxyState; - while (proxyState->pools != NULL) { - struct ncclProxyPool *next = proxyState->pools->next; - free(proxyState->pools); - proxyState->pools = next; - } - pthread_mutex_unlock(&state->mutex); - - return ncclSuccess; -} |