diff options
author | John <jl@hunch.net> | 2011-10-07 02:57:22 +0400 |
---|---|---|
committer | John <jl@hunch.net> | 2011-10-07 02:57:22 +0400 |
commit | 8ca4cd14942451c30383209b01355c14177cebe3 (patch) | |
tree | 8f192742adb6217e3c0a6819586d18d0c8241802 | |
parent | 403a823aea145eed714102574369af1021ca67f4 (diff) | |
parent | de9ed32cc243cfc16bb33caf1f9f6d5f23abc4f1 (diff) |
Merge pull request #23 from jhofman/master
shared learning state
-rw-r--r-- | gd.cc | 65 | ||||
-rw-r--r-- | global_data.cc | 4 | ||||
-rw-r--r-- | global_data.h | 41 | ||||
-rw-r--r-- | lda_core.cc | 4 | ||||
-rw-r--r-- | loss_functions.cc | 40 | ||||
-rw-r--r-- | main.cc | 42 | ||||
-rw-r--r-- | parse_args.cc | 47 | ||||
-rw-r--r-- | parse_regressor.cc | 10 | ||||
-rw-r--r-- | parser.cc | 38 | ||||
-rw-r--r-- | parser.h | 1 |
10 files changed, 156 insertions, 136 deletions
@@ -84,7 +84,7 @@ void* gd_thread(void *in) { uint32_t length = 1 << global.num_bits; size_t stride = global.stride; - float gravity = global.l_1_regularization * global.update_sum; + float gravity = global.l_1_regularization * global.sd->update_sum; for(uint32_t i = 0; i < length; i++) reg.weight_vectors[0][stride*i] = real_weight(reg.weight_vectors[0][stride*i],gravity); } @@ -133,10 +133,11 @@ float finalize_prediction(float ret) cout << "you have a NAN!!!!!" << endl; return 0.; } - if ( ret > global.max_label ) - return global.max_label; - if (ret < global.min_label) - return global.min_label; + if ( ret > global.sd->max_label ) + return global.sd->max_label; + if (ret < global.sd->min_label) + return global.sd->min_label; + return ret; } @@ -155,7 +156,7 @@ void finish_example(example* ec) void print_update(example *ec) { - if (global.weighted_examples > global.dump_interval && !global.quiet && !global.bfgs) + if (global.sd->weighted_examples > global.sd->dump_interval && !global.quiet && !global.bfgs) { label_data* ld = (label_data*) ec->ld; char label_buf[32]; @@ -165,17 +166,17 @@ void print_update(example *ec) sprintf(label_buf,"%8.4f",ld->label); fprintf(stderr, "%-10.6f %-10.6f %8ld %8.1f %s %8.4f %8lu\n", - global.sum_loss/global.weighted_examples, - global.sum_loss_since_last_dump / (global.weighted_examples - global.old_weighted_examples), - (long int)global.example_number, - global.weighted_examples, + global.sd->sum_loss/global.sd->weighted_examples, + global.sd->sum_loss_since_last_dump / (global.sd->weighted_examples - global.sd->old_weighted_examples), + (long int)global.sd->example_number, + global.sd->weighted_examples, label_buf, ec->final_prediction, (long unsigned int)ec->num_features); - global.sum_loss_since_last_dump = 0.0; - global.old_weighted_examples = global.weighted_examples; - global.dump_interval *= 2; + global.sd->sum_loss_since_last_dump = 0.0; + global.sd->old_weighted_examples = global.sd->weighted_examples; + global.sd->dump_interval *= 2; } } @@ -184,18 +185,18 @@ float query_decision(example*, float k); void output_and_account_example(example* ec) { label_data* ld = (label_data*)ec->ld; - global.weighted_examples += ld->weight; - global.weighted_labels += ld->label == FLT_MAX ? 0 : ld->label * ld->weight; - global.total_features += ec->num_features; - global.sum_loss += ec->loss; - global.sum_loss_since_last_dump += ec->loss; + global.sd->weighted_examples += ld->weight; + global.sd->weighted_labels += ld->label == FLT_MAX ? 0 : ld->label * ld->weight; + global.sd->total_features += ec->num_features; + global.sd->sum_loss += ec->loss; + global.sd->sum_loss_since_last_dump += ec->loss; global.print(global.raw_prediction, ec->partial_prediction, -1, ec->tag); float ai=-1; if(global.active && ld->label == FLT_MAX) - ai=query_decision(ec, global.weighted_unlabeled_examples); - global.weighted_unlabeled_examples += ld->label == FLT_MAX ? ld->weight : 0; + ai=query_decision(ec, global.sd->weighted_unlabeled_examples); + global.sd->weighted_unlabeled_examples += ld->label == FLT_MAX ? ld->weight : 0; for (size_t i = 0; i<global.final_prediction_sink.index(); i++) { @@ -209,10 +210,12 @@ void output_and_account_example(example* ec) } pthread_mutex_lock(&output_lock); - global.example_number++; + global.local_example_number++; pthread_cond_signal(&output_done); pthread_mutex_unlock(&output_lock); + global.sd->example_number++; + print_update(ec); } @@ -224,7 +227,7 @@ float inline_l1_predict(regressor ®, example* &ec, size_t thread_num) size_t thread_mask = global.thread_mask; for (size_t* i = ec->indices.begin; i != ec->indices.end; i++) { - prediction += sd_truncadd(weights,thread_mask,ec->subsets[*i][thread_num], ec->subsets[*i][thread_num+1], global.l_1_regularization * global.update_sum); + prediction += sd_truncadd(weights,thread_mask,ec->subsets[*i][thread_num], ec->subsets[*i][thread_num+1], global.l_1_regularization * global.sd->update_sum); } for (vector<string>::iterator i = global.pairs.begin(); i != global.pairs.end();i++) @@ -236,7 +239,7 @@ float inline_l1_predict(regressor ®, example* &ec, size_t thread_num) temp.end = ec->subsets[(int)(*i)[0]][thread_num+1]; for (; temp.begin != temp.end; temp.begin++) prediction += one_pf_quad_predict_trunc(weights,*temp.begin, - ec->atomics[(int)(*i)[1]],thread_mask, global.l_1_regularization * global.update_sum); + ec->atomics[(int)(*i)[1]],thread_mask, global.l_1_regularization * global.sd->update_sum); } } @@ -309,7 +312,7 @@ void print_audit_quad(weight* weights, audit_data& page_feature, v_array<audit_d { size_t halfhash = quadratic_constant * page_feature.weight_index; - float gravity = global.l_1_regularization * global.update_sum; + float gravity = global.l_1_regularization * global.sd->update_sum; for (audit_data* ele = offer_features.begin; ele != offer_features.end; ele++) { @@ -325,7 +328,7 @@ void print_audit_quad(weight* weights, audit_data& page_feature, v_array<audit_d void print_quad(weight* weights, feature& page_feature, v_array<feature> &offer_features, size_t mask, vector<string_value>& features) { - float gravity = global.l_1_regularization * global.update_sum; + float gravity = global.l_1_regularization * global.sd->update_sum; size_t halfhash = quadratic_constant * page_feature.weight_index; for (feature* ele = offer_features.begin; ele != offer_features.end; ele++) { @@ -362,7 +365,7 @@ void print_features(regressor ®, example* &ec) { vector<string_value> features; - float gravity = global.l_1_regularization * global.update_sum; + float gravity = global.l_1_regularization * global.sd->update_sum; for (size_t* i = ec->indices.begin; i != ec->indices.end; i++) if (ec->audit_features[*i].begin != ec->audit_features[*i].end) @@ -648,8 +651,8 @@ float query_decision(example* ec, float k) if (k<=1.) bias=1.; else{ - weighted_queries = global.initial_t + global.weighted_examples - global.weighted_unlabeled_examples; - avg_loss = global.sum_loss/k + sqrt((1.+0.5*log(k))/(weighted_queries+0.0001)); + weighted_queries = global.initial_t + global.sd->weighted_examples - global.sd->weighted_unlabeled_examples; + avg_loss = global.sd->sum_loss/k + sqrt((1.+0.5*log(k))/(weighted_queries+0.0001)); bias = get_active_coin_bias(k, avg_loss, ec->revert_weight/k, global.active_c0); } if(drand48()<bias) @@ -672,7 +675,7 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg, size_t thread_num ec->revert_weight = reg.loss->getRevertingWeight(ec->final_prediction, global.eta/pow(k,vars.power_t)); float importance = query_decision(ec, k); if(importance > 0){ - global.queries += 1; + global.sd->queries += 1; ld->weight *= importance; } else //do not query => do not train @@ -681,7 +684,7 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg, size_t thread_num float t; if(global.active) - t = global.weighted_unlabeled_examples; + t = global.sd->weighted_unlabeled_examples; else t = ec->example_t; @@ -703,7 +706,7 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg, size_t thread_num ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, update, ec->total_sum_feat_sq); } - global.update_sum += update; + global.sd->update_sum += update; } else if(global.active) ec->revert_weight = reg.loss->getRevertingWeight(ec->final_prediction, global.eta/pow(t,vars.power_t)); diff --git a/global_data.cc b/global_data.cc index 8b4a01c7..9d799615 100644 --- a/global_data.cc +++ b/global_data.cc @@ -94,9 +94,9 @@ void print_lda_result(int f, float* res, float weight, v_array<char> tag) void set_mm(double label) { - global.min_label = min(global.min_label, label); + global.sd->min_label = min(global.sd->min_label, label); if (label != FLT_MAX) - global.max_label = max(global.max_label, label); + global.sd->max_label = max(global.sd->max_label, label); } void noop_mm(double label) diff --git a/global_data.h b/global_data.h index aa5b8c5e..d4f54800 100644 --- a/global_data.h +++ b/global_data.h @@ -18,7 +18,28 @@ struct int_pair { int id; }; +struct shared_data { + size_t queries; + + uint64_t example_number; + uint64_t total_features; + + double t; + double weighted_examples; + double weighted_unlabeled_examples; + double old_weighted_examples; + double weighted_labels; + double sum_loss; + double sum_loss_since_last_dump; + float dump_interval;// when should I update for the user. + double update_sum; + double min_label;//minimum label encountered + double max_label;//maximum label encountered +}; + struct global_data { + shared_data* sd; + size_t thread_bits; // log_2 of the number of threads. size_t partition_bits; // log_2 of the number of partitions of features. size_t num_bits; // log_2 of the number of features. @@ -49,11 +70,13 @@ struct global_data { std::string per_feature_regularizer_text; float l_1_regularization;//the level of l_1 regularization to impose. - float update_sum; size_t minibatch; size_t ring_size; + uint64_t parsed_examples; // The index of the parsed example. + uint64_t local_example_number; + size_t pass_length; size_t numpasses; size_t passes_complete; @@ -64,7 +87,6 @@ struct global_data { bool ignore[256];//a set of namespaces to ignore size_t ngram;//ngrams to generate. size_t skips;//skips in ngrams. - size_t queries; bool audit;//should I print lots of debugging information? bool quiet;//Should I suppress updates? bool training;//Should I train if label data is available? @@ -75,9 +97,6 @@ struct global_data { bool random_weights; bool add_constant; - double min_label;//minimum label encountered - double max_label;//maximum label encountered - size_t lda; float lda_alpha; float lda_rho; @@ -107,16 +126,7 @@ struct global_data { char* program_name; //runtime accounting variables. - uint64_t example_number; double initial_t; - double weighted_examples; - double weighted_unlabeled_examples; - double old_weighted_examples; - double weighted_labels; - uint64_t total_features; - double sum_loss; - double sum_loss_since_last_dump; - float dump_interval;// when should I update for the user. float eta;//learning rate control. float eta_decay_rate; @@ -133,4 +143,7 @@ void print_lda_result(int f, float* res, float weight, v_array<char> tag); extern pthread_mutex_t output_lock; extern pthread_cond_t output_done; +extern pthread_mutex_t output_lock; +extern pthread_cond_t output_done; + #endif diff --git a/lda_core.cc b/lda_core.cc index 8dcee8c7..e60e9651 100644 --- a/lda_core.cc +++ b/lda_core.cc @@ -586,8 +586,8 @@ void start_lda(gd_thread_params t) print_audit_features(reg, examples[d]);
// If the doc is empty, give it loss of 0.
if (doc_lengths[d] > 0) {
- global.sum_loss -= score;
- global.sum_loss_since_last_dump -= score;
+ global.sd->sum_loss -= score;
+ global.sd->sum_loss_since_last_dump -= score;
}
finish_example(examples[d]);
}
diff --git a/loss_functions.cc b/loss_functions.cc index 37e88542..62850ced 100644 --- a/loss_functions.cc +++ b/loss_functions.cc @@ -18,23 +18,23 @@ public: } float getLoss(float prediction, float label) { - if (prediction <= global.max_label && prediction >= global.min_label) + if (prediction <= global.sd->max_label && prediction >= global.sd->min_label) { float example_loss = (prediction - label) * (prediction - label); return example_loss; } - else if (prediction < global.min_label) - if (label == global.min_label) + else if (prediction < global.sd->min_label) + if (label == global.sd->min_label) return 0.; else - return (label - global.min_label) * (label - global.min_label) - + 2. * (label-global.min_label) * (global.min_label - prediction); + return (label - global.sd->min_label) * (label - global.sd->min_label) + + 2. * (label-global.sd->min_label) * (global.sd->min_label - prediction); else - if (label == global.max_label) + if (label == global.sd->max_label) return 0.; else - return (global.max_label - label) * (global.max_label - label) - + 2. * (global.max_label - label) * (prediction - global.max_label); + return (global.sd->max_label - label) * (global.sd->max_label - label) + + 2. * (global.sd->max_label - label) * (prediction - global.sd->max_label); } float getUpdate(float prediction, float label,float eta_t, float norm) { @@ -50,8 +50,8 @@ public: } float getRevertingWeight(float prediction, float eta_t){ - float t = 0.5*(global.min_label+global.max_label); - float alternative = (prediction > t) ? global.min_label : global.max_label; + float t = 0.5*(global.sd->min_label+global.sd->max_label); + float alternative = (prediction > t) ? global.sd->min_label : global.sd->max_label; return log((alternative-prediction)/(alternative-t))/eta_t; } @@ -60,15 +60,15 @@ public: } float first_derivative(float prediction, float label) { - if (prediction < global.min_label) - prediction = global.min_label; - else if (prediction > global.max_label) - prediction = global.max_label; + if (prediction < global.sd->min_label) + prediction = global.sd->min_label; + else if (prediction > global.sd->max_label) + prediction = global.sd->max_label; return 2. * (prediction-label); } float second_derivative(float prediction, float label) { - if (prediction <= global.max_label && prediction >= global.min_label) + if (prediction <= global.sd->max_label && prediction >= global.sd->min_label) return 2.; else return 0.; @@ -91,8 +91,8 @@ public: } float getRevertingWeight(float prediction, float eta_t){ - float t = 0.5*(global.min_label+global.max_label); - float alternative = (prediction > t) ? global.min_label : global.max_label; + float t = 0.5*(global.sd->min_label+global.sd->max_label); + float alternative = (prediction > t) ? global.sd->min_label : global.sd->max_label; return (t-prediction)/((alternative-prediction)*eta_t); } @@ -238,7 +238,7 @@ public: float getRevertingWeight(float prediction, float eta_t){ float v,t; - t = 0.5*(global.min_label+global.max_label); + t = 0.5*(global.sd->min_label+global.sd->max_label); if(prediction > t) v = -(1-tau); else @@ -276,8 +276,8 @@ loss_function* getLossFunction(string funcName, double function_parameter) { } else if(funcName.compare("logistic") == 0) { if (set_minmax != noop_mm) { - global.min_label = -100; - global.max_label = 100; + global.sd->min_label = -100; + global.sd->max_label = 100; } return new logloss(); } else if(funcName.compare("quantile") == 0 || funcName.compare("pinball") == 0 || funcName.compare("absolute") == 0) { @@ -14,22 +14,22 @@ int main(int argc, char *argv[]) { gd_vars *vars = vw(argc, argv); if(global.span_server != "") { - float loss = global.sum_loss; - global.sum_loss = (double)accumulate_scalar(global.span_server, loss); - float weighted_examples = global.weighted_examples; - global.weighted_examples = (double)accumulate_scalar(global.span_server, weighted_examples); - float weighted_labels = global.weighted_labels; - global.weighted_labels = (double)accumulate_scalar(global.span_server, weighted_labels); - float weighted_unlabeled_examples = global.weighted_unlabeled_examples; - global.weighted_unlabeled_examples = (double)accumulate_scalar(global.span_server, weighted_unlabeled_examples); - float example_number = global.example_number; - global.example_number = (uint64_t)accumulate_scalar(global.span_server, example_number); - float total_features = global.total_features; - global.total_features = (uint64_t)accumulate_scalar(global.span_server, total_features); + float loss = global.sd->sum_loss; + global.sd->sum_loss = (double)accumulate_scalar(global.span_server, loss); + float weighted_examples = global.sd->weighted_examples; + global.sd->weighted_examples = (double)accumulate_scalar(global.span_server, weighted_examples); + float weighted_labels = global.sd->weighted_labels; + global.sd->weighted_labels = (double)accumulate_scalar(global.span_server, weighted_labels); + float weighted_unlabeled_examples = global.sd->weighted_unlabeled_examples; + global.sd->weighted_unlabeled_examples = (double)accumulate_scalar(global.span_server, weighted_unlabeled_examples); + float example_number = global.sd->example_number; + global.sd->example_number = (uint64_t)accumulate_scalar(global.span_server, example_number); + float total_features = global.sd->total_features; + global.sd->total_features = (uint64_t)accumulate_scalar(global.span_server, total_features); } - float weighted_labeled_examples = global.weighted_examples - global.weighted_unlabeled_examples; - float best_constant = (global.weighted_labels - global.initial_t) / weighted_labeled_examples; + float weighted_labeled_examples = global.sd->weighted_examples - global.sd->weighted_unlabeled_examples; + float best_constant = (global.sd->weighted_labels - global.initial_t) / weighted_labeled_examples; float constant_loss = (best_constant*(1.0 - best_constant)*(1.0 - best_constant) + (1.0 - best_constant)*best_constant*best_constant); @@ -37,16 +37,16 @@ int main(int argc, char *argv[]) { { cerr.precision(4); cerr << endl << "finished run"; - cerr << endl << "number of examples = " << global.example_number; - cerr << endl << "weighted example sum = " << global.weighted_examples; - cerr << endl << "weighted label sum = " << global.weighted_labels; - cerr << endl << "average loss = " << global.sum_loss / global.weighted_examples; + cerr << endl << "number of examples = " << global.sd->example_number; + cerr << endl << "weighted example sum = " << global.sd->weighted_examples; + cerr << endl << "weighted label sum = " << global.sd->weighted_labels; + cerr << endl << "average loss = " << global.sd->sum_loss / global.sd->weighted_examples; cerr << endl << "best constant = " << best_constant; - if (global.min_label == 0. && global.max_label == 1. && best_constant < 1. && best_constant > 0.) + if (global.sd->min_label == 0. && global.sd->max_label == 1. && best_constant < 1. && best_constant > 0.) cerr << endl << "best constant's loss = " << constant_loss; - cerr << endl << "total feature number = " << global.total_features; + cerr << endl << "total feature number = " << global.sd->total_features; if (global.active_simulation) - cerr << endl << "total queries = " << global.queries << endl; + cerr << endl << "total queries = " << global.sd->queries << endl; cerr << endl; } diff --git a/parse_args.cc b/parse_args.cc index 235e3513..11d0072c 100644 --- a/parse_args.cc +++ b/parse_args.cc @@ -47,6 +47,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt { vars.init(); global.program_name = argv[0]; + global.sd = (shared_data *) malloc(sizeof(shared_data)); // Declare the supported options. desc.add_options() ("active_learning", "active learning mode") @@ -84,7 +85,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt ("initial_weight", po::value<float>(&global.initial_weight)->default_value(0.), "Set all weights to an initial value of 1.") ("initial_regressor,i", po::value< vector<string> >(), "Initial regressor(s)") ("initial_pass_length", po::value<size_t>(&global.pass_length)->default_value((size_t)-1), "initial number of examples per pass") - ("initial_t", po::value<float>(&(par->t))->default_value(1.), "initial t value") + ("initial_t", po::value<double>(&(global.sd->t))->default_value(1.), "initial t value") ("l1", po::value<float>(&global.l_1_regularization)->default_value(0.), "l_1 regularization level") ("lda", po::value<size_t>(&global.lda), "Run lda with <int> topics") ("lda_alpha", po::value<float>(&global.lda_alpha)->default_value(0.1), "Prior on sparsity of per-document topic weights") @@ -92,8 +93,8 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt ("lda_D", po::value<float>(&global.lda_D)->default_value(10000.), "Number of documents") ("minibatch", po::value<size_t>(&global.minibatch)->default_value(1), "Minibatch size, for LDA") ("span_server", po::value<string>(&global.span_server)->default_value(""), "Location of server for setting up spanning tree") - ("min_prediction", po::value<double>(&global.min_label), "Smallest prediction to output") - ("max_prediction", po::value<double>(&global.max_label), "Largest prediction to output") + ("min_prediction", po::value<double>(&global.sd->min_label), "Smallest prediction to output") + ("max_prediction", po::value<double>(&global.sd->max_label), "Largest prediction to output") ("mem", po::value<int>(&global.m)->default_value(15), "memory in bfgs") ("multisource", po::value<size_t>(), "multiple sources for daemon input") ("noconstant", "Don't add a constant feature") @@ -130,11 +131,19 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt ("ngram", po::value<size_t>(), "Generate N grams") ("skips", po::value<size_t>(), "Generate skips in N grams. This in conjunction with the ngram tag can be used to generate generalized n-skip-k-gram."); - - global.queries = 0; - global.example_number = 0; - global.weighted_examples = 0.; - global.old_weighted_examples = 0.; + global.sd->queries = 0; + global.sd->example_number = 0; + global.sd->weighted_examples = 0.; + global.sd->old_weighted_examples = 0.; + global.sd->weighted_labels = 0.; + global.sd->total_features = 0; + global.sd->sum_loss = 0.0; + global.sd->sum_loss_since_last_dump = 0.0; + global.sd->dump_interval = exp(1.); + global.sd->update_sum = 0.; + global.sd->min_label = 0.; + global.sd->max_label = 1.; + global.local_example_number = 0; global.backprop = false; global.bfgs = false; global.corrective = false; @@ -142,11 +151,6 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt global.bfgs = false; global.hessian_on = false; global.stride = 1; - global.weighted_labels = 0.; - global.total_features = 0; - global.sum_loss = 0.0; - global.sum_loss_since_last_dump = 0.0; - global.dump_interval = exp(1.); global.num_bits = 18; global.default_bits = true; global.daemon = false; @@ -154,9 +158,6 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt global.raw_prediction = -1; global.local_prediction = -1; global.print = print_result; - global.min_label = 0.; - global.max_label = 1.; - global.update_sum = 0.; global.lda = 0; global.random_weights = false; global.per_feature_regularizer_input = ""; @@ -185,8 +186,8 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt options(desc).positional(p).run(), vm); po::notify(vm); - global.weighted_unlabeled_examples = par->t; - global.initial_t = par->t; + global.sd->weighted_unlabeled_examples = global.sd->t; + global.initial_t = global.sd->t; global.partition_bits = global.thread_bits; if (vm.count("help") || argc == 1) { @@ -387,7 +388,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt global.eta = min(global.eta,1.f); } if (!vm.count("lda")) - global.eta *= pow(par->t, vars.power_t); + global.eta *= pow(global.sd->t, vars.power_t); if (vm.count("minibatch")) { size_t minibatch2 = next_pow2(global.minibatch); @@ -406,9 +407,9 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt global.save_per_pass = true; if (vm.count("min_prediction")) - global.min_label = vm["min_prediction"].as<double>(); + global.sd->min_label = vm["min_prediction"].as<double>(); if (vm.count("max_prediction")) - global.max_label = vm["max_prediction"].as<double>(); + global.sd->max_label = vm["max_prediction"].as<double>(); if (vm.count("min_prediction") || vm.count("max_prediction") || vm.count("testonly")) set_minmax = noop_mm; @@ -429,7 +430,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt r.loss = getLossFunction(loss_function, loss_parameter); global.loss = r.loss; -// global.eta *= pow(par->t, vars.power_t); +// global.eta *= pow(global.sd->t, vars.power_t); if (global.eta_decay_rate != default_decay && global.numpasses == 1) cerr << "Warning: decay_learning_rate has no effect when there is only one pass" << endl; @@ -445,7 +446,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt { cerr << "Num weight bits = " << global.num_bits << endl; cerr << "learning rate = " << global.eta << endl; - cerr << "initial_t = " << par->t << endl; + cerr << "initial_t = " << global.sd->t << endl; cerr << "power_t = " << vars.power_t << endl; if (global.numpasses > 1) cerr << "decay_learning_rate = " << global.eta_decay_rate << endl; diff --git a/parse_regressor.cc b/parse_regressor.cc index 19f7d1b7..aef293f7 100644 --- a/parse_regressor.cc +++ b/parse_regressor.cc @@ -98,8 +98,8 @@ void read_vector(const char* file, regressor& r, bool& initialized, bool reg_vec exit(1); } - source.read((char*)&global.min_label, sizeof(global.min_label)); - source.read((char*)&global.max_label, sizeof(global.max_label)); + source.read((char*)&global.sd->min_label, sizeof(global.sd->min_label)); + source.read((char*)&global.sd->max_label, sizeof(global.sd->max_label)); size_t local_num_bits; source.read((char *)&local_num_bits, sizeof(local_num_bits)); @@ -326,8 +326,8 @@ void dump_regressor(string reg_name, regressor &r, bool as_text, bool reg_vector io_temp.write_file(f,(char*)&v_length, sizeof(v_length)); io_temp.write_file(f,version.c_str(),v_length); - io_temp.write_file(f,(char*)&global.min_label, sizeof(global.min_label)); - io_temp.write_file(f,(char*)&global.max_label, sizeof(global.max_label)); + io_temp.write_file(f,(char*)&global.sd->min_label, sizeof(global.sd->min_label)); + io_temp.write_file(f,(char*)&global.sd->max_label, sizeof(global.sd->max_label)); io_temp.write_file(f,(char *)&global.num_bits, sizeof(global.num_bits)); io_temp.write_file(f,(char *)&global.thread_bits, sizeof(global.thread_bits)); @@ -347,7 +347,7 @@ void dump_regressor(string reg_name, regressor &r, bool as_text, bool reg_vector int len; len = sprintf(buff, "Version %s\n", version.c_str()); io_temp.write_file(f, buff, len); - len = sprintf(buff, "Min label:%f max label:%f\n", global.min_label, global.max_label); + len = sprintf(buff, "Min label:%f max label:%f\n", global.sd->min_label, global.sd->max_label); io_temp.write_file(f, buff, len); len = sprintf(buff, "bits:%d thread_bits:%d\n", (int)global.num_bits, (int)global.thread_bits); io_temp.write_file(f, buff, len); @@ -7,6 +7,7 @@ embodied in the content of this file are licensed under the BSD #include <sys/types.h> #include <sys/mman.h> #include <sys/wait.h> + #include <signal.h> #include <unistd.h> #include <fstream> @@ -34,7 +35,6 @@ example* examples;//A Ring of examples. pthread_mutex_t examples_lock = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t example_available = PTHREAD_COND_INITIALIZER; pthread_cond_t example_unused = PTHREAD_COND_INITIALIZER; -uint64_t parsed_index; // The index of the parsed example. uint64_t* used_index; // The index of the example currently used by thread i. bool done=false; v_array<size_t> random_nos; @@ -125,7 +125,7 @@ void reset_source(size_t numbits, parser* p) { // wait for all predictions to be sent back to client pthread_mutex_lock(&output_lock); - while (global.example_number != parsed_index) + while (global.local_example_number != global.parsed_examples) pthread_cond_wait(&output_done, &output_lock); pthread_mutex_unlock(&output_lock); @@ -326,7 +326,6 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa if (global.daemon) { // weights will be shared across processes, accessible to children - float* shared_weights = (float*)mmap(0,global.stride * global.length() * sizeof(float), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); @@ -340,6 +339,13 @@ void parse_source_args(po::variables_map& vm, parser* par, bool quiet, size_t pa global.reg->weight_vectors[i] = dest; } + // learning state to be shared across children + shared_data* sd = (shared_data *)mmap(0,sizeof(shared_data), + PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + memcpy(sd, global.sd, sizeof(shared_data)); + free(global.sd); + global.sd = sd; + // create children size_t num_children = global.num_children; v_array<int> children; @@ -497,7 +503,7 @@ bool parser_done() if (done) { for (size_t i = 0; i < global.num_threads(); i++) - if (used_index[i] != parsed_index) + if (used_index[i] != global.parsed_examples) return false; return true; } @@ -577,11 +583,11 @@ example* get_unused_example() while (true) { pthread_mutex_lock(&examples_lock); - if (examples[parsed_index % global.ring_size].in_use == false) + if (examples[global.parsed_examples % global.ring_size].in_use == false) { - examples[parsed_index % global.ring_size].in_use = true; + examples[global.parsed_examples % global.ring_size].in_use = true; pthread_mutex_unlock(&examples_lock); - return examples + (parsed_index % global.ring_size); + return examples + (global.parsed_examples % global.ring_size); } else { @@ -656,8 +662,6 @@ feature* search(feature* begin, size_t value, feature* end) } } -size_t example_count = 0; - void setup_example(parser* p, example* ae) { ae->pass = global.passes_complete; @@ -666,10 +670,10 @@ void setup_example(parser* p, example* ae) ae->total_sum_feat_sq = 1; ae->threads_to_finish = global.num_threads(); ae->done = false; - ae->example_counter = ++example_count; + ae->example_counter = global.parsed_examples + 1; ae->global_weight = p->lp->get_weight(ae->ld); - p->t += ae->global_weight; - ae->example_t = p->t; + global.sd->t += ae->global_weight; + ae->example_t = global.sd->t; if (global.ignore_some) { @@ -755,7 +759,7 @@ void *main_parse_loop(void *in) parser* p = (parser*) in; global.passes_complete = 0; - size_t example_number = 0; + size_t example_number = 0; // for variable-size batch learning algorithms while(!done) { example* ae=get_unused_example(); @@ -764,7 +768,7 @@ void *main_parse_loop(void *in) setup_example(p,ae); example_number++; pthread_mutex_lock(&examples_lock); - parsed_index++; + global.parsed_examples++; pthread_cond_broadcast(&example_available); pthread_mutex_unlock(&examples_lock); } @@ -816,10 +820,10 @@ example* get_example(size_t thread_num) { pthread_mutex_lock(&examples_lock); - if (parsed_index != used_index[thread_num]) { + if (global.parsed_examples != used_index[thread_num]) { size_t ring_index = used_index[thread_num]++ % global.ring_size; if (!(examples+ring_index)->in_use) - cout << used_index[thread_num] << " " << parsed_index << " " << thread_num << " " << ring_index << endl; + cout << used_index[thread_num] << " " << global.parsed_examples << " " << thread_num << " " << ring_index << endl; assert((examples+ring_index)->in_use); pthread_mutex_unlock(&examples_lock); @@ -838,7 +842,7 @@ pthread_t parse_thread; void start_parser(size_t num_threads, parser* pf) { used_index = (uint64_t*) calloc(num_threads, sizeof(uint64_t)); - parsed_index = 0; + global.parsed_examples = 0; done = false; if(global.ngram>1) @@ -32,7 +32,6 @@ struct parser { v_array<substring> name; const label_parser* lp; - float t; io_buf* input; //Input source(s) int (*reader)(parser* p, void* ae); |