diff options
author | Sylvain Jeaugey <sjeaugey@nvidia.com> | 2018-11-20 04:43:50 +0300 |
---|---|---|
committer | Sylvain Jeaugey <sjeaugey@nvidia.com> | 2018-11-27 03:24:31 +0300 |
commit | 98adf2fe11bb13f93e93c805ffcb28fd9f0578a1 (patch) | |
tree | c45afebf0c856bfecb27165d740902c1ce5eb0de /src/include | |
parent | 0d3a20f96d4887bee86a0fd7bf79feb14e5a01f5 (diff) |
Make network isend/irecv non blocking
Diffstat (limited to 'src/include')
-rw-r--r-- | src/include/nccl_net.h | 2 | ||||
-rw-r--r-- | src/include/socket.h | 53 |
2 files changed, 32 insertions, 23 deletions
diff --git a/src/include/nccl_net.h b/src/include/nccl_net.h index 7dbbc37..5d3ec7c 100644 --- a/src/include/nccl_net.h +++ b/src/include/nccl_net.h @@ -41,8 +41,10 @@ typedef struct { // Finalize connection establishment after remote peer has called connectHandle ncclResult_t (*accept)(void* listenComm, void** recvComm); // Asynchronous send to a peer. Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA. + // May return request == NULL if the call cannot be performed (or would block) ncclResult_t (*isend)(void* sendComm, void* data, int size, int type, void** request); // Asynchronous recv from a peer. Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA. + // May return request == NULL if the call cannot be performed (or would block) ncclResult_t (*irecv)(void* recvComm, void* data, int size, int type, void** request); // Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is // visible to the GPU diff --git a/src/include/socket.h b/src/include/socket.h index 9d2b2c8..81e1651 100644 --- a/src/include/socket.h +++ b/src/include/socket.h @@ -370,39 +370,46 @@ static ncclResult_t connectAddress(int* fd, union socketAddress* remoteAddr) { return ncclSuccess; } -static ncclResult_t socketReceive(int fd, void* ptr, int size) { +#define NCCL_SOCKET_SEND 0 +#define NCCL_SOCKET_RECV 1 +static ncclResult_t socketProgress(int op, int fd, void* ptr, int size, int* offset) { + int bytes = 0; char* data = (char*)ptr; - int offset = 0; - while (offset < size) { - int recvsize; - SYSCHECKVAL(recv(fd, data, size-offset, 0), "recv", recvsize); - if (recvsize == 0) { + do { + if (op == NCCL_SOCKET_RECV) bytes = recv(fd, data+(*offset), size-(*offset), MSG_DONTWAIT); + if (op == NCCL_SOCKET_SEND) bytes = send(fd, data+(*offset), size-(*offset), MSG_DONTWAIT); + if (op == NCCL_SOCKET_RECV && bytes == 0) { WARN("Net : Connection closed by remote peer"); return ncclSystemError; } - if (recvsize == -1) { - INFO(NCCL_NET,"Recv : got retcode %d, retrying", errno); - continue; + if (bytes == -1) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + WARN("Call to recv failed : %s", strerror(errno)); + return ncclSystemError; + } else { + bytes = 0; + } } - data += recvsize; - offset += recvsize; - } + (*offset) += bytes; + } while (bytes > 0 && (*offset) < size); + return ncclSuccess; +} + +static ncclResult_t socketWait(int op, int fd, void* ptr, int size, int* offset) { + while (*offset < size) + NCCLCHECK(socketProgress(op, fd, ptr, size, offset)); return ncclSuccess; } static ncclResult_t socketSend(int fd, void* ptr, int size) { - char* data = (char*)ptr; int offset = 0; - while (offset < size) { - int sendsize; - SYSCHECKVAL(write(fd, data, size-offset), "write", sendsize); - if (sendsize == -1) { - INFO(NCCL_NET,"Send : got retcode %d, retrying", errno); - continue; - } - data += sendsize; - offset += sendsize; - } + NCCLCHECK(socketWait(NCCL_SOCKET_SEND, fd, ptr, size, &offset)); + return ncclSuccess; +} + +static ncclResult_t socketReceive(int fd, void* ptr, int size) { + int offset = 0; + NCCLCHECK(socketWait(NCCL_SOCKET_RECV, fd, ptr, size, &offset)); return ncclSuccess; } |