Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/NVIDIA/nccl.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--makefiles/version.mk2
-rw-r--r--src/Makefile2
-rw-r--r--src/enqueue.cc38
-rw-r--r--src/group.cc4
-rw-r--r--src/include/enqueue.h1
-rw-r--r--src/include/param.h82
-rw-r--r--src/include/socket.h2
-rw-r--r--src/init.cc17
-rw-r--r--src/misc/argcheck.cc7
-rw-r--r--src/misc/param.cc81
-rw-r--r--src/misc/socket.cc10
-rw-r--r--src/proxy.cc6
-rw-r--r--src/transport/coll_net.cc34
-rw-r--r--src/transport/net.cc2
-rw-r--r--src/transport/net_ib.cc23
-rw-r--r--src/transport/net_socket.cc6
16 files changed, 203 insertions, 114 deletions
diff --git a/makefiles/version.mk b/makefiles/version.mk
index e7fe35e..7c9bf0f 100644
--- a/makefiles/version.mk
+++ b/makefiles/version.mk
@@ -1,6 +1,6 @@
##### version
NCCL_MAJOR := 2
NCCL_MINOR := 12
-NCCL_PATCH := 7
+NCCL_PATCH := 10
NCCL_SUFFIX :=
PKG_REVISION := 1
diff --git a/src/Makefile b/src/Makefile
index 65c8b28..1dfb5ee 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -10,7 +10,7 @@ include ../makefiles/version.mk
##### src files
INCEXPORTS := nccl.h nccl_net.h
LIBSRCFILES := init.cc channel.cc bootstrap.cc transport.cc enqueue.cc group.cc debug.cc proxy.cc enhcompat.cc net.cc \
- misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc \
+ misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc misc/param.cc \
transport/p2p.cc transport/shm.cc transport/net.cc transport/net_socket.cc transport/net_ib.cc transport/coll_net.cc \
collectives/sendrecv.cc collectives/all_reduce.cc collectives/all_gather.cc collectives/broadcast.cc collectives/reduce.cc collectives/reduce_scatter.cc \
graph/topo.cc graph/paths.cc graph/search.cc graph/connect.cc graph/rings.cc graph/trees.cc graph/tuning.cc graph/xml.cc
diff --git a/src/enqueue.cc b/src/enqueue.cc
index d28191b..a15c370 100644
--- a/src/enqueue.cc
+++ b/src/enqueue.cc
@@ -125,6 +125,19 @@ error:
return (res != ncclSuccess) ? 0 : max;
}
+// Set shared memory carveout for the nccl kernels
+ncclResult_t ncclKernSetSharedMemoryCarveout(int carveOut) {
+ ncclResult_t res = ncclSuccess;
+ int numNcclKerns = sizeof(ncclKerns)/sizeof(ncclKerns[0]);
+ for (int i = 0; i < numNcclKerns; i++) {
+ CUDACHECKGOTO(cudaFuncSetAttribute(ncclKerns[i], cudaFuncAttributePreferredSharedMemoryCarveout, carveOut), res, error);
+ }
+
+error:
+ return res;
+}
+
+
/*****************************************************************************/
/* Launch system : synchronization and CUDA kernel launch */
/*****************************************************************************/
@@ -705,17 +718,6 @@ static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) {
params->blockDim.x = std::max<unsigned>(params->blockDim.x, info->nThreads);
comm->enqueueInfo->maxChannels = params->gridDim.x; // params may be varied by a second graph hence we need to capture it here
- // Register and exchange input and output buffers
- if (comm->usingCudaGraph && // only in CUDA graph mode
- comm->graphRegister == 1 && // when registration is enabled
- info->algorithm == NCCL_ALGO_COLLNET && // limited to CollNet for now
- comm->intraHighestTransportType == TRANSPORT_P2P && // only when all ranks can p2p each other
- comm->intraRanks == 1) { // only in multi-process mode
- NCCLCHECK(ncclRegBuffAndExchange(info, &eqElem->buffRegInfo));
- comm->enqueueInfo->nRegBuffs += eqElem->buffRegInfo.nBuffs;
- work->header.type = ncclWorkTypeRegColl;
- }
-
// Inline the first kernel
if (params->func == NULL) {
params->func = ncclKerns[work->header.funcIndex];
@@ -728,6 +730,20 @@ static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) {
}
}
+ // Register and exchange input and output buffers
+ if (comm->usingCudaGraph && // only in CUDA graph mode
+ comm->graphRegister == 1 && // when registration is enabled
+ info->algorithm == NCCL_ALGO_COLLNET && // limited to CollNet for now
+ comm->intraHighestTransportType == TRANSPORT_P2P && // only when all ranks can p2p each other
+ comm->intraRanks == 1) { // only in multi-process mode
+ NCCLCHECK(ncclRegBuffAndExchange(info, &eqElem->buffRegInfo));
+ comm->enqueueInfo->nRegBuffs += eqElem->buffRegInfo.nBuffs;
+ work->header.type = ncclWorkTypeRegColl;
+ // Disable inline argument because we need kernel to copy the entire ncclWork from workFifo
+ // because the registered addresses are in ncclWorkElemReg
+ comm->args.header.type = ncclWorkTypeUnused;
+ }
+
return ncclSuccess;
}
diff --git a/src/group.cc b/src/group.cc
index 0e8f19e..f2b9e37 100644
--- a/src/group.cc
+++ b/src/group.cc
@@ -274,8 +274,8 @@ sched_delta:
if (sendbytes > sendChunkSize) { sendbytes = sendChunkSize; } else { sendRemaining = 0; }
// 0-bytes send/recv are considered as syncs. Make sure we only add syncs when requested
// (total size == 0), otherwise set size to -1.
- if (sendbytes <= 0 && totSendBytes != 0) send = NULL;
- if (recvbytes <= 0 && totRecvBytes != 0) recv = NULL;
+ if (sendbytes < 0 || (sendbytes == 0 && totSendBytes != 0)) send = NULL;
+ if (recvbytes < 0 || (recvbytes == 0 && totRecvBytes != 0)) recv = NULL;
if (recv) {
NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, channelId, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup);
}
diff --git a/src/include/enqueue.h b/src/include/enqueue.h
index 02a9adb..282342b 100644
--- a/src/include/enqueue.h
+++ b/src/include/enqueue.h
@@ -15,6 +15,7 @@
#define NCCL_AGG_CHANNEL_SIZE (1LL << 21) /* 2 MiB, ideal per-channel size to fully utilize bandwidth */
size_t ncclKernMaxLocalSize();
+ncclResult_t ncclKernSetSharedMemoryCarveout(int carveOut);
ncclResult_t ncclEnqueueCheck(struct ncclInfo* info);
ncclResult_t ncclCpuBarrierIn(struct ncclComm* comm, int* isLast);
ncclResult_t ncclCpuBarrierLast(struct ncclComm* comm);
diff --git a/src/include/param.h b/src/include/param.h
index 7f749fb..c95b67c 100644
--- a/src/include/param.h
+++ b/src/include/param.h
@@ -7,77 +7,23 @@
#ifndef NCCL_PARAM_H_
#define NCCL_PARAM_H_
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
+#include <stdint.h>
-static const char* userHomeDir() {
- struct passwd *pwUser = getpwuid(getuid());
- return pwUser == NULL ? NULL : pwUser->pw_dir;
-}
+const char* userHomeDir();
+void setEnvFile(const char* fileName);
+void initEnv();
-static void setEnvFile(const char* fileName) {
- FILE * file = fopen(fileName, "r");
- if (file == NULL) return;
+void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache);
- char *line = NULL;
- char envVar[1024];
- char envValue[1024];
- size_t n = 0;
- ssize_t read;
- while ((read = getline(&line, &n, file)) != -1) {
- if (line[read-1] == '\n') line[read-1] = '\0';
- int s=0; // Env Var Size
- while (line[s] != '\0' && line[s] != '=') s++;
- if (line[s] == '\0') continue;
- strncpy(envVar, line, std::min(1023,s));
- envVar[s] = '\0';
- s++;
- strncpy(envValue, line+s, 1023);
- envValue[1023]='\0';
- setenv(envVar, envValue, 0);
- //printf("%s : %s->%s\n", fileName, envVar, envValue);
- }
- if (line) free(line);
- fclose(file);
-}
-
-static void initEnv() {
- char confFilePath[1024];
- const char * userDir = userHomeDir();
- if (userDir) {
- sprintf(confFilePath, "%s/.nccl.conf", userDir);
- setEnvFile(confFilePath);
- }
- sprintf(confFilePath, "/etc/nccl.conf");
- setEnvFile(confFilePath);
-}
-
-
-#define NCCL_PARAM(name, env, default_value) \
-pthread_mutex_t ncclParamMutex##name = PTHREAD_MUTEX_INITIALIZER; \
-int64_t ncclParam##name() { \
- static_assert(default_value != -1LL, "default value cannot be -1"); \
- static int64_t value = -1LL; \
- pthread_mutex_lock(&ncclParamMutex##name); \
- if (value == -1LL) { \
- value = default_value; \
- char* str = getenv("NCCL_" env); \
- if (str && strlen(str) > 0) { \
- errno = 0; \
- int64_t v = strtoll(str, NULL, 0); \
- if (errno) { \
- INFO(NCCL_ALL,"Invalid value %s for %s, using default %lu.", str, "NCCL_" env, value); \
- } else { \
- value = v; \
- INFO(NCCL_ALL,"%s set by environment to %lu.", "NCCL_" env, value); \
- } \
+#define NCCL_PARAM(name, env, deftVal) \
+ int64_t ncclParam##name() { \
+ constexpr int64_t uninitialized = INT64_MIN; \
+ static_assert(deftVal != uninitialized, "default value cannot be the uninitialized value."); \
+ static int64_t cache = uninitialized; \
+ if (__builtin_expect(__atomic_load_n(&cache, __ATOMIC_RELAXED) == uninitialized, false)) { \
+ ncclLoadParam("NCCL_" env, deftVal, uninitialized, &cache); \
} \
- } \
- pthread_mutex_unlock(&ncclParamMutex##name); \
- return value; \
-}
+ return cache; \
+ }
#endif
diff --git a/src/include/socket.h b/src/include/socket.h
index 53fda4d..d72480b 100644
--- a/src/include/socket.h
+++ b/src/include/socket.h
@@ -44,7 +44,7 @@ struct ncclSocket {
enum ncclSocketState state;
};
-const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf);
+const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm = 1);
ncclResult_t ncclGetSocketAddrFromString(union ncclSocketAddress* ua, const char* ip_port_pair);
int ncclFindInterfaceMatchSubnet(char* ifNames, union ncclSocketAddress* localAddrs, union ncclSocketAddress* remoteAddr, int ifNameMaxSize, int maxIfs);
int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNameMaxSize, int maxIfs);
diff --git a/src/init.cc b/src/init.cc
index 4da8dfd..29bfa01 100644
--- a/src/init.cc
+++ b/src/init.cc
@@ -59,7 +59,8 @@ ncclResult_t initGdrCopy() {
return ncclSuccess;
}
-NCCL_PARAM(CollNetEnable, "COLLNET_ENABLE", 0);
+
+NCCL_PARAM(L1SharedMemoryCarveout, "L1_SHARED_MEMORY_CARVEOUT", 0);
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
static bool initialized = false;
@@ -71,6 +72,8 @@ static ncclResult_t ncclInit() {
initEnv();
initGdrCopy();
maxLocalSizeBytes = ncclKernMaxLocalSize();
+ int carveout = ncclParamL1SharedMemoryCarveout();
+ if (carveout) ncclKernSetSharedMemoryCarveout(carveout);
NCCLCHECK(ncclNetInit());
INFO(NCCL_INIT, "Using network %s", ncclNetName());
initialized = true;
@@ -529,7 +532,16 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
}
// Determine local CollNet support before all-gather
- if (ncclParamCollNetEnable() == 1 && collNetSupport() == 1 && collNetGraph.nChannels > 0) comm->collNetSupport = 1;
+ if (collNetSupport()) {
+ char *collNetEnable = getenv("NCCL_COLLNET_ENABLE");
+ if (collNetEnable != NULL) {
+ INFO(NCCL_ALL, "NCCL_COLLNET_ENABLE set by environment to %s.", collNetEnable);
+ if (strcmp(collNetEnable, "1") == 0) {
+ comm->collNetSupport = 1;
+ }
+ }
+ }
+ if (comm->collNetSupport == 1 && collNetGraph.nChannels <= 0) comm->collNetSupport = 0;
// AllGather3 - begin
struct ncclGraphInfo {
@@ -832,7 +844,6 @@ collnet_cleanup:
// Connect to local net proxy
struct ncclProxyConnector proxyConn;
- NCCLCHECK(ncclTopoGetLocalRank(comm->topo, comm->rank, &proxyConn.localRank));
NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_NET, 1, comm->rank, &proxyConn));
NCCLCHECK(ncclProxyCall(&proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0));
diff --git a/src/misc/argcheck.cc b/src/misc/argcheck.cc
index 1c5ba3c..5406bf0 100644
--- a/src/misc/argcheck.cc
+++ b/src/misc/argcheck.cc
@@ -11,7 +11,7 @@ static ncclResult_t CudaPtrCheck(const void* pointer, struct ncclComm* comm, con
cudaPointerAttributes attr;
cudaError_t err = cudaPointerGetAttributes(&attr, pointer);
if (err != cudaSuccess || attr.devicePointer == NULL) {
- WARN("%s : %s is not a valid pointer", opname, ptrname);
+ WARN("%s : %s %p is not a valid pointer", opname, ptrname, pointer);
return ncclInvalidArgument;
}
#if CUDART_VERSION >= 10000
@@ -63,8 +63,9 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) {
}
if (info->comm->checkPointers) {
- if ((info->coll == ncclFuncSend || info->coll == ncclFuncRecv) && info->count > 0) {
- NCCLCHECK(CudaPtrCheck(info->recvbuff, info->comm, "buff", info->opName));
+ if ((info->coll == ncclFuncSend || info->coll == ncclFuncRecv)) {
+ if (info->count >0)
+ NCCLCHECK(CudaPtrCheck(info->recvbuff, info->comm, "buff", info->opName));
} else {
// Check CUDA device pointers
if (info->coll != ncclFuncBroadcast || info->comm->rank == info->root) {
diff --git a/src/misc/param.cc b/src/misc/param.cc
new file mode 100644
index 0000000..48099b3
--- /dev/null
+++ b/src/misc/param.cc
@@ -0,0 +1,81 @@
+/*************************************************************************
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved.
+ *
+ * See LICENSE.txt for license information
+ ************************************************************************/
+
+#include "param.h"
+#include "debug.h"
+
+#include <algorithm>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <pwd.h>
+
+const char* userHomeDir() {
+ struct passwd *pwUser = getpwuid(getuid());
+ return pwUser == NULL ? NULL : pwUser->pw_dir;
+}
+
+void setEnvFile(const char* fileName) {
+ FILE * file = fopen(fileName, "r");
+ if (file == NULL) return;
+
+ char *line = NULL;
+ char envVar[1024];
+ char envValue[1024];
+ size_t n = 0;
+ ssize_t read;
+ while ((read = getline(&line, &n, file)) != -1) {
+ if (line[read-1] == '\n') line[read-1] = '\0';
+ int s=0; // Env Var Size
+ while (line[s] != '\0' && line[s] != '=') s++;
+ if (line[s] == '\0') continue;
+ strncpy(envVar, line, std::min(1023,s));
+ envVar[s] = '\0';
+ s++;
+ strncpy(envValue, line+s, 1023);
+ envValue[1023]='\0';
+ setenv(envVar, envValue, 0);
+ //printf("%s : %s->%s\n", fileName, envVar, envValue);
+ }
+ if (line) free(line);
+ fclose(file);
+}
+
+void initEnv() {
+ char confFilePath[1024];
+ const char * userDir = userHomeDir();
+ if (userDir) {
+ sprintf(confFilePath, "%s/.nccl.conf", userDir);
+ setEnvFile(confFilePath);
+ }
+ sprintf(confFilePath, "/etc/nccl.conf");
+ setEnvFile(confFilePath);
+}
+
+void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache) {
+ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ pthread_mutex_lock(&mutex);
+ if (__atomic_load_n(cache, __ATOMIC_RELAXED) == uninitialized) {
+ char* str = getenv(env);
+ int64_t value = deftVal;
+ if (str && strlen(str) > 0) {
+ errno = 0;
+ value = strtoll(str, nullptr, 0);
+ if (errno) {
+ value = deftVal;
+ INFO(NCCL_ALL,"Invalid value %s for %s, using default %lld.", str, env, (long long)deftVal);
+ } else {
+ INFO(NCCL_ALL,"%s set by environment to %lld.", env, (long long)value);
+ }
+ }
+ __atomic_store_n(cache, value, __ATOMIC_RELAXED);
+ }
+ pthread_mutex_unlock(&mutex);
+}
diff --git a/src/misc/socket.cc b/src/misc/socket.cc
index 4e3295f..ef2bea6 100644
--- a/src/misc/socket.cc
+++ b/src/misc/socket.cc
@@ -16,12 +16,16 @@
*
* Output: "IPv4/IPv6 address<port>"
*/
-const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf) {
+const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm /*= 1*/) {
if (buf == NULL || addr == NULL) return NULL;
struct sockaddr *saddr = &addr->sa;
if (saddr->sa_family != AF_INET && saddr->sa_family != AF_INET6) { buf[0]='\0'; return buf; }
char host[NI_MAXHOST], service[NI_MAXSERV];
- (void) getnameinfo(saddr, sizeof(union ncclSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST|NI_NUMERICSERV);
+ /* NI_NUMERICHOST: If set, then the numeric form of the hostname is returned.
+ * (When not set, this will still happen in case the node's name cannot be determined.)
+ */
+ int flag = NI_NUMERICSERV | (numericHostForm ? NI_NUMERICHOST : 0);
+ (void) getnameinfo(saddr, sizeof(union ncclSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, flag);
sprintf(buf, "%s<%s>", host, service);
return buf;
}
@@ -516,7 +520,7 @@ ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int
NCCLCHECK(ncclSocketProgressOpt(op, sock, ptr, size, offset, 0, &closed));
if (closed) {
char line[SOCKET_NAME_MAXLEN+1];
- WARN("Net : Connection closed by remote peer %s", ncclSocketToString(&sock->addr, line));
+ WARN("Net : Connection closed by remote peer %s", ncclSocketToString(&sock->addr, line, 0));
return ncclSystemError;
}
return ncclSuccess;
diff --git a/src/proxy.cc b/src/proxy.cc
index d6fe309..e72a8eb 100644
--- a/src/proxy.cc
+++ b/src/proxy.cc
@@ -806,6 +806,7 @@ ncclResult_t ncclProxyCall(struct ncclProxyConnector* proxyConn, int type, void*
return ncclSuccess;
error:
WARN("Proxy Call to rank %d failed (%s)", proxyConn->comm->localRankToRank[proxyConn->localRank], ncclProxyMsgTypeStr[type]);
+ sock->fd = -1;
return ret;
}
@@ -870,8 +871,6 @@ ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm) {
static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclProxyConnectionPool* connectionPool, struct ncclComm* comm) {
struct ncclSocket* sock = &peer->sock;
- char buf[SOCKET_NAME_MAXLEN+1];
- buf[SOCKET_NAME_MAXLEN] = '\0';
int id;
struct ncclProxyConnection* connection;
NCCLCHECK(ncclProxyNewConnection(connectionPool, &id));
@@ -889,8 +888,7 @@ static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclPr
struct ncclProxyProgressState* state = &comm->proxyState.progressState;
NCCLCHECK(ncclSocketSend(sock, state->opsPoolShmSuffix, sizeof("XXXXXX")-1));
}
- buf[SOCKET_NAME_MAXLEN] = '\0';
- INFO(NCCL_NET, "New proxy %s connection %d from %s, transport %d", connection->send ? "send":"recv", id, ncclSocketToString(&sock->addr, buf), connection->transport);
+ INFO(NCCL_NET, "New proxy %s connection %d from local rank %d, transport %d", connection->send ? "send":"recv", id, connection->localRank, connection->transport);
return ncclSuccess;
}
diff --git a/src/transport/coll_net.cc b/src/transport/coll_net.cc
index 26f875f..771a18f 100644
--- a/src/transport/coll_net.cc
+++ b/src/transport/coll_net.cc
@@ -217,6 +217,9 @@ static ncclResult_t sendConnect(struct ncclComm* comm, struct ncclConnect* conne
struct connectMap* map;
NCCLCHECK(ncclProxyCall(&send->proxyConn, ncclProxyMsgConnect, &args, sizeof(struct collNetConnectArgs), &map, sizeof(struct connectMap*)));
+ // If collnet connect failed, propagate error to fallback on regular p2p
+ if (map == NULL) return ncclSystemError;
+
//NCCLCHECK(collNetDumpMap(map));
struct ncclSendMem *sendMem = (struct ncclSendMem*) NCCL_NET_MAP_GET_POINTER(map, gpu, sendMem);
@@ -240,6 +243,9 @@ static ncclResult_t recvConnect(struct ncclComm* comm, struct ncclConnect* conne
struct connectMap* map;
NCCLCHECK(ncclProxyCall(&recv->proxyConn, ncclProxyMsgConnect, &args, sizeof(struct collNetConnectArgs), &map, sizeof(struct connectMap*)));
+ // If collnet connect failed, propagate error to fallback on regular p2p
+ if (map == NULL) return ncclSystemError;
+
//NCCLCHECK(collNetDumpMap(map));
struct ncclSendMem *sendMem = (struct ncclSendMem*) NCCL_NET_MAP_GET_POINTER(map, gpu, sendMem);
@@ -309,12 +315,15 @@ static ncclResult_t sharedConnect(struct ncclComm* comm, int netDev, struct nccl
resources->collNetListenComms[netDev],
resources->collNetComms+netDev);
free(handlePtrs);
- NCCLCHECK(ret);
- // Close listen comm
- NCCLCHECK(collNetCloseListen(resources->collNetListenComms[netDev]));
+ if (ret == ncclSuccess) {
+ // Close listen comm
+ NCCLCHECK(collNetCloseListen(resources->collNetListenComms[netDev]));
+ } else {
+ resources->collNetListenComms[netDev] = NULL;
+ }
}
*collNetComm = resources->collNetComms[netDev];
- resources->commRefCount[netDev]++;
+ if (*collNetComm) resources->commRefCount[netDev]++;
return ncclSuccess;
}
@@ -400,6 +409,13 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
resources->recvMhandles[p] = info->mhandles[p];
NCCLCHECK(sharedConnect(comm, resources->netDev, args->connectInfos, args->nranks, args->rank, &resources->collNetComm));
+
+ // Collnet connect is allowed to fail. Gracefully handle that case by returning NULL to the caller.
+ if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; }
+ if (resources->collNetComm == NULL) {
+ *((struct connectMap**)respBuff) = NULL;
+ return ncclSuccess;
+ }
connection->proxyAppendPtr = comm->proxyState.progressState.collNet.proxyAppend+2*resources->netDev;
struct connectMap* map = &resources->map;
@@ -435,7 +451,6 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST,
&resources->sendMhandles[NCCL_PROTO_SIMPLE]));
- if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; }
*((struct connectMap**)respBuff) = &resources->map;
return ncclSuccess;
}
@@ -449,6 +464,13 @@ static ncclResult_t recvProxyConnect(struct ncclProxyConnection* connection, str
resources->collNetRank = args->rank;
NCCLCHECK(sharedConnect(comm, resources->netDev, args->connectInfos, args->nranks, args->rank, &resources->collNetComm));
+
+ // Collnet connect is allowed to fail. Gracefully handle that case by returning NULL to the caller.
+ if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; }
+ if (resources->collNetComm == NULL) {
+ *((struct connectMap**)respBuff) = NULL;
+ return ncclSuccess;
+ }
connection->proxyAppendPtr = comm->proxyState.progressState.collNet.proxyAppend+2*resources->netDev+1;
struct connectMap* map = &resources->map;
@@ -743,7 +765,7 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg
int offset;
NCCLCHECK(sharedBuffersGet(comm, 1, sharedBuffSlot, startChannel, &offset));
volatile int* offsFifo = (volatile int*)resources->recvMem->offsFifo;
- offsFifo[buffSlot] = offset;
+ offsFifo[buffSlot] = offset + (s%COLLNET_GROUP_NSUBS)*args->chunkSize;
__sync_synchronize();
volatile uint64_t* recvTail = resources->gdcSync ? resources->gdcSync : &resources->recvMem->tail;
*recvTail = sub->base + sub->flushed;
diff --git a/src/transport/net.cc b/src/transport/net.cc
index 56f0315..e96f189 100644
--- a/src/transport/net.cc
+++ b/src/transport/net.cc
@@ -130,7 +130,7 @@ struct recvResources {
uint64_t llLastCleaning;
};
-NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", -2);
+NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", 0);
/* Determine if two peers can communicate with NET */
static ncclResult_t canConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) {
diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc
index 6e8f215..26b47be 100644
--- a/src/transport/net_ib.cc
+++ b/src/transport/net_ib.cc
@@ -43,7 +43,7 @@ struct ncclIbMrCache {
};
static int ncclNIbDevs = -1;
-struct ncclIbDev {
+struct alignas(64) ncclIbDev {
pthread_mutex_t lock;
int device;
uint64_t guid;
@@ -841,7 +841,7 @@ ncclResult_t ncclIbRegMr(void* comm, void* data, int size, int type, void** mhan
else {
NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages*pageSize, flags), res, returning);
}
- TRACE(NCCL_INIT,"regAddr %llx size %lld rkey %x", (unsigned long long)addr, (long long)pages*PageSize, mr->rkey);
+ TRACE(NCCL_INIT,"regAddr %llx size %lld rkey %x", (unsigned long long)addr, (long long)pages*pageSize, mr->rkey);
cache->population += 1;
cache->slots[slot].addr = addr;
cache->slots[slot].pages = pages;
@@ -940,9 +940,11 @@ ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) {
lastWr->next = NULL;
lastWr->send_flags = IBV_SEND_SIGNALED;
+ // Multi-QP: make sure IB writes are multiples of 128B so that LL and LL128 protocols still work
+ const int align = 128;
for (int q=0; q<comm->nqps; q++) {
for (int r=0; r<nreqs; r++) {
- int chunkSize = std::max(8, DIVUP(reqs[r]->send.size, comm->nqps));
+ int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, comm->nqps), align) * align;
int length = std::min(reqs[r]->send.size-reqs[r]->send.offset, chunkSize);
if (length <= 0) {
comm->wrs[r].sg_list = NULL;
@@ -957,7 +959,7 @@ ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) {
NCCLCHECK(wrap_ibv_post_send(comm->qps[q], comm->wrs, &bad_wr));
for (int r=0; r<nreqs; r++) {
- int chunkSize = std::max(8, DIVUP(reqs[r]->send.size, comm->nqps));
+ int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, comm->nqps), align) * align;
reqs[r]->send.offset += chunkSize;
comm->sges[r].addr += chunkSize;
comm->wrs[r].wr.rdma.remote_addr += chunkSize;
@@ -991,11 +993,16 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, int tag, void* mh
if (reqs[r] != NULL || slots[r].tag != tag) continue;
// Sanity checks to catch user collective call count/size mismatches
- // plus any potential programming errors
- if (size > slots[r].size || slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) {
+ if (size > slots[r].size) {
char line[SOCKET_NAME_MAXLEN+1];
- WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error local size %d remote %d addr %lx rkey %x",
- r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), size, slots[r].size, slots[r].addr, slots[r].rkey);
+ WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error, local size %d remote size %d",
+ r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), size, slots[r].size);
+ return ncclInvalidUsage;
+ } // plus any potential programming errors
+ else if (slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) {
+ char line[SOCKET_NAME_MAXLEN+1];
+ WARN("NET/IB : req %d/%d tag %x peer %s posted incorrect receive info: size %d addr %lx rkey %x",
+ r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), slots[r].size, slots[r].addr, slots[r].rkey);
return ncclInternalError;
}
struct ncclIbRequest* req;
diff --git a/src/transport/net_socket.cc b/src/transport/net_socket.cc
index d92c46f..9e14aa2 100644
--- a/src/transport/net_socket.cc
+++ b/src/transport/net_socket.cc
@@ -500,8 +500,10 @@ ncclResult_t ncclSocketTest(void* request, int* done, int* size) {
// Check size is less or equal to the size provided by the user
if (r->op == NCCL_SOCKET_RECV && data > r->size) {
char line[SOCKET_NAME_MAXLEN+1];
- WARN("NET/Socket : peer %s message truncated : receiving %d bytes instead of %d", ncclSocketToString(&r->ctrlSock->addr, line), data, r->size);
- return ncclInternalError;
+ WARN("NET/Socket : peer %s message truncated : receiving %d bytes instead of %d. If you believe your socket network is in healthy state, \
+ there may be a mismatch in collective sizes or environment settings (e.g. NCCL_PROTO, NCCL_ALGO) between ranks",
+ ncclSocketToString(&r->ctrlSock->addr, line), data, r->size);
+ return ncclInvalidUsage;
}
r->size = data;
r->offset = 0;