Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlekh Agarwal <alekh@alekha-z420.corp.microsoft.com>2014-04-03 22:01:05 +0400
committerAlekh Agarwal <alekh@alekha-z420.corp.microsoft.com>2014-04-03 22:01:05 +0400
commit2759cbbb4c831d6d52b6a425901c76c8a14c103b (patch)
tree13bc2f5f3368ed9187f418ca9ffd28349ad5a08d /vowpalwabbit/allreduce.h
parente4795b6a661606109fcbc7daae157cadc295248b (diff)
.
Diffstat (limited to 'vowpalwabbit/allreduce.h')
-rw-r--r--vowpalwabbit/allreduce.h21
1 files changed, 0 insertions, 21 deletions
diff --git a/vowpalwabbit/allreduce.h b/vowpalwabbit/allreduce.h
index 2fd2aef9..413bceee 100644
--- a/vowpalwabbit/allreduce.h
+++ b/vowpalwabbit/allreduce.h
@@ -55,12 +55,6 @@ struct node_socks {
template <class T> void addbufs(T* buf1, const T* buf2, const int n) {
for(int i = 0;i < n;i++)
-// {
-// uint32_t first = *((uint32_t*)(buf1+i));
-// uint32_t second = *((uint32_t*)(buf2+i));
-// uint32_t xkindaor = first^second;
-// buf1[i] = *(float*)(&xkindaor);
-// }
buf1[i] += buf2[i];
}
@@ -69,8 +63,6 @@ void all_reduce_init(const string master_location, const size_t unique_id, const
template <class T> void pass_up(char* buffer, int left_read_pos, int right_read_pos, int& parent_sent_pos, socket_t parent_sock, int n) {
int my_bufsize = min(ar_buf_size, ((int)(floor(left_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
my_bufsize = min(my_bufsize, ((int)(floor(right_read_pos/((float)sizeof(T)))*sizeof(T)) - parent_sent_pos));
- // if(n - parent_sent_pos == n % 4)
- // my_bufsize = n - parent_sent_pos;
if(my_bufsize > 0) {
//going to pass up this chunk of data to the parent
@@ -78,7 +70,6 @@ template <class T> void pass_up(char* buffer, int left_read_pos, int right_read_
if(write_size < my_bufsize)
cerr<<"Write to parent failed "<<my_bufsize<<" "<<write_size<<" "<<parent_sent_pos<<" "<<left_read_pos<<" "<<right_read_pos<<endl ;
parent_sent_pos += my_bufsize;
- //cerr<<"buf size = "<<my_bufsize<<" "<<parent_sent_pos<<" "<<n%4<<" "<<(n - parent_sent_pos)<<endl;
}
}
@@ -114,8 +105,6 @@ template <class T>void reduce(char* buffer, const int n, const socket_t parent_s
if(parent_sent_pos >= n && child_read_pos[0] >= n && child_read_pos[1] >= n) break;
- // cout<<"Before select parent_sent_pos = "<<parent_sent_pos<<" child_read_pos[0] = "<<child_read_pos[0]<<" max fd = "<<max_fd<<endl;
-
if(child_read_pos[0] < n || child_read_pos[1] < n) {
if (max_fd > 0 && select((int)max_fd,&fds,NULL, NULL, NULL) == -1)
{
@@ -129,12 +118,10 @@ template <class T>void reduce(char* buffer, const int n, const socket_t parent_s
//there is data to be left from left child
if(child_read_pos[i] == n) {
cerr<<"I think child has no data to send but he thinks he has "<<FD_ISSET(child_sockets[0],&fds)<<" "<<FD_ISSET(child_sockets[1],&fds)<<endl;
- //fflush(stderr);
throw exception();
}
- //float read_buf[ar_buf_size];
size_t count = min(ar_buf_size,n - child_read_pos[i]);
int read_size = recv(child_sockets[i], child_read_buf[i] + child_unprocessed[i], (int)count, 0);
if(read_size == -1) {
@@ -148,18 +135,12 @@ template <class T>void reduce(char* buffer, const int n, const socket_t parent_s
child_read_pos[i] += read_size;
int old_unprocessed = child_unprocessed[i];
child_unprocessed[i] = child_read_pos[i] % (int)sizeof(T);
- //cout<<"Unprocessed "<<child_unprocessed[i]<<" "<<(old_unprocessed + read_size)%(int)sizeof(float)<<" ";
for(int j = 0;j < child_unprocessed[i];j++) {
- // cout<<(child_read_pos[i]/(int)sizeof(float))*(int)sizeof(float)+j<<" ";
child_read_buf[i][j] = child_read_buf[i][((old_unprocessed + read_size)/(int)sizeof(T))*sizeof(T)+j];
}
- //cout<<endl;
if(child_read_pos[i] == n) //Done reading parent
FD_CLR(child_sockets[i],&fds);
- //cerr<<"Clearing socket "<<i<<" "<<FD_ISSET(child_sockets[i],&fds)<<endl;
- //cerr<<"Child_unprocessed should be 0"<<child_unprocessed[i]<<endl;
- //fflush(stderr);
}
else if(child_sockets[i] != -1 && child_read_pos[i] != n)
FD_SET(child_sockets[i],&fds);
@@ -177,10 +158,8 @@ void broadcast(char* buffer, const int n, const socket_t parent_sock, const sock
template <class T> void all_reduce(T* buffer, const int n, const std::string master_location, const size_t unique_id, const size_t total, const size_t node, node_socks& socks)
{
- //cerr<<"Allreduce\n";
if(master_location != socks.current_master)
all_reduce_init(master_location, unique_id, total, node, socks);
- //cerr<<"In AR "<<socks.current_master<<" "<<socks.parent<<" "<<socks.children[0]<<" "<<socks.children[1]<<endl;
reduce<T>((char*)buffer, n*sizeof(T), socks.parent, socks.children);
broadcast((char*)buffer, n*sizeof(T), socks.parent, socks.children);
}