Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian Muller <brian.muller@livingsocial.com>2012-01-03 23:56:51 +0400
committerBrian Muller <brian.muller@livingsocial.com>2012-01-03 23:56:51 +0400
commit9f457bf70dcab12082ec07cc6018e3f1a75899a2 (patch)
treea30f2028788bf291a71200ff5644a3c72a842f27 /vowpalwabbit/sender.cc
parent5c267f9404ffbb733c88b759605386aad51e6baf (diff)
now creating a linkable library
Diffstat (limited to 'vowpalwabbit/sender.cc')
-rw-r--r--vowpalwabbit/sender.cc101
1 files changed, 101 insertions, 0 deletions
diff --git a/vowpalwabbit/sender.cc b/vowpalwabbit/sender.cc
new file mode 100644
index 00000000..f346f35b
--- /dev/null
+++ b/vowpalwabbit/sender.cc
@@ -0,0 +1,101 @@
+#include <pthread.h>
+#include <vector>
+#include <netdb.h>
+#include "io.h"
+#include "parse_args.h"
+#include "cache.h"
+#include "vw.h"
+#include "simple_label.h"
+#include "network.h"
+
+using namespace std;
+io_buf* buf;
+
+int sd = -1;
+
+void open_sockets(string host)
+{
+ sd = open_socket(host.c_str());
+ buf = new io_buf();
+ push(buf->files,sd);
+}
+
+void parse_send_args(po::variables_map& vm, vector<string> pairs)
+{
+ if (vm.count("sendto"))
+ {
+ vector<string> hosts = vm["sendto"].as< vector<string> >();
+ open_sockets(hosts[0]);
+ }
+}
+
+void send_features(io_buf *b, example* ec)
+{
+ // note: subtracting 1 b/c not sending constant
+ output_byte(*b,ec->indices.index()-1);
+
+ for (size_t* i = ec->indices.begin; i != ec->indices.end; i++) {
+ if (*i == constant_namespace)
+ continue;
+ output_features(*b, *i, ec->atomics[*i].begin, ec->atomics[*i].end);
+ }
+ b->flush();
+}
+
+void setup_send()
+{
+ example* ec = NULL;
+ v_array<char> null_tag;
+ null_tag.erase();
+
+ example** delay_ring = (example**) calloc(global.ring_size, sizeof(example*));
+ size_t sent_index =0;
+ size_t received_index=0;
+
+ bool parser_finished = false;
+ while ( true )
+ {//this is a poor man's select operation.
+ if (received_index + global.ring_size == sent_index || (parser_finished & (received_index != sent_index)))
+ {
+ float res, weight;
+ get_prediction(sd,res,weight);
+
+ ec=delay_ring[received_index++ % global.ring_size];
+ label_data* ld = (label_data*)ec->ld;
+
+ ec->final_prediction = res;
+
+ ec->loss = global.loss->getLoss(ec->final_prediction, ld->label) * ld->weight;
+
+ finish_example(ec);
+ }
+ else if ((ec = get_example()) != NULL)//semiblocking operation.
+ {
+ label_data* ld = (label_data*)ec->ld;
+ set_minmax(ld->label);
+ simple_label.cache_label(ld, *buf);//send label information.
+ cache_tag(*buf, ec->tag);
+ send_features(buf,ec);
+ delay_ring[sent_index++ % global.ring_size] = ec;
+ }
+ else if (parser_done())
+ { //close our outputs to signal finishing.
+ parser_finished = true;
+ if (received_index == sent_index)
+ {
+ shutdown(buf->files[0],SHUT_WR);
+ free(buf->files.begin);
+ free(buf->space.begin);
+ free(delay_ring);
+ return;
+ }
+ }
+ else
+ ;
+ }
+ return;
+}
+
+void destroy_send()
+{
+}