Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--accumulate.cc12
-rw-r--r--allreduce.cc17
-rw-r--r--allreduce.h2
-rw-r--r--cluster/allreduce_master.cc26
-rwxr-xr-xcluster/mapscript.sh2
-rwxr-xr-xcluster/runvw.sh14
-rw-r--r--parse_args.cc2
7 files changed, 57 insertions, 18 deletions
diff --git a/accumulate.cc b/accumulate.cc
index 3784ff17..0235aa65 100644
--- a/accumulate.cc
+++ b/accumulate.cc
@@ -31,7 +31,7 @@ void accumulate(string master_location, regressor& reg, size_t o) {
local_grad[i] = weights[stride*i+o];
}
- all_reduce((char*)local_grad, length*sizeof(float), master_location);
+ all_reduce((char*)local_grad, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
{
weights[stride*i+o] = local_grad[i];
@@ -44,7 +44,7 @@ void accumulate(string master_location, regressor& reg, size_t o) {
float accumulate_scalar(string master_location, float local_sum) {
ftime(&t_start);
float temp = local_sum;
- all_reduce((char*)&temp, sizeof(float), master_location);
+ all_reduce((char*)&temp, sizeof(float), master_location, global.unique_id);
ftime(&t_end);
net_comm_time += (int) (1000.0 * (t_end.time - t_start.time) + (t_end.millitm - t_start.millitm));
return temp;
@@ -57,13 +57,13 @@ void accumulate_avg(string master_location, regressor& reg, size_t o) {
weight* weights = reg.weight_vectors[0];
ftime(&t_start);
float numnodes = 1.;
- all_reduce((char*)&numnodes, sizeof(float), master_location);
+ all_reduce((char*)&numnodes, sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
{
local_grad[i] = weights[stride*i+o];
}
- all_reduce((char*)local_grad, length*sizeof(float), master_location);
+ all_reduce((char*)local_grad, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
{
weights[stride*i+o] = local_grad[i]/numnodes;
@@ -102,7 +102,7 @@ void accumulate_weighted_avg(string master_location, regressor& reg) {
for(uint32_t i = 0;i < length;i++)
local_weights[i] = sqrt(weights[stride*i+1]*weights[stride*i+1]-1);
- all_reduce((char*)local_weights, length*sizeof(float), master_location);
+ all_reduce((char*)local_weights, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
if(local_weights[i] > 0)
@@ -110,7 +110,7 @@ void accumulate_weighted_avg(string master_location, regressor& reg) {
else
local_param[i] = 0;
- all_reduce((char*)local_param, length*sizeof(float), master_location);
+ all_reduce((char*)local_param, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
weights[stride*i] = local_param[i];
ftime(&t_end);
diff --git a/allreduce.cc b/allreduce.cc
index 9d4a1d9f..2a1cb5c1 100644
--- a/allreduce.cc
+++ b/allreduce.cc
@@ -79,7 +79,7 @@ int sock_connect(uint32_t ip, int port) {
}
-int all_reduce_init(string master_location)
+int all_reduce_init(string master_location, size_t unique_id)
{
struct hostent* master = gethostbyname(master_location.c_str());
@@ -93,6 +93,17 @@ int all_reduce_init(string master_location)
int port = 26543;
int master_sock = sock_connect(master_ip, htons(port));
+
+ if(write(master_sock, &unique_id, sizeof(unique_id)) < (int)sizeof(unique_id))
+ cerr << "write failed!" << endl;
+ int ok;
+ if (read(master_sock, &ok, sizeof(ok)) < (int)sizeof(ok))
+ cerr << "read failed!" << endl;
+ if (!ok) {
+ cerr << "mapper already connected" << endl;
+ exit(1);
+ }
+
int client_port, kid_count, parent_port;
uint32_t parent_ip;
int numnodes;
@@ -353,10 +364,10 @@ void broadcast(char* buffer, int n, int parent_sock, int* child_sockets) {
}
}
-void all_reduce(char* buffer, int n, string master_location)
+void all_reduce(char* buffer, int n, string master_location, size_t unique_id)
{
if(master_location != current_master)
- all_reduce_init(master_location);
+ all_reduce_init(master_location, unique_id);
reduce(buffer, n, socks.parent, socks.children);
broadcast(buffer, n, socks.parent, socks.children);
diff --git a/allreduce.h b/allreduce.h
index 3e9c1100..1278c268 100644
--- a/allreduce.h
+++ b/allreduce.h
@@ -22,6 +22,6 @@ using namespace std;
const int buf_size = 1<<18;
-void all_reduce(char* buffer, int n, string master_location);
+void all_reduce(char* buffer, int n, string master_location, size_t unique_id);
#endif
diff --git a/cluster/allreduce_master.cc b/cluster/allreduce_master.cc
index 4af743dd..fe71440a 100644
--- a/cluster/allreduce_master.cc
+++ b/cluster/allreduce_master.cc
@@ -23,6 +23,7 @@ using namespace std;
struct client {
uint32_t client_ip;
int socket;
+ size_t unique_id;
};
static int socket_sort(const void* s1, const void* s2) {
@@ -125,9 +126,28 @@ int main(int argc, char* argv[]) {
cerr << "bad client socket!" << endl;
exit (1);
}
- client_sockets[i].client_ip = client_address.sin_addr.s_addr;
- client_sockets[i].socket = f;
- }
+ size_t mapper_id = 0;
+ if (read(f, &mapper_id, sizeof(mapper_id)) != sizeof(mapper_id))
+ {
+ cerr << "unique id read failed, exiting" << endl;
+ exit(1);
+ }
+ int ok = true;
+ for (int j = 0; j < i; j++)
+ if (client_sockets[j].unique_id == mapper_id && mapper_id != ((size_t)0)-1)
+ ok = false;
+ cerr << mapper_id << " " << ok << endl;
+ fail_write(f,&ok, sizeof(ok));
+
+ if (ok)
+ {
+ client_sockets[i].client_ip = client_address.sin_addr.s_addr;
+ client_sockets[i].socket = f;
+ client_sockets[i].unique_id = mapper_id;
+ }
+ else
+ i--;
+ }
qsort(client_sockets, source_count, sizeof(client), socket_sort);
int client_ports[source_count];
client_ports[0] = htons(port+1);
diff --git a/cluster/mapscript.sh b/cluster/mapscript.sh
index f8173a07..21de77d4 100755
--- a/cluster/mapscript.sh
+++ b/cluster/mapscript.sh
@@ -15,4 +15,4 @@ killall allreduce_master
master=`hostname`
mapcommand="runvw.sh $out_directory $master"
echo $mapcommand
-hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE \ No newline at end of file
+hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.map.tasks.speculative.execution=true -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE
diff --git a/cluster/runvw.sh b/cluster/runvw.sh
index 21740d13..ea50ce57 100755
--- a/cluster/runvw.sh
+++ b/cluster/runvw.sh
@@ -8,11 +8,15 @@ echo $1 > /dev/stderr
#./vw -b 24 --cache_file temp.cache --passes 20 --regularization=1 -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5
#./vw -b 24 --cache_file temp.cache --passes 10 --regularization=1 --loss_function=logistic -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5
#./vw -b 24 --cache_file temp.cache --passes 1 -d /dev/stdin -i tempmodel -t
-gdcmd="./vw -b 24 --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic"
-bfgscmd="./vw -b 24 --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic"
+gdcmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic"
+bfgscmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic"
if [ "$mapper" == '000000' ]
then
$gdcmd > mapperout 2>&1
+ if [ $? -neq 0 ]
+ then
+ exit 1
+ fi
$bfgscmd >> mapperout 2>&1
outfile=$out_directory/model
mapperfile=$out_directory/mapperout
@@ -31,5 +35,9 @@ then
hadoop fs -put mapperout $mapperfile
else
$gdcmd
+ if [ $? -neq 0 ]
+ then
+ exit 1
+ fi
$bfgscmd
-fi \ No newline at end of file
+fi
diff --git a/parse_args.cc b/parse_args.cc
index 847969ed..927172b0 100644
--- a/parse_args.cc
+++ b/parse_args.cc
@@ -108,7 +108,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
("thread_bits", po::value<size_t>(&global.thread_bits)->default_value(0), "log_2 threads")
("loss_function", po::value<string>()->default_value("squared"), "Specify the loss function to be used, uses squared by default. Currently available ones are squared, classic, hinge, logistic and quantile.")
("quantile_tau", po::value<double>()->default_value(0.5), "Parameter \\tau associated with Quantile loss. Defaults to 0.5")
- ("unique_id", po::value<size_t>(&global.unique_id)->default_value(0),"unique id used for cluster parallel")
+ ("unique_id", po::value<size_t>(&global.unique_id)->default_value(((size_t)0)-1),"unique id used for cluster parallel")
("sort_features", "turn this on to disregard order in which features have been defined. This will lead to smaller cache sizes")
("ngram", po::value<size_t>(), "Generate N grams")
("skips", po::value<size_t>(), "Generate skips in N grams. This in conjunction with the ngram tag can be used to generate generalized n-skip-k-gram.");