diff options
author | John Langford <jl@hunch.net> | 2011-11-22 04:59:40 +0400 |
---|---|---|
committer | John Langford <jl@hunch.net> | 2011-11-22 04:59:40 +0400 |
commit | 2b092676b4ad97b8504636c424da486d25ba24a4 (patch) | |
tree | 9d40b57d898e47f54e18e50b1ed148e7ed46ea84 | |
parent | 090fbe85e78017cfb4dfb942bd7b0ef1143cfd35 (diff) |
removed delay_ring
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | bfgs.cc | 3 | ||||
-rw-r--r-- | delay_ring.cc | 114 | ||||
-rw-r--r-- | delay_ring.h | 10 | ||||
-rw-r--r-- | gd.cc | 102 | ||||
-rw-r--r-- | gd_mf.cc | 29 | ||||
-rw-r--r-- | global_data.cc | 1 | ||||
-rw-r--r-- | lda_core.cc | 5 | ||||
-rw-r--r-- | message_relay.h | 8 | ||||
-rw-r--r-- | sender.cc | 3 | ||||
-rw-r--r-- | vw.cc | 4 |
11 files changed, 62 insertions, 219 deletions
@@ -59,7 +59,7 @@ export spanning_tree: cd cluster; $(MAKE); cd .. -vw: hash.o global_data.o delay_ring.o io.o parse_regressor.o parse_primitives.o unique_sort.o cache.o simple_label.o parse_example.o sparse_dense.o network.o parse_args.o allreduce.o accumulate.o gd.o lda_core.o gd_mf.o bfgs.o noop.o parser.o vw.o loss_functions.o sender.o main.o +vw: hash.o global_data.o io.o parse_regressor.o parse_primitives.o unique_sort.o cache.o simple_label.o parse_example.o sparse_dense.o network.o parse_args.o allreduce.o accumulate.o gd.o lda_core.o gd_mf.o bfgs.o noop.o parser.o vw.o loss_functions.o sender.o main.o $(COMPILER) $(FLAGS) -L$(BOOST_LIBRARY) -o $@ $+ $(LIBS) active_interactor: active_interactor.cc @@ -19,7 +19,6 @@ Implementation by Miro Dudik. #include "bfgs.h" #include "cache.h" #include "simple_label.h" -#include "delay_ring.h" #include "accumulate.h" using namespace std; @@ -747,7 +746,7 @@ void setup_bfgs(gd_thread_params& t) /********************************************************************/ /* PROCESS THE FINAL EXAMPLE ****************************************/ /********************************************************************/ - else if (thread_done()) + else if (parser_done()) { if (current_pass != 0) work_on_weights(gradient_pass, reg, *(t.final_regressor_name), diff --git a/delay_ring.cc b/delay_ring.cc deleted file mode 100644 index 2b0861c9..00000000 --- a/delay_ring.cc +++ /dev/null @@ -1,114 +0,0 @@ -#include <stdlib.h> -#include <pthread.h> -#include <iostream> -#include <float.h> -#include "v_array.h" -#include "example.h" -#include "global_data.h" -#include "parser.h" -#include "gd.h" - -v_array<size_t> delay_indices;//thread specific state. - -v_array<example*> delay_ring;//delay_ring state & mutexes -v_array<size_t> threads_to_use; -size_t local_index; -size_t global_index; -pthread_mutex_t delay = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t delay_empty = PTHREAD_COND_INITIALIZER; -pthread_cond_t delay_nonempty = PTHREAD_COND_INITIALIZER; -pthread_cond_t global_delay_nonempty = PTHREAD_COND_INITIALIZER; - -size_t mesg = 0; - -void initialize_delay_ring() -{ - size_t nt = 1+mesg; - reserve(delay_indices, nt); - for (size_t i = 0; i < nt; i++) - delay_indices[i] = 0; - reserve(delay_ring, global.ring_size); - for (size_t i = 0; i < global.ring_size; i++) - delay_ring[i] = NULL; - reserve(threads_to_use, global.ring_size); - local_index = 0; - global_index = 0; -} - -void destroy_delay_ring() -{ - free(delay_indices.begin); - delay_indices.begin = NULL; - free(delay_ring.begin); - delay_ring.begin = NULL; - free(threads_to_use.begin); - threads_to_use.begin = NULL; -} - -bool thread_done() -{ - bool ret; - if (!parser_done()) - return false; - pthread_mutex_lock(&delay); - ret = delay_indices[0] == local_index; - pthread_mutex_unlock(&delay); - return ret; -} - -example* return_example() -{ - size_t index = delay_indices[0] % global.ring_size; - example* ret = delay_ring[index]; - - pthread_mutex_lock(&delay); - delay_indices[0]++; - pthread_mutex_unlock(&delay); - - if (--threads_to_use[index] == 0) - { - pthread_mutex_lock(&delay); - delay_ring[index] = NULL; - pthread_cond_broadcast(&delay_empty); - pthread_mutex_unlock(&delay); - } - return ret; -} - -example* get_delay_example() -{//semiblocking - if (delay_indices[0] != local_index) - return return_example(); - else - return NULL; -} - -void delay_example(example* ex, size_t count) -{ - size_t delay_count = count+mesg; - - if (delay_count == 0) - { - output_and_account_example(ex); - free_example(ex); - } - else - { - size_t index = local_index % global.ring_size; - - pthread_mutex_lock(&delay); - while (delay_ring[index] != NULL) - pthread_cond_wait(&delay_empty, &delay); - - delay_ring[index] = ex; - threads_to_use[index] = delay_count; - - local_index++; - if (count == 0) - delay_indices[0]++; - - pthread_cond_broadcast(&delay_nonempty); - pthread_mutex_unlock(&delay); - } -} - diff --git a/delay_ring.h b/delay_ring.h deleted file mode 100644 index 440dba43..00000000 --- a/delay_ring.h +++ /dev/null @@ -1,10 +0,0 @@ -#ifndef DELAY_RING_H -#define DELAY_RING_H - -void initialize_delay_ring(); -void destroy_delay_ring(); -example* get_delay_example(); -void delay_example(example* ex, size_t count); -bool thread_done(); - -#endif @@ -21,7 +21,6 @@ embodied in the content of this file are licensed under the BSD #include "gd.h" #include "cache.h" #include "simple_label.h" -#include "delay_ring.h" #include "allreduce.h" #include "accumulate.h" @@ -40,43 +39,42 @@ void* gd_thread(void *in) while ( true ) {//this is a poor man's select operation. - if ((ec = get_delay_example()) != NULL)//nonblocking + if ((ec = get_example()) != NULL)//semiblocking operation. { + assert(ec->in_use); if (ec->pass != current_pass) { + if(global.span_server != "") { + if(global.adaptive) + accumulate_weighted_avg(global.span_server, params->reg); + else + accumulate_avg(global.span_server, params->reg, 0); + } + if (global.save_per_pass) sync_weights(®); global.eta *= global.eta_decay_rate; save_predictor(*(params->final_regressor_name), current_pass); current_pass = ec->pass; } - if (global.adaptive) - adaptive_inline_train(reg,ec,ec->eta_round); - else - inline_train(reg, ec, ec->eta_round); - finish_example(ec); - if (global.sd->contraction < 1e-10) // updating weights now to avoid numerical instability - sync_weights(®); - } - else if ((ec = get_example()) != NULL)//semiblocking operation. - { - assert(ec->in_use); - if (ec->pass != current_pass && global.span_server != "") + + if (!command_example(ec, params)) { - if(global.span_server != "") { - if(global.adaptive) - accumulate_weighted_avg(global.span_server, params->reg); - else - accumulate_avg(global.span_server, params->reg, 0); - } + predict(reg,ec,*(params->vars)); + if (ec->eta_round != 0.) + { + if (global.adaptive) + adaptive_inline_train(reg,ec,ec->eta_round); + else + inline_train(reg, ec, ec->eta_round); + if (global.sd->contraction < 1e-10) // updating weights now to avoid numerical instability + sync_weights(®); + + } } - - if (command_example(ec, params)) - delay_example(ec,0); - else - predict(reg,ec,*(params->vars)); + finish_example(ec); } - else if (thread_done()) + else if (parser_done()) { sync_weights(®); if(global.span_server != "") { @@ -615,31 +613,35 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg) t = global.sd->weighted_unlabeled_examples; else t = ec->example_t; - + + ec->eta_round = 0; if (ld->label != FLT_MAX) { ec->loss = reg.loss->getLoss(ec->final_prediction, ld->label) * ld->weight; - double eta_t; - float norm; - if (global.adaptive && global.exact_adaptive_norm) { - float magx = 0.; - norm = compute_xGx(reg, ec, magx); - eta_t = global.eta * norm / magx; - } else { - eta_t = global.eta / pow(t,vars.power_t) * ld->weight; - norm = global.nonormalize ? 1. : ec->total_sum_feat_sq; - } - - ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, eta_t, norm) / global.sd->contraction; - - if (global.training && global.reg_mode && fabs(ec->eta_round) > 1e-8) { - double dev1 = reg.loss->first_derivative(ec->final_prediction, ld->label); - double eta_bar = (fabs(dev1) > 1e-8) ? (-ec->eta_round / dev1) : 0.0; - if (fabs(dev1) > 1e-8) - global.sd->contraction /= (1. + global.l2_lambda * eta_bar * norm); - global.sd->gravity += eta_bar * sqrt(norm) * global.l1_lambda; - } + if (global.training) + { + double eta_t; + float norm; + if (global.adaptive && global.exact_adaptive_norm) { + float magx = 0.; + norm = compute_xGx(reg, ec, magx); + eta_t = global.eta * norm / magx; + } else { + eta_t = global.eta / pow(t,vars.power_t) * ld->weight; + norm = global.nonormalize ? 1. : ec->total_sum_feat_sq; + } + + ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, eta_t, norm) / global.sd->contraction; + + if (global.reg_mode && fabs(ec->eta_round) > 1e-8) { + double dev1 = reg.loss->first_derivative(ec->final_prediction, ld->label); + double eta_bar = (fabs(dev1) > 1e-8) ? (-ec->eta_round / dev1) : 0.0; + if (fabs(dev1) > 1e-8) + global.sd->contraction /= (1. + global.l2_lambda * eta_bar * norm); + global.sd->gravity += eta_bar * sqrt(norm) * global.l1_lambda; + } + } } else if(global.active) ec->revert_weight = reg.loss->getRevertingWeight(ec->final_prediction, global.eta/pow(t,vars.power_t)); @@ -676,12 +678,6 @@ void predict(regressor& r, example* ex, gd_vars& vars) local_predict(ex, vars,r); ex->done = true; - - if (global.training && ((label_data*)(ex->ld))->label != FLT_MAX) - delay_example(ex,1); - else - delay_example(ex,0); - } // trains regressor r on one example ex. @@ -14,7 +14,6 @@ embodied in the content of this file are licensed under the BSD #include "gd.h" #include "cache.h" #include "simple_label.h" -#include "delay_ring.h" using namespace std; @@ -30,29 +29,22 @@ void* gd_mf_thread(void *in) size_t current_pass = 0; while ( true ) - {//this is a poor man's select operation. - if ((ec = get_delay_example()) != NULL)//nonblocking + { + if ((ec = get_example()) != NULL)//blocking operation. { - if (ec->pass != current_pass) { global.eta *= global.eta_decay_rate; current_pass = ec->pass; } - - //cout << ec->eta_round << endl; - mf_inline_train(*(params->vars), reg, ec, ec->eta_round); - finish_example(ec); - } - else if ((ec = get_example()) != NULL)//blocking operation. - { - if (command_example(ec, params)) + if (!command_example(ec, params)) { - delay_example(ec,0); + mf_predict(reg,ec,*(params->vars)); + if (global.training && ((label_data*)(ec->ld))->label != FLT_MAX) + mf_inline_train(*(params->vars), reg, ec, ec->eta_round); } - else - mf_predict(reg,ec,*(params->vars)); + finish_example(ec); } - else if (thread_done()) + else if (parser_done()) { if (global.local_prediction > 0) shutdown(global.local_prediction, SHUT_WR); @@ -231,11 +223,6 @@ float mf_predict(regressor& r, example* ex, gd_vars& vars) ex->partial_prediction = prediction; mf_local_predict(ex, vars,r); - if (global.training && ((label_data*)(ex->ld))->label != FLT_MAX) - delay_example(ex,1); - else - delay_example(ex,0); - return ex->final_prediction; } diff --git a/global_data.cc b/global_data.cc index 1fa83da6..b977684f 100644 --- a/global_data.cc +++ b/global_data.cc @@ -3,7 +3,6 @@ #include <float.h> #include <iostream> #include "global_data.h" -#include "message_relay.h" using namespace std; diff --git a/lda_core.cc b/lda_core.cc index 35ec3ebb..3b7bff90 100644 --- a/lda_core.cc +++ b/lda_core.cc @@ -17,7 +17,6 @@ embodied in the content of this file are licensed under the BSD #include "lda_core.h"
#include "cache.h"
#include "simple_label.h"
-#include "delay_ring.h"
#define MINEIRO_SPECIAL
#ifdef MINEIRO_SPECIAL
@@ -538,7 +537,7 @@ void start_lda(gd_thread_params t) }
}
}
- else if (thread_done())
+ else if (parser_done())
batch_size = d;
else
d--;
@@ -619,7 +618,7 @@ void start_lda(gd_thread_params t) total_lambda[k] += total_new[k];
}
- if (thread_done())
+ if (parser_done())
{
for (size_t i = 0; i < global.length(); i++) {
weight* weights_for_w = & (weights[i*global.stride]);
diff --git a/message_relay.h b/message_relay.h deleted file mode 100644 index 656caccd..00000000 --- a/message_relay.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef MESSAGE_RELAY_H -#define MESSAGE_RELAY_H - -void setup_relay(void*); - -void destroy_relay(); - -#endif @@ -7,7 +7,6 @@ #include "vw.h" #include "simple_label.h" #include "network.h" -#include "delay_ring.h" using namespace std; io_buf* buf; @@ -60,7 +59,7 @@ void setup_send() simple_label.cache_label(ld, *buf);//send label information. cache_tag(*buf, ec->tag); send_features(buf,ec); - delay_example(ec,0); + finish_example(ec); } else if (!finished && parser_done()) { //close our outputs to signal finishing. @@ -25,8 +25,6 @@ embodied in the content of this file are licensed under the BSD #include "vw.h" #include "simple_label.h" #include "sender.h" -#include "delay_ring.h" -#include "message_relay.h" #include "allreduce.h" using namespace std; @@ -63,7 +61,6 @@ gd_vars* vw(int argc, char *argv[]) gd_thread_params t = {vars, regressor1, &final_regressor_name}; start_parser(p); - initialize_delay_ring(); if (vm.count("sendto")) { @@ -97,7 +94,6 @@ gd_vars* vw(int argc, char *argv[]) destroy_gd(); } - destroy_delay_ring(); end_parser(p); finalize_regressor(final_regressor_name,t.reg); |