diff options
-rw-r--r-- | accumulate.cc | 12 | ||||
-rw-r--r-- | allreduce.cc | 17 | ||||
-rw-r--r-- | allreduce.h | 2 | ||||
-rw-r--r-- | cluster/allreduce_master.cc | 26 | ||||
-rwxr-xr-x | cluster/mapscript.sh | 2 | ||||
-rwxr-xr-x | cluster/runvw.sh | 14 | ||||
-rw-r--r-- | parse_args.cc | 2 |
7 files changed, 57 insertions, 18 deletions
diff --git a/accumulate.cc b/accumulate.cc index 3784ff17..0235aa65 100644 --- a/accumulate.cc +++ b/accumulate.cc @@ -31,7 +31,7 @@ void accumulate(string master_location, regressor& reg, size_t o) { local_grad[i] = weights[stride*i+o];
}
- all_reduce((char*)local_grad, length*sizeof(float), master_location);
+ all_reduce((char*)local_grad, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
{
weights[stride*i+o] = local_grad[i];
@@ -44,7 +44,7 @@ void accumulate(string master_location, regressor& reg, size_t o) { float accumulate_scalar(string master_location, float local_sum) {
ftime(&t_start);
float temp = local_sum;
- all_reduce((char*)&temp, sizeof(float), master_location);
+ all_reduce((char*)&temp, sizeof(float), master_location, global.unique_id);
ftime(&t_end);
net_comm_time += (int) (1000.0 * (t_end.time - t_start.time) + (t_end.millitm - t_start.millitm));
return temp;
@@ -57,13 +57,13 @@ void accumulate_avg(string master_location, regressor& reg, size_t o) { weight* weights = reg.weight_vectors[0];
ftime(&t_start);
float numnodes = 1.;
- all_reduce((char*)&numnodes, sizeof(float), master_location);
+ all_reduce((char*)&numnodes, sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
{
local_grad[i] = weights[stride*i+o];
}
- all_reduce((char*)local_grad, length*sizeof(float), master_location);
+ all_reduce((char*)local_grad, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
{
weights[stride*i+o] = local_grad[i]/numnodes;
@@ -102,7 +102,7 @@ void accumulate_weighted_avg(string master_location, regressor& reg) { for(uint32_t i = 0;i < length;i++)
local_weights[i] = sqrt(weights[stride*i+1]*weights[stride*i+1]-1);
- all_reduce((char*)local_weights, length*sizeof(float), master_location);
+ all_reduce((char*)local_weights, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
if(local_weights[i] > 0)
@@ -110,7 +110,7 @@ void accumulate_weighted_avg(string master_location, regressor& reg) { else
local_param[i] = 0;
- all_reduce((char*)local_param, length*sizeof(float), master_location);
+ all_reduce((char*)local_param, length*sizeof(float), master_location, global.unique_id);
for(uint32_t i = 0;i < length;i++)
weights[stride*i] = local_param[i];
ftime(&t_end);
diff --git a/allreduce.cc b/allreduce.cc index 9d4a1d9f..2a1cb5c1 100644 --- a/allreduce.cc +++ b/allreduce.cc @@ -79,7 +79,7 @@ int sock_connect(uint32_t ip, int port) { } -int all_reduce_init(string master_location) +int all_reduce_init(string master_location, size_t unique_id) { struct hostent* master = gethostbyname(master_location.c_str()); @@ -93,6 +93,17 @@ int all_reduce_init(string master_location) int port = 26543; int master_sock = sock_connect(master_ip, htons(port)); + + if(write(master_sock, &unique_id, sizeof(unique_id)) < (int)sizeof(unique_id)) + cerr << "write failed!" << endl; + int ok; + if (read(master_sock, &ok, sizeof(ok)) < (int)sizeof(ok)) + cerr << "read failed!" << endl; + if (!ok) { + cerr << "mapper already connected" << endl; + exit(1); + } + int client_port, kid_count, parent_port; uint32_t parent_ip; int numnodes; @@ -353,10 +364,10 @@ void broadcast(char* buffer, int n, int parent_sock, int* child_sockets) { } } -void all_reduce(char* buffer, int n, string master_location) +void all_reduce(char* buffer, int n, string master_location, size_t unique_id) { if(master_location != current_master) - all_reduce_init(master_location); + all_reduce_init(master_location, unique_id); reduce(buffer, n, socks.parent, socks.children); broadcast(buffer, n, socks.parent, socks.children); diff --git a/allreduce.h b/allreduce.h index 3e9c1100..1278c268 100644 --- a/allreduce.h +++ b/allreduce.h @@ -22,6 +22,6 @@ using namespace std; const int buf_size = 1<<18; -void all_reduce(char* buffer, int n, string master_location); +void all_reduce(char* buffer, int n, string master_location, size_t unique_id); #endif diff --git a/cluster/allreduce_master.cc b/cluster/allreduce_master.cc index 4af743dd..fe71440a 100644 --- a/cluster/allreduce_master.cc +++ b/cluster/allreduce_master.cc @@ -23,6 +23,7 @@ using namespace std; struct client { uint32_t client_ip; int socket; + size_t unique_id; }; static int socket_sort(const void* s1, const void* s2) { @@ -125,9 +126,28 @@ int main(int argc, char* argv[]) { cerr << "bad client socket!" << endl; exit (1); } - client_sockets[i].client_ip = client_address.sin_addr.s_addr; - client_sockets[i].socket = f; - } + size_t mapper_id = 0; + if (read(f, &mapper_id, sizeof(mapper_id)) != sizeof(mapper_id)) + { + cerr << "unique id read failed, exiting" << endl; + exit(1); + } + int ok = true; + for (int j = 0; j < i; j++) + if (client_sockets[j].unique_id == mapper_id && mapper_id != ((size_t)0)-1) + ok = false; + cerr << mapper_id << " " << ok << endl; + fail_write(f,&ok, sizeof(ok)); + + if (ok) + { + client_sockets[i].client_ip = client_address.sin_addr.s_addr; + client_sockets[i].socket = f; + client_sockets[i].unique_id = mapper_id; + } + else + i--; + } qsort(client_sockets, source_count, sizeof(client), socket_sort); int client_ports[source_count]; client_ports[0] = htons(port+1); diff --git a/cluster/mapscript.sh b/cluster/mapscript.sh index f8173a07..21de77d4 100755 --- a/cluster/mapscript.sh +++ b/cluster/mapscript.sh @@ -15,4 +15,4 @@ killall allreduce_master master=`hostname` mapcommand="runvw.sh $out_directory $master" echo $mapcommand -hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE
\ No newline at end of file +hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.job.queue.name=search -Dmapred.min.split.size=$mapsize -Dmapred.map.tasks.speculative.execution=true -Dmapred.reduce.tasks=0 -Dmapred.job.map.memory.mb=3000 -Dmapred.child.java.opts="-Xmx100m" -Dmapred.task.timeout=600000000 -input $in_directory -output $out_directory -file ../vw -file runvw.sh -mapper "$mapcommand" -reducer NONE diff --git a/cluster/runvw.sh b/cluster/runvw.sh index 21740d13..ea50ce57 100755 --- a/cluster/runvw.sh +++ b/cluster/runvw.sh @@ -8,11 +8,15 @@ echo $1 > /dev/stderr #./vw -b 24 --cache_file temp.cache --passes 20 --regularization=1 -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5 #./vw -b 24 --cache_file temp.cache --passes 10 --regularization=1 --loss_function=logistic -d /dev/stdin -f tempmodel --master_location $2 --bfgs --mem 5 #./vw -b 24 --cache_file temp.cache --passes 1 -d /dev/stdin -i tempmodel -t -gdcmd="./vw -b 24 --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic" -bfgscmd="./vw -b 24 --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic" +gdcmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --passes 1 --regularization=1 --adaptive --exact_adaptive_norm -d /dev/stdin -f tempmodel --master_location $master --loss_function=logistic" +bfgscmd="./vw -b 24 --unique_id $mapper --cache_file temp.cache --bfgs --mem 5 --passes 20 --regularization=1 --master_location $master -f model -i tempmodel --loss_function=logistic" if [ "$mapper" == '000000' ] then $gdcmd > mapperout 2>&1 + if [ $? -neq 0 ] + then + exit 1 + fi $bfgscmd >> mapperout 2>&1 outfile=$out_directory/model mapperfile=$out_directory/mapperout @@ -31,5 +35,9 @@ then hadoop fs -put mapperout $mapperfile else $gdcmd + if [ $? -neq 0 ] + then + exit 1 + fi $bfgscmd -fi
\ No newline at end of file +fi diff --git a/parse_args.cc b/parse_args.cc index 847969ed..927172b0 100644 --- a/parse_args.cc +++ b/parse_args.cc @@ -108,7 +108,7 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt ("thread_bits", po::value<size_t>(&global.thread_bits)->default_value(0), "log_2 threads") ("loss_function", po::value<string>()->default_value("squared"), "Specify the loss function to be used, uses squared by default. Currently available ones are squared, classic, hinge, logistic and quantile.") ("quantile_tau", po::value<double>()->default_value(0.5), "Parameter \\tau associated with Quantile loss. Defaults to 0.5") - ("unique_id", po::value<size_t>(&global.unique_id)->default_value(0),"unique id used for cluster parallel") + ("unique_id", po::value<size_t>(&global.unique_id)->default_value(((size_t)0)-1),"unique id used for cluster parallel") ("sort_features", "turn this on to disregard order in which features have been defined. This will lead to smaller cache sizes") ("ngram", po::value<size_t>(), "Generate N grams") ("skips", po::value<size_t>(), "Generate skips in N grams. This in conjunction with the ngram tag can be used to generate generalized n-skip-k-gram."); |