Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/nccl.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSylvain Jeaugey <sjeaugey@nvidia.com>2018-11-20 04:43:50 +0300
committerSylvain Jeaugey <sjeaugey@nvidia.com>2018-11-27 03:24:31 +0300
commit98adf2fe11bb13f93e93c805ffcb28fd9f0578a1 (patch)
treec45afebf0c856bfecb27165d740902c1ce5eb0de /src/include
parent0d3a20f96d4887bee86a0fd7bf79feb14e5a01f5 (diff)
Make network isend/irecv non blocking
Diffstat (limited to 'src/include')
-rw-r--r--src/include/nccl_net.h2
-rw-r--r--src/include/socket.h53
2 files changed, 32 insertions, 23 deletions
diff --git a/src/include/nccl_net.h b/src/include/nccl_net.h
index 7dbbc37..5d3ec7c 100644
--- a/src/include/nccl_net.h
+++ b/src/include/nccl_net.h
@@ -41,8 +41,10 @@ typedef struct {
// Finalize connection establishment after remote peer has called connectHandle
ncclResult_t (*accept)(void* listenComm, void** recvComm);
// Asynchronous send to a peer. Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
+ // May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*isend)(void* sendComm, void* data, int size, int type, void** request);
// Asynchronous recv from a peer. Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
+ // May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*irecv)(void* recvComm, void* data, int size, int type, void** request);
// Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
// visible to the GPU
diff --git a/src/include/socket.h b/src/include/socket.h
index 9d2b2c8..81e1651 100644
--- a/src/include/socket.h
+++ b/src/include/socket.h
@@ -370,39 +370,46 @@ static ncclResult_t connectAddress(int* fd, union socketAddress* remoteAddr) {
return ncclSuccess;
}
-static ncclResult_t socketReceive(int fd, void* ptr, int size) {
+#define NCCL_SOCKET_SEND 0
+#define NCCL_SOCKET_RECV 1
+static ncclResult_t socketProgress(int op, int fd, void* ptr, int size, int* offset) {
+ int bytes = 0;
char* data = (char*)ptr;
- int offset = 0;
- while (offset < size) {
- int recvsize;
- SYSCHECKVAL(recv(fd, data, size-offset, 0), "recv", recvsize);
- if (recvsize == 0) {
+ do {
+ if (op == NCCL_SOCKET_RECV) bytes = recv(fd, data+(*offset), size-(*offset), MSG_DONTWAIT);
+ if (op == NCCL_SOCKET_SEND) bytes = send(fd, data+(*offset), size-(*offset), MSG_DONTWAIT);
+ if (op == NCCL_SOCKET_RECV && bytes == 0) {
WARN("Net : Connection closed by remote peer");
return ncclSystemError;
}
- if (recvsize == -1) {
- INFO(NCCL_NET,"Recv : got retcode %d, retrying", errno);
- continue;
+ if (bytes == -1) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ WARN("Call to recv failed : %s", strerror(errno));
+ return ncclSystemError;
+ } else {
+ bytes = 0;
+ }
}
- data += recvsize;
- offset += recvsize;
- }
+ (*offset) += bytes;
+ } while (bytes > 0 && (*offset) < size);
+ return ncclSuccess;
+}
+
+static ncclResult_t socketWait(int op, int fd, void* ptr, int size, int* offset) {
+ while (*offset < size)
+ NCCLCHECK(socketProgress(op, fd, ptr, size, offset));
return ncclSuccess;
}
static ncclResult_t socketSend(int fd, void* ptr, int size) {
- char* data = (char*)ptr;
int offset = 0;
- while (offset < size) {
- int sendsize;
- SYSCHECKVAL(write(fd, data, size-offset), "write", sendsize);
- if (sendsize == -1) {
- INFO(NCCL_NET,"Send : got retcode %d, retrying", errno);
- continue;
- }
- data += sendsize;
- offset += sendsize;
- }
+ NCCLCHECK(socketWait(NCCL_SOCKET_SEND, fd, ptr, size, &offset));
+ return ncclSuccess;
+}
+
+static ncclResult_t socketReceive(int fd, void* ptr, int size) {
+ int offset = 0;
+ NCCLCHECK(socketWait(NCCL_SOCKET_RECV, fd, ptr, size, &offset));
return ncclSuccess;
}