Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJake Hofman <jhofman+github@gmail.com>2011-08-03 23:39:34 +0400
committerJohn <jl@hunch.net>2011-08-04 04:08:12 +0400
commitfc38a0610a06838555059fc68198eb794ebc55c6 (patch)
tree28730d926a80131c6937615d5e2b2e15b6cd649e /parser.cc
parentf7c683ee47ed080521fd0a4c90169debb3df76cd (diff)
persistent daemon should be working ...
Diffstat (limited to 'parser.cc')
-rw-r--r--parser.cc155
1 files changed, 122 insertions, 33 deletions
diff --git a/parser.cc b/parser.cc
index 0ee53e3f..b10f3bb1 100644
--- a/parser.cc
+++ b/parser.cc
@@ -4,6 +4,10 @@ embodied in the content of this file are licensed under the BSD
(revised) open source license
*/
+#include <sys/types.h>
+#include <sys/mman.h>
+#include <sys/wait.h>
+
#include <netdb.h>
#include <boost/program_options.hpp>
#include <netinet/tcp.h>
@@ -21,6 +25,16 @@ namespace po = boost::program_options;
#include "unique_sort.h"
#include "constant.h"
+example* examples;//A Ring of examples.
+pthread_mutex_t examples_lock = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t example_available = PTHREAD_COND_INITIALIZER;
+pthread_cond_t example_unused = PTHREAD_COND_INITIALIZER;
+unsigned long long parsed_index; // The index of the parsed example.
+unsigned long long* used_index; // The index of the example currently used by thread i.
+bool done=false;
+v_array<size_t> random_nos;
+v_array<size_t> gram_mask;
+
parser* new_parser(const label_parser* lp)
{
parser* ret = (parser*) calloc(1,sizeof(parser));
@@ -88,16 +102,50 @@ void reset_source(size_t numbits, parser* p)
input->open_file(p->output->finalname.begin,io_buf::READ); //pushing is merged into open_file
p->reader = read_cached_features;
}
- if ( p->resettable == true ){
- for (size_t i = 0; i < input->files.index();i++)
+ if ( p->resettable == true )
+ if (global.persistent)
{
- input->reset_file(input->files[i]);
- if (cache_numbits(input, input->files[i]) < numbits) {
- cerr << "argh, a bug in caching of some sort! Exiting\n" ;
- exit(1);
- }
+ // wait for all predictions to be sent back to client
+ pthread_mutex_lock(&output_lock);
+ while (global.example_number != parsed_index)
+ pthread_cond_wait(&output_done, &output_lock);
+ pthread_mutex_unlock(&output_lock);
+
+ // close socket, erase final prediction sink and socket
+ close(p->input->files[0]);
+ global.final_prediction_sink.erase();
+ p->input->files.erase();
+
+ sockaddr_in client_address;
+ socklen_t size = sizeof(client_address);
+ int f = accept(p->bound_sock,(sockaddr*)&client_address,&size);
+ if (f < 0)
+ {
+ cerr << "bad client socket!" << endl;
+ exit (1);
+ }
+
+ size_t id;
+ really_read(f, &id, sizeof(id));
+ if (id != 0) {
+ cerr << "id must be 0 (multisource not supported)" << endl;
+ exit(1);
+ }
+
+ int_pair pf = {f,id};
+ push(global.final_prediction_sink,pf);
+ push(p->input->files,f);
}
- }
+ else {
+ for (size_t i = 0; i < input->files.index();i++)
+ {
+ input->reset_file(input->files[i]);
+ if (cache_numbits(input, input->files[i]) < numbits) {
+ cerr << "argh, a bug in caching of some sort! Exiting\n" ;
+ exit(1);
+ }
+ }
+ }
}
void finalize_source(parser* p)
@@ -202,43 +250,91 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
if(vm.count("hash"))
hash_function = vm["hash"].as<string>();
- if (vm.count("daemon") || vm.count("multisource"))
+ if (vm.count("daemon") || vm.count("multisource") || global.persistent)
{
- int sock = socket(PF_INET, SOCK_STREAM, 0);
- if (sock < 0) {
+ par->bound_sock = socket(PF_INET, SOCK_STREAM, 0);
+ if (par->bound_sock < 0) {
cerr << "can't open socket!" << endl;
exit(1);
}
int on = 1;
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0)
+ if (setsockopt(par->bound_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0)
perror("setsockopt SO_REUSEADDR");
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
- short unsigned int port = 39524;
+ short unsigned int port = 26542;
if (vm.count("port"))
port = vm["port"].as<size_t>();
address.sin_port = htons(port);
-
- if (bind(sock,(sockaddr*)&address, sizeof(address)) < 0)
+
+ // attempt to bind to socket
+ if (bind(par->bound_sock,(sockaddr*)&address, sizeof(address)) < 0)
{
cerr << "failure to bind!" << endl;
exit(1);
}
+ int source_count = 1;
+
+ if (vm.count("multisource"))
+ source_count = vm["multisource"].as<size_t>();
+
+ // listen on socket
+ listen(par->bound_sock, source_count);
+
+ // background process
if (daemon(1,1))
{
cerr << "failure to background!" << endl;
exit(1);
}
- int source_count = 1;
-
- if (vm.count("multisource"))
- source_count = vm["multisource"].as<size_t>();
- listen(sock, source_count);
+ if (global.persistent)
+ {
+ // weights will be shared across processes, accessible to children
+ float* shared_weights =
+ (float*)mmap(0,global.stride * global.length() * sizeof(float),
+ PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
+
+ size_t floats_per_thread = global.stride * global.length() / global.num_threads();
+ for (size_t i = 0; i < global.num_threads(); i++)
+ {
+ weight* dest = shared_weights + i*floats_per_thread;
+ memcpy(dest, global.reg->weight_vectors[i], floats_per_thread*sizeof(float));
+ free(global.reg->weight_vectors[i]);
+ global.reg->weight_vectors[i] = dest;
+ }
+
+ // create children
+ size_t num_children = 10;
+ int children[num_children];
+ for (size_t i = 0; i < num_children; i++)
+ {
+ // fork() returns pid if parent, 0 if child
+ // store fork value and run child process if child
+ if ((children[i] = fork()) == 0)
+ goto child;
+ }
+
+ while (true)
+ {
+ // wait for child to change state; if finished, then respawn
+ siginfo_t sig;
+ waitid(P_ALL,0,&sig,WEXITED);
+ for (size_t i = 0; i < num_children; i++)
+ if (sig.si_pid == children[i])
+ {
+ if ((children[i]=fork()) == 0)
+ goto child;
+ break;
+ }
+ }
+
+ }
+ child:
sockaddr_in client_address;
socklen_t size = sizeof(client_address);
par->max_fd = 0;
@@ -247,7 +343,7 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
{
if (!global.quiet)
cerr << "calling accept" << endl;
- int f = accept(sock,(sockaddr*)&client_address,&size);
+ int f = accept(par->bound_sock,(sockaddr*)&client_address,&size);
if (f < 0)
{
cerr << "bad client socket!" << endl;
@@ -350,6 +446,10 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
par->resettable = par->write_cache;
}
}
+
+ // allow reset source if we have a cache or if in persistent mode
+ par->resettable = par->resettable || global.persistent;
+
if (passes > 1 && !par->resettable)
{
cerr << global.program_name << ": need a cache file for multiple passes: try using --cache_file" << endl;
@@ -360,16 +460,6 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
cerr << "num sources = " << par->input->files.index() << endl;
}
-example* examples;//A Ring of examples.
-pthread_mutex_t examples_lock = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t example_available = PTHREAD_COND_INITIALIZER;
-pthread_cond_t example_unused = PTHREAD_COND_INITIALIZER;
-size_t parsed_index; // The index of the parsed example.
-size_t* used_index; // The index of the example currently used by thread i.
-bool done=false;
-v_array<size_t> random_nos;
-v_array<size_t> gram_mask;
-
bool parser_done()
{
if (done)
@@ -562,7 +652,6 @@ void setup_example(parser* p, example* ae)
}
//add constant feature
- size_t constant_namespace = 128;
push(ae->indices,constant_namespace);
feature temp = {1,constant & global.mask};
push(ae->atomics[constant_namespace], temp);
@@ -713,7 +802,7 @@ pthread_t parse_thread;
void start_parser(size_t num_threads, parser* pf)
{
- used_index = (size_t*) calloc(num_threads, sizeof(size_t));
+ used_index = (unsigned long long*) calloc(num_threads, sizeof(unsigned long long));
parsed_index = 0;
done = false;