Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlekh Agarwal <alekh@alekha-z420.corp.microsoft.com>2014-05-19 19:21:18 +0400
committerAlekh Agarwal <alekh@alekha-z420.corp.microsoft.com>2014-05-19 19:21:18 +0400
commit4ab1a2d39434ce8090d0ecb534287c06ee751ba9 (patch)
tree5d33d0068dcf1efa3a1528a09cc89df3c2e62bed /vowpalwabbit/allreduce.h
parent9cf70f0cce1b355c867d51138cf739c092c4d6b8 (diff)
fixed parallel
Diffstat (limited to 'vowpalwabbit/allreduce.h')
-rw-r--r--vowpalwabbit/allreduce.h30
1 files changed, 15 insertions, 15 deletions
diff --git a/vowpalwabbit/allreduce.h b/vowpalwabbit/allreduce.h
index 35185456..67b9b0ac 100644
--- a/vowpalwabbit/allreduce.h
+++ b/vowpalwabbit/allreduce.h
@@ -29,7 +29,7 @@ typedef int socket_t;
using namespace std;
-const size_t ar_buf_size = 1<<16;
+const int ar_buf_size = 1<<16;
struct node_socks {
std::string current_master;
@@ -53,28 +53,28 @@ struct node_socks {
};
-template <class T> void addbufs(T* buf1, const T* buf2, const size_t n) {
- for(size_t i = 0;i < n;i++)
- buf1[i] += buf2[i];
+template <class T, void (*f)(T&, const T&)> void addbufs(T* buf1, const T* buf2, const int n) {
+ for(int i = 0;i < n;i++)
+ f(buf1[1], buf2[i]);
}
void all_reduce_init(const string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks);
-template <class T> void pass_up(char* buffer, size_t left_read_pos, size_t right_read_pos, size_t& parent_sent_pos, socket_t parent_sock, size_t n) {
- size_t my_bufsize = min(ar_buf_size, ((int)(floor(left_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
+template <class T> void pass_up(char* buffer, int left_read_pos, int right_read_pos, int& parent_sent_pos, socket_t parent_sock, int n) {
+ int my_bufsize = min(ar_buf_size, ((int)(floor(left_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
my_bufsize = min(my_bufsize, ((int)(floor(right_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
if(my_bufsize > 0) {
//going to pass up this chunk of data to the parent
- int write_size = send(parent_sock, buffer+parent_sent_pos, (int)my_bufsize, 0);
- if(write_size < (int)my_bufsize)
+ int write_size = send(parent_sock, buffer+parent_sent_pos, my_bufsize, 0);
+ if(write_size < my_bufsize)
cerr<<"Write to parent failed "<<my_bufsize<<" "<<write_size<<" "<<parent_sent_pos<<" "<<left_read_pos<<" "<<right_read_pos<<endl ;
parent_sent_pos += my_bufsize;
}
}
-template <class T>void reduce(char* buffer, const size_t n, const socket_t parent_sock, const socket_t* child_sockets) {
+template <class T, void (*f)(T&, const T&)>void reduce(char* buffer, const int n, const socket_t parent_sock, const socket_t* child_sockets) {
fd_set fds;
FD_ZERO(&fds);
@@ -84,10 +84,10 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
FD_SET(child_sockets[1],&fds);
socket_t max_fd = max(child_sockets[0],child_sockets[1])+1;
- size_t child_read_pos[2] = {0,0}; //First unread float from left and right children
+ int child_read_pos[2] = {0,0}; //First unread float from left and right children
int child_unprocessed[2] = {0,0}; //The number of bytes sent by the child but not yet added to the buffer
char child_read_buf[2][ar_buf_size+sizeof(T)-1];
- size_t parent_sent_pos = 0; //First unsent float to parent
+ int parent_sent_pos = 0; //First unsent float to parent
//parent_sent_pos <= left_read_pos
//parent_sent_pos <= right_read_pos
@@ -130,7 +130,7 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
throw exception();
}
- addbufs((T*)buffer + child_read_pos[i]/sizeof(T), (T*)child_read_buf[i], (child_read_pos[i] + read_size)/sizeof(T) - child_read_pos[i]/sizeof(T));
+ addbufs<T, f>((T*)buffer + child_read_pos[i]/sizeof(T), (T*)child_read_buf[i], (child_read_pos[i] + read_size)/sizeof(T) - child_read_pos[i]/sizeof(T));
child_read_pos[i] += read_size;
int old_unprocessed = child_unprocessed[i];
@@ -153,14 +153,14 @@ template <class T>void reduce(char* buffer, const size_t n, const socket_t paren
}
-void broadcast(char* buffer, const size_t n, const socket_t parent_sock, const socket_t * child_sockets);
+void broadcast(char* buffer, const int n, const socket_t parent_sock, const socket_t * child_sockets);
-template <class T> void all_reduce(T* buffer, const size_t n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks)
+template <class T, void (*f)(T&, const T&)> void all_reduce(T* buffer, const int n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks)
{
if(master_location != socks.current_master)
all_reduce_init(master_location, unique_id, total, node, socks);
- reduce<T>((char*)buffer, n*sizeof(T), socks.parent, socks.children);
+ reduce<T, f>((char*)buffer, n*sizeof(T), socks.parent, socks.children);
broadcast((char*)buffer, n*sizeof(T), socks.parent, socks.children);
}