Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Hsu <danielhsu@gmail.com>2010-08-18 18:37:18 +0400
committerJohnLangford <jl@hunch.net>2010-08-24 18:34:05 +0400
commite9c233e00ffdd6e11ff0b6298d5efaf6e71e0810 (patch)
treef648182b5e2a81df22b9d75b42acbc0f8003b96d /sender.cc
parentdcbdf822ed9ce8af1f3acadbe0cf208c064cc5b0 (diff)
some bug fixes
- 'bufs' memory errors - find_split()
Diffstat (limited to 'sender.cc')
-rw-r--r--sender.cc123
1 files changed, 59 insertions, 64 deletions
diff --git a/sender.cc b/sender.cc
index 3cc67829..a4540f0a 100644
--- a/sender.cc
+++ b/sender.cc
@@ -13,7 +13,7 @@
pthread_t* thread;
size_t d_1;
size_t d_2;
-v_array<v_array<io_buf> > bufs;
+v_array<v_array<io_buf *> > bufs;
bool second_of_pair[256];
bool pairs_exist=false;
@@ -31,27 +31,22 @@ size_t log_int(size_t v)
}
size_t find_split(size_t number)
-{/* Breaks <number> things into a factor of 2 by a factor of 2, with the first dimension longest */
+{// approximately factor number into d_1 and d_2, each a power of 2 as large as possible so that (1) d_1 >= d_2, and (2) d_1 * d_2 <= number
d_1 = 1;
d_2 = 1;
if (number > 1)
{
size_t log_2 = log_int(number);
if (!pairs_exist)
- d_1 = 1 << log_2;
+ d_1 = 1 << log_2;
else
- {
- d_1 = 1 << (log_2 -1);
- if (d_1 * d_1 > number)
- d_2 = d_1 / 2;
- else {
- d_2 = d_1;
- if (d_1 * 2 * d_2 <= number)
- d_1 *=2;
- }
- }
+ {
+ size_t a = log_2 / 2;
+ d_1 = 1 << (log_2 - a);
+ d_2 = 1 << a;
+ }
if (d_1 * d_2 < number)
- cerr << "warning: number of remote hosts is not a factor of 2, so some are wasted" << endl;
+ cerr << "warning: number of remote hosts is not a factor of 2, so some are wasted" << endl;
return log_2;
}
return 0;
@@ -62,19 +57,19 @@ void open_sockets(vector<string>& hosts)
size_t new_id = global.unique_id;
for (size_t i = 0; i < d_1; i++)
{
- v_array<io_buf> t;
+ v_array<io_buf *> t;
push(bufs,t);
for (size_t j = 0; j< d_2; j++)
- {
- size_t number = j + d_2*i;
- int sd = open_socket(hosts[number].c_str(), new_id);
- if (new_id == 0)
- global.local_prediction = sd;
- new_id++;
- io_buf b;
- push(b.files, sd);
- push(bufs[i], b);
- }
+ {
+ size_t number = j + d_2*i;
+ int sd = open_socket(hosts[number].c_str(), new_id);
+ if (new_id == 0)
+ global.local_prediction = sd;
+ new_id++;
+ io_buf *b = new io_buf();
+ push(b->files, sd);
+ push(bufs[i], b);
+ }
}
}
@@ -83,13 +78,13 @@ void parse_send_args(po::variables_map& vm, vector<string> pairs, size_t& thread
if (vm.count("sendto"))
{
if (pairs.size() > 0)
- {
- pairs_exist=true;
- for (int i = 0; i<256;i++)
- second_of_pair[i]=false;
- for (vector<string>::iterator i = pairs.begin(); i != pairs.end();i++)
- second_of_pair[(int)(*i)[1]] = true;
- }
+ {
+ pairs_exist=true;
+ for (int i = 0; i<256;i++)
+ second_of_pair[i]=false;
+ for (vector<string>::iterator i = pairs.begin(); i != pairs.end();i++)
+ second_of_pair[(int)(*i)[1]] = true;
+ }
vector<string> hosts = vm["sendto"].as< vector<string> >();
thread_bits = max(thread_bits,find_split(hosts.size()));
@@ -97,16 +92,16 @@ void parse_send_args(po::variables_map& vm, vector<string> pairs, size_t& thread
}
}
-void send_features(int i, int j, io_buf& b, example* ec)
+void send_features(int i, int j, io_buf *b, example* ec)
{
- output_int(b,ec->indices.index());
+ output_int(*b,ec->indices.index());
for (size_t* index = ec->indices.begin; index != ec->indices.end; index++)
if (second_of_pair[*index])
- output_features(b, *index, ec->subsets[*index][j*d_1], ec->subsets[*index][(j+1)*d_1]);
+ output_features(*b, *index, ec->subsets[*index][j*d_1], ec->subsets[*index][(j+1)*d_1]);
else
- output_features(b, *index, ec->subsets[*index][i*d_2], ec->subsets[*index][(i+1)*d_2]);
- b.flush();
+ output_features(*b, *index, ec->subsets[*index][i*d_2], ec->subsets[*index][(i+1)*d_2]);
+ b->flush();
}
void* send_thread(void*)
@@ -118,34 +113,34 @@ void* send_thread(void*)
while ( true )
{//this is a poor man's select operation.
if ((ec = get_example(0)) != NULL)//blocking operation.
- {
- label_data* ld = (label_data*)ec->ld;
-
- for (size_t i = 0; i < d_1; i++)
- for (size_t j = 0; j < d_2; j++)
- {
- simple_label.cache_label(ld, bufs[i][j]);//send label information.
- cache_tag(bufs[i][j], null_tag);
- send_features(i,j,bufs[i][j],ec);
- }
- delay_example(ec,0);
- }
+ {
+ label_data* ld = (label_data*)ec->ld;
+
+ for (size_t i = 0; i < d_1; i++)
+ for (size_t j = 0; j < d_2; j++)
+ {
+ simple_label.cache_label(ld, *bufs[i][j]);//send label information.
+ cache_tag(*bufs[i][j], null_tag);
+ send_features(i,j,bufs[i][j],ec);
+ }
+ delay_example(ec,0);
+ }
else
- { //close our outputs to signal finishing.
- for (size_t i = 0; i < d_1; i++)
- {
- for (size_t j = 0; j < d_2; j++)
- {
- bufs[i][j].flush();
- shutdown(bufs[i][j].files[0],SHUT_WR);
- free(bufs[i][j].files.begin);
- free(bufs[i][j].space.begin);
- }
- free(bufs[i].begin);
- }
- free(bufs.begin);
- return NULL;
- }
+ { //close our outputs to signal finishing.
+ for (size_t i = 0; i < d_1; i++)
+ {
+ for (size_t j = 0; j < d_2; j++)
+ {
+ bufs[i][j]->flush();
+ shutdown(bufs[i][j]->files[0],SHUT_WR);
+ free(bufs[i][j]->files.begin);
+ free(bufs[i][j]->space.begin);
+ }
+ free(bufs[i].begin);
+ }
+ free(bufs.begin);
+ return NULL;
+ }
}
return NULL;