diff options
Diffstat (limited to 'src/init.cc')
-rw-r--r-- | src/init.cc | 396 |
1 files changed, 256 insertions, 140 deletions
diff --git a/src/init.cc b/src/init.cc index 627f6c7..0a02760 100644 --- a/src/init.cc +++ b/src/init.cc @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -11,11 +11,10 @@ #include "transport.h" #include "group.h" #include "net.h" +#include "coll_net.h" #include "enqueue.h" #include "graph.h" #include "argcheck.h" -#include "cpuset.h" -#include <sched.h> #include <fcntl.h> #include <string.h> #include <errno.h> @@ -43,6 +42,7 @@ NCCL_PARAM(GroupCudaStream, "GROUP_CUDA_STREAM", NCCL_GROUP_CUDA_STREAM); NCCL_PARAM(CheckPointers, "CHECK_POINTERS", 0); ncclNet_t* ncclNet = NULL; +ncclCollNet_t* ncclCollNet = NULL; // Returns ncclInternalError if anything fails, causing that network to be ignored. ncclResult_t initNet(ncclNet_t* net) { @@ -53,7 +53,15 @@ ncclResult_t initNet(ncclNet_t* net) { return ncclSuccess; } -ncclResult_t initNetPlugin(ncclNet_t** net) { +ncclResult_t initCollNet(ncclCollNet_t* collnet) { + int ndev; + if (collnet->init(ncclDebugLog) != ncclSuccess) return ncclInternalError; + if (collnet->devices(&ndev) != ncclSuccess) return ncclInternalError; + if (ndev <= 0) return ncclSystemError; + return ncclSuccess; +} + +ncclResult_t initNetPlugin(ncclNet_t** net, ncclCollNet_t** collnet) { void* netPluginLib = dlopen("libnccl-net.so", RTLD_NOW | RTLD_LOCAL); if (netPluginLib == NULL) { // dlopen does not guarantee to set errno, but dlerror only gives us a @@ -69,13 +77,17 @@ ncclResult_t initNetPlugin(ncclNet_t** net) { ncclNet_t* extNet = (ncclNet_t*) dlsym(netPluginLib, STR(NCCL_PLUGIN_SYMBOL)); if (extNet == NULL) { INFO(NCCL_INIT|NCCL_NET, "NET/Plugin: Failed to find " STR(NCCL_PLUGIN_SYMBOL) " symbol."); - goto cleanup; - } - if (initNet(extNet) == ncclSuccess) { + } else if (initNet(extNet) == ncclSuccess) { *net = extNet; + // Check for CollNet + ncclCollNet_t* extCollNet = (ncclCollNet_t*) dlsym(netPluginLib, STR(NCCL_COLLNET_PLUGIN_SYMBOL)); + if (extCollNet == NULL) { + INFO(NCCL_INIT|NCCL_NET, "NET/Plugin: Failed to find " STR(NCCL_COLLNET_PLUGIN_SYMBOL) " symbol."); + } else if (initCollNet(extCollNet) == ncclSuccess) { + *collnet = extCollNet; + } return ncclSuccess; } -cleanup: if (netPluginLib != NULL) dlclose(netPluginLib); return ncclSuccess; } @@ -84,7 +96,7 @@ ncclResult_t initNet() { // Always initialize bootstrap network NCCLCHECK(bootstrapNetInit()); - NCCLCHECK(initNetPlugin(&ncclNet)); + NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet)); if (ncclNet != NULL) return ncclSuccess; if (initNet(&ncclNetIb) == ncclSuccess) { ncclNet = &ncclNetIb; @@ -95,6 +107,8 @@ ncclResult_t initNet() { return ncclSuccess; } +NCCL_PARAM(CollNetEnable, "COLLNET_ENABLE", 0); + pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; static bool initialized = false; static ncclResult_t ncclInit() { @@ -103,6 +117,7 @@ static ncclResult_t ncclInit() { if (!initialized) { initEnv(); initNet(); + INFO(NCCL_INIT, "Using network %s", ncclNetName()); initialized = true; } pthread_mutex_unlock(&initLock); @@ -220,6 +235,7 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { *comm->abortFlag = 0; comm->argsptr = &comm->args; + comm->collNetSupport = 0; *comret = comm; return ncclSuccess; @@ -233,7 +249,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) { // Copy userRanks and peers for (int r=0; r<comm->nChannels; r++) { NCCLCHECK(ncclCudaMemcpy(comm->channels[r].ring.devUserRanks, comm->channels[r].ring.userRanks, comm->nRanks)); - NCCLCHECK(ncclCudaMemcpy(comm->channels[r].devPeers, comm->channels[r].peers, comm->nRanks)); + NCCLCHECK(ncclCudaMemcpy(comm->channels[r].devPeers, comm->channels[r].peers, comm->nRanks+1)); } // Duplicate the dev comm on the device @@ -269,14 +285,8 @@ static ncclResult_t fillInfo(struct ncclComm* comm, struct ncclPeerInfo* info, u info->shmDev = statbuf.st_dev; info->busId = comm->busId; - int netDevs; - NCCLCHECK(ncclNetDevices(&netDevs)); - for (int n=0; n<netDevs; n++) { - int ptrSupport; - NCCLCHECK(ncclNetPtrSupport(n, &ptrSupport)); - if (ptrSupport & NCCL_PTR_CUDA) info->gdrSupport |= (1 << n); - } + NCCLCHECK(ncclGpuGdrSupport(&info->gdrSupport)); return ncclSuccess; } @@ -396,7 +406,7 @@ static ncclResult_t p2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclConnector* conn; for (int i=0; i<nrecv; i++) { int peer = peerRecv[i]; - if (peer == -1) continue; + if (peer == -1 || peer >= comm->nRanks) continue; conn = &channel->peers[peer].recv; if (conn->connected) { ++nSkippedRecv; continue; } memset(&connect, 0, sizeof(connect)); @@ -405,7 +415,7 @@ static ncclResult_t p2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, } for (int i=0; i<nsend; i++) { int peer = peerSend[i]; - if (peer == -1) continue; + if (peer == -1 || peer >= comm->nRanks) continue; conn = &channel->peers[peer].send; if (conn->connected) { ++nSkippedSend; continue; } memset(&connect, 0, sizeof(connect)); @@ -414,29 +424,148 @@ static ncclResult_t p2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, } for (int i=0; i<nsend; i++) { int peer = peerSend[i]; - if (peer == -1) continue; + if (peer == -1 || peer >= comm->nRanks) continue; conn = &channel->peers[peer].send; if (conn->connected) {++nSkippedSend; continue; } memset(&connect, 0, sizeof(connect)); NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect))); - NCCLCHECK(conn->transportComm->connect(&connect, conn)); + NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn)); conn->connected = 1; } for (int i=0; i<nrecv; i++) { int peer = peerRecv[i]; - if (peer == -1) continue; + if (peer == -1 || peer >= comm->nRanks) continue; conn = &channel->peers[peer].recv; if (conn->connected) {++nSkippedRecv; continue; } memset(&connect, 0, sizeof(connect)); NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect))); - NCCLCHECK(conn->transportComm->connect(&connect, conn)); + NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn)); conn->connected = 1; } TRACE(NCCL_INIT, "nsend %d nrecv %d nSkippedSend %u nSkippedRecv %u - DONE", nsend, nrecv, nSkippedSend, nSkippedRecv); return ncclSuccess; } +extern struct ncclTransport collNetTransport; + +// All ranks must participate in collNetSetup call +// type: 0 for send, 1 for recv +// return: 0 - unsupported, 1 - supported +static int collNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int collNetChannels, int rank, int nranks, int masterRank, int masterPeer, int nMasters, int type) { + int rankInCollNet = -1; + int supported = 0; + int isMaster = (rank == masterRank) ? 1 : 0; + struct { + int collNetRank; + ncclConnect connect; + } sendrecvExchange; + + // check if we can connect to collnet, whose root is the nranks-th rank + struct ncclPeerInfo *myInfo = comm->peerInfo+rank, *peerInfo = comm->peerInfo+nranks; + peerInfo->rank = nranks; + int ret = 1; + if (isMaster) { + NCCLCHECK(collNetTransport.canConnect(&ret, comm->topo, collNetGraph, myInfo, peerInfo)); + } + + // send master receives connect info from peer recv master + if (isMaster && type == 0) { + NCCLCHECK(bootstrapRecv(comm->bootstrap, masterPeer, &sendrecvExchange, sizeof(sendrecvExchange))); + rankInCollNet = sendrecvExchange.collNetRank; + INFO(NCCL_INIT, "CollNet [send] : rank %d collNetRank %d collNetNranks %d received connect from rank %d", rank, rankInCollNet, nMasters, masterPeer); + } + + // select + struct ncclPeer* root = channel->peers+nranks; + struct ncclConnector* conn = (type == 1) ? &root->recv : &root->send; + struct ncclTransportComm* transportComm = (type == 1) ? &(collNetTransport.recv) : &(collNetTransport.send); + conn->transportComm = transportComm; + // setup + struct ncclConnect myConnect; + if (isMaster && ret > 0) { + NCCLCHECK(transportComm->setup(comm->topo, collNetGraph, myInfo, peerInfo, &myConnect, conn, channel->buffSize, channel->id)); + } + // prepare connect handles + ncclResult_t res; + struct { + int isMaster; + ncclConnect connect; + } *allConnects = NULL; + ncclConnect *masterConnects = NULL; + NCCLCHECK(ncclCalloc(&masterConnects, nMasters)); + if (type == 1) { // recv side: AllGather + // all ranks must participate + NCCLCHECK(ncclCalloc(&allConnects, nranks)); + allConnects[rank].isMaster = isMaster; + memcpy(&(allConnects[rank].connect), &myConnect, sizeof(struct ncclConnect)); + NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allConnects, sizeof(*allConnects)), res, cleanup); + // consolidate + int c = 0; + for (int r = 0; r < nranks; r++) { + if (allConnects[r].isMaster) { + memcpy(masterConnects+c, &(allConnects[r].connect), sizeof(struct ncclConnect)); + if (r == rank) rankInCollNet = c; + c++; + } + } + } else { // send side : copy in connect info received from peer recv master + if (isMaster) memcpy(masterConnects+rankInCollNet, &(sendrecvExchange.connect), sizeof(struct ncclConnect)); + } + // connect + if (isMaster && ret > 0) { + NCCLCHECKGOTO(transportComm->connect(masterConnects, nMasters, rankInCollNet, conn), res, cleanup); + } + // recv side sends connect info to send side + if (isMaster && type == 1) { + sendrecvExchange.collNetRank = rankInCollNet; + memcpy(&sendrecvExchange.connect, masterConnects+rankInCollNet, sizeof(struct ncclConnect)); + NCCLCHECK(bootstrapSend(comm->bootstrap, masterPeer, &sendrecvExchange, sizeof(sendrecvExchange))); + INFO(NCCL_INIT, "CollNet [recv] : rank %d collNetRank %d collNetNranks %d sent connect to rank %d", rank, rankInCollNet, nMasters, masterPeer); + } + if (ret > 0) { + supported = 1; + } +cleanup: + if (allConnects != NULL) free(allConnects); + if (masterConnects != NULL) free(masterConnects); + return supported; +} + +static ncclResult_t checkCollNetSetup(struct ncclComm* comm, int rank, int collNetSetupFail) { + int nranks = comm->nRanks; + // AllGather collNet setup results + int* allGatherFailures; + NCCLCHECK(ncclCalloc(&allGatherFailures, nranks)); + allGatherFailures[rank] = collNetSetupFail; + NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGatherFailures, sizeof(int))); + for (int i=0; i<nranks; i++) { + if (allGatherFailures[i] != 0) { + collNetSetupFail = 1; + break; + } + } + free(allGatherFailures); + if (collNetSetupFail) { + if (rank == 0) WARN("Cannot initialize CollNet, using %s instead", ncclNetName()); + // Free collNet resources + for (int r=0; r<comm->nChannels; r++) { + struct ncclChannel* channel = comm->channels+r; + struct ncclPeer* peer = channel->peers+nranks; + if (peer->send.transportResources && peer->send.transportComm) NCCLCHECK(peer->send.transportComm->free(peer->send.transportResources)); + if (peer->recv.transportResources && peer->recv.transportComm) NCCLCHECK(peer->recv.transportComm->free(peer->recv.transportResources)); + peer->send.transportResources = NULL; // avoid double free + peer->recv.transportResources = NULL; // avoid double free + } + // Set support to 0 + comm->collNetSupport = 0; + } else { + comm->collNetSupport = 1; + } + return ncclSuccess; +} + NCCL_PARAM(CrossNic, "CROSS_NIC", 2); +NCCL_PARAM(GraphDumpFileRank, "GRAPH_DUMP_FILE_RANK", 0); static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) { // We use 3 AllGathers @@ -462,7 +591,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(fillInfo(comm, myInfo, commHash)); NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather1Data, sizeof(*allGather1Data))); - NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks)); + NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks+1)); // Extra rank to represent CollNet root for (int i = 0; i < nranks; i++) { memcpy(comm->peerInfo+i, &allGather1Data[i].peerInfo, sizeof(struct ncclPeerInfo)); if ((i != rank) && (comm->peerInfo[i].hostHash == myInfo->hostHash) && (comm->peerInfo[i].busId == myInfo->busId)) { @@ -481,60 +610,82 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(ncclTopoTrimSystem(comm->topo, comm)); // Recompute paths after trimming NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo)); - // Compute max speed to accelerate search - NCCLCHECK(ncclTopoGetMaxSpeed(comm->topo)); + // Init search + NCCLCHECK(ncclTopoSearchInit(comm->topo)); // Print final topology NCCLCHECK(ncclTopoPrint(comm->topo)); // Get rings and trees - struct ncclTopoGraph treeGraph; - treeGraph.pattern = NCCL_TOPO_PATTERN_SPLIT_TREE; - treeGraph.crossNic = ncclParamCrossNic(); - // We communicate only half the data between node with trees on 2 nodes. - NCCLCHECK(ncclTopoCompute(comm->topo, &treeGraph)); - NCCLCHECK(ncclTopoPrintGraph(comm->topo, &treeGraph)); struct ncclTopoGraph ringGraph; + ringGraph.id = 0; ringGraph.pattern = NCCL_TOPO_PATTERN_RING; ringGraph.crossNic = ncclParamCrossNic(); + ringGraph.collNet = 0; + ringGraph.minChannels = 1; + ringGraph.maxChannels = MAXCHANNELS/2; NCCLCHECK(ncclTopoCompute(comm->topo, &ringGraph)); NCCLCHECK(ncclTopoPrintGraph(comm->topo, &ringGraph)); + struct ncclTopoGraph treeGraph; + treeGraph.id = 1; + treeGraph.pattern = NCCL_TOPO_PATTERN_SPLIT_TREE; + treeGraph.crossNic = ncclParamCrossNic(); + treeGraph.collNet = 0; + treeGraph.minChannels = 1; + treeGraph.maxChannels = ringGraph.nChannels; + NCCLCHECK(ncclTopoCompute(comm->topo, &treeGraph)); + NCCLCHECK(ncclTopoPrintGraph(comm->topo, &treeGraph)); + + struct ncclTopoGraph collNetGraph; + collNetGraph.id = 2; + collNetGraph.pattern = NCCL_TOPO_PATTERN_TREE; + collNetGraph.collNet = 1; + collNetGraph.crossNic = ncclParamCrossNic(); + collNetGraph.minChannels = collNetGraph.maxChannels = ringGraph.nChannels; + NCCLCHECK(ncclTopoCompute(comm->topo, &collNetGraph)); + NCCLCHECK(ncclTopoPrintGraph(comm->topo, &collNetGraph)); + + if (comm->rank == ncclParamGraphDumpFileRank()) { + struct ncclTopoGraph* graphs[3] = { &ringGraph, &treeGraph, &collNetGraph }; + NCCLCHECK(ncclTopoDumpGraphs(comm->topo, 3, graphs)); + } + // AllGather3 - begin + struct ncclGraphInfo { + int sameChannels; + float speedIntra; + float speedInter; + int typeIntra; + }; struct { int cudaCompCap; int fullCudaCompCap; - int nvlink; int nChannels; - struct { - int sameChannels; - int speedIntra; - int speedInter; - int nvlink; - } tree; - struct { - int sameChannels; - int speedIntra; - int speedInter; - int nvlink; - } ring; + struct ncclGraphInfo tree; + struct ncclGraphInfo ring; + struct ncclGraphInfo collNet; struct ncclTopoRanks topoRanks; } *allGather3Data; NCCLCHECK(ncclCalloc(&allGather3Data, nranks)); allGather3Data[rank].cudaCompCap = ncclCudaCompCap(); - allGather3Data[rank].nvlink = treeGraph.nvlink; - allGather3Data[rank].nChannels = comm->nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); + allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = + std::min(treeGraph.nChannels, ringGraph.nChannels); allGather3Data[rank].tree.sameChannels = treeGraph.sameChannels; allGather3Data[rank].tree.speedIntra = treeGraph.speedIntra; allGather3Data[rank].tree.speedInter = treeGraph.speedInter; - allGather3Data[rank].tree.nvlink = treeGraph.nvlink; + allGather3Data[rank].tree.typeIntra = treeGraph.typeIntra; allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels; allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra; allGather3Data[rank].ring.speedInter = ringGraph.speedInter; - allGather3Data[rank].ring.nvlink = ringGraph.nvlink; + allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra; + allGather3Data[rank].collNet.sameChannels = collNetGraph.sameChannels; + allGather3Data[rank].collNet.speedIntra = collNetGraph.speedIntra; + allGather3Data[rank].collNet.speedInter = collNetGraph.speedInter; + allGather3Data[rank].collNet.typeIntra = collNetGraph.typeIntra; - NCCLCHECK(ncclTopoPreset(comm, &treeGraph, &ringGraph, &allGather3Data[rank].topoRanks)); + NCCLCHECK(ncclTopoPreset(comm, &treeGraph, &ringGraph, &collNetGraph, &allGather3Data[rank].topoRanks)); NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather3Data, sizeof(*allGather3Data))); @@ -562,9 +713,6 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm maxCompCap = std::max(allGather3Data[i].cudaCompCap, maxCompCap); } - comm->nvlink = 1; - for (int i = 0; i < nranks; i++) comm->nvlink &= allGather3Data[i].nvlink; - int nChannelsOrig = comm->nChannels; struct ncclTopoRanks** allTopoRanks; NCCLCHECK(ncclCalloc(&allTopoRanks, comm->nRanks)); @@ -575,11 +723,15 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm treeGraph.sameChannels = std::min(allGather3Data[i].tree.sameChannels, treeGraph.sameChannels); treeGraph.speedIntra = std::min(allGather3Data[i].tree.speedIntra, treeGraph.speedIntra); treeGraph.speedInter = std::min(allGather3Data[i].tree.speedInter, treeGraph.speedInter); - treeGraph.nvlink = std::min(allGather3Data[i].tree.nvlink, treeGraph.nvlink); + treeGraph.typeIntra = std::min(allGather3Data[i].tree.typeIntra, treeGraph.typeIntra); ringGraph.sameChannels = std::min(allGather3Data[i].ring.sameChannels, ringGraph.sameChannels); ringGraph.speedIntra = std::min(allGather3Data[i].ring.speedIntra, ringGraph.speedIntra); ringGraph.speedInter = std::min(allGather3Data[i].ring.speedInter, ringGraph.speedInter); - ringGraph.nvlink = std::min(allGather3Data[i].ring.nvlink, ringGraph.nvlink); + ringGraph.typeIntra = std::min(allGather3Data[i].ring.typeIntra, ringGraph.typeIntra); + collNetGraph.sameChannels = std::min(allGather3Data[i].collNet.sameChannels, collNetGraph.sameChannels); + collNetGraph.speedIntra = std::min(allGather3Data[i].collNet.speedIntra, collNetGraph.speedIntra); + collNetGraph.speedInter = std::min(allGather3Data[i].collNet.speedInter, collNetGraph.speedInter); + collNetGraph.typeIntra = std::min(allGather3Data[i].collNet.typeIntra, collNetGraph.typeIntra); } if (comm->nChannels < nChannelsOrig) { @@ -592,6 +744,11 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm NCCLCHECK(ncclCalloc(&rings, nranks*MAXCHANNELS)); NCCLCHECK(ncclTopoPostset(comm, nodesFirstRank, allTopoRanks, rings)); + if (comm->nNodes > 1 && + ncclParamCollNetEnable() == 1 && + collNetSupport()) { + NCCLCHECK(ncclTopoConnectCollNet(comm, &collNetGraph, rank)); + } free(allTopoRanks); free(nodesFirstRank); @@ -601,7 +758,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm TRACE(NCCL_INIT, "rank %d nranks %d - BUILT %d TREES/RINGS", rank, nranks, comm->nChannels); - NCCLCHECK(ncclSetThresholds(comm, minCompCap, maxCompCap, &treeGraph, &ringGraph)); + NCCLCHECK(ncclTopoSetThresholds(comm, minCompCap, maxCompCap, &treeGraph, &ringGraph, &collNetGraph)); char line[1024]; line[0]='\0'; @@ -615,21 +772,58 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm line[1023] = '\0'; INFO(NCCL_INIT, "Trees%s", line); + // Set Affinity to a CPU local the our GPU, so that all memory we allocate + // on the host is local. + cpu_set_t affinitySave; + sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave); + NCCLCHECK(ncclTopoSetAffinity(comm->topo, comm->rank)); + ncclResult_t ret; + // Connect with prev/next for each ring struct ncclConnect *connect; - NCCLCHECK(ncclCalloc(&connect, 2)); + NCCLCHECKGOTO(ncclCalloc(&connect, 2), ret, affinity_restore); for (int c=0; c<comm->nChannels; c++) { struct ncclChannel* channel = comm->channels+c; - NCCLCHECK(setupChannel(comm, c, rank, nranks, rings+c*nranks)); + NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore); if (comm->nRanks == 1) continue; - NCCLCHECK(p2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next)); - NCCLCHECK(p2pSetup(comm, &treeGraph, channel, NCCL_MAX_TREE_ARITY, channel->treeUp.down, 1, &channel->treeUp.up)); - NCCLCHECK(p2pSetup(comm, &treeGraph, channel, 1, &channel->treeDn.up, NCCL_MAX_TREE_ARITY, channel->treeDn.down)); + NCCLCHECKGOTO(p2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore); + NCCLCHECKGOTO(p2pSetup(comm, &treeGraph, channel, NCCL_MAX_TREE_ARITY, channel->treeUp.down, 1, &channel->treeUp.up), ret, affinity_restore); + NCCLCHECKGOTO(p2pSetup(comm, &treeGraph, channel, 1, &channel->treeDn.up, NCCL_MAX_TREE_ARITY, channel->treeDn.down), ret, affinity_restore); + } + + // Check if we can setup CollNet + if (comm->nNodes > 1 && + ncclParamCollNetEnable() == 1 && + collNetSupport()) { + int logicChannels = comm->nChannels/2; + int collNetSetupFail = 0; + const int recvIndex = 0; // recv GPU index is always 0 + const int sendIndex = collNetGraph.pattern == NCCL_TOPO_PATTERN_TREE ? 0 : 1; // send GPU index depends on topo pattern + for (int c=0; c<logicChannels; c++) { + struct ncclChannel* channelRecv = comm->channels+logicChannels+c; + struct ncclChannel* channelSend = comm->channels+c; + NCCLCHECK(p2pSetup(comm, &collNetGraph, channelRecv, 1, &channelRecv->collTreeDn.up, 1, channelRecv->collTreeDn.down)); + NCCLCHECK(p2pSetup(comm, &collNetGraph, channelSend, 1, channelSend->collTreeUp.down, 1, &channelSend->collTreeUp.up)); + const int recvMaster = collNetGraph.intra[c*comm->localRanks+recvIndex]; + const int sendMaster = collNetGraph.intra[c*comm->localRanks+sendIndex]; + if (collNetSetup(comm, &collNetGraph, channelRecv, logicChannels, rank, nranks, recvMaster, sendMaster, comm->nNodes, 1) != 1) + collNetSetupFail = 1; + if (collNetSetup(comm, &collNetGraph, channelSend, logicChannels, rank, nranks, sendMaster, recvMaster, comm->nNodes, 0) != 1) + collNetSetupFail = 1; + } + // Verify CollNet setup across ranks + NCCLCHECK(checkCollNetSetup(comm, rank, collNetSetupFail)); } TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels); free(connect); free(rings); + // We should have allocated all buffers, collective fifos, ... we can + // restore the affinity. +affinity_restore: + sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave); + if (ret != ncclSuccess) return ret; + // Compute intra ranks (using AllGather1 data) int intraRank0 = -1, intraRank = -1, intraRanks = 0; for (int i = 0; i < nranks; i++) { @@ -658,98 +852,20 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm return ncclSuccess; } -static ncclResult_t getCpuGpuAffinity(int cudaDev, cpu_set_t* mask) { - CPU_ZERO_S(sizeof(cpu_set_t), mask); - char* cudaPath; - NCCLCHECK(ncclTopoCudaPath(cudaDev, &cudaPath)); - char path[PATH_MAX]; - strncpy(path, cudaPath, PATH_MAX-1); - snprintf(path+strlen(path), PATH_MAX-1-strlen(path), "/local_cpus"); - path[PATH_MAX-1] = '\0'; - int fd; - SYSCHECKVAL(open(path, O_RDONLY), "open", fd); - char affinityStr[sizeof(cpu_set_t)*2 + 1]; - int r = read(fd, affinityStr, sizeof(cpu_set_t)*2); - if (r > 0) { - affinityStr[r] = '\0'; - NCCLCHECK(ncclStrToCpuset(affinityStr, mask)); - } - close(fd); - free(cudaPath); - return ncclSuccess; -} - -NCCL_PARAM(IgnoreCpuAffinity, "IGNORE_CPU_AFFINITY", 0); - -static ncclResult_t setCpuAffinity(int cudaDev) { - // Query the CPU affinity set we were provided - cpu_set_t mask; - SYSCHECK(sched_getaffinity(0, sizeof(cpu_set_t), &mask), "sched_getaffinity"); - -#ifdef ENABLE_TRACE - { - char affinityStr[sizeof(cpu_set_t)*2]; - NCCLCHECK(ncclCpusetToStr(&mask, affinityStr)); - TRACE(NCCL_INIT, "Current affinity for GPU %d is %s", cudaDev, affinityStr); - } -#endif - - // Find the CPUs that are local to the supplied GPU - cpu_set_t gpuMask; - NCCLCHECK(getCpuGpuAffinity(cudaDev, &gpuMask)); - -#ifdef ENABLE_TRACE - { - char affinityStr[sizeof(cpu_set_t)*2]; - NCCLCHECK(ncclCpusetToStr(&gpuMask, affinityStr)); - TRACE(NCCL_INIT, "CPU GPU affinity for GPU %d is %s", cudaDev, affinityStr); - } -#endif - - cpu_set_t finalMask; - if (ncclParamIgnoreCpuAffinity()) - // Ignore the CPU affinity set and use the GPU one instead - finalMask = gpuMask; - else - // Use a subset of the GPU affinity set - CPU_AND(&finalMask, &mask, &gpuMask); - - // If there is a non empty set, use it to set affinity - if (CPU_COUNT(&finalMask)) { - char affinityStr[sizeof(cpu_set_t)*2]; - NCCLCHECK(ncclCpusetToStr(&finalMask, affinityStr)); - INFO(NCCL_INIT, "Setting affinity for GPU %d to %s", cudaDev, affinityStr); - SYSCHECK(sched_setaffinity(0, sizeof(cpu_set_t), &finalMask), "sched_setaffinity"); - } - return ncclSuccess; -} - ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) { - cpu_set_t affinitySave; - sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave); - - NCCLCHECK(wrapNvmlSymbols()); - NCCLCHECK(wrapNvmlInit()); - - // Make sure all host memory allocation are close to the GPU - CUDACHECK(cudaSetDevice(cudaDev)); - NCCLCHECK(setCpuAffinity(cudaDev)); ncclResult_t res; + CUDACHECK(cudaSetDevice(cudaDev)); NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup); NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup); NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup); - sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave); - NCCLCHECKGOTO(wrapNvmlShutdown(), res, cleanup); - INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId); return ncclSuccess; cleanup: if ((*newcomm) && (*newcomm)->bootstrap) bootstrapAbort((*newcomm)->bootstrap); *newcomm = NULL; - sched_setaffinity(0, sizeof(cpu_set_t), &affinitySave); return res; } |