Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'vowpalwabbit/allreduce.h')
-rw-r--r--vowpalwabbit/allreduce.h47
1 files changed, 24 insertions, 23 deletions
diff --git a/vowpalwabbit/allreduce.h b/vowpalwabbit/allreduce.h
index 35185456..7c2bdfd1 100644
--- a/vowpalwabbit/allreduce.h
+++ b/vowpalwabbit/allreduce.h
@@ -3,7 +3,7 @@ Copyright (c) by respective owners including Yahoo!, Microsoft, and
individual contributors. All rights reserved. Released under a BSD
license as described in the file LICENSE.
*/
-// This implements the allreduce function of MPI.
+// This implements the allreduce function of MPI.
#ifndef ALLREDUCE_H
#define ALLREDUCE_H
@@ -15,7 +15,7 @@ typedef unsigned int uint32_t;
typedef unsigned short uint16_t;
typedef int socklen_t;
typedef SOCKET socket_t;
-#define SHUT_RDWR SD_BOTH
+#define CLOSESOCK closesocket
#else
#include <sys/socket.h>
#include <sys/socket.h>
@@ -25,6 +25,7 @@ typedef SOCKET socket_t;
#include <stdlib.h>
#include <stdio.h>
typedef int socket_t;
+#define CLOSESOCK close
#endif
using namespace std;
@@ -39,11 +40,11 @@ struct node_socks {
{
if(current_master != "") {
if(parent != -1)
- shutdown(this->parent, SHUT_RDWR);
- if(children[0] != -1)
- shutdown(this->children[0], SHUT_RDWR);
+ CLOSESOCK(this->parent);
+ if(children[0] != -1)
+ CLOSESOCK(this->children[0]);
if(children[1] != -1)
- shutdown(this->children[1], SHUT_RDWR);
+ CLOSESOCK(this->children[1]);
}
}
node_socks ()
@@ -54,7 +55,7 @@ struct node_socks {
template <class T> void addbufs(T* buf1, const T* buf2, const size_t n) {
- for(size_t i = 0;i < n;i++)
+ for(size_t i = 0;i < n;i++)
buf1[i] += buf2[i];
}
@@ -63,11 +64,11 @@ void all_reduce_init(const string master_location, const size_t unique_id, const
template <class T> void pass_up(char* buffer, size_t left_read_pos, size_t right_read_pos, size_t& parent_sent_pos, socket_t parent_sock, size_t n) {
size_t my_bufsize = min(ar_buf_size, ((int)(floor(left_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
my_bufsize = min(my_bufsize, ((int)(floor(right_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
-
+
if(my_bufsize > 0) {
//going to pass up this chunk of data to the parent
int write_size = send(parent_sock, buffer+parent_sent_pos, (int)my_bufsize, 0);
- if(write_size < (int)my_bufsize)
+ if(write_size < (int)my_bufsize)
cerr<<"Write to parent failed "<<my_bufsize<<" "<<write_size<<" "<<parent_sent_pos<<" "<<left_read_pos<<" "<<right_read_pos<<endl ;
parent_sent_pos += my_bufsize;
}
@@ -90,12 +91,12 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
size_t parent_sent_pos = 0; //First unsent float to parent
//parent_sent_pos <= left_read_pos
//parent_sent_pos <= right_read_pos
-
+
if(child_sockets[0] == -1) {
child_read_pos[0] = n;
}
if(child_sockets[1] == -1) {
- child_read_pos[1] = n;
+ child_read_pos[1] = n;
}
while (parent_sent_pos < n || child_read_pos[0] < n || child_read_pos[1] < n)
@@ -112,7 +113,7 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
perror(NULL);
throw exception();
}
-
+
for(int i = 0;i < 2;i++) {
if(child_sockets[i] != -1 && FD_ISSET(child_sockets[i],&fds)) {
//there is data to be left from left child
@@ -120,8 +121,8 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
cerr<<"I think child has no data to send but he thinks he has "<<FD_ISSET(child_sockets[0],&fds)<<" "<<FD_ISSET(child_sockets[1],&fds)<<endl;
throw exception();
}
-
-
+
+
size_t count = min(ar_buf_size,n - child_read_pos[i]);
int read_size = recv(child_sockets[i], child_read_buf[i] + child_unprocessed[i], (int)count, 0);
if(read_size == -1) {
@@ -129,36 +130,36 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
perror(NULL);
throw exception();
}
-
+
addbufs((T*)buffer + child_read_pos[i]/sizeof(T), (T*)child_read_buf[i], (child_read_pos[i] + read_size)/sizeof(T) - child_read_pos[i]/sizeof(T));
-
+
child_read_pos[i] += read_size;
int old_unprocessed = child_unprocessed[i];
child_unprocessed[i] = child_read_pos[i] % (int)sizeof(T);
for(int j = 0;j < child_unprocessed[i];j++) {
child_read_buf[i][j] = child_read_buf[i][((old_unprocessed + read_size)/(int)sizeof(T))*sizeof(T)+j];
}
-
+
if(child_read_pos[i] == n) //Done reading parent
FD_CLR(child_sockets[i],&fds);
}
else if(child_sockets[i] != -1 && child_read_pos[i] != n)
- FD_SET(child_sockets[i],&fds);
+ FD_SET(child_sockets[i],&fds);
}
}
- if(parent_sock == -1 && child_read_pos[0] == n && child_read_pos[1] == n)
+ if(parent_sock == -1 && child_read_pos[0] == n && child_read_pos[1] == n)
parent_sent_pos = n;
- }
-
+ }
+
}
void broadcast(char* buffer, const size_t n, const socket_t parent_sock, const socket_t * child_sockets);
-template <class T> void all_reduce(T* buffer, const size_t n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks)
+template <class T> void all_reduce(T* buffer, const size_t n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks)
{
- if(master_location != socks.current_master)
+ if(master_location != socks.current_master)
all_reduce_init(master_location, unique_id, total, node, socks);
reduce<T>((char*)buffer, n*sizeof(T), socks.parent, socks.children);
broadcast((char*)buffer, n*sizeof(T), socks.parent, socks.children);