Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/vowpal_wabbit.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Langford <jl@hunch.net>2011-08-25 03:20:34 +0400
committerJohn Langford <jl@hunch.net>2011-08-25 03:20:34 +0400
commit395c317a6a4c8d0adb335b941b40aaf2a2072ea0 (patch)
tree10a47a05a202fe6657be017f28fcdd60dbd83af4 /cluster
parent85f0a2b66b97573fc4faa3a8093bb7b813350cfc (diff)
speculative execution compatible allreduce with Olivier
Diffstat (limited to 'cluster')
-rw-r--r--cluster/allreduce_master.cc26
-rwxr-xr-xcluster/mapscript.sh2
-rwxr-xr-xcluster/runvw.sh14
3 files changed, 35 insertions, 7 deletions
diff --git a/cluster/allreduce_master.cc b/cluster/allreduce_master.cc
index 4af743dd..fe71440a 100644
--- a/cluster/allreduce_master.cc
+++ b/cluster/allreduce_master.cc
@@ -23,6 +23,7 @@ using namespace std;
struct client {
uint32_t client_ip;
int socket;
+ size_t unique_id;
};
static int socket_sort(const void* s1, const void* s2) {
@@ -125,9 +126,28 @@ int main(int argc, char* argv[]) {
cerr << "bad client socket!" << endl;
exit (1);
}
- client_sockets[i].client_ip = client_address.sin_addr.s_addr;
- client_sockets[i].socket = f;
- }
+ size_t mapper_id = 0;
+ if (read(f, &mapper_id, sizeof(mapper_id)) != sizeof(mapper_id))
+ {
+ cerr << "unique id read failed, exiting" << endl;
+ exit(1);
+ }
+ int ok = true;
+ for (int j = 0; j < i; j++)
+ if (client_sockets[j].unique_id == mapper_id && mapper_id != ((size_t)0)-1)
+ ok = false;
+ cerr << mapper_id << " " << ok << endl;
+ fail_write(f,&ok, sizeof(ok));
+
+ if (ok)
+ {
+ client_sockets[i].client_ip = client_address.sin_addr.s_addr;
+ client_sockets[i].socket = f;
+ client_sockets[i].unique_id = mapper_id;
+ }
+ else
+ i--;
+ }
qsort(client_sockets, source_count, sizeof(client), socket_sort);
int client_ports[source_count];
client_ports[0] = htons(port+1);
diff --git a/cluster/mapscript.sh b/cluster/mapscript.sh
index f8173a07..21de77d4 100755
--- a/cluster/mapscript.sh
+++ b/cluster/mapscript.sh
@@ -15,4 +15,4 @@ killall allreduce_master
master=`hostname`
mapcommand="runvw.sh $out_directory $master"
echo $mapcommand
-hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE \ No newline at end of file
+hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.map.tasks.speculative.execution=true -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE
diff --git a/cluster/runvw.sh b/cluster/runvw.sh
index 21740d13..ea50ce57 100755
--- a/cluster/runvw.sh
+++ b/cluster/runvw.sh
@@ -8,11 +8,15 @@ echo $1 > /dev/stderr
#./vw -b 24 --cache_file temp.cache --passes 20 --regularization=1 -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5
#./vw -b 24 --cache_file temp.cache --passes 10 --regularization=1 --loss_function=logistic -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5
#./vw -b 24 --cache_file temp.cache --passes 1 -d /dev/stdin -i tempmodel -t
-gdcmd="./vw -b 24 --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic"
-bfgscmd="./vw -b 24 --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic"
+gdcmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic"
+bfgscmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic"
if [ "$mapper" == '000000' ]
then
$gdcmd > mapperout 2>&1
+ if [ $? -neq 0 ]
+ then
+ exit 1
+ fi
$bfgscmd >> mapperout 2>&1
outfile=$out_directory/model
mapperfile=$out_directory/mapperout
@@ -31,5 +35,9 @@ then
hadoop fs -put mapperout $mapperfile
else
$gdcmd
+ if [ $? -neq 0 ]
+ then
+ exit 1
+ fi
$bfgscmd
-fi \ No newline at end of file
+fi