From 99f9974e4002e0aaef22741f656e5bc9baf9dc40 Mon Sep 17 00:00:00 2001 From: Hal Daume III Date: Sat, 24 May 2014 16:56:42 -0400 Subject: merged john's changes --- vowpalwabbit/allreduce.h | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) (limited to 'vowpalwabbit/allreduce.h') diff --git a/vowpalwabbit/allreduce.h b/vowpalwabbit/allreduce.h index 35185456..7c2bdfd1 100644 --- a/vowpalwabbit/allreduce.h +++ b/vowpalwabbit/allreduce.h @@ -3,7 +3,7 @@ Copyright (c) by respective owners including Yahoo!, Microsoft, and individual contributors. All rights reserved. Released under a BSD license as described in the file LICENSE. */ -// This implements the allreduce function of MPI. +// This implements the allreduce function of MPI. #ifndef ALLREDUCE_H #define ALLREDUCE_H @@ -15,7 +15,7 @@ typedef unsigned int uint32_t; typedef unsigned short uint16_t; typedef int socklen_t; typedef SOCKET socket_t; -#define SHUT_RDWR SD_BOTH +#define CLOSESOCK closesocket #else #include #include @@ -25,6 +25,7 @@ typedef SOCKET socket_t; #include #include typedef int socket_t; +#define CLOSESOCK close #endif using namespace std; @@ -39,11 +40,11 @@ struct node_socks { { if(current_master != "") { if(parent != -1) - shutdown(this->parent, SHUT_RDWR); - if(children[0] != -1) - shutdown(this->children[0], SHUT_RDWR); + CLOSESOCK(this->parent); + if(children[0] != -1) + CLOSESOCK(this->children[0]); if(children[1] != -1) - shutdown(this->children[1], SHUT_RDWR); + CLOSESOCK(this->children[1]); } } node_socks () @@ -54,7 +55,7 @@ struct node_socks { template void addbufs(T* buf1, const T* buf2, const size_t n) { - for(size_t i = 0;i < n;i++) + for(size_t i = 0;i < n;i++) buf1[i] += buf2[i]; } @@ -63,11 +64,11 @@ void all_reduce_init(const string master_location, const size_t unique_id, const template void pass_up(char* buffer, size_t left_read_pos, size_t right_read_pos, size_t& parent_sent_pos, socket_t parent_sock, size_t n) { size_t my_bufsize = min(ar_buf_size, ((int)(floor(left_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos)); my_bufsize = min(my_bufsize, ((int)(floor(right_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos)); - + if(my_bufsize > 0) { //going to pass up this chunk of data to the parent int write_size = send(parent_sock, buffer+parent_sent_pos, (int)my_bufsize, 0); - if(write_size < (int)my_bufsize) + if(write_size < (int)my_bufsize) cerr<<"Write to parent failed "<void reduce(char* buffer, const size_t n, const socket_t paren size_t parent_sent_pos = 0; //First unsent float to parent //parent_sent_pos <= left_read_pos //parent_sent_pos <= right_read_pos - + if(child_sockets[0] == -1) { child_read_pos[0] = n; } if(child_sockets[1] == -1) { - child_read_pos[1] = n; + child_read_pos[1] = n; } while (parent_sent_pos < n || child_read_pos[0] < n || child_read_pos[1] < n) @@ -112,7 +113,7 @@ template void reduce(char* buffer, const size_t n, const socket_t paren perror(NULL); throw exception(); } - + for(int i = 0;i < 2;i++) { if(child_sockets[i] != -1 && FD_ISSET(child_sockets[i],&fds)) { //there is data to be left from left child @@ -120,8 +121,8 @@ template void reduce(char* buffer, const size_t n, const socket_t paren cerr<<"I think child has no data to send but he thinks he has "<void reduce(char* buffer, const size_t n, const socket_t paren perror(NULL); throw exception(); } - + addbufs((T*)buffer + child_read_pos[i]/sizeof(T), (T*)child_read_buf[i], (child_read_pos[i] + read_size)/sizeof(T) - child_read_pos[i]/sizeof(T)); - + child_read_pos[i] += read_size; int old_unprocessed = child_unprocessed[i]; child_unprocessed[i] = child_read_pos[i] % (int)sizeof(T); for(int j = 0;j < child_unprocessed[i];j++) { child_read_buf[i][j] = child_read_buf[i][((old_unprocessed + read_size)/(int)sizeof(T))*sizeof(T)+j]; } - + if(child_read_pos[i] == n) //Done reading parent FD_CLR(child_sockets[i],&fds); } else if(child_sockets[i] != -1 && child_read_pos[i] != n) - FD_SET(child_sockets[i],&fds); + FD_SET(child_sockets[i],&fds); } } - if(parent_sock == -1 && child_read_pos[0] == n && child_read_pos[1] == n) + if(parent_sock == -1 && child_read_pos[0] == n && child_read_pos[1] == n) parent_sent_pos = n; - } - + } + } void broadcast(char* buffer, const size_t n, const socket_t parent_sock, const socket_t * child_sockets); -template void all_reduce(T* buffer, const size_t n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks) +template void all_reduce(T* buffer, const size_t n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks) { - if(master_location != socks.current_master) + if(master_location != socks.current_master) all_reduce_init(master_location, unique_id, total, node, socks); reduce((char*)buffer, n*sizeof(T), socks.parent, socks.children); broadcast((char*)buffer, n*sizeof(T), socks.parent, socks.children); -- cgit v1.2.3