Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2013-01-21 19:47:03 +0400
committerJohn Langford <jl@hunch.net>2013-01-21 19:47:03 +0400
commitf8d453e6eec1067e411271f2c208f001ec32237e (patch)
tree757ff6bfe1d1b928d3c035c6e794a6577d47b047 /vowpalwabbit/sender.cc
parent85a5725045ca861feae61ab65b7a4593f6f81615 (diff)
first compiling version
Diffstat (limited to 'vowpalwabbit/sender.cc')
-rw-r--r--vowpalwabbit/sender.cc61
1 files changed, 35 insertions, 26 deletions
diff --git a/vowpalwabbit/sender.cc b/vowpalwabbit/sender.cc
index ea7243d3..95d20ff9 100644
--- a/vowpalwabbit/sender.cc
+++ b/vowpalwabbit/sender.cc
@@ -24,25 +24,17 @@
using namespace std;
namespace SENDER {
-//nonreentrant
-io_buf* buf;
+ struct sender {
+ io_buf* buf;
+
+ int sd;
+ };
-int sd = -1;
-
-void open_sockets(string host)
-{
- sd = open_socket(host.c_str());
- buf = new io_buf();
- buf->files.push_back(sd);
-}
-
-void parse_send_args(po::variables_map& vm, vector<string> pairs)
+ void open_sockets(sender& s, string host)
{
- if (vm.count("sendto"))
- {
- vector<string> hosts = vm["sendto"].as< vector<string> >();
- open_sockets(hosts[0]);
- }
+ s.sd = open_socket(host.c_str());
+ s.buf = new io_buf();
+ s.buf->files.push_back(s.sd);
}
void send_features(io_buf *b, example* ec)
@@ -58,11 +50,12 @@ void send_features(io_buf *b, example* ec)
b->flush();
}
-void save_load(void* in, io_buf& model_file, bool read, bool text) {}
+ void save_load(void* in, void* d, io_buf& model_file, bool read, bool text) {}
-void drive_send(void* in)
+ void drive_send(void* in, void* d)
{
vw* all = (vw*)in;
+ sender* s = (sender*)d;
example* ec = NULL;
v_array<char> null_tag;
null_tag.erase();
@@ -77,7 +70,7 @@ void drive_send(void* in)
if (received_index + all->p->ring_size == sent_index || (parser_finished & (received_index != sent_index)))
{
float res, weight;
- get_prediction(sd,res,weight);
+ get_prediction(s->sd,res,weight);
ec=delay_ring[received_index++ % all->p->ring_size];
label_data* ld = (label_data*)ec->ld;
@@ -92,9 +85,9 @@ void drive_send(void* in)
{
label_data* ld = (label_data*)ec->ld;
all->set_minmax(all->sd, ld->label);
- simple_label.cache_label(ld, *buf);//send label information.
- cache_tag(*buf, ec->tag);
- send_features(buf,ec);
+ simple_label.cache_label(ld, *s->buf);//send label information.
+ cache_tag(*s->buf, ec->tag);
+ send_features(s->buf,ec);
delay_ring[sent_index++ % all->p->ring_size] = ec;
}
else if (parser_done(all->p))
@@ -102,9 +95,9 @@ void drive_send(void* in)
parser_finished = true;
if (received_index == sent_index)
{
- shutdown(buf->files[0],SHUT_WR);
- buf->files.delete_v();
- buf->space.delete_v();
+ shutdown(s->buf->files[0],SHUT_WR);
+ s->buf->files.delete_v();
+ s->buf->space.delete_v();
free(delay_ring);
return;
}
@@ -114,5 +107,21 @@ void drive_send(void* in)
}
return;
}
+ void learn(void*in, void* d, example*ec) { cout << "sender can't be used under reduction" << endl; }
+ void finish(void*in, void* d) { cout << "sender can't be used under reduction" << endl; }
+
+ void parse_send_args(vw& all, po::variables_map& vm, vector<string> pairs)
+{
+ sender* s = (sender*)calloc(1,sizeof(sender));
+ s->sd = -1;
+ if (vm.count("sendto"))
+ {
+ vector<string> hosts = vm["sendto"].as< vector<string> >();
+ open_sockets(*s, hosts[0]);
+ }
+
+ learner ret = {s,drive_send,learn,finish,save_load};
+ all.l = ret;
+}
}