Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn <jl@hunch.net>2011-10-07 02:57:22 +0400
committerJohn <jl@hunch.net>2011-10-07 02:57:22 +0400
commit8ca4cd14942451c30383209b01355c14177cebe3 (patch)
tree8f192742adb6217e3c0a6819586d18d0c8241802
parent403a823aea145eed714102574369af1021ca67f4 (diff)
parentde9ed32cc243cfc16bb33caf1f9f6d5f23abc4f1 (diff)
Merge pull request #23 from jhofman/master
shared learning state
-rw-r--r--gd.cc65
-rw-r--r--global_data.cc4
-rw-r--r--global_data.h41
-rw-r--r--lda_core.cc4
-rw-r--r--loss_functions.cc40
-rw-r--r--main.cc42
-rw-r--r--parse_args.cc47
-rw-r--r--parse_regressor.cc10
-rw-r--r--parser.cc38
-rw-r--r--parser.h1
10 files changed, 156 insertions, 136 deletions
diff --git a/gd.cc b/gd.cc
index 5032286a..d08813d3 100644
--- a/gd.cc
+++ b/gd.cc
@@ -84,7 +84,7 @@ void* gd_thread(void *in)
{
uint32_t length = 1 << global.num_bits;
size_t stride = global.stride;
- float gravity = global.l_1_regularization * global.update_sum;
+ float gravity = global.l_1_regularization * global.sd->update_sum;
for(uint32_t i = 0; i < length; i++)
reg.weight_vectors[0][stride*i] = real_weight(reg.weight_vectors[0][stride*i],gravity);
}
@@ -133,10 +133,11 @@ float finalize_prediction(float ret)
cout << "you have a NAN!!!!!" << endl;
return 0.;
}
- if ( ret > global.max_label )
- return global.max_label;
- if (ret < global.min_label)
- return global.min_label;
+ if ( ret > global.sd->max_label )
+ return global.sd->max_label;
+ if (ret < global.sd->min_label)
+ return global.sd->min_label;
+
return ret;
}
@@ -155,7 +156,7 @@ void finish_example(example* ec)
void print_update(example *ec)
{
- if (global.weighted_examples > global.dump_interval && !global.quiet && !global.bfgs)
+ if (global.sd->weighted_examples > global.sd->dump_interval && !global.quiet && !global.bfgs)
{
label_data* ld = (label_data*) ec->ld;
char label_buf[32];
@@ -165,17 +166,17 @@ void print_update(example *ec)
sprintf(label_buf,"%8.4f",ld->label);
fprintf(stderr, "%-10.6f %-10.6f %8ld %8.1f %s %8.4f %8lu\n",
- global.sum_loss/global.weighted_examples,
- global.sum_loss_since_last_dump / (global.weighted_examples - global.old_weighted_examples),
- (long int)global.example_number,
- global.weighted_examples,
+ global.sd->sum_loss/global.sd->weighted_examples,
+ global.sd->sum_loss_since_last_dump / (global.sd->weighted_examples - global.sd->old_weighted_examples),
+ (long int)global.sd->example_number,
+ global.sd->weighted_examples,
label_buf,
ec->final_prediction,
(long unsigned int)ec->num_features);
- global.sum_loss_since_last_dump = 0.0;
- global.old_weighted_examples = global.weighted_examples;
- global.dump_interval *= 2;
+ global.sd->sum_loss_since_last_dump = 0.0;
+ global.sd->old_weighted_examples = global.sd->weighted_examples;
+ global.sd->dump_interval *= 2;
}
}
@@ -184,18 +185,18 @@ float query_decision(example*, float k);
void output_and_account_example(example* ec)
{
label_data* ld = (label_data*)ec->ld;
- global.weighted_examples += ld->weight;
- global.weighted_labels += ld->label == FLT_MAX ? 0 : ld->label * ld->weight;
- global.total_features += ec->num_features;
- global.sum_loss += ec->loss;
- global.sum_loss_since_last_dump += ec->loss;
+ global.sd->weighted_examples += ld->weight;
+ global.sd->weighted_labels += ld->label == FLT_MAX ? 0 : ld->label * ld->weight;
+ global.sd->total_features += ec->num_features;
+ global.sd->sum_loss += ec->loss;
+ global.sd->sum_loss_since_last_dump += ec->loss;
global.print(global.raw_prediction, ec->partial_prediction, -1, ec->tag);
float ai=-1;
if(global.active && ld->label == FLT_MAX)
- ai=query_decision(ec, global.weighted_unlabeled_examples);
- global.weighted_unlabeled_examples += ld->label == FLT_MAX ? ld->weight : 0;
+ ai=query_decision(ec, global.sd->weighted_unlabeled_examples);
+ global.sd->weighted_unlabeled_examples += ld->label == FLT_MAX ? ld->weight : 0;
for (size_t i = 0; i<global.final_prediction_sink.index(); i++)
{
@@ -209,10 +210,12 @@ void output_and_account_example(example* ec)
}
pthread_mutex_lock(&output_lock);
- global.example_number++;
+ global.local_example_number++;
pthread_cond_signal(&output_done);
pthread_mutex_unlock(&output_lock);
+ global.sd->example_number++;
+
print_update(ec);
}
@@ -224,7 +227,7 @@ float inline_l1_predict(regressor &reg, example* &ec, size_t thread_num)
size_t thread_mask = global.thread_mask;
for (size_t* i = ec->indices.begin; i != ec->indices.end; i++)
{
- prediction += sd_truncadd(weights,thread_mask,ec->subsets[*i][thread_num], ec->subsets[*i][thread_num+1], global.l_1_regularization * global.update_sum);
+ prediction += sd_truncadd(weights,thread_mask,ec->subsets[*i][thread_num], ec->subsets[*i][thread_num+1], global.l_1_regularization * global.sd->update_sum);
}
for (vector<string>::iterator i = global.pairs.begin(); i != global.pairs.end();i++)
@@ -236,7 +239,7 @@ float inline_l1_predict(regressor &reg, example* &ec, size_t thread_num)
temp.end = ec->subsets[(int)(*i)[0]][thread_num+1];
for (; temp.begin != temp.end; temp.begin++)
prediction += one_pf_quad_predict_trunc(weights,*temp.begin,
- ec->atomics[(int)(*i)[1]],thread_mask, global.l_1_regularization * global.update_sum);
+ ec->atomics[(int)(*i)[1]],thread_mask, global.l_1_regularization * global.sd->update_sum);
}
}
@@ -309,7 +312,7 @@ void print_audit_quad(weight* weights, audit_data& page_feature, v_array<audit_d
{
size_t halfhash = quadratic_constant * page_feature.weight_index;
- float gravity = global.l_1_regularization * global.update_sum;
+ float gravity = global.l_1_regularization * global.sd->update_sum;
for (audit_data* ele = offer_features.begin; ele != offer_features.end; ele++)
{
@@ -325,7 +328,7 @@ void print_audit_quad(weight* weights, audit_data& page_feature, v_array<audit_d
void print_quad(weight* weights, feature& page_feature, v_array<feature> &offer_features, size_t mask, vector<string_value>& features)
{
- float gravity = global.l_1_regularization * global.update_sum;
+ float gravity = global.l_1_regularization * global.sd->update_sum;
size_t halfhash = quadratic_constant * page_feature.weight_index;
for (feature* ele = offer_features.begin; ele != offer_features.end; ele++)
{
@@ -362,7 +365,7 @@ void print_features(regressor &reg, example* &ec)
{
vector<string_value> features;
- float gravity = global.l_1_regularization * global.update_sum;
+ float gravity = global.l_1_regularization * global.sd->update_sum;
for (size_t* i = ec->indices.begin; i != ec->indices.end; i++)
if (ec->audit_features[*i].begin != ec->audit_features[*i].end)
@@ -648,8 +651,8 @@ float query_decision(example* ec, float k)
if (k<=1.)
bias=1.;
else{
- weighted_queries = global.initial_t + global.weighted_examples - global.weighted_unlabeled_examples;
- avg_loss = global.sum_loss/k + sqrt((1.+0.5*log(k))/(weighted_queries+0.0001));
+ weighted_queries = global.initial_t + global.sd->weighted_examples - global.sd->weighted_unlabeled_examples;
+ avg_loss = global.sd->sum_loss/k + sqrt((1.+0.5*log(k))/(weighted_queries+0.0001));
bias = get_active_coin_bias(k, avg_loss, ec->revert_weight/k, global.active_c0);
}
if(drand48()<bias)
@@ -672,7 +675,7 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg, size_t thread_num
ec->revert_weight = reg.loss->getRevertingWeight(ec->final_prediction, global.eta/pow(k,vars.power_t));
float importance = query_decision(ec, k);
if(importance > 0){
- global.queries += 1;
+ global.sd->queries += 1;
ld->weight *= importance;
}
else //do not query => do not train
@@ -681,7 +684,7 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg, size_t thread_num
float t;
if(global.active)
- t = global.weighted_unlabeled_examples;
+ t = global.sd->weighted_unlabeled_examples;
else
t = ec->example_t;
@@ -703,7 +706,7 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg, size_t thread_num
ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, update, ec->total_sum_feat_sq);
}
- global.update_sum += update;
+ global.sd->update_sum += update;
}
else if(global.active)
ec->revert_weight = reg.loss->getRevertingWeight(ec->final_prediction, global.eta/pow(t,vars.power_t));
diff --git a/global_data.cc b/global_data.cc
index 8b4a01c7..9d799615 100644
--- a/global_data.cc
+++ b/global_data.cc
@@ -94,9 +94,9 @@ void print_lda_result(int f, float* res, float weight, v_array<char> tag)
void set_mm(double label)
{
- global.min_label = min(global.min_label, label);
+ global.sd->min_label = min(global.sd->min_label, label);
if (label != FLT_MAX)
- global.max_label = max(global.max_label, label);
+ global.sd->max_label = max(global.sd->max_label, label);
}
void noop_mm(double label)
diff --git a/global_data.h b/global_data.h
index aa5b8c5e..d4f54800 100644
--- a/global_data.h
+++ b/global_data.h
@@ -18,7 +18,28 @@ struct int_pair {
int id;
};
+struct shared_data {
+ size_t queries;
+
+ uint64_t example_number;
+ uint64_t total_features;
+
+ double t;
+ double weighted_examples;
+ double weighted_unlabeled_examples;
+ double old_weighted_examples;
+ double weighted_labels;
+ double sum_loss;
+ double sum_loss_since_last_dump;
+ float dump_interval;// when should I update for the user.
+ double update_sum;
+ double min_label;//minimum label encountered
+ double max_label;//maximum label encountered
+};
+
struct global_data {
+ shared_data* sd;
+
size_t thread_bits; // log_2 of the number of threads.
size_t partition_bits; // log_2 of the number of partitions of features.
size_t num_bits; // log_2 of the number of features.
@@ -49,11 +70,13 @@ struct global_data {
std::string per_feature_regularizer_text;
float l_1_regularization;//the level of l_1 regularization to impose.
- float update_sum;
size_t minibatch;
size_t ring_size;
+ uint64_t parsed_examples; // The index of the parsed example.
+ uint64_t local_example_number;
+
size_t pass_length;
size_t numpasses;
size_t passes_complete;
@@ -64,7 +87,6 @@ struct global_data {
bool ignore[256];//a set of namespaces to ignore
size_t ngram;//ngrams to generate.
size_t skips;//skips in ngrams.
- size_t queries;
bool audit;//should I print lots of debugging information?
bool quiet;//Should I suppress updates?
bool training;//Should I train if label data is available?
@@ -75,9 +97,6 @@ struct global_data {
bool random_weights;
bool add_constant;
- double min_label;//minimum label encountered
- double max_label;//maximum label encountered
-
size_t lda;
float lda_alpha;
float lda_rho;
@@ -107,16 +126,7 @@ struct global_data {
char* program_name;
//runtime accounting variables.
- uint64_t example_number;
double initial_t;
- double weighted_examples;
- double weighted_unlabeled_examples;
- double old_weighted_examples;
- double weighted_labels;
- uint64_t total_features;
- double sum_loss;
- double sum_loss_since_last_dump;
- float dump_interval;// when should I update for the user.
float eta;//learning rate control.
float eta_decay_rate;
@@ -133,4 +143,7 @@ void print_lda_result(int f, float* res, float weight, v_array<char> tag);
extern pthread_mutex_t output_lock;
extern pthread_cond_t output_done;
+extern pthread_mutex_t output_lock;
+extern pthread_cond_t output_done;
+
#endif
diff --git a/lda_core.cc b/lda_core.cc
index 8dcee8c7..e60e9651 100644
--- a/lda_core.cc
+++ b/lda_core.cc
@@ -586,8 +586,8 @@ void start_lda(gd_thread_params t)
print_audit_features(reg, examples[d]);
// If the doc is empty, give it loss of 0.
if (doc_lengths[d] > 0) {
- global.sum_loss -= score;
- global.sum_loss_since_last_dump -= score;
+ global.sd->sum_loss -= score;
+ global.sd->sum_loss_since_last_dump -= score;
}
finish_example(examples[d]);
}
diff --git a/loss_functions.cc b/loss_functions.cc
index 37e88542..62850ced 100644
--- a/loss_functions.cc
+++ b/loss_functions.cc
@@ -18,23 +18,23 @@ public:
}
float getLoss(float prediction, float label) {
- if (prediction <= global.max_label && prediction >= global.min_label)
+ if (prediction <= global.sd->max_label && prediction >= global.sd->min_label)
{
float example_loss = (prediction - label) * (prediction - label);
return example_loss;
}
- else if (prediction < global.min_label)
- if (label == global.min_label)
+ else if (prediction < global.sd->min_label)
+ if (label == global.sd->min_label)
return 0.;
else
- return (label - global.min_label) * (label - global.min_label)
- + 2. * (label-global.min_label) * (global.min_label - prediction);
+ return (label - global.sd->min_label) * (label - global.sd->min_label)
+ + 2. * (label-global.sd->min_label) * (global.sd->min_label - prediction);
else
- if (label == global.max_label)
+ if (label == global.sd->max_label)
return 0.;
else
- return (global.max_label - label) * (global.max_label - label)
- + 2. * (global.max_label - label) * (prediction - global.max_label);
+ return (global.sd->max_label - label) * (global.sd->max_label - label)
+ + 2. * (global.sd->max_label - label) * (prediction - global.sd->max_label);
}
float getUpdate(float prediction, float label,float eta_t, float norm) {
@@ -50,8 +50,8 @@ public:
}
float getRevertingWeight(float prediction, float eta_t){
- float t = 0.5*(global.min_label+global.max_label);
- float alternative = (prediction > t) ? global.min_label : global.max_label;
+ float t = 0.5*(global.sd->min_label+global.sd->max_label);
+ float alternative = (prediction > t) ? global.sd->min_label : global.sd->max_label;
return log((alternative-prediction)/(alternative-t))/eta_t;
}
@@ -60,15 +60,15 @@ public:
}
float first_derivative(float prediction, float label)
{
- if (prediction < global.min_label)
- prediction = global.min_label;
- else if (prediction > global.max_label)
- prediction = global.max_label;
+ if (prediction < global.sd->min_label)
+ prediction = global.sd->min_label;
+ else if (prediction > global.sd->max_label)
+ prediction = global.sd->max_label;
return 2. * (prediction-label);
}
float second_derivative(float prediction, float label)
{
- if (prediction <= global.max_label && prediction >= global.min_label)
+ if (prediction <= global.sd->max_label && prediction >= global.sd->min_label)
return 2.;
else
return 0.;
@@ -91,8 +91,8 @@ public:
}
float getRevertingWeight(float prediction, float eta_t){
- float t = 0.5*(global.min_label+global.max_label);
- float alternative = (prediction > t) ? global.min_label : global.max_label;
+ float t = 0.5*(global.sd->min_label+global.sd->max_label);
+ float alternative = (prediction > t) ? global.sd->min_label : global.sd->max_label;
return (t-prediction)/((alternative-prediction)*eta_t);
}
@@ -238,7 +238,7 @@ public:
float getRevertingWeight(float prediction, float eta_t){
float v,t;
- t = 0.5*(global.min_label+global.max_label);
+ t = 0.5*(global.sd->min_label+global.sd->max_label);
if(prediction > t)
v = -(1-tau);
else
@@ -276,8 +276,8 @@ loss_function* getLossFunction(string funcName, double function_parameter) {
} else if(funcName.compare("logistic") == 0) {
if (set_minmax != noop_mm)
{
- global.min_label = -100;
- global.max_label = 100;
+ global.sd->min_label = -100;
+ global.sd->max_label = 100;
}
return new logloss();
} else if(funcName.compare("quantile") == 0 || funcName.compare("pinball") == 0 || funcName.compare("absolute") == 0) {
diff --git a/main.cc b/main.cc
index dc9fa9aa..ad521545 100644
--- a/main.cc
+++ b/main.cc
@@ -14,22 +14,22 @@ int main(int argc, char *argv[]) {
gd_vars *vars = vw(argc, argv);
if(global.span_server != "") {
- float loss = global.sum_loss;
- global.sum_loss = (double)accumulate_scalar(global.span_server, loss);
- float weighted_examples = global.weighted_examples;
- global.weighted_examples = (double)accumulate_scalar(global.span_server, weighted_examples);
- float weighted_labels = global.weighted_labels;
- global.weighted_labels = (double)accumulate_scalar(global.span_server, weighted_labels);
- float weighted_unlabeled_examples = global.weighted_unlabeled_examples;
- global.weighted_unlabeled_examples = (double)accumulate_scalar(global.span_server, weighted_unlabeled_examples);
- float example_number = global.example_number;
- global.example_number = (uint64_t)accumulate_scalar(global.span_server, example_number);
- float total_features = global.total_features;
- global.total_features = (uint64_t)accumulate_scalar(global.span_server, total_features);
+ float loss = global.sd->sum_loss;
+ global.sd->sum_loss = (double)accumulate_scalar(global.span_server, loss);
+ float weighted_examples = global.sd->weighted_examples;
+ global.sd->weighted_examples = (double)accumulate_scalar(global.span_server, weighted_examples);
+ float weighted_labels = global.sd->weighted_labels;
+ global.sd->weighted_labels = (double)accumulate_scalar(global.span_server, weighted_labels);
+ float weighted_unlabeled_examples = global.sd->weighted_unlabeled_examples;
+ global.sd->weighted_unlabeled_examples = (double)accumulate_scalar(global.span_server, weighted_unlabeled_examples);
+ float example_number = global.sd->example_number;
+ global.sd->example_number = (uint64_t)accumulate_scalar(global.span_server, example_number);
+ float total_features = global.sd->total_features;
+ global.sd->total_features = (uint64_t)accumulate_scalar(global.span_server, total_features);
}
- float weighted_labeled_examples = global.weighted_examples - global.weighted_unlabeled_examples;
- float best_constant = (global.weighted_labels - global.initial_t) / weighted_labeled_examples;
+ float weighted_labeled_examples = global.sd->weighted_examples - global.sd->weighted_unlabeled_examples;
+ float best_constant = (global.sd->weighted_labels - global.initial_t) / weighted_labeled_examples;
float constant_loss = (best_constant*(1.0 - best_constant)*(1.0 - best_constant)
+ (1.0 - best_constant)*best_constant*best_constant);
@@ -37,16 +37,16 @@ int main(int argc, char *argv[]) {
{
cerr.precision(4);
cerr << endl << "finished run";
- cerr << endl << "number of examples = " << global.example_number;
- cerr << endl << "weighted example sum = " << global.weighted_examples;
- cerr << endl << "weighted label sum = " << global.weighted_labels;
- cerr << endl << "average loss = " << global.sum_loss / global.weighted_examples;
+ cerr << endl << "number of examples = " << global.sd->example_number;
+ cerr << endl << "weighted example sum = " << global.sd->weighted_examples;
+ cerr << endl << "weighted label sum = " << global.sd->weighted_labels;
+ cerr << endl << "average loss = " << global.sd->sum_loss / global.sd->weighted_examples;
cerr << endl << "best constant = " << best_constant;
- if (global.min_label == 0. && global.max_label == 1. && best_constant < 1. && best_constant > 0.)
+ if (global.sd->min_label == 0. && global.sd->max_label == 1. && best_constant < 1. && best_constant > 0.)
cerr << endl << "best constant's loss = " << constant_loss;
- cerr << endl << "total feature number = " << global.total_features;
+ cerr << endl << "total feature number = " << global.sd->total_features;
if (global.active_simulation)
- cerr << endl << "total queries = " << global.queries << endl;
+ cerr << endl << "total queries = " << global.sd->queries << endl;
cerr << endl;
}
diff --git a/parse_args.cc b/parse_args.cc
index 235e3513..11d0072c 100644
--- a/parse_args.cc
+++ b/parse_args.cc
@@ -47,6 +47,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
{
vars.init();
global.program_name = argv[0];
+ global.sd = (shared_data *) malloc(sizeof(shared_data));
// Declare the supported options.
desc.add_options()
("active_learning", "active learning mode")
@@ -84,7 +85,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
("initial_weight", po::value<float>(&global.initial_weight)->default_value(0.), "Set all weights to an initial value of 1.")
("initial_regressor,i", po::value< vector<string> >(), "Initial regressor(s)")
("initial_pass_length", po::value<size_t>(&global.pass_length)->default_value((size_t)-1), "initial number of examples per pass")
- ("initial_t", po::value<float>(&(par->t))->default_value(1.), "initial t value")
+ ("initial_t", po::value<double>(&(global.sd->t))->default_value(1.), "initial t value")
("l1", po::value<float>(&global.l_1_regularization)->default_value(0.), "l_1 regularization level")
("lda", po::value<size_t>(&global.lda), "Run lda with <int> topics")
("lda_alpha", po::value<float>(&global.lda_alpha)->default_value(0.1), "Prior on sparsity of per-document topic weights")
@@ -92,8 +93,8 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
("lda_D", po::value<float>(&global.lda_D)->default_value(10000.), "Number of documents")
("minibatch", po::value<size_t>(&global.minibatch)->default_value(1), "Minibatch size, for LDA")
("span_server", po::value<string>(&global.span_server)->default_value(""), "Location of server for setting up spanning tree")
- ("min_prediction", po::value<double>(&global.min_label), "Smallest prediction to output")
- ("max_prediction", po::value<double>(&global.max_label), "Largest prediction to output")
+ ("min_prediction", po::value<double>(&global.sd->min_label), "Smallest prediction to output")
+ ("max_prediction", po::value<double>(&global.sd->max_label), "Largest prediction to output")
("mem", po::value<int>(&global.m)->default_value(15), "memory in bfgs")
("multisource", po::value<size_t>(), "multiple sources for daemon input")
("noconstant", "Don't add a constant feature")
@@ -130,11 +131,19 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
("ngram", po::value<size_t>(), "Generate N grams")
("skips", po::value<size_t>(), "Generate skips in N grams. This in conjunction with the ngram tag can be used to generate generalized n-skip-k-gram.");
-
- global.queries = 0;
- global.example_number = 0;
- global.weighted_examples = 0.;
- global.old_weighted_examples = 0.;
+ global.sd->queries = 0;
+ global.sd->example_number = 0;
+ global.sd->weighted_examples = 0.;
+ global.sd->old_weighted_examples = 0.;
+ global.sd->weighted_labels = 0.;
+ global.sd->total_features = 0;
+ global.sd->sum_loss = 0.0;
+ global.sd->sum_loss_since_last_dump = 0.0;
+ global.sd->dump_interval = exp(1.);
+ global.sd->update_sum = 0.;
+ global.sd->min_label = 0.;
+ global.sd->max_label = 1.;
+ global.local_example_number = 0;
global.backprop = false;
global.bfgs = false;
global.corrective = false;
@@ -142,11 +151,6 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
global.bfgs = false;
global.hessian_on = false;
global.stride = 1;
- global.weighted_labels = 0.;
- global.total_features = 0;
- global.sum_loss = 0.0;
- global.sum_loss_since_last_dump = 0.0;
- global.dump_interval = exp(1.);
global.num_bits = 18;
global.default_bits = true;
global.daemon = false;
@@ -154,9 +158,6 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
global.raw_prediction = -1;
global.local_prediction = -1;
global.print = print_result;
- global.min_label = 0.;
- global.max_label = 1.;
- global.update_sum = 0.;
global.lda = 0;
global.random_weights = false;
global.per_feature_regularizer_input = "";
@@ -185,8 +186,8 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
options(desc).positional(p).run(), vm);
po::notify(vm);
- global.weighted_unlabeled_examples = par->t;
- global.initial_t = par->t;
+ global.sd->weighted_unlabeled_examples = global.sd->t;
+ global.initial_t = global.sd->t;
global.partition_bits = global.thread_bits;
if (vm.count("help") || argc == 1) {
@@ -387,7 +388,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
global.eta = min(global.eta,1.f);
}
if (!vm.count("lda"))
- global.eta *= pow(par->t, vars.power_t);
+ global.eta *= pow(global.sd->t, vars.power_t);
if (vm.count("minibatch")) {
size_t minibatch2 = next_pow2(global.minibatch);
@@ -406,9 +407,9 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
global.save_per_pass = true;
if (vm.count("min_prediction"))
- global.min_label = vm["min_prediction"].as<double>();
+ global.sd->min_label = vm["min_prediction"].as<double>();
if (vm.count("max_prediction"))
- global.max_label = vm["max_prediction"].as<double>();
+ global.sd->max_label = vm["max_prediction"].as<double>();
if (vm.count("min_prediction") || vm.count("max_prediction") || vm.count("testonly"))
set_minmax = noop_mm;
@@ -429,7 +430,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
r.loss = getLossFunction(loss_function, loss_parameter);
global.loss = r.loss;
-// global.eta *= pow(par->t, vars.power_t);
+// global.eta *= pow(global.sd->t, vars.power_t);
if (global.eta_decay_rate != default_decay && global.numpasses == 1)
cerr << "Warning: decay_learning_rate has no effect when there is only one pass" << endl;
@@ -445,7 +446,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
{
cerr << "Num weight bits = " << global.num_bits << endl;
cerr << "learning rate = " << global.eta << endl;
- cerr << "initial_t = " << par->t << endl;
+ cerr << "initial_t = " << global.sd->t << endl;
cerr << "power_t = " << vars.power_t << endl;
if (global.numpasses > 1)
cerr << "decay_learning_rate = " << global.eta_decay_rate << endl;
diff --git a/parse_regressor.cc b/parse_regressor.cc
index 19f7d1b7..aef293f7 100644
--- a/parse_regressor.cc
+++ b/parse_regressor.cc
@@ -98,8 +98,8 @@ void read_vector(const char* file, regressor& r, bool& initialized, bool reg_vec
exit(1);
}
- source.read((char*)&global.min_label, sizeof(global.min_label));
- source.read((char*)&global.max_label, sizeof(global.max_label));
+ source.read((char*)&global.sd->min_label, sizeof(global.sd->min_label));
+ source.read((char*)&global.sd->max_label, sizeof(global.sd->max_label));
size_t local_num_bits;
source.read((char *)&local_num_bits, sizeof(local_num_bits));
@@ -326,8 +326,8 @@ void dump_regressor(string reg_name, regressor &r, bool as_text, bool reg_vector
io_temp.write_file(f,(char*)&v_length, sizeof(v_length));
io_temp.write_file(f,version.c_str(),v_length);
- io_temp.write_file(f,(char*)&global.min_label, sizeof(global.min_label));
- io_temp.write_file(f,(char*)&global.max_label, sizeof(global.max_label));
+ io_temp.write_file(f,(char*)&global.sd->min_label, sizeof(global.sd->min_label));
+ io_temp.write_file(f,(char*)&global.sd->max_label, sizeof(global.sd->max_label));
io_temp.write_file(f,(char *)&global.num_bits, sizeof(global.num_bits));
io_temp.write_file(f,(char *)&global.thread_bits, sizeof(global.thread_bits));
@@ -347,7 +347,7 @@ void dump_regressor(string reg_name, regressor &r, bool as_text, bool reg_vector
int len;
len = sprintf(buff, "Version %s\n", version.c_str());
io_temp.write_file(f, buff, len);
- len = sprintf(buff, "Min label:%f max label:%f\n", global.min_label, global.max_label);
+ len = sprintf(buff, "Min label:%f max label:%f\n", global.sd->min_label, global.sd->max_label);
io_temp.write_file(f, buff, len);
len = sprintf(buff, "bits:%d thread_bits:%d\n", (int)global.num_bits, (int)global.thread_bits);
io_temp.write_file(f, buff, len);
diff --git a/parser.cc b/parser.cc
index 9d4439ab..2f04cff1 100644
--- a/parser.cc
+++ b/parser.cc
@@ -7,6 +7,7 @@ embodied in the content of this file are licensed under the BSD
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/wait.h>
+
#include <signal.h>
#include <unistd.h>
#include <fstream>
@@ -34,7 +35,6 @@ example* examples;//A Ring of examples.
pthread_mutex_t examples_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t example_available = PTHREAD_COND_INITIALIZER;
pthread_cond_t example_unused = PTHREAD_COND_INITIALIZER;
-uint64_t parsed_index; // The index of the parsed example.
uint64_t* used_index; // The index of the example currently used by thread i.
bool done=false;
v_array<size_t> random_nos;
@@ -125,7 +125,7 @@ void reset_source(size_t numbits, parser* p)
{
// wait for all predictions to be sent back to client
pthread_mutex_lock(&output_lock);
- while (global.example_number != parsed_index)
+ while (global.local_example_number != global.parsed_examples)
pthread_cond_wait(&output_done, &output_lock);
pthread_mutex_unlock(&output_lock);
@@ -326,7 +326,6 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
if (global.daemon)
{
// weights will be shared across processes, accessible to children
-
float* shared_weights =
(float*)mmap(0,global.stride * global.length() * sizeof(float),
PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
@@ -340,6 +339,13 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa
global.reg->weight_vectors[i] = dest;
}
+ // learning state to be shared across children
+ shared_data* sd = (shared_data *)mmap(0,sizeof(shared_data),
+ PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
+ memcpy(sd, global.sd, sizeof(shared_data));
+ free(global.sd);
+ global.sd = sd;
+
// create children
size_t num_children = global.num_children;
v_array<int> children;
@@ -497,7 +503,7 @@ bool parser_done()
if (done)
{
for (size_t i = 0; i < global.num_threads(); i++)
- if (used_index[i] != parsed_index)
+ if (used_index[i] != global.parsed_examples)
return false;
return true;
}
@@ -577,11 +583,11 @@ example* get_unused_example()
while (true)
{
pthread_mutex_lock(&examples_lock);
- if (examples[parsed_index % global.ring_size].in_use == false)
+ if (examples[global.parsed_examples % global.ring_size].in_use == false)
{
- examples[parsed_index % global.ring_size].in_use = true;
+ examples[global.parsed_examples % global.ring_size].in_use = true;
pthread_mutex_unlock(&examples_lock);
- return examples + (parsed_index % global.ring_size);
+ return examples + (global.parsed_examples % global.ring_size);
}
else
{
@@ -656,8 +662,6 @@ feature* search(feature* begin, size_t value, feature* end)
}
}
-size_t example_count = 0;
-
void setup_example(parser* p, example* ae)
{
ae->pass = global.passes_complete;
@@ -666,10 +670,10 @@ void setup_example(parser* p, example* ae)
ae->total_sum_feat_sq = 1;
ae->threads_to_finish = global.num_threads();
ae->done = false;
- ae->example_counter = ++example_count;
+ ae->example_counter = global.parsed_examples + 1;
ae->global_weight = p->lp->get_weight(ae->ld);
- p->t += ae->global_weight;
- ae->example_t = p->t;
+ global.sd->t += ae->global_weight;
+ ae->example_t = global.sd->t;
if (global.ignore_some)
{
@@ -755,7 +759,7 @@ void *main_parse_loop(void *in)
parser* p = (parser*) in;
global.passes_complete = 0;
- size_t example_number = 0;
+ size_t example_number = 0; // for variable-size batch learning algorithms
while(!done)
{
example* ae=get_unused_example();
@@ -764,7 +768,7 @@ void *main_parse_loop(void *in)
setup_example(p,ae);
example_number++;
pthread_mutex_lock(&examples_lock);
- parsed_index++;
+ global.parsed_examples++;
pthread_cond_broadcast(&example_available);
pthread_mutex_unlock(&examples_lock);
}
@@ -816,10 +820,10 @@ example* get_example(size_t thread_num)
{
pthread_mutex_lock(&examples_lock);
- if (parsed_index != used_index[thread_num]) {
+ if (global.parsed_examples != used_index[thread_num]) {
size_t ring_index = used_index[thread_num]++ % global.ring_size;
if (!(examples+ring_index)->in_use)
- cout << used_index[thread_num] << " " << parsed_index << " " << thread_num << " " << ring_index << endl;
+ cout << used_index[thread_num] << " " << global.parsed_examples << " " << thread_num << " " << ring_index << endl;
assert((examples+ring_index)->in_use);
pthread_mutex_unlock(&examples_lock);
@@ -838,7 +842,7 @@ pthread_t parse_thread;
void start_parser(size_t num_threads, parser* pf)
{
used_index = (uint64_t*) calloc(num_threads, sizeof(uint64_t));
- parsed_index = 0;
+ global.parsed_examples = 0;
done = false;
if(global.ngram>1)
diff --git a/parser.h b/parser.h
index 4b0ce5a6..d58f2a6c 100644
--- a/parser.h
+++ b/parser.h
@@ -32,7 +32,6 @@ struct parser {
v_array<substring> name;
const label_parser* lp;
- float t;
io_buf* input; //Input source(s)
int (*reader)(parser* p, void* ae);