diff options
author | John Langford <jl@hunch.net> | 2011-08-25 03:20:34 +0400 |
---|---|---|
committer | John Langford <jl@hunch.net> | 2011-08-25 03:20:34 +0400 |
commit | 395c317a6a4c8d0adb335b941b40aaf2a2072ea0 (patch) | |
tree | 10a47a05a202fe6657be017f28fcdd60dbd83af4 /cluster | |
parent | 85f0a2b66b97573fc4faa3a8093bb7b813350cfc (diff) |
speculative execution compatible allreduce with Olivier
Diffstat (limited to 'cluster')
-rw-r--r-- | cluster/allreduce_master.cc | 26 | ||||
-rwxr-xr-x | cluster/mapscript.sh | 2 | ||||
-rwxr-xr-x | cluster/runvw.sh | 14 |
3 files changed, 35 insertions, 7 deletions
diff --git a/cluster/allreduce_master.cc b/cluster/allreduce_master.cc index 4af743dd..fe71440a 100644 --- a/cluster/allreduce_master.cc +++ b/cluster/allreduce_master.cc @@ -23,6 +23,7 @@ using namespace std; struct client { uint32_t client_ip; int socket; + size_t unique_id; }; static int socket_sort(const void* s1, const void* s2) { @@ -125,9 +126,28 @@ int main(int argc, char* argv[]) { cerr << "bad client socket!" << endl; exit (1); } - client_sockets[i].client_ip = client_address.sin_addr.s_addr; - client_sockets[i].socket = f; - } + size_t mapper_id = 0; + if (read(f, &mapper_id, sizeof(mapper_id)) != sizeof(mapper_id)) + { + cerr << "unique id read failed, exiting" << endl; + exit(1); + } + int ok = true; + for (int j = 0; j < i; j++) + if (client_sockets[j].unique_id == mapper_id && mapper_id != ((size_t)0)-1) + ok = false; + cerr << mapper_id << " " << ok << endl; + fail_write(f,&ok, sizeof(ok)); + + if (ok) + { + client_sockets[i].client_ip = client_address.sin_addr.s_addr; + client_sockets[i].socket = f; + client_sockets[i].unique_id = mapper_id; + } + else + i--; + } qsort(client_sockets, source_count, sizeof(client), socket_sort); int client_ports[source_count]; client_ports[0] = htons(port+1); diff --git a/cluster/mapscript.sh b/cluster/mapscript.sh index f8173a07..21de77d4 100755 --- a/cluster/mapscript.sh +++ b/cluster/mapscript.sh @@ -15,4 +15,4 @@ killall allreduce_master master=`hostname` mapcommand="runvw.sh $out_directory $master" echo $mapcommand -hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE
\ No newline at end of file +hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.map.tasks.speculative.execution=true -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE diff --git a/cluster/runvw.sh b/cluster/runvw.sh index 21740d13..ea50ce57 100755 --- a/cluster/runvw.sh +++ b/cluster/runvw.sh @@ -8,11 +8,15 @@ echo $1 > /dev/stderr #./vw -b 24 --cache_file temp.cache --passes 20 --regularization=1 -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5 #./vw -b 24 --cache_file temp.cache --passes 10 --regularization=1 --loss_function=logistic -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5 #./vw -b 24 --cache_file temp.cache --passes 1 -d /dev/stdin -i tempmodel -t -gdcmd="./vw -b 24 --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic" -bfgscmd="./vw -b 24 --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic" +gdcmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic" +bfgscmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic" if [ "$mapper" == '000000' ] then $gdcmd > mapperout 2>&1 + if [ $? -neq 0 ] + then + exit 1 + fi $bfgscmd >> mapperout 2>&1 outfile=$out_directory/model mapperfile=$out_directory/mapperout @@ -31,5 +35,9 @@ then hadoop fs -put mapperout $mapperfile else $gdcmd + if [ $? -neq 0 ] + then + exit 1 + fi $bfgscmd -fi
\ No newline at end of file +fi |