Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2009-09-17 00:49:31 +0400
committerJohn Langford <jl@hunch.net>2009-09-17 00:49:31 +0400
commit0bda878246f522418bdf71902fa45c8efab9347e (patch)
tree2c31f1b91825cb7e4d12d584ed799e8d10a41659 /multisource.cc
parent9ffe9acd7d46e12d83d231c9842a79a86d1a82ea (diff)
Still grinding.
Diffstat (limited to 'multisource.cc')
-rw-r--r--multisource.cc96
1 files changed, 55 insertions, 41 deletions
diff --git a/multisource.cc b/multisource.cc
index ff05e4d2..97ebfd8b 100644
--- a/multisource.cc
+++ b/multisource.cc
@@ -1,10 +1,20 @@
#include "multisource.h"
#include "simple_label.h"
-void get_prediction(int sock, prediction &p)
+#include <sys/types.h>
+#include <sys/socket.h>
+
+
+bool get_prediction(int sock, prediction &p)
{
- if (read(sock, &p, sizeof(prediction)) < (int)sizeof(prediction))
- cerr << "argh! bad read!" << endl;
+ return (recv(sock, &p, sizeof(p), MSG_DONTWAIT) == sizeof(p));
+}
+
+void send_prediction(int sock, prediction pred)
+{
+ if (write(sock,&pred,sizeof(prediction)) < (int)sizeof(prediction))
+ cerr << "argh! bad write!" << endl;
+ fsync(sock);
}
void reset(partial_example &ex)
@@ -13,53 +23,57 @@ void reset(partial_example &ex)
ex.features.erase();
}
-int recieve_features(parser* p, void* ex)
+int receive_features(parser* p, void* ex)
{
example* ae = (example*)ex;
+ fd_set fds;
+ FD_ZERO(&fds);
+ for (int* sock= p->input.files.begin; sock != p->input.files.end; sock++)
+ FD_SET(*sock,&fds);
+
while (true)
{
- fd_set fds;
- FD_ZERO(&fds);
- for (int* sock= p->input.files.begin; sock != p->input.files.end; sock++)
- FD_SET(*sock,&fds);
-
if (select(p->max_fd,&fds,NULL, NULL, NULL) == -1)
{
cerr << "Select failed!" << endl;
exit (1);
}
- for (int* sock = p->input.files.begin; sock != p->input.files.end; sock++)
- if (FD_ISSET(*sock, &fds))
- {//there is a feature or label to read
- prediction pre;
- get_prediction(*sock, pre);
- size_t index = pre.example_number % p->pes.index();
- if (p->pes[index].example_number != pre.example_number)
- if (p->pes[index].counter == 0)
- p->pes[index].example_number = pre.example_number;
- else
- cerr << "Error, two examples map to the same index" << endl;
- if (*sock != p->label_sock) // Not the label source
- {
- feature f = {pre.p, *sock };
- push(p->pes[index].features,f);
- }
- else // The label source
- {
- p->pes[index].ld.weight = pre.p;
- get_prediction(*sock, pre);
- p->pes[index].ld.label = pre.p;
- }
- if( p->pes[index].counter == p->input.files.index() )
- {
- push( ae->indices, multindex );
- push_many( ae->atomics[multindex], p->pes[index].features.begin, p->pes[index].features.index() );
- label_data* ld = (label_data*)ae->ld;
- *ld = p->pes[index].ld;
- reset(p->pes[index]);
- return ae->atomics[multindex].index();
- }
- }
+ for (size_t index = 0; index <= p->input.files.index(); index++)
+ {
+ int sock = p->input.files[index];
+ if (FD_ISSET(sock, &fds))
+ {//there is a feature or label to read
+ prediction pre;
+ get_prediction(sock, pre);
+ size_t index = pre.example_number % p->pes.index();
+ if (p->pes[index].example_number != pre.example_number)
+ if (p->pes[index].counter == 0)
+ p->pes[index].example_number = pre.example_number;
+ else
+ cerr << "Error, two examples map to the same index" << endl;
+ feature f = {pre.p, p->ids[index]};
+ push(p->pes[index].features,f);
+ if (sock == p->label_sock) // The label source
+ {
+ label_data ld;
+ size_t len = sizeof(ld.label)+sizeof(ld.weight);
+ char c[len];
+ read(sock,c,len);
+ bufread_simple_label(&(p->pes[index].ld), c);
+ }
+ if( p->pes[index].counter == p->input.files.index() )
+ {
+ push( ae->indices, multindex );
+ push_many( ae->atomics[multindex], p->pes[index].features.begin, p->pes[index].features.index() );
+ label_data* ld = (label_data*)ae->ld;
+ *ld = p->pes[index].ld;
+ reset(p->pes[index]);
+ return ae->atomics[multindex].index();
+ }
+ }
+ else
+ FD_SET(sock,&fds);
+ }
}
}