Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/nccl.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport.cu')
-rw-r--r--src/transport.cu187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/transport.cu b/src/transport.cu
new file mode 100644
index 0000000..f5f9d75
--- /dev/null
+++ b/src/transport.cu
@@ -0,0 +1,187 @@
+/*************************************************************************
+ * Copyright (c) 2016-2018, NVIDIA CORPORATION. All rights reserved.
+ *
+ * See LICENSE.txt for license information
+ ************************************************************************/
+
+#include "core.h"
+#include "common_coll.h"
+
+extern struct ncclTransport p2pTransport;
+extern struct ncclTransport shmTransport;
+extern struct ncclTransport netTransport;
+
+struct ncclTransport ncclTransports[NTRANSPORTS] = {
+ p2pTransport,
+ shmTransport,
+ netTransport,
+};
+
+static void FifoPullArgs(struct transportProxyInfo* info, struct ncclProxyArgs *args) {
+ struct ncclProxyArgs *fifoArgs = info->argsFifo + (info->argsFifoHead % TRANSPORT_PROXY_FIFO_SIZE);
+ pthread_mutex_lock(&info->mutex);
+ while (fifoArgs->active == 0)
+ pthread_cond_wait(&info->cond, &info->mutex);
+ __sync_synchronize();
+ memcpy(args, fifoArgs, sizeof(struct ncclProxyArgs));
+ __sync_synchronize();
+ fifoArgs->active = 0;
+ pthread_cond_signal(&info->cond);
+ pthread_mutex_unlock(&info->mutex);
+ info->argsFifoHead++;
+}
+
+static struct ncclProxyArgs* FifoGetNextArgs(struct transportProxyInfo* info) {
+ if (info == NULL) return NULL;
+ struct ncclProxyArgs* fifoArgs = info->argsFifo + (info->argsFifoTail % TRANSPORT_PROXY_FIFO_SIZE);
+ pthread_mutex_lock(&info->mutex);
+ while (fifoArgs->active == 1)
+ pthread_cond_wait(&info->cond, &info->mutex);
+ pthread_mutex_unlock(&info->mutex);
+ info->argsFifoTail++;
+ return fifoArgs;
+}
+
+static void FifoPushArgs(struct transportProxyInfo* info) {
+ if (info == NULL) return;
+
+ struct ncclProxyArgs* fifoArgs = info->argsFifo + ((info->argsFifoTail-1) % TRANSPORT_PROXY_FIFO_SIZE);
+ if (fifoArgs->active == 0) return;
+
+ pthread_mutex_lock(&info->mutex);
+ pthread_cond_signal(&info->cond);
+ pthread_mutex_unlock(&info->mutex);
+}
+
+static void WaitProxyReady(struct transportProxyInfo* info) {
+ pthread_mutex_lock(&info->mutex);
+ while (info->proxyReady == 0)
+ pthread_cond_wait(&info->cond, &info->mutex);
+ pthread_mutex_unlock(&info->mutex);
+}
+
+static void SetProxyReady(struct transportProxyInfo* info) {
+ pthread_mutex_lock(&info->mutex);
+ info->proxyReady = 1;
+ pthread_cond_signal(&info->cond);
+ pthread_mutex_unlock(&info->mutex);
+}
+
+static void StopProxy(struct transportProxyInfo* info) {
+ struct ncclProxyArgs* fifoArgs = FifoGetNextArgs(info);
+ fifoArgs->active = -1;
+ FifoPushArgs(info);
+}
+
+#define RECV 0
+#define SEND 1
+
+static bool NeedProxy(int type, int pattern, struct ncclRing* ring, int nranks) {
+ enum proxyMode mode = proxyPatternMode(pattern);
+ if (mode == proxyRing) return true;
+
+ /* In chains, one rank does not need a proxy. Let's figure out which one it is */
+ int root = proxyPatternRoot(pattern);
+ // Which index in the reorganized rings should we compare root against */
+ const int myrank = 0, nextrank = 1, prevrank = nranks-1;
+ int index = mode == proxyFrom ?
+ /* no recv / no send if root = */
+ /* bcast */ (type == RECV ? myrank : nextrank ):
+ /* reduce */ (type == RECV ? prevrank : myrank );
+ int rank = ring->userRanks[index];
+ return (root != rank);
+}
+
+static void SaveProxy(struct ncclConnector* connector, struct ncclProxyArgs* args, int needProxy) {
+ struct transportProxyInfo* info = connector->proxyInfo;
+ if (info == NULL) return;
+ struct ncclProxyArgs* fifoArgs = FifoGetNextArgs(info);
+ args->needProxy = needProxy;
+ __sync_synchronize();
+ memcpy(fifoArgs, args, sizeof(struct ncclProxyArgs));
+ __sync_synchronize();
+ fifoArgs->active = 1;
+}
+
+ncclResult_t transportSaveProxies(int substeps, int subchunks, int nstepsPerRound, int nblocksPerRound, size_t nbytes, int pattern, struct ncclComm* comm) {
+ int llMode, nrings, nthreads;
+ ncclGetCollResource(comm, nbytes, &nrings, &nthreads, &llMode);
+ nbytes = llMode ? nbytes * 2 : nbytes;
+ substeps = llMode ? 1 : substeps;
+ subchunks = llMode ? NCCL_LL_CHUNKS : subchunks;
+ int buffSize = llMode ? NCCL_LL_BUFF_SIZE : comm->rings[0].buffSize;
+
+ int nrounds = (int)(DIVUP(nbytes, ((size_t)nrings * nblocksPerRound * (buffSize/subchunks)))); // Fixed 32-bit overflow
+ int nsteps = nstepsPerRound * nrounds * substeps;
+ TRACE(NET,"opCount %lx substeps %d subchunks %d nrounds %d nsteps %d comm %p", comm->opCount, subchunks, subchunks, nrounds, nsteps, comm);
+ TRACE(NET,"opCount %lx nbytes %zi nrings %d buffSize %d pattern %d comm %p", comm->opCount, nbytes, nrings, buffSize, pattern, comm);
+ for (int r=0; r<nrings; r++) {
+ struct ncclRing* ring = comm->rings+((comm->myParams->gridDim.x+r)%comm->nRings);
+ struct ncclProxyArgs args = { ring, substeps*subchunks, nsteps, comm->opCount, llMode, 0 };
+ SaveProxy(&ring->recv, &args, NeedProxy(RECV, pattern, ring, comm->nRanks));
+ SaveProxy(&ring->send, &args, NeedProxy(SEND, pattern, ring, comm->nRanks));
+ }
+ return ncclSuccess;
+}
+
+ncclResult_t transportStartProxies(ncclComm* comm) {
+ for (int r=0; r<comm->nRings; r++) {
+ FifoPushArgs(comm->rings[r].send.proxyInfo);
+ FifoPushArgs(comm->rings[r].recv.proxyInfo);
+ }
+ pthread_yield(); // Let other threads run
+ return ncclSuccess;
+}
+
+void* persistentThread(void *opaqueInfo) {
+ struct transportProxyInfo* info = (struct transportProxyInfo*)opaqueInfo;
+ // We need to initialize the context before launching any NCCL cuda kernel,
+ // otherwise we would create it during the first cudaMemcpyAsync inside the
+ // proxy function and that would cause a deadlock
+ cudaSetDevice(info->comm->cudaDev);
+ // Signal the main thread the context is created and it can proceed.
+ SetProxyReady(info);
+ while (1) {
+ struct ncclProxyArgs args;
+ FifoPullArgs(info, &args);
+ if (args.active == -1) {
+ // Main thread asked to stop
+ return NULL;
+ }
+ ncclResult_t res = info->func(&args);
+ if (res != ncclSuccess) {
+ WARN("%s:%d -> %d [Proxy thread error]", __FILE__, __LINE__, res);
+ }
+ }
+}
+
+ncclResult_t transportCreateProxy(int type, struct ncclRing* ring, struct ncclComm* comm) {
+ struct ncclConnector* connector = (type == RECV) ? &ring->recv : &ring->send;
+ threadFunc_t proxyfunc = (threadFunc_t) ((type == RECV) ? connector->transport->recv.proxy : connector->transport->send.proxy);
+ if (proxyfunc) {
+ TRACE(NET,"type %d ring %p proxyfunc %p comm %p", type, ring, proxyfunc, comm);
+ struct transportProxyInfo* info;
+ NCCLCHECK(ncclCalloc(&info, 1));
+ connector->proxyInfo = info;
+ info->comm = comm;
+ info->cond = PTHREAD_COND_INITIALIZER;
+ info->mutex = PTHREAD_MUTEX_INITIALIZER;
+ info->func = proxyfunc;
+ info->argsFifoHead = info->argsFifoTail = 0;
+ info->proxyReady = 0;
+ pthread_create(&connector->proxyInfo->thread, NULL, persistentThread, info);
+ // Wait for thread to initialize its CUDA context.
+ WaitProxyReady(info);
+ }
+ return ncclSuccess;
+}
+
+ncclResult_t transportDestroyProxy(struct ncclConnector* connector) {
+ if (connector->proxyInfo) {
+ StopProxy(connector->proxyInfo);
+ pthread_join(connector->proxyInfo->thread, NULL);
+ free(connector->proxyInfo);
+ connector->proxyInfo = NULL;
+ }
+ return ncclSuccess;
+}