Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlekh Agarwal <alekh@alekha-z420.corp.microsoft.com>2014-04-03 02:12:26 +0400
committerAlekh Agarwal <alekh@alekha-z420.corp.microsoft.com>2014-04-03 02:12:26 +0400
commite4795b6a661606109fcbc7daae157cadc295248b (patch)
treebf128a108be6ca879abb8dd32fbb191c501277b4 /vowpalwabbit/allreduce.h
parent5e4ef545ff98f1f97bd4c4ac97217e62380d4a1a (diff)
updating allreduce
Diffstat (limited to 'vowpalwabbit/allreduce.h')
-rw-r--r--vowpalwabbit/allreduce.h143
1 files changed, 142 insertions, 1 deletions
diff --git a/vowpalwabbit/allreduce.h b/vowpalwabbit/allreduce.h
index 3c06ba7c..2fd2aef9 100644
--- a/vowpalwabbit/allreduce.h
+++ b/vowpalwabbit/allreduce.h
@@ -22,9 +22,15 @@ typedef SOCKET socket_t;
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
+#include <stdlib.h>
+#include <stdio.h>
typedef int socket_t;
#endif
+using namespace std;
+
+const int ar_buf_size = 1<<16;
+
struct node_socks {
std::string current_master;
socket_t parent;
@@ -46,6 +52,141 @@ struct node_socks {
}
};
-void all_reduce(float* buffer, int n, std::string master_location, size_t unique_id, size_t total, size_t node, node_socks& socks);
+
+template <class T> void addbufs(T* buf1, const T* buf2, const int n) {
+ for(int i = 0;i < n;i++)
+// {
+// uint32_t first = *((uint32_t*)(buf1+i));
+// uint32_t second = *((uint32_t*)(buf2+i));
+// uint32_t xkindaor = first^second;
+// buf1[i] = *(float*)(&xkindaor);
+// }
+ buf1[i] += buf2[i];
+}
+
+void all_reduce_init(const string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks);
+
+template <class T> void pass_up(char* buffer, int left_read_pos, int right_read_pos, int& parent_sent_pos, socket_t parent_sock, int n) {
+ int my_bufsize = min(ar_buf_size, ((int)(floor(left_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
+ my_bufsize = min(my_bufsize, ((int)(floor(right_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
+ // if(n - parent_sent_pos == n % 4)
+ // my_bufsize = n - parent_sent_pos;
+
+ if(my_bufsize > 0) {
+ //going to pass up this chunk of data to the parent
+ int write_size = send(parent_sock, buffer+parent_sent_pos, my_bufsize, 0);
+ if(write_size < my_bufsize)
+ cerr<<"Write to parent failed "<<my_bufsize<<" "<<write_size<<" "<<parent_sent_pos<<" "<<left_read_pos<<" "<<right_read_pos<<endl ;
+ parent_sent_pos += my_bufsize;
+ //cerr<<"buf size = "<<my_bufsize<<" "<<parent_sent_pos<<" "<<n%4<<" "<<(n - parent_sent_pos)<<endl;
+ }
+
+}
+
+template <class T>void reduce(char* buffer, const int n, const socket_t parent_sock, const socket_t* child_sockets) {
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ if(child_sockets[0] != -1)
+ FD_SET(child_sockets[0],&fds);
+ if(child_sockets[1] != -1)
+ FD_SET(child_sockets[1],&fds);
+
+ socket_t max_fd = max(child_sockets[0],child_sockets[1])+1;
+ int child_read_pos[2] = {0,0}; //First unread float from left and right children
+ int child_unprocessed[2] = {0,0}; //The number of bytes sent by the child but not yet added to the buffer
+ char child_read_buf[2][ar_buf_size+sizeof(T)-1];
+ int parent_sent_pos = 0; //First unsent float to parent
+ //parent_sent_pos <= left_read_pos
+ //parent_sent_pos <= right_read_pos
+
+ if(child_sockets[0] == -1) {
+ child_read_pos[0] = n;
+ }
+ if(child_sockets[1] == -1) {
+ child_read_pos[1] = n;
+ }
+
+ while (parent_sent_pos < n || child_read_pos[0] < n || child_read_pos[1] < n)
+ {
+ if(parent_sock != -1)
+ pass_up<T>(buffer, child_read_pos[0], child_read_pos[1], parent_sent_pos, parent_sock, n);
+
+ if(parent_sent_pos >= n && child_read_pos[0] >= n && child_read_pos[1] >= n) break;
+
+ // cout<<"Before select parent_sent_pos = "<<parent_sent_pos<<" child_read_pos[0] = "<<child_read_pos[0]<<" max fd = "<<max_fd<<endl;
+
+ if(child_read_pos[0] < n || child_read_pos[1] < n) {
+ if (max_fd > 0 && select((int)max_fd,&fds,NULL, NULL, NULL) == -1)
+ {
+ cerr << "Select failed!" << endl;
+ perror(NULL);
+ throw exception();
+ }
+
+ for(int i = 0;i < 2;i++) {
+ if(child_sockets[i] != -1 && FD_ISSET(child_sockets[i],&fds)) {
+ //there is data to be left from left child
+ if(child_read_pos[i] == n) {
+ cerr<<"I think child has no data to send but he thinks he has "<<FD_ISSET(child_sockets[0],&fds)<<" "<<FD_ISSET(child_sockets[1],&fds)<<endl;
+ //fflush(stderr);
+ throw exception();
+ }
+
+
+ //float read_buf[ar_buf_size];
+ size_t count = min(ar_buf_size,n - child_read_pos[i]);
+ int read_size = recv(child_sockets[i], child_read_buf[i] + child_unprocessed[i], (int)count, 0);
+ if(read_size == -1) {
+ cerr <<" Read from child failed\n";
+ perror(NULL);
+ throw exception();
+ }
+
+ addbufs((T*)buffer + child_read_pos[i]/sizeof(T), (T*)child_read_buf[i], (child_read_pos[i] + read_size)/sizeof(T) - child_read_pos[i]/sizeof(T));
+
+ child_read_pos[i] += read_size;
+ int old_unprocessed = child_unprocessed[i];
+ child_unprocessed[i] = child_read_pos[i] % (int)sizeof(T);
+ //cout<<"Unprocessed "<<child_unprocessed[i]<<" "<<(old_unprocessed + read_size)%(int)sizeof(float)<<" ";
+ for(int j = 0;j < child_unprocessed[i];j++) {
+ // cout<<(child_read_pos[i]/(int)sizeof(float))*(int)sizeof(float)+j<<" ";
+ child_read_buf[i][j] = child_read_buf[i][((old_unprocessed + read_size)/(int)sizeof(T))*sizeof(T)+j];
+ }
+ //cout<<endl;
+
+ if(child_read_pos[i] == n) //Done reading parent
+ FD_CLR(child_sockets[i],&fds);
+ //cerr<<"Clearing socket "<<i<<" "<<FD_ISSET(child_sockets[i],&fds)<<endl;
+ //cerr<<"Child_unprocessed should be 0"<<child_unprocessed[i]<<endl;
+ //fflush(stderr);
+ }
+ else if(child_sockets[i] != -1 && child_read_pos[i] != n)
+ FD_SET(child_sockets[i],&fds);
+ }
+ }
+ if(parent_sock == -1 && child_read_pos[0] == n && child_read_pos[1] == n)
+ parent_sent_pos = n;
+
+ }
+
+}
+
+void broadcast(char* buffer, const int n, const socket_t parent_sock, const socket_t * child_sockets);
+
+
+template <class T> void all_reduce(T* buffer, const int n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks)
+{
+ //cerr<<"Allreduce\n";
+ if(master_location != socks.current_master)
+ all_reduce_init(master_location, unique_id, total, node, socks);
+ //cerr<<"In AR "<<socks.current_master<<" "<<socks.parent<<" "<<socks.children[0]<<" "<<socks.children[1]<<endl;
+ reduce<T>((char*)buffer, n*sizeof(T), socks.parent, socks.children);
+ broadcast((char*)buffer, n*sizeof(T), socks.parent, socks.children);
+}
+
+
+
+
#endif