Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/cg.cc
diff options
context:
space:
mode:
authorAlekh Agarwal <alekh@starbuilt.newyork.corp.yahoo.com>2011-06-23 21:26:02 +0400
committerAlekh Agarwal <alekh@starbuilt.newyork.corp.yahoo.com>2011-06-23 21:26:02 +0400
commit540d2fe5d7626bdeddfe296fb90ccbcedd426765 (patch)
treefd96b574b1704d65010de8245cd2823e9026bbe4 /cg.cc
parentbf9b9be470ad8bcb42ef22b89dc57f20b53c56d8 (diff)
minor changes and a README for cluster
Diffstat (limited to 'cg.cc')
-rw-r--r--cg.cc101
1 files changed, 88 insertions, 13 deletions
diff --git a/cg.cc b/cg.cc
index 161937f5..63e23944 100644
--- a/cg.cc
+++ b/cg.cc
@@ -20,6 +20,11 @@ The algorithm here is generally based on Jonathan Shewchuck's tutorial.
#include "multisource.h"
#include "simple_label.h"
#include "delay_ring.h"
+#include "allreduce.h"
+#include <sys/timeb.h>
+
+struct timeb t_start, t_end;
+double net_comm_time = 0.0;
void quad_grad_update(weight* weights, feature& page_feature, v_array<feature> &offer_features, size_t mask, float g)
{
@@ -257,6 +262,36 @@ void update_weight(regressor& reg, float step_size)
reg.weight_vectors[0][stride*i] += step_size * reg.weight_vectors[0][stride*i+2];
}
+void accumulate(node_socks socks, regressor& reg, size_t o) {
+ ftime(&t_start);
+ uint32_t length = 1 << global.num_bits; //This is size of gradient
+ size_t stride = global.stride;
+ float* local_grad = new float[length];
+ weight* weights = reg.weight_vectors[0];
+ for(uint32_t i = 0;i < length;i++)
+ {
+ local_grad[i] = weights[stride*i+o];
+ }
+
+ all_reduce((char*)local_grad, length*sizeof(float), socks);
+ for(uint32_t i = 0;i < length;i++)
+ {
+ weights[stride*i+o] = local_grad[i];
+ }
+ delete[] local_grad;
+ ftime(&t_end);
+ net_comm_time += (int) (1000.0 * (t_end.time - t_start.time) + (t_end.millitm - t_start.millitm));
+}
+
+float accumulate_scalar(node_socks socks, float local_sum) {
+ ftime(&t_start);
+ float temp = local_sum;
+ all_reduce((char*)&temp, sizeof(float), socks);
+ ftime(&t_end);
+ net_comm_time += (int) (1000.0 * (t_end.time - t_start.time) + (t_end.millitm - t_start.millitm));
+ return temp;
+}
+
void setup_cg(gd_thread_params t)
{
regressor reg = t.reg;
@@ -278,11 +313,21 @@ void setup_cg(gd_thread_params t)
float* old_first_derivative = (float*) malloc(sizeof(float)*global.length());
+ node_socks socks;
+ struct timeb t_start_global, t_end_global;
+ double net_time = 0.0;
+ net_comm_time = 0.0;
+ ftime(&t_start_global);
+
+ if(global.master_location != "")
+ all_reduce_init(global.master_location, &socks);
+
if (!global.quiet)
{
- const char * header_fmt = "%-10s\t%-10s\t%-10s\t%-10s\t%-10s\t%-10s\n";
+ const char * header_fmt = "%-10s\t%-10s\t%-10s\t%-10s\t%-10s\t%-10s\t%-10s\n";
fprintf(stderr, header_fmt,
- "avg. loss", "mix fraction", "curvature", "dir. magnitude", "step size", "d_mag*step/examples");
+ "avg. loss", "mix fraction", "der. mag", "curvature", "dir. magnitude", "step size", "newt. decr.");
+ fflush(stderr);
cerr.precision(5);
}
@@ -294,14 +339,22 @@ void setup_cg(gd_thread_params t)
if (ec->pass != current_pass)//we need to do work on all features.
{
if (current_pass == 0)
- finalize_preconditioner(reg,global.regularization*importance_weight_sum);
+ {
+ if(global.master_location != "")
+ accumulate(socks, reg, 3); //Accumulate preconditioner
+ finalize_preconditioner(reg,global.regularization);
+ }
if (gradient_pass) // We just finished computing all gradients
{
+ if(global.master_location != "") {
+ loss_sum = accumulate_scalar(socks, loss_sum); //Accumulate loss_sums
+ accumulate(socks, reg, 1); //Accumulate gradients from all nodes
+ }
if (global.regularization > 0.)
- loss_sum += add_regularization(reg,global.regularization*importance_weight_sum);
+ loss_sum += add_regularization(reg,global.regularization);
if (!global.quiet)
- fprintf(stderr, "%-f\t", loss_sum / importance_weight_sum);
-
+ fprintf(stderr, "%-f\t", loss_sum );
+
if (current_pass > 0 && loss_sum > previous_loss_sum)
{// we stepped to far last time, step back
if (ec->pass != 0)
@@ -325,9 +378,8 @@ void setup_cg(gd_thread_params t)
mix_frac = 0;
float new_d_mag = derivative_magnitude(reg, old_first_derivative);
previous_d_mag = new_d_mag;
-
if (!global.quiet)
- fprintf(stderr, "%f\t", mix_frac);
+ fprintf(stderr, "%f\t%f\t", mix_frac, new_d_mag);
update_direction(reg, mix_frac, old_first_derivative);
gradient_pass = false;//now start computing curvature
@@ -336,8 +388,10 @@ void setup_cg(gd_thread_params t)
else // just finished all second gradients
{
float d_mag = direction_magnitude(reg);
+ if(global.master_location != "")
+ curvature = accumulate_scalar(socks, curvature); //Accumulate curvatures
if (global.regularization > 0.)
- curvature += global.regularization*d_mag*importance_weight_sum;
+ curvature += global.regularization*d_mag;
float dd = derivative_in_direction(reg, old_first_derivative);
if (curvature == 0. && dd != 0.)
{
@@ -346,7 +400,7 @@ void setup_cg(gd_thread_params t)
}
step_size = - dd/curvature;
if (!global.quiet)
- fprintf(stderr, "%-e\t%-e\t%-e\t%-f\n", curvature / importance_weight_sum, d_mag, step_size,d_mag*step_size/importance_weight_sum);
+ fprintf(stderr, "%-e\t%-e\t%-e\t%-f\n", curvature, d_mag, step_size, 0.5*step_size*step_size*curvature);
predictions.erase();
update_weight(reg,step_size);
@@ -401,8 +455,10 @@ void setup_cg(gd_thread_params t)
if (example_number == predictions.index())//do one last update
{
float d_mag = direction_magnitude(reg);
+ if(global.master_location != "")
+ curvature = accumulate_scalar(socks, curvature); //Accumulate curvatures
if (global.regularization > 0.)
- curvature += global.regularization*d_mag*importance_weight_sum;
+ curvature += global.regularization*d_mag;
float dd = derivative_in_direction(reg, old_first_derivative);
if (curvature == 0. && dd != 0.)
{
@@ -411,20 +467,39 @@ void setup_cg(gd_thread_params t)
}
float step_size = - dd/(max(curvature,1.));
if (!global.quiet)
- fprintf(stderr, "%-e\t%-e\t%-e\t%-f\n", curvature / importance_weight_sum, d_mag, step_size,d_mag*step_size/importance_weight_sum);
+ fprintf(stderr, "%-e\t%-e\t%-e\t%-f\n", curvature, d_mag, step_size, 0.5*step_size*step_size*curvature);
update_weight(reg,step_size);
}
+ ftime(&t_end_global);
+ net_time += (int) (1000.0 * (t_end_global.time - t_start_global.time) + (t_end_global.millitm - t_start_global.millitm));
+ cerr<<"Net time spent in communication = "<<(float)net_comm_time/(float)1000<<"seconds\n";
+ cerr<<"Net time spent = "<<(float)net_time/(float)1000<<"seconds\n";
if (global.local_prediction > 0)
shutdown(global.local_prediction, SHUT_WR);
+ if(global.master_location != "")
+ all_reduce_close(socks);
free(predictions.begin);
+ free(old_first_derivative);
+ free(ec);
return;
}
else
;//busywait when we have predicted on all examples but not yet trained on all.
}
-
+
+ cerr<<"Done CG\n";
+ fflush(stderr);
+ if(global.master_location != "")
+ all_reduce_close(socks);
free(predictions.begin);
free(old_first_derivative);
+ cerr<<"Really Done CG\n";
+ ftime(&t_end_global);
+ net_time += (int) (1000.0 * (t_end_global.time - t_start_global.time) + (t_end_global.millitm - t_start_global.millitm));
+ cerr<<"Net time spent in communication = "<<(float)net_comm_time/(float)1000<<"seconds\n";
+ cerr<<"Net time spent = "<<(float)net_time/(float)1000<<"seconds\n";
+ fflush(stderr);
+
return;
}