diff options
-rw-r--r-- | makefiles/version.mk | 2 | ||||
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/enqueue.cc | 38 | ||||
-rw-r--r-- | src/group.cc | 4 | ||||
-rw-r--r-- | src/include/enqueue.h | 1 | ||||
-rw-r--r-- | src/include/param.h | 82 | ||||
-rw-r--r-- | src/include/socket.h | 2 | ||||
-rw-r--r-- | src/init.cc | 17 | ||||
-rw-r--r-- | src/misc/argcheck.cc | 7 | ||||
-rw-r--r-- | src/misc/param.cc | 81 | ||||
-rw-r--r-- | src/misc/socket.cc | 10 | ||||
-rw-r--r-- | src/proxy.cc | 6 | ||||
-rw-r--r-- | src/transport/coll_net.cc | 34 | ||||
-rw-r--r-- | src/transport/net.cc | 2 | ||||
-rw-r--r-- | src/transport/net_ib.cc | 23 | ||||
-rw-r--r-- | src/transport/net_socket.cc | 6 |
16 files changed, 203 insertions, 114 deletions
diff --git a/makefiles/version.mk b/makefiles/version.mk index e7fe35e..7c9bf0f 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 12 -NCCL_PATCH := 7 +NCCL_PATCH := 10 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/Makefile b/src/Makefile index 65c8b28..1dfb5ee 100644 --- a/src/Makefile +++ b/src/Makefile @@ -10,7 +10,7 @@ include ../makefiles/version.mk ##### src files INCEXPORTS := nccl.h nccl_net.h LIBSRCFILES := init.cc channel.cc bootstrap.cc transport.cc enqueue.cc group.cc debug.cc proxy.cc enhcompat.cc net.cc \ - misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc \ + misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc misc/param.cc \ transport/p2p.cc transport/shm.cc transport/net.cc transport/net_socket.cc transport/net_ib.cc transport/coll_net.cc \ collectives/sendrecv.cc collectives/all_reduce.cc collectives/all_gather.cc collectives/broadcast.cc collectives/reduce.cc collectives/reduce_scatter.cc \ graph/topo.cc graph/paths.cc graph/search.cc graph/connect.cc graph/rings.cc graph/trees.cc graph/tuning.cc graph/xml.cc diff --git a/src/enqueue.cc b/src/enqueue.cc index d28191b..a15c370 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -125,6 +125,19 @@ error: return (res != ncclSuccess) ? 0 : max; } +// Set shared memory carveout for the nccl kernels +ncclResult_t ncclKernSetSharedMemoryCarveout(int carveOut) { + ncclResult_t res = ncclSuccess; + int numNcclKerns = sizeof(ncclKerns)/sizeof(ncclKerns[0]); + for (int i = 0; i < numNcclKerns; i++) { + CUDACHECKGOTO(cudaFuncSetAttribute(ncclKerns[i], cudaFuncAttributePreferredSharedMemoryCarveout, carveOut), res, error); + } + +error: + return res; +} + + /*****************************************************************************/ /* Launch system : synchronization and CUDA kernel launch */ /*****************************************************************************/ @@ -705,17 +718,6 @@ static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) { params->blockDim.x = std::max<unsigned>(params->blockDim.x, info->nThreads); comm->enqueueInfo->maxChannels = params->gridDim.x; // params may be varied by a second graph hence we need to capture it here - // Register and exchange input and output buffers - if (comm->usingCudaGraph && // only in CUDA graph mode - comm->graphRegister == 1 && // when registration is enabled - info->algorithm == NCCL_ALGO_COLLNET && // limited to CollNet for now - comm->intraHighestTransportType == TRANSPORT_P2P && // only when all ranks can p2p each other - comm->intraRanks == 1) { // only in multi-process mode - NCCLCHECK(ncclRegBuffAndExchange(info, &eqElem->buffRegInfo)); - comm->enqueueInfo->nRegBuffs += eqElem->buffRegInfo.nBuffs; - work->header.type = ncclWorkTypeRegColl; - } - // Inline the first kernel if (params->func == NULL) { params->func = ncclKerns[work->header.funcIndex]; @@ -728,6 +730,20 @@ static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) { } } + // Register and exchange input and output buffers + if (comm->usingCudaGraph && // only in CUDA graph mode + comm->graphRegister == 1 && // when registration is enabled + info->algorithm == NCCL_ALGO_COLLNET && // limited to CollNet for now + comm->intraHighestTransportType == TRANSPORT_P2P && // only when all ranks can p2p each other + comm->intraRanks == 1) { // only in multi-process mode + NCCLCHECK(ncclRegBuffAndExchange(info, &eqElem->buffRegInfo)); + comm->enqueueInfo->nRegBuffs += eqElem->buffRegInfo.nBuffs; + work->header.type = ncclWorkTypeRegColl; + // Disable inline argument because we need kernel to copy the entire ncclWork from workFifo + // because the registered addresses are in ncclWorkElemReg + comm->args.header.type = ncclWorkTypeUnused; + } + return ncclSuccess; } diff --git a/src/group.cc b/src/group.cc index 0e8f19e..f2b9e37 100644 --- a/src/group.cc +++ b/src/group.cc @@ -274,8 +274,8 @@ sched_delta: if (sendbytes > sendChunkSize) { sendbytes = sendChunkSize; } else { sendRemaining = 0; } // 0-bytes send/recv are considered as syncs. Make sure we only add syncs when requested // (total size == 0), otherwise set size to -1. - if (sendbytes <= 0 && totSendBytes != 0) send = NULL; - if (recvbytes <= 0 && totRecvBytes != 0) recv = NULL; + if (sendbytes < 0 || (sendbytes == 0 && totSendBytes != 0)) send = NULL; + if (recvbytes < 0 || (recvbytes == 0 && totRecvBytes != 0)) recv = NULL; if (recv) { NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, channelId, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup); } diff --git a/src/include/enqueue.h b/src/include/enqueue.h index 02a9adb..282342b 100644 --- a/src/include/enqueue.h +++ b/src/include/enqueue.h @@ -15,6 +15,7 @@ #define NCCL_AGG_CHANNEL_SIZE (1LL << 21) /* 2 MiB, ideal per-channel size to fully utilize bandwidth */ size_t ncclKernMaxLocalSize(); +ncclResult_t ncclKernSetSharedMemoryCarveout(int carveOut); ncclResult_t ncclEnqueueCheck(struct ncclInfo* info); ncclResult_t ncclCpuBarrierIn(struct ncclComm* comm, int* isLast); ncclResult_t ncclCpuBarrierLast(struct ncclComm* comm); diff --git a/src/include/param.h b/src/include/param.h index 7f749fb..c95b67c 100644 --- a/src/include/param.h +++ b/src/include/param.h @@ -7,77 +7,23 @@ #ifndef NCCL_PARAM_H_ #define NCCL_PARAM_H_ -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <sys/types.h> -#include <pwd.h> +#include <stdint.h> -static const char* userHomeDir() { - struct passwd *pwUser = getpwuid(getuid()); - return pwUser == NULL ? NULL : pwUser->pw_dir; -} +const char* userHomeDir(); +void setEnvFile(const char* fileName); +void initEnv(); -static void setEnvFile(const char* fileName) { - FILE * file = fopen(fileName, "r"); - if (file == NULL) return; +void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache); - char *line = NULL; - char envVar[1024]; - char envValue[1024]; - size_t n = 0; - ssize_t read; - while ((read = getline(&line, &n, file)) != -1) { - if (line[read-1] == '\n') line[read-1] = '\0'; - int s=0; // Env Var Size - while (line[s] != '\0' && line[s] != '=') s++; - if (line[s] == '\0') continue; - strncpy(envVar, line, std::min(1023,s)); - envVar[s] = '\0'; - s++; - strncpy(envValue, line+s, 1023); - envValue[1023]='\0'; - setenv(envVar, envValue, 0); - //printf("%s : %s->%s\n", fileName, envVar, envValue); - } - if (line) free(line); - fclose(file); -} - -static void initEnv() { - char confFilePath[1024]; - const char * userDir = userHomeDir(); - if (userDir) { - sprintf(confFilePath, "%s/.nccl.conf", userDir); - setEnvFile(confFilePath); - } - sprintf(confFilePath, "/etc/nccl.conf"); - setEnvFile(confFilePath); -} - - -#define NCCL_PARAM(name, env, default_value) \ -pthread_mutex_t ncclParamMutex##name = PTHREAD_MUTEX_INITIALIZER; \ -int64_t ncclParam##name() { \ - static_assert(default_value != -1LL, "default value cannot be -1"); \ - static int64_t value = -1LL; \ - pthread_mutex_lock(&ncclParamMutex##name); \ - if (value == -1LL) { \ - value = default_value; \ - char* str = getenv("NCCL_" env); \ - if (str && strlen(str) > 0) { \ - errno = 0; \ - int64_t v = strtoll(str, NULL, 0); \ - if (errno) { \ - INFO(NCCL_ALL,"Invalid value %s for %s, using default %lu.", str, "NCCL_" env, value); \ - } else { \ - value = v; \ - INFO(NCCL_ALL,"%s set by environment to %lu.", "NCCL_" env, value); \ - } \ +#define NCCL_PARAM(name, env, deftVal) \ + int64_t ncclParam##name() { \ + constexpr int64_t uninitialized = INT64_MIN; \ + static_assert(deftVal != uninitialized, "default value cannot be the uninitialized value."); \ + static int64_t cache = uninitialized; \ + if (__builtin_expect(__atomic_load_n(&cache, __ATOMIC_RELAXED) == uninitialized, false)) { \ + ncclLoadParam("NCCL_" env, deftVal, uninitialized, &cache); \ } \ - } \ - pthread_mutex_unlock(&ncclParamMutex##name); \ - return value; \ -} + return cache; \ + } #endif diff --git a/src/include/socket.h b/src/include/socket.h index 53fda4d..d72480b 100644 --- a/src/include/socket.h +++ b/src/include/socket.h @@ -44,7 +44,7 @@ struct ncclSocket { enum ncclSocketState state; }; -const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf); +const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm = 1); ncclResult_t ncclGetSocketAddrFromString(union ncclSocketAddress* ua, const char* ip_port_pair); int ncclFindInterfaceMatchSubnet(char* ifNames, union ncclSocketAddress* localAddrs, union ncclSocketAddress* remoteAddr, int ifNameMaxSize, int maxIfs); int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNameMaxSize, int maxIfs); diff --git a/src/init.cc b/src/init.cc index 4da8dfd..29bfa01 100644 --- a/src/init.cc +++ b/src/init.cc @@ -59,7 +59,8 @@ ncclResult_t initGdrCopy() { return ncclSuccess; } -NCCL_PARAM(CollNetEnable, "COLLNET_ENABLE", 0); + +NCCL_PARAM(L1SharedMemoryCarveout, "L1_SHARED_MEMORY_CARVEOUT", 0); pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; static bool initialized = false; @@ -71,6 +72,8 @@ static ncclResult_t ncclInit() { initEnv(); initGdrCopy(); maxLocalSizeBytes = ncclKernMaxLocalSize(); + int carveout = ncclParamL1SharedMemoryCarveout(); + if (carveout) ncclKernSetSharedMemoryCarveout(carveout); NCCLCHECK(ncclNetInit()); INFO(NCCL_INIT, "Using network %s", ncclNetName()); initialized = true; @@ -529,7 +532,16 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm } // Determine local CollNet support before all-gather - if (ncclParamCollNetEnable() == 1 && collNetSupport() == 1 && collNetGraph.nChannels > 0) comm->collNetSupport = 1; + if (collNetSupport()) { + char *collNetEnable = getenv("NCCL_COLLNET_ENABLE"); + if (collNetEnable != NULL) { + INFO(NCCL_ALL, "NCCL_COLLNET_ENABLE set by environment to %s.", collNetEnable); + if (strcmp(collNetEnable, "1") == 0) { + comm->collNetSupport = 1; + } + } + } + if (comm->collNetSupport == 1 && collNetGraph.nChannels <= 0) comm->collNetSupport = 0; // AllGather3 - begin struct ncclGraphInfo { @@ -832,7 +844,6 @@ collnet_cleanup: // Connect to local net proxy struct ncclProxyConnector proxyConn; - NCCLCHECK(ncclTopoGetLocalRank(comm->topo, comm->rank, &proxyConn.localRank)); NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_NET, 1, comm->rank, &proxyConn)); NCCLCHECK(ncclProxyCall(&proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0)); diff --git a/src/misc/argcheck.cc b/src/misc/argcheck.cc index 1c5ba3c..5406bf0 100644 --- a/src/misc/argcheck.cc +++ b/src/misc/argcheck.cc @@ -11,7 +11,7 @@ static ncclResult_t CudaPtrCheck(const void* pointer, struct ncclComm* comm, con cudaPointerAttributes attr; cudaError_t err = cudaPointerGetAttributes(&attr, pointer); if (err != cudaSuccess || attr.devicePointer == NULL) { - WARN("%s : %s is not a valid pointer", opname, ptrname); + WARN("%s : %s %p is not a valid pointer", opname, ptrname, pointer); return ncclInvalidArgument; } #if CUDART_VERSION >= 10000 @@ -63,8 +63,9 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) { } if (info->comm->checkPointers) { - if ((info->coll == ncclFuncSend || info->coll == ncclFuncRecv) && info->count > 0) { - NCCLCHECK(CudaPtrCheck(info->recvbuff, info->comm, "buff", info->opName)); + if ((info->coll == ncclFuncSend || info->coll == ncclFuncRecv)) { + if (info->count >0) + NCCLCHECK(CudaPtrCheck(info->recvbuff, info->comm, "buff", info->opName)); } else { // Check CUDA device pointers if (info->coll != ncclFuncBroadcast || info->comm->rank == info->root) { diff --git a/src/misc/param.cc b/src/misc/param.cc new file mode 100644 index 0000000..48099b3 --- /dev/null +++ b/src/misc/param.cc @@ -0,0 +1,81 @@ +/************************************************************************* + * Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "param.h" +#include "debug.h" + +#include <algorithm> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <unistd.h> +#include <pthread.h> +#include <pwd.h> + +const char* userHomeDir() { + struct passwd *pwUser = getpwuid(getuid()); + return pwUser == NULL ? NULL : pwUser->pw_dir; +} + +void setEnvFile(const char* fileName) { + FILE * file = fopen(fileName, "r"); + if (file == NULL) return; + + char *line = NULL; + char envVar[1024]; + char envValue[1024]; + size_t n = 0; + ssize_t read; + while ((read = getline(&line, &n, file)) != -1) { + if (line[read-1] == '\n') line[read-1] = '\0'; + int s=0; // Env Var Size + while (line[s] != '\0' && line[s] != '=') s++; + if (line[s] == '\0') continue; + strncpy(envVar, line, std::min(1023,s)); + envVar[s] = '\0'; + s++; + strncpy(envValue, line+s, 1023); + envValue[1023]='\0'; + setenv(envVar, envValue, 0); + //printf("%s : %s->%s\n", fileName, envVar, envValue); + } + if (line) free(line); + fclose(file); +} + +void initEnv() { + char confFilePath[1024]; + const char * userDir = userHomeDir(); + if (userDir) { + sprintf(confFilePath, "%s/.nccl.conf", userDir); + setEnvFile(confFilePath); + } + sprintf(confFilePath, "/etc/nccl.conf"); + setEnvFile(confFilePath); +} + +void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache) { + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&mutex); + if (__atomic_load_n(cache, __ATOMIC_RELAXED) == uninitialized) { + char* str = getenv(env); + int64_t value = deftVal; + if (str && strlen(str) > 0) { + errno = 0; + value = strtoll(str, nullptr, 0); + if (errno) { + value = deftVal; + INFO(NCCL_ALL,"Invalid value %s for %s, using default %lld.", str, env, (long long)deftVal); + } else { + INFO(NCCL_ALL,"%s set by environment to %lld.", env, (long long)value); + } + } + __atomic_store_n(cache, value, __ATOMIC_RELAXED); + } + pthread_mutex_unlock(&mutex); +} diff --git a/src/misc/socket.cc b/src/misc/socket.cc index 4e3295f..ef2bea6 100644 --- a/src/misc/socket.cc +++ b/src/misc/socket.cc @@ -16,12 +16,16 @@ * * Output: "IPv4/IPv6 address<port>" */ -const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf) { +const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm /*= 1*/) { if (buf == NULL || addr == NULL) return NULL; struct sockaddr *saddr = &addr->sa; if (saddr->sa_family != AF_INET && saddr->sa_family != AF_INET6) { buf[0]='\0'; return buf; } char host[NI_MAXHOST], service[NI_MAXSERV]; - (void) getnameinfo(saddr, sizeof(union ncclSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST|NI_NUMERICSERV); + /* NI_NUMERICHOST: If set, then the numeric form of the hostname is returned. + * (When not set, this will still happen in case the node's name cannot be determined.) + */ + int flag = NI_NUMERICSERV | (numericHostForm ? NI_NUMERICHOST : 0); + (void) getnameinfo(saddr, sizeof(union ncclSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, flag); sprintf(buf, "%s<%s>", host, service); return buf; } @@ -516,7 +520,7 @@ ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int NCCLCHECK(ncclSocketProgressOpt(op, sock, ptr, size, offset, 0, &closed)); if (closed) { char line[SOCKET_NAME_MAXLEN+1]; - WARN("Net : Connection closed by remote peer %s", ncclSocketToString(&sock->addr, line)); + WARN("Net : Connection closed by remote peer %s", ncclSocketToString(&sock->addr, line, 0)); return ncclSystemError; } return ncclSuccess; diff --git a/src/proxy.cc b/src/proxy.cc index d6fe309..e72a8eb 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -806,6 +806,7 @@ ncclResult_t ncclProxyCall(struct ncclProxyConnector* proxyConn, int type, void* return ncclSuccess; error: WARN("Proxy Call to rank %d failed (%s)", proxyConn->comm->localRankToRank[proxyConn->localRank], ncclProxyMsgTypeStr[type]); + sock->fd = -1; return ret; } @@ -870,8 +871,6 @@ ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm) { static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclProxyConnectionPool* connectionPool, struct ncclComm* comm) { struct ncclSocket* sock = &peer->sock; - char buf[SOCKET_NAME_MAXLEN+1]; - buf[SOCKET_NAME_MAXLEN] = '\0'; int id; struct ncclProxyConnection* connection; NCCLCHECK(ncclProxyNewConnection(connectionPool, &id)); @@ -889,8 +888,7 @@ static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclPr struct ncclProxyProgressState* state = &comm->proxyState.progressState; NCCLCHECK(ncclSocketSend(sock, state->opsPoolShmSuffix, sizeof("XXXXXX")-1)); } - buf[SOCKET_NAME_MAXLEN] = '\0'; - INFO(NCCL_NET, "New proxy %s connection %d from %s, transport %d", connection->send ? "send":"recv", id, ncclSocketToString(&sock->addr, buf), connection->transport); + INFO(NCCL_NET, "New proxy %s connection %d from local rank %d, transport %d", connection->send ? "send":"recv", id, connection->localRank, connection->transport); return ncclSuccess; } diff --git a/src/transport/coll_net.cc b/src/transport/coll_net.cc index 26f875f..771a18f 100644 --- a/src/transport/coll_net.cc +++ b/src/transport/coll_net.cc @@ -217,6 +217,9 @@ static ncclResult_t sendConnect(struct ncclComm* comm, struct ncclConnect* conne struct connectMap* map; NCCLCHECK(ncclProxyCall(&send->proxyConn, ncclProxyMsgConnect, &args, sizeof(struct collNetConnectArgs), &map, sizeof(struct connectMap*))); + // If collnet connect failed, propagate error to fallback on regular p2p + if (map == NULL) return ncclSystemError; + //NCCLCHECK(collNetDumpMap(map)); struct ncclSendMem *sendMem = (struct ncclSendMem*) NCCL_NET_MAP_GET_POINTER(map, gpu, sendMem); @@ -240,6 +243,9 @@ static ncclResult_t recvConnect(struct ncclComm* comm, struct ncclConnect* conne struct connectMap* map; NCCLCHECK(ncclProxyCall(&recv->proxyConn, ncclProxyMsgConnect, &args, sizeof(struct collNetConnectArgs), &map, sizeof(struct connectMap*))); + // If collnet connect failed, propagate error to fallback on regular p2p + if (map == NULL) return ncclSystemError; + //NCCLCHECK(collNetDumpMap(map)); struct ncclSendMem *sendMem = (struct ncclSendMem*) NCCL_NET_MAP_GET_POINTER(map, gpu, sendMem); @@ -309,12 +315,15 @@ static ncclResult_t sharedConnect(struct ncclComm* comm, int netDev, struct nccl resources->collNetListenComms[netDev], resources->collNetComms+netDev); free(handlePtrs); - NCCLCHECK(ret); - // Close listen comm - NCCLCHECK(collNetCloseListen(resources->collNetListenComms[netDev])); + if (ret == ncclSuccess) { + // Close listen comm + NCCLCHECK(collNetCloseListen(resources->collNetListenComms[netDev])); + } else { + resources->collNetListenComms[netDev] = NULL; + } } *collNetComm = resources->collNetComms[netDev]; - resources->commRefCount[netDev]++; + if (*collNetComm) resources->commRefCount[netDev]++; return ncclSuccess; } @@ -400,6 +409,13 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str resources->recvMhandles[p] = info->mhandles[p]; NCCLCHECK(sharedConnect(comm, resources->netDev, args->connectInfos, args->nranks, args->rank, &resources->collNetComm)); + + // Collnet connect is allowed to fail. Gracefully handle that case by returning NULL to the caller. + if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; } + if (resources->collNetComm == NULL) { + *((struct connectMap**)respBuff) = NULL; + return ncclSuccess; + } connection->proxyAppendPtr = comm->proxyState.progressState.collNet.proxyAppend+2*resources->netDev; struct connectMap* map = &resources->map; @@ -435,7 +451,6 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->sendMhandles[NCCL_PROTO_SIMPLE])); - if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; } *((struct connectMap**)respBuff) = &resources->map; return ncclSuccess; } @@ -449,6 +464,13 @@ static ncclResult_t recvProxyConnect(struct ncclProxyConnection* connection, str resources->collNetRank = args->rank; NCCLCHECK(sharedConnect(comm, resources->netDev, args->connectInfos, args->nranks, args->rank, &resources->collNetComm)); + + // Collnet connect is allowed to fail. Gracefully handle that case by returning NULL to the caller. + if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; } + if (resources->collNetComm == NULL) { + *((struct connectMap**)respBuff) = NULL; + return ncclSuccess; + } connection->proxyAppendPtr = comm->proxyState.progressState.collNet.proxyAppend+2*resources->netDev+1; struct connectMap* map = &resources->map; @@ -743,7 +765,7 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg int offset; NCCLCHECK(sharedBuffersGet(comm, 1, sharedBuffSlot, startChannel, &offset)); volatile int* offsFifo = (volatile int*)resources->recvMem->offsFifo; - offsFifo[buffSlot] = offset; + offsFifo[buffSlot] = offset + (s%COLLNET_GROUP_NSUBS)*args->chunkSize; __sync_synchronize(); volatile uint64_t* recvTail = resources->gdcSync ? resources->gdcSync : &resources->recvMem->tail; *recvTail = sub->base + sub->flushed; diff --git a/src/transport/net.cc b/src/transport/net.cc index 56f0315..e96f189 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -130,7 +130,7 @@ struct recvResources { uint64_t llLastCleaning; }; -NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", -2); +NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", 0); /* Determine if two peers can communicate with NET */ static ncclResult_t canConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) { diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 6e8f215..26b47be 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -43,7 +43,7 @@ struct ncclIbMrCache { }; static int ncclNIbDevs = -1; -struct ncclIbDev { +struct alignas(64) ncclIbDev { pthread_mutex_t lock; int device; uint64_t guid; @@ -841,7 +841,7 @@ ncclResult_t ncclIbRegMr(void* comm, void* data, int size, int type, void** mhan else { NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages*pageSize, flags), res, returning); } - TRACE(NCCL_INIT,"regAddr %llx size %lld rkey %x", (unsigned long long)addr, (long long)pages*PageSize, mr->rkey); + TRACE(NCCL_INIT,"regAddr %llx size %lld rkey %x", (unsigned long long)addr, (long long)pages*pageSize, mr->rkey); cache->population += 1; cache->slots[slot].addr = addr; cache->slots[slot].pages = pages; @@ -940,9 +940,11 @@ ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) { lastWr->next = NULL; lastWr->send_flags = IBV_SEND_SIGNALED; + // Multi-QP: make sure IB writes are multiples of 128B so that LL and LL128 protocols still work + const int align = 128; for (int q=0; q<comm->nqps; q++) { for (int r=0; r<nreqs; r++) { - int chunkSize = std::max(8, DIVUP(reqs[r]->send.size, comm->nqps)); + int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, comm->nqps), align) * align; int length = std::min(reqs[r]->send.size-reqs[r]->send.offset, chunkSize); if (length <= 0) { comm->wrs[r].sg_list = NULL; @@ -957,7 +959,7 @@ ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) { NCCLCHECK(wrap_ibv_post_send(comm->qps[q], comm->wrs, &bad_wr)); for (int r=0; r<nreqs; r++) { - int chunkSize = std::max(8, DIVUP(reqs[r]->send.size, comm->nqps)); + int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, comm->nqps), align) * align; reqs[r]->send.offset += chunkSize; comm->sges[r].addr += chunkSize; comm->wrs[r].wr.rdma.remote_addr += chunkSize; @@ -991,11 +993,16 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, int tag, void* mh if (reqs[r] != NULL || slots[r].tag != tag) continue; // Sanity checks to catch user collective call count/size mismatches - // plus any potential programming errors - if (size > slots[r].size || slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) { + if (size > slots[r].size) { char line[SOCKET_NAME_MAXLEN+1]; - WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error local size %d remote %d addr %lx rkey %x", - r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), size, slots[r].size, slots[r].addr, slots[r].rkey); + WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error, local size %d remote size %d", + r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), size, slots[r].size); + return ncclInvalidUsage; + } // plus any potential programming errors + else if (slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) { + char line[SOCKET_NAME_MAXLEN+1]; + WARN("NET/IB : req %d/%d tag %x peer %s posted incorrect receive info: size %d addr %lx rkey %x", + r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), slots[r].size, slots[r].addr, slots[r].rkey); return ncclInternalError; } struct ncclIbRequest* req; diff --git a/src/transport/net_socket.cc b/src/transport/net_socket.cc index d92c46f..9e14aa2 100644 --- a/src/transport/net_socket.cc +++ b/src/transport/net_socket.cc @@ -500,8 +500,10 @@ ncclResult_t ncclSocketTest(void* request, int* done, int* size) { // Check size is less or equal to the size provided by the user if (r->op == NCCL_SOCKET_RECV && data > r->size) { char line[SOCKET_NAME_MAXLEN+1]; - WARN("NET/Socket : peer %s message truncated : receiving %d bytes instead of %d", ncclSocketToString(&r->ctrlSock->addr, line), data, r->size); - return ncclInternalError; + WARN("NET/Socket : peer %s message truncated : receiving %d bytes instead of %d. If you believe your socket network is in healthy state, \ + there may be a mismatch in collective sizes or environment settings (e.g. NCCL_PROTO, NCCL_ALGO) between ranks", + ncclSocketToString(&r->ctrlSock->addr, line), data, r->size); + return ncclInvalidUsage; } r->size = data; r->offset = 0; |