Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/nccl.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/init.cc')
-rw-r--r--src/init.cc396
1 files changed, 256 insertions, 140 deletions
diff --git a/src/init.cc b/src/init.cc
index 627f6c7..0a02760 100644
--- a/src/init.cc
+++ b/src/init.cc
@@ -1,5 +1,5 @@
/*************************************************************************
- * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
+ * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
@@ -11,11 +11,10 @@
#include "transport.h"
#include "group.h"
#include "net.h"
+#include "coll_net.h"
#include "enqueue.h"
#include "graph.h"
#include "argcheck.h"
-#include "cpuset.h"
-#include <sched.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
@@ -43,6 +42,7 @@ NCCL_PARAM(GroupCudaStream, "GROUP_CUDA_STREAM", NCCL_GROUP_CUDA_STREAM);
NCCL_PARAM(CheckPointers, "CHECK_POINTERS", 0);
ncclNet_t* ncclNet = NULL;
+ncclCollNet_t* ncclCollNet = NULL;
// Returns ncclInternalError if anything fails, causing that network to be ignored.
ncclResult_t initNet(ncclNet_t* net) {
@@ -53,7 +53,15 @@ ncclResult_t initNet(ncclNet_t* net) {
return ncclSuccess;
}
-ncclResult_t initNetPlugin(ncclNet_t** net) {
+ncclResult_t initCollNet(ncclCollNet_t* collnet) {
+ int ndev;
+ if (collnet->init(ncclDebugLog) != ncclSuccess) return ncclInternalError;
+ if (collnet->devices(&ndev) != ncclSuccess) return ncclInternalError;
+ if (ndev <= 0) return ncclSystemError;
+ return ncclSuccess;
+}
+
+ncclResult_t initNetPlugin(ncclNet_t** net, ncclCollNet_t** collnet) {
void* netPluginLib = dlopen("libnccl-net.so", RTLD_NOW | RTLD_LOCAL);
if (netPluginLib == NULL) {
// dlopen does not guarantee to set errno, but dlerror only gives us a
@@ -69,13 +77,17 @@ ncclResult_t initNetPlugin(ncclNet_t** net) {
ncclNet_t* extNet = (ncclNet_t*) dlsym(netPluginLib, STR(NCCL_PLUGIN_SYMBOL));
if (extNet == NULL) {
INFO(NCCL_INIT|NCCL_NET, "NET/Plugin: Failed to find " STR(NCCL_PLUGIN_SYMBOL) " symbol.");
- goto cleanup;
- }
- if (initNet(extNet) == ncclSuccess) {
+ } else if (initNet(extNet) == ncclSuccess) {
*net = extNet;
+ // Check for CollNet
+ ncclCollNet_t* extCollNet = (ncclCollNet_t*) dlsym(netPluginLib, STR(NCCL_COLLNET_PLUGIN_SYMBOL));
+ if (extCollNet == NULL) {
+ INFO(NCCL_INIT|NCCL_NET, "NET/Plugin: Failed to find " STR(NCCL_COLLNET_PLUGIN_SYMBOL) " symbol.");
+ } else if (initCollNet(extCollNet) == ncclSuccess) {
+ *collnet = extCollNet;
+ }
return ncclSuccess;
}
-cleanup:
if (netPluginLib != NULL) dlclose(netPluginLib);
return ncclSuccess;
}
@@ -84,7 +96,7 @@ ncclResult_t initNet() {
// Always initialize bootstrap network
NCCLCHECK(bootstrapNetInit());
- NCCLCHECK(initNetPlugin(&ncclNet));
+ NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));
if (ncclNet != NULL) return ncclSuccess;
if (initNet(&ncclNetIb) == ncclSuccess) {
ncclNet = &ncclNetIb;
@@ -95,6 +107,8 @@ ncclResult_t initNet() {
return ncclSuccess;
}
+NCCL_PARAM(CollNetEnable, "COLLNET_ENABLE", 0);
+
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
static bool initialized = false;
static ncclResult_t ncclInit() {
@@ -103,6 +117,7 @@ static ncclResult_t ncclInit() {
if (!initialized) {
initEnv();
initNet();
+ INFO(NCCL_INIT, "Using network %s", ncclNetName());
initialized = true;
}
pthread_mutex_unlock(&initLock);
@@ -220,6 +235,7 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) {
*comm->abortFlag = 0;
comm->argsptr = &comm->args;
+ comm->collNetSupport = 0;
*comret = comm;
return ncclSuccess;
@@ -233,7 +249,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
// Copy userRanks and peers
for (int r=0; r<comm->nChannels; r++) {
NCCLCHECK(ncclCudaMemcpy(comm->channels[r].ring.devUserRanks, comm->channels[r].ring.userRanks, comm->nRanks));
- NCCLCHECK(ncclCudaMemcpy(comm->channels[r].devPeers, comm->channels[r].peers, comm->nRanks));
+ NCCLCHECK(ncclCudaMemcpy(comm->channels[r].devPeers, comm->channels[r].peers, comm->nRanks+1));
}
// Duplicate the dev comm on the device
@@ -269,14 +285,8 @@ static ncclResult_t fillInfo(struct ncclComm* comm, struct ncclPeerInfo* info, u
info->shmDev = statbuf.st_dev;
info->busId = comm->busId;
- int netDevs;
- NCCLCHECK(ncclNetDevices(&netDevs));
- for (int n=0; n<netDevs; n++) {
- int ptrSupport;
- NCCLCHECK(ncclNetPtrSupport(n, &ptrSupport));
- if (ptrSupport & NCCL_PTR_CUDA) info->gdrSupport |= (1 << n);
- }
+ NCCLCHECK(ncclGpuGdrSupport(&info->gdrSupport));
return ncclSuccess;
}
@@ -396,7 +406,7 @@ static ncclResult_t p2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph,
struct ncclConnector* conn;
for (int i=0; i<nrecv; i++) {
int peer = peerRecv[i];
- if (peer == -1) continue;
+ if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].recv;
if (conn->connected) { ++nSkippedRecv; continue; }
memset(&connect, 0, sizeof(connect));
@@ -405,7 +415,7 @@ static ncclResult_t p2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph,
}
for (int i=0; i<nsend; i++) {
int peer = peerSend[i];
- if (peer == -1) continue;
+ if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].send;
if (conn->connected) { ++nSkippedSend; continue; }
memset(&connect, 0, sizeof(connect));
@@ -414,29 +424,148 @@ static ncclResult_t p2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph,
}
for (int i=0; i<nsend; i++) {
int peer = peerSend[i];
- if (peer == -1) continue;
+ if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].send;
if (conn->connected) {++nSkippedSend; continue; }
memset(&connect, 0, sizeof(connect));
NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
- NCCLCHECK(conn->transportComm->connect(&connect, conn));
+ NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
conn->connected = 1;
}
for (int i=0; i<nrecv; i++) {
int peer = peerRecv[i];
- if (peer == -1) continue;
+ if (peer == -1 || peer >= comm->nRanks) continue;
conn = &channel->peers[peer].recv;
if (conn->connected) {++nSkippedRecv; continue; }
memset(&connect, 0, sizeof(connect));
NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
- NCCLCHECK(conn->transportComm->connect(&connect, conn));
+ NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
conn->connected = 1;
}
TRACE(NCCL_INIT, "nsend %d nrecv %d nSkippedSend %u nSkippedRecv %u - DONE", nsend, nrecv, nSkippedSend, nSkippedRecv);
return ncclSuccess;
}
+extern struct ncclTransport collNetTransport;
+
+// All ranks must participate in collNetSetup call
+// type: 0 for send, 1 for recv
+// return: 0 - unsupported, 1 - supported
+static int collNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int collNetChannels, int rank, int nranks, int masterRank, int masterPeer, int nMasters, int type) {
+ int rankInCollNet = -1;
+ int supported = 0;
+ int isMaster = (rank == masterRank) ? 1 : 0;
+ struct {
+ int collNetRank;
+ ncclConnect connect;
+ } sendrecvExchange;
+
+ // check if we can connect to collnet, whose root is the nranks-th rank
+ struct ncclPeerInfo *myInfo = comm->peerInfo+rank, *peerInfo = comm->peerInfo+nranks;
+ peerInfo->rank = nranks;
+ int ret = 1;
+ if (isMaster) {
+ NCCLCHECK(collNetTransport.canConnect(&ret, comm->topo, collNetGraph, myInfo, peerInfo));
+ }
+
+ // send master receives connect info from peer recv master
+ if (isMaster && type == 0) {
+ NCCLCHECK(bootstrapRecv(comm->bootstrap, masterPeer, &sendrecvExchange, sizeof(sendrecvExchange)));
+ rankInCollNet = sendrecvExchange.collNetRank;
+ INFO(NCCL_INIT, "CollNet [send] : rank %d collNetRank %d collNetNranks %d received connect from rank %d", rank, rankInCollNet, nMasters, masterPeer);
+ }
+
+ // select
+ struct ncclPeer* root = channel->peers+nranks;
+ struct ncclConnector* conn = (type == 1) ? &root->recv : &root->send;
+ struct ncclTransportComm* transportComm = (type == 1) ? &(collNetTransport.recv) : &(collNetTransport.send);
+ conn->transportComm = transportComm;
+ // setup
+ struct ncclConnect myConnect;
+ if (isMaster && ret > 0) {
+ NCCLCHECK(transportComm->setup(comm->topo, collNetGraph, myInfo, peerInfo, &myConnect, conn, channel->buffSize, channel->id));
+ }
+ // prepare connect handles
+ ncclResult_t res;
+ struct {
+ int isMaster;
+ ncclConnect connect;
+ } *allConnects = NULL;
+ ncclConnect *masterConnects = NULL;
+ NCCLCHECK(ncclCalloc(&masterConnects, nMasters));
+ if (type == 1) { // recv side: AllGather
+ // all ranks must participate
+ NCCLCHECK(ncclCalloc(&allConnects, nranks));
+ allConnects[rank].isMaster = isMaster;
+ memcpy(&(allConnects[rank].connect), &myConnect, sizeof(struct ncclConnect));
+ NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allConnects, sizeof(*allConnects)), res, cleanup);
+ // consolidate
+ int c = 0;
+ for (int r = 0; r < nranks; r++) {
+ if (allConnects[r].isMaster) {
+ memcpy(masterConnects+c, &(allConnects[r].connect), sizeof(struct ncclConnect));
+ if (r == rank) rankInCollNet = c;
+ c++;
+ }
+ }
+ } else { // send side : copy in connect info received from peer recv master
+ if (isMaster) memcpy(masterConnects+rankInCollNet, &(sendrecvExchange.connect), sizeof(struct ncclConnect));
+ }
+ // connect
+ if (isMaster && ret > 0) {
+ NCCLCHECKGOTO(transportComm->connect(masterConnects, nMasters, rankInCollNet, conn), res, cleanup);
+ }
+ // recv side sends connect info to send side
+ if (isMaster && type == 1) {
+ sendrecvExchange.collNetRank = rankInCollNet;
+ memcpy(&sendrecvExchange.connect, masterConnects+rankInCollNet, sizeof(struct ncclConnect));
+ NCCLCHECK(bootstrapSend(comm->bootstrap, masterPeer, &sendrecvExchange, sizeof(sendrecvExchange)));
+ INFO(NCCL_INIT, "CollNet [recv] : rank %d collNetRank %d collNetNranks %d sent connect to rank %d", rank, rankInCollNet, nMasters, masterPeer);
+ }
+ if (ret > 0) {
+ supported = 1;
+ }
+cleanup:
+ if (allConnects != NULL) free(allConnects);
+ if (masterConnects != NULL) free(masterConnects);
+ return supported;
+}
+
+static ncclResult_t checkCollNetSetup(struct ncclComm* comm, int rank, int collNetSetupFail) {
+ int nranks = comm->nRanks;
+ // AllGather collNet setup results
+ int* allGatherFailures;
+ NCCLCHECK(ncclCalloc(&allGatherFailures, nranks));
+ allGatherFailures[rank] = collNetSetupFail;
+ NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGatherFailures, sizeof(int)));
+ for (int i=0; i<nranks; i++) {
+ if (allGatherFailures[i] != 0) {
+ collNetSetupFail = 1;
+ break;
+ }
+ }
+ free(allGatherFailures);
+ if (collNetSetupFail) {
+ if (rank == 0) WARN("Cannot initialize CollNet, using %s instead", ncclNetName());
+ // Free collNet resources
+ for (int r=0; r<comm->nChannels; r++) {
+ struct ncclChannel* channel = comm->channels+r;
+ struct ncclPeer* peer = channel->peers+nranks;
+ if (peer->send.transportResources && peer->send.transportComm) NCCLCHECK(peer->send.transportComm->free(peer->send.transportResources));
+ if (peer->recv.transportResources && peer->recv.transportComm) NCCLCHECK(peer->recv.transportComm->free(peer->recv.transportResources));
+ peer->send.transportResources = NULL; // avoid double free
+ peer->recv.transportResources = NULL; // avoid double free
+ }
+ // Set support to 0
+ comm->collNetSupport = 0;
+ } else {
+ comm->collNetSupport = 1;
+ }
+ return ncclSuccess;
+}
+
NCCL_PARAM(CrossNic, "CROSS_NIC", 2);
+NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0);
static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
// We use 3 AllGathers
@@ -462,7 +591,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
NCCLCHECK(fillInfo(comm, myInfo, commHash));
NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather1Data, sizeof(*allGather1Data)));
- NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks));
+ NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks+1)); // Extra rank to represent CollNet root
for (int i = 0; i < nranks; i++) {
memcpy(comm->peerInfo+i, &allGather1Data[i].peerInfo, sizeof(struct ncclPeerInfo));
if ((i != rank) && (comm->peerInfo[i].hostHash == myInfo->hostHash) && (comm->peerInfo[i].busId == myInfo->busId)) {
@@ -481,60 +610,82 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
NCCLCHECK(ncclTopoTrimSystem(comm->topo, comm));
// Recompute paths after trimming
NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo));
- // Compute max speed to accelerate search
- NCCLCHECK(ncclTopoGetMaxSpeed(comm->topo));
+ // Init search
+ NCCLCHECK(ncclTopoSearchInit(comm->topo));
// Print final topology
NCCLCHECK(ncclTopoPrint(comm->topo));
// Get rings and trees
- struct ncclTopoGraph treeGraph;
- treeGraph.pattern = NCCL_TOPO_PATTERN_SPLIT_TREE;
- treeGraph.crossNic = ncclParamCrossNic();
- // We communicate only half the data between node with trees on 2 nodes.
- NCCLCHECK(ncclTopoCompute(comm->topo, &treeGraph));
- NCCLCHECK(ncclTopoPrintGraph(comm->topo, &treeGraph));
struct ncclTopoGraph ringGraph;
+ ringGraph.id = 0;
ringGraph.pattern = NCCL_TOPO_PATTERN_RING;
ringGraph.crossNic = ncclParamCrossNic();
+ ringGraph.collNet = 0;
+ ringGraph.minChannels = 1;
+ ringGraph.maxChannels = MAXCHANNELS/2;
NCCLCHECK(ncclTopoCompute(comm->topo, &ringGraph));
NCCLCHECK(ncclTopoPrintGraph(comm->topo, &ringGraph));
+ struct ncclTopoGraph treeGraph;
+ treeGraph.id = 1;
+ treeGraph.pattern = NCCL_TOPO_PATTERN_SPLIT_TREE;
+ treeGraph.crossNic = ncclParamCrossNic();
+ treeGraph.collNet = 0;
+ treeGraph.minChannels = 1;
+ treeGraph.maxChannels = ringGraph.nChannels;
+ NCCLCHECK(ncclTopoCompute(comm->topo, &treeGraph));
+ NCCLCHECK(ncclTopoPrintGraph(comm->topo, &treeGraph));
+
+ struct ncclTopoGraph collNetGraph;
+ collNetGraph.id = 2;
+ collNetGraph.pattern = NCCL_TOPO_PATTERN_TREE;
+ collNetGraph.collNet = 1;
+ collNetGraph.crossNic = ncclParamCrossNic();
+ collNetGraph.minChannels = collNetGraph.maxChannels = ringGraph.nChannels;
+ NCCLCHECK(ncclTopoCompute(comm->topo, &collNetGraph));
+ NCCLCHECK(ncclTopoPrintGraph(comm->topo, &collNetGraph));
+
+ if (comm->rank == ncclParamGraphDumpFileRank()) {
+ struct ncclTopoGraph* graphs[3] = { &ringGraph, &treeGraph, &collNetGraph };
+ NCCLCHECK(ncclTopoDumpGraphs(comm->topo, 3, graphs));
+ }
+
// AllGather3 - begin
+ struct ncclGraphInfo {
+ int sameChannels;
+ float speedIntra;
+ float speedInter;
+ int typeIntra;
+ };
struct {
int cudaCompCap;
int fullCudaCompCap;
- int nvlink;
int nChannels;
- struct {
- int sameChannels;
- int speedIntra;
- int speedInter;
- int nvlink;
- } tree;
- struct {
- int sameChannels;
- int speedIntra;
- int speedInter;
- int nvlink;
- } ring;
+ struct ncclGraphInfo tree;
+ struct ncclGraphInfo ring;
+ struct ncclGraphInfo collNet;
struct ncclTopoRanks topoRanks;
} *allGather3Data;
NCCLCHECK(ncclCalloc(&allGather3Data, nranks));
allGather3Data[rank].cudaCompCap = ncclCudaCompCap();
- allGather3Data[rank].nvlink = treeGraph.nvlink;
- allGather3Data[rank].nChannels = comm->nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels);
+ allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels =
+ std::min(treeGraph.nChannels, ringGraph.nChannels);
allGather3Data[rank].tree.sameChannels = treeGraph.sameChannels;
allGather3Data[rank].tree.speedIntra = treeGraph.speedIntra;
allGather3Data[rank].tree.speedInter = treeGraph.speedInter;
- allGather3Data[rank].tree.nvlink = treeGraph.nvlink;
+ allGather3Data[rank].tree.typeIntra = treeGraph.typeIntra;
allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels;
allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra;
allGather3Data[rank].ring.speedInter = ringGraph.speedInter;
- allGather3Data[rank].ring.nvlink = ringGraph.nvlink;
+ allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra;
+ allGather3Data[rank].collNet.sameChannels = collNetGraph.sameChannels;
+ allGather3Data[rank].collNet.speedIntra = collNetGraph.speedIntra;
+ allGather3Data[rank].collNet.speedInter = collNetGraph.speedInter;
+ allGather3Data[rank].collNet.typeIntra = collNetGraph.typeIntra;
- NCCLCHECK(ncclTopoPreset(comm, &treeGraph, &ringGraph, &allGather3Data[rank].topoRanks));
+ NCCLCHECK(ncclTopoPreset(comm, &treeGraph, &ringGraph, &collNetGraph, &allGather3Data[rank].topoRanks));
NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather3Data, sizeof(*allGather3Data)));
@@ -562,9 +713,6 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
maxCompCap = std::max(allGather3Data[i].cudaCompCap, maxCompCap);
}
- comm->nvlink = 1;
- for (int i = 0; i < nranks; i++) comm->nvlink &= allGather3Data[i].nvlink;
-
int nChannelsOrig = comm->nChannels;
struct ncclTopoRanks** allTopoRanks;
NCCLCHECK(ncclCalloc(&allTopoRanks, comm->nRanks));
@@ -575,11 +723,15 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
treeGraph.sameChannels = std::min(allGather3Data[i].tree.sameChannels, treeGraph.sameChannels);
treeGraph.speedIntra = std::min(allGather3Data[i].tree.speedIntra, treeGraph.speedIntra);
treeGraph.speedInter = std::min(allGather3Data[i].tree.speedInter, treeGraph.speedInter);
- treeGraph.nvlink = std::min(allGather3Data[i].tree.nvlink, treeGraph.nvlink);
+ treeGraph.typeIntra = std::min(allGather3Data[i].tree.typeIntra, treeGraph.typeIntra);
ringGraph.sameChannels = std::min(allGather3Data[i].ring.sameChannels, ringGraph.sameChannels);
ringGraph.speedIntra = std::min(allGather3Data[i].ring.speedIntra, ringGraph.speedIntra);
ringGraph.speedInter = std::min(allGather3Data[i].ring.speedInter, ringGraph.speedInter);
- ringGraph.nvlink = std::min(allGather3Data[i].ring.nvlink, ringGraph.nvlink);
+ ringGraph.typeIntra = std::min(allGather3Data[i].ring.typeIntra, ringGraph.typeIntra);
+ collNetGraph.sameChannels = std::min(allGather3Data[i].collNet.sameChannels, collNetGraph.sameChannels);
+ collNetGraph.speedIntra = std::min(allGather3Data[i].collNet.speedIntra, collNetGraph.speedIntra);
+ collNetGraph.speedInter = std::min(allGather3Data[i].collNet.speedInter, collNetGraph.speedInter);
+ collNetGraph.typeIntra = std::min(allGather3Data[i].collNet.typeIntra, collNetGraph.typeIntra);
}
if (comm->nChannels < nChannelsOrig) {
@@ -592,6 +744,11 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
NCCLCHECK(ncclCalloc(&rings, nranks*MAXCHANNELS));
NCCLCHECK(ncclTopoPostset(comm, nodesFirstRank, allTopoRanks, rings));
+ if (comm->nNodes > 1 &&
+ ncclParamCollNetEnable() == 1 &&
+ collNetSupport()) {
+ NCCLCHECK(ncclTopoConnectCollNet(comm, &collNetGraph, rank));
+ }
free(allTopoRanks);
free(nodesFirstRank);
@@ -601,7 +758,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels);
- NCCLCHECK(ncclSetThresholds(comm, minCompCap, maxCompCap, &treeGraph, &ringGraph));
+ NCCLCHECK(ncclTopoSetThresholds(comm, minCompCap, maxCompCap, &treeGraph, &ringGraph, &collNetGraph));
char line[1024];
line[0]='\0';
@@ -615,21 +772,58 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
line[1023] = '\0';
INFO(NCCL_INIT, "Trees%s", line);
+ // Set Affinity to a CPU local the our GPU, so that all memory we allocate
+ // on the host is local.
+ cpu_set_t affinitySave;
+ sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave);
+ NCCLCHECK(ncclTopoSetAffinity(comm->topo, comm->rank));
+ ncclResult_t ret;
+
// Connect with prev/next for each ring
struct ncclConnect *connect;
- NCCLCHECK(ncclCalloc(&connect, 2));
+ NCCLCHECKGOTO(ncclCalloc(&connect, 2), ret, affinity_restore);
for (int c=0; c<comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
- NCCLCHECK(setupChannel(comm, c, rank, nranks, rings+c*nranks));
+ NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore);
if (comm->nRanks == 1) continue;
- NCCLCHECK(p2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next));
- NCCLCHECK(p2pSetup(comm, &treeGraph, channel, NCCL_MAX_TREE_ARITY, channel->treeUp.down, 1, &channel->treeUp.up));
- NCCLCHECK(p2pSetup(comm, &treeGraph, channel, 1, &channel->treeDn.up, NCCL_MAX_TREE_ARITY, channel->treeDn.down));
+ NCCLCHECKGOTO(p2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore);
+ NCCLCHECKGOTO(p2pSetup(comm, &treeGraph, channel, NCCL_MAX_TREE_ARITY, channel->treeUp.down, 1, &channel->treeUp.up), ret, affinity_restore);
+ NCCLCHECKGOTO(p2pSetup(comm, &treeGraph, channel, 1, &channel->treeDn.up, NCCL_MAX_TREE_ARITY, channel->treeDn.down), ret, affinity_restore);
+ }
+
+ // Check if we can setup CollNet
+ if (comm->nNodes > 1 &&
+ ncclParamCollNetEnable() == 1 &&
+ collNetSupport()) {
+ int logicChannels = comm->nChannels/2;
+ int collNetSetupFail = 0;
+ const int recvIndex = 0; // recv GPU index is always 0
+ const int sendIndex = collNetGraph.pattern == NCCL_TOPO_PATTERN_TREE ? 0 : 1; // send GPU index depends on topo pattern
+ for (int c=0; c<logicChannels; c++) {
+ struct ncclChannel* channelRecv = comm->channels+logicChannels+c;
+ struct ncclChannel* channelSend = comm->channels+c;
+ NCCLCHECK(p2pSetup(comm, &collNetGraph, channelRecv, 1, &channelRecv->collTreeDn.up, 1, channelRecv->collTreeDn.down));
+ NCCLCHECK(p2pSetup(comm, &collNetGraph, channelSend, 1, channelSend->collTreeUp.down, 1, &channelSend->collTreeUp.up));
+ const int recvMaster = collNetGraph.intra[c*comm->localRanks+recvIndex];
+ const int sendMaster = collNetGraph.intra[c*comm->localRanks+sendIndex];
+ if (collNetSetup(comm, &collNetGraph, channelRecv, logicChannels, rank, nranks, recvMaster, sendMaster, comm->nNodes, 1) != 1)
+ collNetSetupFail = 1;
+ if (collNetSetup(comm, &collNetGraph, channelSend, logicChannels, rank, nranks, sendMaster, recvMaster, comm->nNodes, 0) != 1)
+ collNetSetupFail = 1;
+ }
+ // Verify CollNet setup across ranks
+ NCCLCHECK(checkCollNetSetup(comm, rank, collNetSetupFail));
}
TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels);
free(connect);
free(rings);
+ // We should have allocated all buffers, collective fifos, ... we can
+ // restore the affinity.
+affinity_restore:
+ sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave);
+ if (ret != ncclSuccess) return ret;
+
// Compute intra ranks (using AllGather1 data)
int intraRank0 = -1, intraRank = -1, intraRanks = 0;
for (int i = 0; i < nranks; i++) {
@@ -658,98 +852,20 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
return ncclSuccess;
}
-static ncclResult_t getCpuGpuAffinity(int cudaDev, cpu_set_t* mask) {
- CPU_ZERO_S(sizeof(cpu_set_t), mask);
- char* cudaPath;
- NCCLCHECK(ncclTopoCudaPath(cudaDev, &cudaPath));
- char path[PATH_MAX];
- strncpy(path, cudaPath, PATH_MAX-1);
- snprintf(path+strlen(path), PATH_MAX-1-strlen(path), "/local_cpus");
- path[PATH_MAX-1] = '\0';
- int fd;
- SYSCHECKVAL(open(path, O_RDONLY), "open", fd);
- char affinityStr[sizeof(cpu_set_t)*2 + 1];
- int r = read(fd, affinityStr, sizeof(cpu_set_t)*2);
- if (r > 0) {
- affinityStr[r] = '\0';
- NCCLCHECK(ncclStrToCpuset(affinityStr, mask));
- }
- close(fd);
- free(cudaPath);
- return ncclSuccess;
-}
-
-NCCL_PARAM(IgnoreCpuAffinity, "IGNORE_CPU_AFFINITY", 0);
-
-static ncclResult_t setCpuAffinity(int cudaDev) {
- // Query the CPU affinity set we were provided
- cpu_set_t mask;
- SYSCHECK(sched_getaffinity(0, sizeof(cpu_set_t), &mask), "sched_getaffinity");
-
-#ifdef ENABLE_TRACE
- {
- char affinityStr[sizeof(cpu_set_t)*2];
- NCCLCHECK(ncclCpusetToStr(&mask, affinityStr));
- TRACE(NCCL_INIT, "Current affinity for GPU %d is %s", cudaDev, affinityStr);
- }
-#endif
-
- // Find the CPUs that are local to the supplied GPU
- cpu_set_t gpuMask;
- NCCLCHECK(getCpuGpuAffinity(cudaDev, &gpuMask));
-
-#ifdef ENABLE_TRACE
- {
- char affinityStr[sizeof(cpu_set_t)*2];
- NCCLCHECK(ncclCpusetToStr(&gpuMask, affinityStr));
- TRACE(NCCL_INIT, "CPU GPU affinity for GPU %d is %s", cudaDev, affinityStr);
- }
-#endif
-
- cpu_set_t finalMask;
- if (ncclParamIgnoreCpuAffinity())
- // Ignore the CPU affinity set and use the GPU one instead
- finalMask = gpuMask;
- else
- // Use a subset of the GPU affinity set
- CPU_AND(&finalMask, &mask, &gpuMask);
-
- // If there is a non empty set, use it to set affinity
- if (CPU_COUNT(&finalMask)) {
- char affinityStr[sizeof(cpu_set_t)*2];
- NCCLCHECK(ncclCpusetToStr(&finalMask, affinityStr));
- INFO(NCCL_INIT, "Setting affinity for GPU %d to %s", cudaDev, affinityStr);
- SYSCHECK(sched_setaffinity(0, sizeof(cpu_set_t), &finalMask), "sched_setaffinity");
- }
- return ncclSuccess;
-}
-
ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
- cpu_set_t affinitySave;
- sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave);
-
- NCCLCHECK(wrapNvmlSymbols());
- NCCLCHECK(wrapNvmlInit());
-
- // Make sure all host memory allocation are close to the GPU
- CUDACHECK(cudaSetDevice(cudaDev));
- NCCLCHECK(setCpuAffinity(cudaDev));
ncclResult_t res;
+ CUDACHECK(cudaSetDevice(cudaDev));
NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup);
NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup);
NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup);
- sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave);
- NCCLCHECKGOTO(wrapNvmlShutdown(), res, cleanup);
-
INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId);
return ncclSuccess;
cleanup:
if ((*newcomm) && (*newcomm)->bootstrap) bootstrapAbort((*newcomm)->bootstrap);
*newcomm = NULL;
- sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave);
return res;
}