Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2009-10-18 22:09:20 +0400
committerJohn Langford <jl@hunch.net>2009-10-18 22:09:20 +0400
commit427c583435bf2d7fbf47fa059fa36f6ac5cb5dee (patch)
tree013ad87cf752d82e0576fe424ec9e2e79e2b6cd4 /multisource.cc
parentb4b8454dbc787770dd28c6edea30c06648e12810 (diff)
Various bugfixes for cluster support
Diffstat (limited to 'multisource.cc')
-rw-r--r--multisource.cc25
1 files changed, 13 insertions, 12 deletions
diff --git a/multisource.cc b/multisource.cc
index fdd23cb4..83ccb3dd 100644
--- a/multisource.cc
+++ b/multisource.cc
@@ -5,12 +5,6 @@
#include <sys/socket.h>
#include <errno.h>
-bool get_prediction(int sock, prediction &p)
-{
- bool ret = (recv(sock, &p, sizeof(p), MSG_DONTWAIT) == sizeof(p));
- return ret;
-}
-
bool blocking_get_prediction(int sock, prediction &p)
{
int count = read(sock, &p, sizeof(p));
@@ -34,8 +28,6 @@ void reset(partial_example &ex)
ex.features.erase();
}
-int receive_count = 0;
-
int receive_features(parser* p, void* ex)
{
example* ae = (example*)ex;
@@ -64,20 +56,28 @@ int receive_features(parser* p, void* ex)
close(sock);
memmove(p->input.files.begin+index,
p->input.files.begin+index+1,
- p->input.files.index() - index-1);
+ (p->input.files.index() - index-1)*sizeof(int));
+ p->input.files.pop();
memmove(p->ids.begin+index,
p->ids.begin+index+1,
- p->ids.index() - index-1);
- p->input.files.pop();
+ (p->ids.index() - index-1)*sizeof(size_t));
p->ids.pop();
+ memmove(p->counts.begin+index,
+ p->counts.begin+index+1,
+ (p->counts.index() - index-1)*sizeof(size_t));
+ p->counts.pop();
index--;
}
else
{
+ if (pre.example_number != ++ (p->counts[index]))
+ cout << "count is off! " << pre.example_number << " != " << p->counts[index] << endl;
+ if (pre.example_number == p->finished_count + ring_size -1)
+ FD_CLR(sock,&fds);//this ones to far ahead, let the buffer fill for awhile.
size_t ring_index = pre.example_number % p->pes.index();
if (p->pes[ring_index].features.index() == 0)
p->pes[ring_index].example_number = pre.example_number;
- if (p->pes[ring_index].example_number != pre.example_number)
+ if (p->pes[ring_index].example_number != (int)pre.example_number)
cerr << "Error, example " << p->pes[ring_index].example_number << " != " << pre.example_number << endl;
feature f = {pre.p, p->ids[index]};
push(p->pes[ring_index].features, f);
@@ -96,6 +96,7 @@ int receive_features(parser* p, void* ex)
label_data* ld = (label_data*)ae->ld;
*ld = p->pes[ring_index].ld;
reset(p->pes[ring_index]);
+ p->finished_count++;
return ae->atomics[multindex].index();
}
}