diff options
author | Clement Farabet <clement.farabet@gmail.com> | 2011-09-04 07:51:06 +0400 |
---|---|---|
committer | Clement Farabet <clement.farabet@gmail.com> | 2011-09-04 07:51:06 +0400 |
commit | 1e19ab7a53cd1c956f41752e615916b601214360 (patch) | |
tree | 8bdd150dc482437475d10fcc3c55e32ebee396d3 | |
parent | cc256ca1c3302aebedf219c36560a524bc955df4 (diff) |
Re-thought map-reduce.
-rw-r--r-- | BatchOptimization.lua | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua index e6c94a3..8bf3e9a 100644 --- a/BatchOptimization.lua +++ b/BatchOptimization.lua @@ -25,12 +25,13 @@ function Batch:__init(...) self.sampleCounter = 0 if self.parallelize > 1 or self.servers then self:setup_mapreduce() + self.P = self.parallelize end end function Batch:forward(inputs, targets, options) options = options or {} - if self.parallelize > 1 then + if self.P > 1 then return self:forward_mapreduce(inputs, targets, options) else return self:forward_sequential(inputs, targets, options) @@ -99,7 +100,7 @@ end function Batch:forward_mapreduce(inputs, targets, options) -- parameters - local P = self.parallelize + local P = self.P -- transmit user hooks, if defined if not self.hooksets then @@ -149,6 +150,7 @@ function Batch:forward_mapreduce(inputs, targets, options) -- (0c) send mini-batch to all workers for t = 1,P do + parallel.children[t]:join() parallel.children[t]:send(inputss[t]) parallel.children[t]:send(targetss[t]) parallel.children[t]:send(optionss[t]) @@ -183,6 +185,7 @@ function Batch:forward_mapreduce(inputs, targets, options) self.evaluate_map = function() -- transmit new parameters to all workers + parallel.children:join() parallel.children:send(self.parameters) -- then wait for all workers to return their partial gradParameters + outputs gradParametersPartial = parallel.children:receive() @@ -215,7 +218,7 @@ function Batch:forward_mapreduce(inputs, targets, options) -- (3) reset workers so they're ready for next mini-batch -- only do this when we have an optimization hook - parallel.children:send('break') + parallel.children:join('break') end -- (4) update sample counter @@ -238,6 +241,7 @@ function Batch:setup_mapreduce () require 'nnx' -- retrieve module + criterion at startup + parallel.yield() module = parallel.parent:receive() criterion = parallel.parent:receive() @@ -256,18 +260,21 @@ function Batch:setup_mapreduce () -- outter loop: mini-batches while true do + -- sync + if parallel.yield() == 'break' then break end + -- receive new mini-batch inputs = parallel.parent:receive() - if type(inputs) == 'string' and inputs == 'break' then break end targets = parallel.parent:receive() options = parallel.parent:receive() + -- inner loop: evaluations while true do - -- receive new set of parameters - newParameters = parallel.parent:receive() + -- sync + if parallel.yield() == 'break' then break end - if type(newParameters) == 'string' and newParameters == 'break' then break end - parameters:copy(newParameters) + -- receive new set of parameters + parameters:copy(parallel.parent:receive()) -- reset gradients gradParameters:zero() @@ -313,6 +320,7 @@ function Batch:setup_mapreduce () parallel.children:exec(worker_code) -- (3) and send them the module + criterion architecture + parallel.children:join() parallel.children:send(self.module) parallel.children:send(self.criterion) end |