Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2011-11-22 04:59:40 +0400
committerJohn Langford <jl@hunch.net>2011-11-22 04:59:40 +0400
commit2b092676b4ad97b8504636c424da486d25ba24a4 (patch)
tree9d40b57d898e47f54e18e50b1ed148e7ed46ea84
parent090fbe85e78017cfb4dfb942bd7b0ef1143cfd35 (diff)
removed delay_ring
-rw-r--r--Makefile2
-rw-r--r--bfgs.cc3
-rw-r--r--delay_ring.cc114
-rw-r--r--delay_ring.h10
-rw-r--r--gd.cc102
-rw-r--r--gd_mf.cc29
-rw-r--r--global_data.cc1
-rw-r--r--lda_core.cc5
-rw-r--r--message_relay.h8
-rw-r--r--sender.cc3
-rw-r--r--vw.cc4
11 files changed, 62 insertions, 219 deletions
diff --git a/Makefile b/Makefile
index 7071eb05..a0fd33e0 100644
--- a/Makefile
+++ b/Makefile
@@ -59,7 +59,7 @@ export
spanning_tree:
cd cluster; $(MAKE); cd ..
-vw: hash.o global_data.o delay_ring.o io.o parse_regressor.o parse_primitives.o unique_sort.o cache.o simple_label.o parse_example.o sparse_dense.o network.o parse_args.o allreduce.o accumulate.o gd.o lda_core.o gd_mf.o bfgs.o noop.o parser.o vw.o loss_functions.o sender.o main.o
+vw: hash.o global_data.o io.o parse_regressor.o parse_primitives.o unique_sort.o cache.o simple_label.o parse_example.o sparse_dense.o network.o parse_args.o allreduce.o accumulate.o gd.o lda_core.o gd_mf.o bfgs.o noop.o parser.o vw.o loss_functions.o sender.o main.o
$(COMPILER) $(FLAGS) -L$(BOOST_LIBRARY) -o $@ $+ $(LIBS)
active_interactor: active_interactor.cc
diff --git a/bfgs.cc b/bfgs.cc
index 0dabdb9a..150434b0 100644
--- a/bfgs.cc
+++ b/bfgs.cc
@@ -19,7 +19,6 @@ Implementation by Miro Dudik.
#include "bfgs.h"
#include "cache.h"
#include "simple_label.h"
-#include "delay_ring.h"
#include "accumulate.h"
using namespace std;
@@ -747,7 +746,7 @@ void setup_bfgs(gd_thread_params& t)
/********************************************************************/
/* PROCESS THE FINAL EXAMPLE ****************************************/
/********************************************************************/
- else if (thread_done())
+ else if (parser_done())
{
if (current_pass != 0)
work_on_weights(gradient_pass, reg, *(t.final_regressor_name),
diff --git a/delay_ring.cc b/delay_ring.cc
deleted file mode 100644
index 2b0861c9..00000000
--- a/delay_ring.cc
+++ /dev/null
@@ -1,114 +0,0 @@
-#include <stdlib.h>
-#include <pthread.h>
-#include <iostream>
-#include <float.h>
-#include "v_array.h"
-#include "example.h"
-#include "global_data.h"
-#include "parser.h"
-#include "gd.h"
-
-v_array<size_t> delay_indices;//thread specific state.
-
-v_array<example*> delay_ring;//delay_ring state & mutexes
-v_array<size_t> threads_to_use;
-size_t local_index;
-size_t global_index;
-pthread_mutex_t delay = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t delay_empty = PTHREAD_COND_INITIALIZER;
-pthread_cond_t delay_nonempty = PTHREAD_COND_INITIALIZER;
-pthread_cond_t global_delay_nonempty = PTHREAD_COND_INITIALIZER;
-
-size_t mesg = 0;
-
-void initialize_delay_ring()
-{
- size_t nt = 1+mesg;
- reserve(delay_indices, nt);
- for (size_t i = 0; i < nt; i++)
- delay_indices[i] = 0;
- reserve(delay_ring, global.ring_size);
- for (size_t i = 0; i < global.ring_size; i++)
- delay_ring[i] = NULL;
- reserve(threads_to_use, global.ring_size);
- local_index = 0;
- global_index = 0;
-}
-
-void destroy_delay_ring()
-{
- free(delay_indices.begin);
- delay_indices.begin = NULL;
- free(delay_ring.begin);
- delay_ring.begin = NULL;
- free(threads_to_use.begin);
- threads_to_use.begin = NULL;
-}
-
-bool thread_done()
-{
- bool ret;
- if (!parser_done())
- return false;
- pthread_mutex_lock(&delay);
- ret = delay_indices[0] == local_index;
- pthread_mutex_unlock(&delay);
- return ret;
-}
-
-example* return_example()
-{
- size_t index = delay_indices[0] % global.ring_size;
- example* ret = delay_ring[index];
-
- pthread_mutex_lock(&delay);
- delay_indices[0]++;
- pthread_mutex_unlock(&delay);
-
- if (--threads_to_use[index] == 0)
- {
- pthread_mutex_lock(&delay);
- delay_ring[index] = NULL;
- pthread_cond_broadcast(&delay_empty);
- pthread_mutex_unlock(&delay);
- }
- return ret;
-}
-
-example* get_delay_example()
-{//semiblocking
- if (delay_indices[0] != local_index)
- return return_example();
- else
- return NULL;
-}
-
-void delay_example(example* ex, size_t count)
-{
- size_t delay_count = count+mesg;
-
- if (delay_count == 0)
- {
- output_and_account_example(ex);
- free_example(ex);
- }
- else
- {
- size_t index = local_index % global.ring_size;
-
- pthread_mutex_lock(&delay);
- while (delay_ring[index] != NULL)
- pthread_cond_wait(&delay_empty, &delay);
-
- delay_ring[index] = ex;
- threads_to_use[index] = delay_count;
-
- local_index++;
- if (count == 0)
- delay_indices[0]++;
-
- pthread_cond_broadcast(&delay_nonempty);
- pthread_mutex_unlock(&delay);
- }
-}
-
diff --git a/delay_ring.h b/delay_ring.h
deleted file mode 100644
index 440dba43..00000000
--- a/delay_ring.h
+++ /dev/null
@@ -1,10 +0,0 @@
-#ifndef DELAY_RING_H
-#define DELAY_RING_H
-
-void initialize_delay_ring();
-void destroy_delay_ring();
-example* get_delay_example();
-void delay_example(example* ex, size_t count);
-bool thread_done();
-
-#endif
diff --git a/gd.cc b/gd.cc
index 2304a561..c245ff04 100644
--- a/gd.cc
+++ b/gd.cc
@@ -21,7 +21,6 @@ embodied in the content of this file are licensed under the BSD
#include "gd.h"
#include "cache.h"
#include "simple_label.h"
-#include "delay_ring.h"
#include "allreduce.h"
#include "accumulate.h"
@@ -40,43 +39,42 @@ void* gd_thread(void *in)
while ( true )
{//this is a poor man's select operation.
- if ((ec = get_delay_example()) != NULL)//nonblocking
+ if ((ec = get_example()) != NULL)//semiblocking operation.
{
+ assert(ec->in_use);
if (ec->pass != current_pass)
{
+ if(global.span_server != "") {
+ if(global.adaptive)
+ accumulate_weighted_avg(global.span_server, params->reg);
+ else
+ accumulate_avg(global.span_server, params->reg, 0);
+ }
+
if (global.save_per_pass)
sync_weights(&reg);
global.eta *= global.eta_decay_rate;
save_predictor(*(params->final_regressor_name), current_pass);
current_pass = ec->pass;
}
- if (global.adaptive)
- adaptive_inline_train(reg,ec,ec->eta_round);
- else
- inline_train(reg, ec, ec->eta_round);
- finish_example(ec);
- if (global.sd->contraction < 1e-10) // updating weights now to avoid numerical instability
- sync_weights(&reg);
- }
- else if ((ec = get_example()) != NULL)//semiblocking operation.
- {
- assert(ec->in_use);
- if (ec->pass != current_pass && global.span_server != "")
+
+ if (!command_example(ec, params))
{
- if(global.span_server != "") {
- if(global.adaptive)
- accumulate_weighted_avg(global.span_server, params->reg);
- else
- accumulate_avg(global.span_server, params->reg, 0);
- }
+ predict(reg,ec,*(params->vars));
+ if (ec->eta_round != 0.)
+ {
+ if (global.adaptive)
+ adaptive_inline_train(reg,ec,ec->eta_round);
+ else
+ inline_train(reg, ec, ec->eta_round);
+ if (global.sd->contraction < 1e-10) // updating weights now to avoid numerical instability
+ sync_weights(&reg);
+
+ }
}
-
- if (command_example(ec, params))
- delay_example(ec,0);
- else
- predict(reg,ec,*(params->vars));
+ finish_example(ec);
}
- else if (thread_done())
+ else if (parser_done())
{
sync_weights(&reg);
if(global.span_server != "") {
@@ -615,31 +613,35 @@ void local_predict(example* ec, gd_vars& vars, regressor& reg)
t = global.sd->weighted_unlabeled_examples;
else
t = ec->example_t;
-
+
+ ec->eta_round = 0;
if (ld->label != FLT_MAX)
{
ec->loss = reg.loss->getLoss(ec->final_prediction, ld->label) * ld->weight;
- double eta_t;
- float norm;
- if (global.adaptive && global.exact_adaptive_norm) {
- float magx = 0.;
- norm = compute_xGx(reg, ec, magx);
- eta_t = global.eta * norm / magx;
- } else {
- eta_t = global.eta / pow(t,vars.power_t) * ld->weight;
- norm = global.nonormalize ? 1. : ec->total_sum_feat_sq;
- }
-
- ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, eta_t, norm) / global.sd->contraction;
-
- if (global.training && global.reg_mode && fabs(ec->eta_round) > 1e-8) {
- double dev1 = reg.loss->first_derivative(ec->final_prediction, ld->label);
- double eta_bar = (fabs(dev1) > 1e-8) ? (-ec->eta_round / dev1) : 0.0;
- if (fabs(dev1) > 1e-8)
- global.sd->contraction /= (1. + global.l2_lambda * eta_bar * norm);
- global.sd->gravity += eta_bar * sqrt(norm) * global.l1_lambda;
- }
+ if (global.training)
+ {
+ double eta_t;
+ float norm;
+ if (global.adaptive && global.exact_adaptive_norm) {
+ float magx = 0.;
+ norm = compute_xGx(reg, ec, magx);
+ eta_t = global.eta * norm / magx;
+ } else {
+ eta_t = global.eta / pow(t,vars.power_t) * ld->weight;
+ norm = global.nonormalize ? 1. : ec->total_sum_feat_sq;
+ }
+
+ ec->eta_round = reg.loss->getUpdate(ec->final_prediction, ld->label, eta_t, norm) / global.sd->contraction;
+
+ if (global.reg_mode && fabs(ec->eta_round) > 1e-8) {
+ double dev1 = reg.loss->first_derivative(ec->final_prediction, ld->label);
+ double eta_bar = (fabs(dev1) > 1e-8) ? (-ec->eta_round / dev1) : 0.0;
+ if (fabs(dev1) > 1e-8)
+ global.sd->contraction /= (1. + global.l2_lambda * eta_bar * norm);
+ global.sd->gravity += eta_bar * sqrt(norm) * global.l1_lambda;
+ }
+ }
}
else if(global.active)
ec->revert_weight = reg.loss->getRevertingWeight(ec->final_prediction, global.eta/pow(t,vars.power_t));
@@ -676,12 +678,6 @@ void predict(regressor& r, example* ex, gd_vars& vars)
local_predict(ex, vars,r);
ex->done = true;
-
- if (global.training && ((label_data*)(ex->ld))->label != FLT_MAX)
- delay_example(ex,1);
- else
- delay_example(ex,0);
-
}
// trains regressor r on one example ex.
diff --git a/gd_mf.cc b/gd_mf.cc
index c8a7d926..42c26fcb 100644
--- a/gd_mf.cc
+++ b/gd_mf.cc
@@ -14,7 +14,6 @@ embodied in the content of this file are licensed under the BSD
#include "gd.h"
#include "cache.h"
#include "simple_label.h"
-#include "delay_ring.h"
using namespace std;
@@ -30,29 +29,22 @@ void* gd_mf_thread(void *in)
size_t current_pass = 0;
while ( true )
- {//this is a poor man's select operation.
- if ((ec = get_delay_example()) != NULL)//nonblocking
+ {
+ if ((ec = get_example()) != NULL)//blocking operation.
{
-
if (ec->pass != current_pass) {
global.eta *= global.eta_decay_rate;
current_pass = ec->pass;
}
-
- //cout << ec->eta_round << endl;
- mf_inline_train(*(params->vars), reg, ec, ec->eta_round);
- finish_example(ec);
- }
- else if ((ec = get_example()) != NULL)//blocking operation.
- {
- if (command_example(ec, params))
+ if (!command_example(ec, params))
{
- delay_example(ec,0);
+ mf_predict(reg,ec,*(params->vars));
+ if (global.training && ((label_data*)(ec->ld))->label != FLT_MAX)
+ mf_inline_train(*(params->vars), reg, ec, ec->eta_round);
}
- else
- mf_predict(reg,ec,*(params->vars));
+ finish_example(ec);
}
- else if (thread_done())
+ else if (parser_done())
{
if (global.local_prediction > 0)
shutdown(global.local_prediction, SHUT_WR);
@@ -231,11 +223,6 @@ float mf_predict(regressor& r, example* ex, gd_vars& vars)
ex->partial_prediction = prediction;
mf_local_predict(ex, vars,r);
- if (global.training && ((label_data*)(ex->ld))->label != FLT_MAX)
- delay_example(ex,1);
- else
- delay_example(ex,0);
-
return ex->final_prediction;
}
diff --git a/global_data.cc b/global_data.cc
index 1fa83da6..b977684f 100644
--- a/global_data.cc
+++ b/global_data.cc
@@ -3,7 +3,6 @@
#include <float.h>
#include <iostream>
#include "global_data.h"
-#include "message_relay.h"
using namespace std;
diff --git a/lda_core.cc b/lda_core.cc
index 35ec3ebb..3b7bff90 100644
--- a/lda_core.cc
+++ b/lda_core.cc
@@ -17,7 +17,6 @@ embodied in the content of this file are licensed under the BSD
#include "lda_core.h"
#include "cache.h"
#include "simple_label.h"
-#include "delay_ring.h"
#define MINEIRO_SPECIAL
#ifdef MINEIRO_SPECIAL
@@ -538,7 +537,7 @@ void start_lda(gd_thread_params t)
}
}
}
- else if (thread_done())
+ else if (parser_done())
batch_size = d;
else
d--;
@@ -619,7 +618,7 @@ void start_lda(gd_thread_params t)
total_lambda[k] += total_new[k];
}
- if (thread_done())
+ if (parser_done())
{
for (size_t i = 0; i < global.length(); i++) {
weight* weights_for_w = & (weights[i*global.stride]);
diff --git a/message_relay.h b/message_relay.h
deleted file mode 100644
index 656caccd..00000000
--- a/message_relay.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef MESSAGE_RELAY_H
-#define MESSAGE_RELAY_H
-
-void setup_relay(void*);
-
-void destroy_relay();
-
-#endif
diff --git a/sender.cc b/sender.cc
index 8a13b046..49b63379 100644
--- a/sender.cc
+++ b/sender.cc
@@ -7,7 +7,6 @@
#include "vw.h"
#include "simple_label.h"
#include "network.h"
-#include "delay_ring.h"
using namespace std;
io_buf* buf;
@@ -60,7 +59,7 @@ void setup_send()
simple_label.cache_label(ld, *buf);//send label information.
cache_tag(*buf, ec->tag);
send_features(buf,ec);
- delay_example(ec,0);
+ finish_example(ec);
}
else if (!finished && parser_done())
{ //close our outputs to signal finishing.
diff --git a/vw.cc b/vw.cc
index 16502cca..e526785d 100644
--- a/vw.cc
+++ b/vw.cc
@@ -25,8 +25,6 @@ embodied in the content of this file are licensed under the BSD
#include "vw.h"
#include "simple_label.h"
#include "sender.h"
-#include "delay_ring.h"
-#include "message_relay.h"
#include "allreduce.h"
using namespace std;
@@ -63,7 +61,6 @@ gd_vars* vw(int argc, char *argv[])
gd_thread_params t = {vars, regressor1, &final_regressor_name};
start_parser(p);
- initialize_delay_ring();
if (vm.count("sendto"))
{
@@ -97,7 +94,6 @@ gd_vars* vw(int argc, char *argv[])
destroy_gd();
}
- destroy_delay_ring();
end_parser(p);
finalize_regressor(final_regressor_name,t.reg);