Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/nccl.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport.cc')
-rw-r--r--src/transport.cc291
1 files changed, 57 insertions, 234 deletions
diff --git a/src/transport.cc b/src/transport.cc
index cc8d5d1..7219ea3 100644
--- a/src/transport.cc
+++ b/src/transport.cc
@@ -1,11 +1,12 @@
/*************************************************************************
- * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved.
+ * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "comm.h"
#include "info.h"
+#include "bootstrap.h"
extern struct ncclTransport p2pTransport;
extern struct ncclTransport shmTransport;
@@ -17,248 +18,70 @@ struct ncclTransport ncclTransports[NTRANSPORTS] = {
netTransport,
};
-#define RECV 0
-#define SEND 1
-
-static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) {
- if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true;
-
- /* In chains, one rank does not need a proxy. Let's figure out which one it is */
- // Which index in the reorganized rings should we compare root against */
- const int myrank = 0, nextrank = 1, prevrank = nranks-1;
- int index = pattern == ncclPatternPipelineFrom ?
- /* no recv / no send if root = */
- /* bcast */ (type == RECV ? myrank : nextrank ):
- /* reduce */ (type == RECV ? prevrank : myrank );
- int rank = ring->userRanks[index];
- return (root != rank);
-}
-
-enum { proxyRecv=0, proxySend=1 };
-
-#define PROXYARGS_ALLOCATE_SIZE 32
-struct ncclProxyPool {
- struct ncclProxyPool *next;
- struct ncclProxyArgs elems[PROXYARGS_ALLOCATE_SIZE];
-};
-
-ncclResult_t transportAllocateProxyArgs(struct ncclComm* comm, struct ncclProxyArgs** argsptr) {
- struct ncclProxyState* state = &comm->proxyState;
- struct ncclProxyArgs* elem;
- pthread_mutex_lock(&state->mutex);
- if (state->pool == NULL) {
- // Allocate a new pool of elements
- struct ncclProxyPool* newPool;
- NCCLCHECK(ncclCalloc(&newPool, 1));
- struct ncclProxyArgs* newElems = newPool->elems;
- // Chain newly allocated elements
- for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) {
- if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1;
- }
- // Add them all to the pool list
- state->pool = newElems;
- // Save the pool memory block for later resource release
- newPool->next = state->pools;
- state->pools = newPool;
- }
- elem = state->pool;
- state->pool = state->pool->next;
- pthread_mutex_unlock(&state->mutex);
- elem->next = elem->nextPeer = NULL;
- *argsptr = elem;
- return ncclSuccess;
-}
-
-static void ProxyAppend(struct ncclConnector* connector, struct ncclProxyArgs* args) {
- struct ncclComm* comm = connector->comm;
- struct ncclProxyState* state = &comm->proxyState;
- pthread_mutex_lock(&state->mutex);
- if (connector->proxyAppend == NULL) {
- // Nothing running for that peer. Add to the circular list
- if (state->ops == NULL) {
- // Create the list
- args->next = args;
- state->ops = args;
- } else {
- // Insert element in the list
- args->next = state->ops->next;
- state->ops->next = args;
+template <int type>
+static ncclResult_t selectTransport(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId) {
+ for (int t=0; t<NTRANSPORTS; t++) {
+ struct ncclTransport *transport = ncclTransports+t;
+ struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
+ int ret = 0;
+ NCCLCHECK(transport->canConnect(&ret, topo, graph, myInfo, peerInfo));
+ if (ret) {
+ connector->transportComm = transportComm;
+ NCCLCHECK(transportComm->setup(topo, graph, myInfo, peerInfo, connect, connector, channelId));
+ return ncclSuccess;
}
- connector->proxyAppend = args;
- } else {
- // There is an active operation already for that peer.
- // Add it to the per-peer list
- connector->proxyAppend->nextPeer = args;
- connector->proxyAppend = args;
}
- pthread_mutex_unlock(&state->mutex);
-}
-
-template <int type>
-static ncclResult_t SaveProxy(int peer, struct ncclProxyArgs* args) {
- if (peer < 0) return ncclSuccess;
-
- struct ncclPeer* peerComm = args->channel->peers+peer;
- struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send;
- if (connector->transportComm == NULL) return ncclInternalError;
- if (connector->transportComm->proxy == NULL) return ncclSuccess;
-
- struct ncclProxyArgs* op;
- NCCLCHECK(transportAllocateProxyArgs(connector->comm, &op));
- memcpy(op, args, sizeof(struct ncclProxyArgs));
- op->connector = connector;
- op->progress = connector->transportComm->proxy;
- op->state = ncclProxyOpReady;
- ProxyAppend(connector, op);
- return ncclSuccess;
+ WARN("No transport found !");
+ return ncclInternalError;
}
-ncclResult_t transportSaveProxies(struct ncclProxyArgs* args, int pattern, int root, int nranks) {
- if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) {
- struct ncclRing* ring = &args->channel->ring;
- if (NeedProxy(RECV, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxyRecv>(ring->prev, args));
- if (NeedProxy(SEND, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxySend>(ring->next, args));
+ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) {
+ TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv);
+ uint32_t nSkippedSend = 0, nSkippedRecv = 0; /* for tracing */
+ struct ncclConnect connect;
+ struct ncclConnector* conn;
+ for (int i=0; i<nrecv; i++) {
+ int peer = peerRecv[i];
+ if (peer == -1 || peer >= comm->nRanks) continue;
+ conn = &channel->peers[peer].recv;
+ if (conn->connected) { ++nSkippedRecv; continue; }
+ memset(&connect, 0, sizeof(connect));
+ NCCLCHECK(selectTransport<0>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
+ NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
}
- if (pattern == ncclPatternTreeUp || pattern == ncclPatternTreeUpDown) {
- // Tree up
- struct ncclTree* tree = &args->channel->treeUp;
- for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxyRecv>(tree->down[i], args));
- NCCLCHECK(SaveProxy<proxySend>(tree->up, args));
+ for (int i=0; i<nsend; i++) {
+ int peer = peerSend[i];
+ if (peer == -1 || peer >= comm->nRanks) continue;
+ conn = &channel->peers[peer].send;
+ if (conn->connected) { ++nSkippedSend; continue; }
+ memset(&connect, 0, sizeof(connect));
+ NCCLCHECK(selectTransport<1>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
+ NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
}
- if (pattern == ncclPatternTreeDown || pattern == ncclPatternTreeUpDown) {
- // Tree down
- struct ncclTree* tree = &args->channel->treeDn;
- for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxySend>(tree->down[i], args));
- NCCLCHECK(SaveProxy<proxyRecv>(tree->up, args));
+ for (int i=0; i<nsend; i++) {
+ int peer = peerSend[i];
+ if (peer == -1 || peer >= comm->nRanks) continue;
+ conn = &channel->peers[peer].send;
+ if (conn->connected) {++nSkippedSend; continue; }
+ memset(&connect, 0, sizeof(connect));
+ NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
+ NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
+ conn->connected = 1;
+ CUDACHECK(cudaMemcpy(&channel->devPeers[peer].send, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice));
}
- if (pattern == ncclPatternCollTreeUp) {
- // CollTree up
- struct ncclTree* tree = &args->channel->collTreeUp;
- NCCLCHECK(SaveProxy<proxyRecv>(tree->down[0], args));
- NCCLCHECK(SaveProxy<proxySend>(tree->up, args));
- }
- if (pattern == ncclPatternCollTreeDown) {
- // CollTree down
- struct ncclTree* tree = &args->channel->collTreeDn;
- NCCLCHECK(SaveProxy<proxySend>(tree->down[0], args));
- NCCLCHECK(SaveProxy<proxyRecv>(tree->up, args));
+ for (int i=0; i<nrecv; i++) {
+ int peer = peerRecv[i];
+ if (peer == -1 || peer >= comm->nRanks) continue;
+ conn = &channel->peers[peer].recv;
+ if (conn->connected) {++nSkippedRecv; continue; }
+ memset(&connect, 0, sizeof(connect));
+ NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
+ NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
+ conn->connected = 1;
+ CUDACHECK(cudaMemcpy(&channel->devPeers[peer].recv, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice));
}
+ TRACE(NCCL_INIT, "nsend %d nrecv %d nSkippedSend %u nSkippedRecv %u - DONE", nsend, nrecv, nSkippedSend, nSkippedRecv);
return ncclSuccess;
}
-void* persistentThread(void *comm_) {
- struct ncclComm* comm = (struct ncclComm*)comm_;
- struct ncclProxyState* state = &comm->proxyState;
- struct ncclProxyArgs* op = NULL;
- ncclResult_t ret = ncclSuccess;
- int idle = 1;
- int idleSpin = 0;
- while (1) {
- do {
- if (*comm->abortFlag) return NULL;
- if (op == NULL) {
- pthread_mutex_lock(&state->mutex);
- op = state->ops;
- if (op == NULL) {
- if (state->stop) {
- // No more commands to process and proxy has been requested to stop
- pthread_mutex_unlock(&state->mutex);
- return NULL;
- }
- pthread_cond_wait(&state->cond, &state->mutex);
- }
- pthread_mutex_unlock(&state->mutex);
- }
- } while (op == NULL);
- op->idle = 0;
- // opCount >= lastOpCount are part of an ongoing GroupStart/GroupEnd that hasn't started
- // yet and might be cancelled before they even start. Hold on on those.
- if (op->state != ncclProxyOpNone && op->opCount < comm->lastOpCount) ret = op->progress(op);
- if (ret != ncclSuccess) {
- comm->fatalError = ret;
- INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret);
- return NULL;
- }
- idle &= op->idle;
- pthread_mutex_lock(&state->mutex);
- if (!idle) idleSpin = 0;
- struct ncclProxyArgs *next = op->next;
- if (next->state == ncclProxyOpNone) {
- struct ncclProxyArgs *freeOp = next;
- if (next->nextPeer) {
- // Replace next by its next per-peer element.
- next = next->nextPeer;
- if (op != freeOp) {
- next->next = freeOp->next;
- op->next = next;
- } else {
- next->next = next;
- }
- } else {
- // Remove next from circular list
- next->connector->proxyAppend = NULL;
- if (op != freeOp) {
- next = next->next;
- op->next = next;
- } else {
- next = NULL;
- }
- }
- if (freeOp == state->ops) state->ops = next;
- freeOp->next = state->pool;
- state->pool = freeOp;
- }
- op = next;
- if (op == state->ops) {
- if (idle == 1) {
- if (++idleSpin == 10) {
- sched_yield();
- idleSpin = 0;
- }
- }
- idle = 1;
- }
- pthread_mutex_unlock(&state->mutex);
- }
-}
-
-ncclResult_t transportStartProxy(struct ncclComm* comm) {
- pthread_mutex_lock(&comm->proxyState.mutex);
- if (comm->proxyState.ops != NULL)
- pthread_cond_signal(&comm->proxyState.cond);
- pthread_mutex_unlock(&comm->proxyState.mutex);
- return ncclSuccess;
-}
-ncclResult_t transportCreateProxy(struct ncclComm* comm) {
- if (!comm->proxyThread) {
- comm->proxyState.cond = PTHREAD_COND_INITIALIZER;
- comm->proxyState.mutex = PTHREAD_MUTEX_INITIALIZER;
- comm->proxyState.ops = NULL;
- pthread_create(&comm->proxyThread, NULL, persistentThread, comm);
- }
- return ncclSuccess;
-}
-
-ncclResult_t transportDestroyProxy(struct ncclComm* comm) {
- struct ncclProxyState* state = &comm->proxyState;
-
- // Request the proxy to stop and then wake it
- pthread_mutex_lock(&state->mutex);
- state->stop = true;
- pthread_cond_signal(&state->cond);
- pthread_mutex_unlock(&state->mutex);
- if (comm->proxyThread) pthread_join(comm->proxyThread, NULL);
-
- // Free off any memory allocated for the proxy arg pools
- pthread_mutex_lock(&state->mutex);
- struct ncclProxyState* proxyState = &comm->proxyState;
- while (proxyState->pools != NULL) {
- struct ncclProxyPool *next = proxyState->pools->next;
- free(proxyState->pools);
- proxyState->pools = next;
- }
- pthread_mutex_unlock(&state->mutex);
-
- return ncclSuccess;
-}