Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2009-12-05 05:02:00 +0300
committerJohn Langford <jl@hunch.net>2009-12-05 05:02:00 +0300
commit7311737885b644c31384ad814968b5a8f728a7b2 (patch)
tree08b7b4cd0599bbbee4a4bbdb3d241c264b9f99c6 /multisource.cc
parent3bf2923ed8e786679f3c9ea22cf1df8c3f8e6ee5 (diff)
Optimized network speed via allowing Nagle and fixed bugs revealed.
Diffstat (limited to 'multisource.cc')
-rw-r--r--multisource.cc40
1 files changed, 33 insertions, 7 deletions
diff --git a/multisource.cc b/multisource.cc
index 2cfbc1dc..6db54767 100644
--- a/multisource.cc
+++ b/multisource.cc
@@ -6,22 +6,46 @@
#include <errno.h>
#include <stdio.h>
+int really_read(int sock, void* in, size_t count)
+{
+ char* buf = (char*)in;
+ size_t done = 0;
+ int r= 0;
+ while (done < count)
+ {
+ if ((r = read(sock,buf,count-done)) == 0)
+ return 0;
+ else
+ if (r < 0)
+ {
+ cerr << "argh! bad read! " << endl;
+ perror(NULL);
+ exit(0);
+ }
+ else
+ {
+ done += r;
+ buf += r;
+ }
+ }
+ return done;
+}
+
bool blocking_get_prediction(int sock, prediction &p)
{
- int count = read(sock, &p, sizeof(p));
+ int count = really_read(sock, &p, sizeof(p));
bool ret = (count == sizeof(p));
return ret;
}
-void send_prediction(int sock, prediction pred)
+void send_prediction(int sock, prediction p)
{
- if (write(sock,&pred,sizeof(prediction)) < (int)sizeof(prediction))
+ if (write(sock, &p, sizeof(p)) < (int)sizeof(p))
{
cerr << "argh! bad write! " << endl;
perror(NULL);
exit(0);
}
- fsync(sock);
}
void reset(partial_example &ex)
@@ -72,8 +96,9 @@ int receive_features(parser* p, void* ex)
else
{
if (pre.example_number != ++ (p->counts[index]))
- cout << "count is off! " << pre.example_number << " != " << p->counts[index] << endl;
- if (pre.example_number == p->finished_count + ring_size -1)
+ cout << "count is off! " << pre.example_number << " != " << p->counts[index] <<
+ " for source " << index << " prediction = " << pre.p << endl;
+ if (pre.example_number == p->finished_count + ring_size - 1)
FD_CLR(sock,&fds);//this ones to far ahead, let the buffer fill for awhile.
size_t ring_index = pre.example_number % p->pes.index();
if (p->pes[ring_index].features.index() == 0)
@@ -87,9 +112,10 @@ int receive_features(parser* p, void* ex)
label_data ld;
size_t len = sizeof(ld.label)+sizeof(ld.weight);
char c[len];
- read(sock,c,len);
+ really_read(sock,c,len);
bufread_simple_label(&(p->pes[ring_index].ld), c);
}
+
if( p->pes[ring_index].features.index() == p->input.count )
{
push( ae->indices, multindex );