Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2009-09-20 03:17:28 +0400
committerJohn Langford <jl@hunch.net>2009-09-20 03:17:28 +0400
commitf5e75687c8d6debdbdc38ba5cc023ecb209bd9af (patch)
tree84962846432895610bd575360634aa3663b46d37 /sender.cc
parent0bda878246f522418bdf71902fa45c8efab9347e (diff)
first compiling (but not debugged) cluster version.
Diffstat (limited to 'sender.cc')
-rw-r--r--sender.cc80
1 files changed, 46 insertions, 34 deletions
diff --git a/sender.cc b/sender.cc
index 925b7865..22273b1d 100644
--- a/sender.cc
+++ b/sender.cc
@@ -6,6 +6,9 @@
#include "cache.h"
#include "vw.h"
#include "simple_label.h"
+#include "network.h"
+#include "multisource.h"
+#include "train_ring.h"
pthread_t* thread;
size_t d_1;
@@ -60,31 +63,14 @@ void open_sockets(vector<string>& hosts)
{
v_array<io_buf> t;
push(bufs,t);
+ int new_id = global.unique_id;
for (size_t j = 0; j< d_2; j++)
{
size_t number = j + d_2*i;
- hostent* he = gethostbyname(hosts[number].c_str());
- if (he == NULL)
- {
- cerr << "can't resolve hostname: " << hosts[number] << endl;
- exit(1);
- }
- int sd = socket(PF_INET, SOCK_STREAM, 0);
- if (sd == -1)
- {
- cerr << "can't get socket " << endl;
- exit(1);
- }
- sockaddr_in far_end;
- far_end.sin_family = AF_INET;
- far_end.sin_port = htons(39523);
- far_end.sin_addr = *(in_addr*)(he->h_addr);
- memset(&far_end.sin_zero, '\0',8);
- if (connect(sd,(sockaddr*)&far_end, sizeof(far_end)) == -1)
- {
- cerr << "can't connect to: " << hosts[number] << endl;
- exit(1);
- }
+ int sd = open_socket(hosts[number], new_id);
+ if (new_id == 0)
+ global.local_prediction = sd;
+ new_id++;
io_buf b;
push(b.files, sd);
push(bufs[i], b);
@@ -123,22 +109,48 @@ void send_features(int i, int j, io_buf& b, example* ec)
b.flush();
}
+bool check_mesg(int sock)
+{
+ while (true)
+ {
+ prediction ps;
+ if (get_prediction(sock,ps))
+ {
+ example* ec = get_train_example(0);
+ ec->final_prediction = ps.p;
+ finish_example(ec);
+ }
+ else
+ return false;
+ }
+}
+
void* send_thread(void*)
{
example* ec = NULL;
- while ( (ec = get_example(ec,0)) )
- {
- label_data* ld = (label_data*)ec->ld;
-
- for (size_t i = 0; i < d_1; i++)
- for (size_t j = 0; j < d_2; j++)
- {
- simple_label.cache_label(ld,bufs[i][j]);//send label information.
- send_features(i,j,bufs[i][j],ec);
- }
- ec->threads_to_finish = 1;
- ec->done = true;
+ while ( true )
+ {//this is a poor man's select operation.
+ if (check_mesg(global.local_prediction))//nonblocking
+ ;
+ else if ((ec = get_example(0)) != NULL)//blocking operation.
+ {
+ label_data* ld = (label_data*)ec->ld;
+
+ for (size_t i = 0; i < d_1; i++)
+ for (size_t j = 0; j < d_2; j++)
+ {
+ simple_label.cache_label(ld,bufs[i][j]);//send label information.
+ send_features(i,j,bufs[i][j],ec);
+ }
+ insert_example(ec);
+ ec->threads_to_finish = 1;
+ ec->done = true;
+ }
+ else if (thread_done(0))
+ return NULL;
+ else
+ ;//busywait when we have sent out all examples but not yet received predictions for all.
}
return NULL;