diff options
author | John Langford <jl@hunch.net> | 2013-01-21 19:47:03 +0400 |
---|---|---|
committer | John Langford <jl@hunch.net> | 2013-01-21 19:47:03 +0400 |
commit | f8d453e6eec1067e411271f2c208f001ec32237e (patch) | |
tree | 757ff6bfe1d1b928d3c035c6e794a6577d47b047 /vowpalwabbit/sender.cc | |
parent | 85a5725045ca861feae61ab65b7a4593f6f81615 (diff) |
first compiling version
Diffstat (limited to 'vowpalwabbit/sender.cc')
-rw-r--r-- | vowpalwabbit/sender.cc | 61 |
1 files changed, 35 insertions, 26 deletions
diff --git a/vowpalwabbit/sender.cc b/vowpalwabbit/sender.cc index ea7243d3..95d20ff9 100644 --- a/vowpalwabbit/sender.cc +++ b/vowpalwabbit/sender.cc @@ -24,25 +24,17 @@ using namespace std; namespace SENDER { -//nonreentrant -io_buf* buf; + struct sender { + io_buf* buf; + + int sd; + }; -int sd = -1; - -void open_sockets(string host) -{ - sd = open_socket(host.c_str()); - buf = new io_buf(); - buf->files.push_back(sd); -} - -void parse_send_args(po::variables_map& vm, vector<string> pairs) + void open_sockets(sender& s, string host) { - if (vm.count("sendto")) - { - vector<string> hosts = vm["sendto"].as< vector<string> >(); - open_sockets(hosts[0]); - } + s.sd = open_socket(host.c_str()); + s.buf = new io_buf(); + s.buf->files.push_back(s.sd); } void send_features(io_buf *b, example* ec) @@ -58,11 +50,12 @@ void send_features(io_buf *b, example* ec) b->flush(); } -void save_load(void* in, io_buf& model_file, bool read, bool text) {} + void save_load(void* in, void* d, io_buf& model_file, bool read, bool text) {} -void drive_send(void* in) + void drive_send(void* in, void* d) { vw* all = (vw*)in; + sender* s = (sender*)d; example* ec = NULL; v_array<char> null_tag; null_tag.erase(); @@ -77,7 +70,7 @@ void drive_send(void* in) if (received_index + all->p->ring_size == sent_index || (parser_finished & (received_index != sent_index))) { float res, weight; - get_prediction(sd,res,weight); + get_prediction(s->sd,res,weight); ec=delay_ring[received_index++ % all->p->ring_size]; label_data* ld = (label_data*)ec->ld; @@ -92,9 +85,9 @@ void drive_send(void* in) { label_data* ld = (label_data*)ec->ld; all->set_minmax(all->sd, ld->label); - simple_label.cache_label(ld, *buf);//send label information. - cache_tag(*buf, ec->tag); - send_features(buf,ec); + simple_label.cache_label(ld, *s->buf);//send label information. + cache_tag(*s->buf, ec->tag); + send_features(s->buf,ec); delay_ring[sent_index++ % all->p->ring_size] = ec; } else if (parser_done(all->p)) @@ -102,9 +95,9 @@ void drive_send(void* in) parser_finished = true; if (received_index == sent_index) { - shutdown(buf->files[0],SHUT_WR); - buf->files.delete_v(); - buf->space.delete_v(); + shutdown(s->buf->files[0],SHUT_WR); + s->buf->files.delete_v(); + s->buf->space.delete_v(); free(delay_ring); return; } @@ -114,5 +107,21 @@ void drive_send(void* in) } return; } + void learn(void*in, void* d, example*ec) { cout << "sender can't be used under reduction" << endl; } + void finish(void*in, void* d) { cout << "sender can't be used under reduction" << endl; } + + void parse_send_args(vw& all, po::variables_map& vm, vector<string> pairs) +{ + sender* s = (sender*)calloc(1,sizeof(sender)); + s->sd = -1; + if (vm.count("sendto")) + { + vector<string> hosts = vm["sendto"].as< vector<string> >(); + open_sockets(*s, hosts[0]); + } + + learner ret = {s,drive_send,learn,finish,save_load}; + all.l = ret; +} } |