diff options
author | Clement Farabet <clement.farabet@gmail.com> | 2011-08-28 07:36:12 +0400 |
---|---|---|
committer | Clement Farabet <clement.farabet@gmail.com> | 2011-08-28 07:36:12 +0400 |
commit | e0f477a75504b5ce09c511bc6f6e9e035e89d435 (patch) | |
tree | f95b9b4917c444d22cd615cd02bb1a851a54c683 | |
parent | 5f632bb32240f2d65ea39ee90d12977ff29e5113 (diff) |
First end-to-end working version of map-reduce for l-BFGS.parallel
-rw-r--r-- | LBFGSOptimization.lua | 153 |
1 files changed, 97 insertions, 56 deletions
diff --git a/LBFGSOptimization.lua b/LBFGSOptimization.lua index e85bc97..d8b42e6 100644 --- a/LBFGSOptimization.lua +++ b/LBFGSOptimization.lua @@ -17,11 +17,7 @@ function LBFGS:__init(...) self.gradParametersT = nnx.getGradParameters(self.module) lbfgs.verbose = self.verbose if self.parallelize > 1 then - if not xrequire 'parallel' then - xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)', - 'nn.LBFGSOptimization') - end - parallel.setSharedSize(4*1024*1024) + self:setup_mapreduce() end end @@ -110,6 +106,12 @@ function LBFGS:forward_mapreduce(inputs, targets, options) end end + -- (0c) send mini-batch to all workers + for t = 1,P do + parallel.children[t]:send(inputss[t]) + parallel.children[t]:send(targetss[t]) + end + -- (1) construct a closure that compute f(inputs) + df/dW -- after each call to that function: -- + self.parameters contains the current X vector @@ -117,65 +119,27 @@ function LBFGS:forward_mapreduce(inputs, targets, options) -- + self.output contains the estimated (average) F(X) lbfgs.evaluate = function() - -- reset parallel state - parallel.reset() - -- dispatch N parallel jobs - for t = 1,P do - parallel.run(lbfgs.evaluate_map) - end + lbfgs.evaluate_map() + return lbfgs.evaluate_reduce() + end + + -- (1a) the map part of the evaluation: compute partial gradients + -- in separate threads + lbfgs.evaluate_map + = function() -- load parameters into current model self:unflatten(self.parametersT, self.gradParametersT) - -- transmit data to all jobs + -- transmit new parameters to workers for t = 1,P do - -- transmit all necessary data - parallel.children[t]:send(self.module) - parallel.children[t]:send(self.criterion) - parallel.children[t]:send(inputss[t]) - parallel.children[t]:send(targetss[t]) + parallel.children[t]:send(self.parametersT) end - -- then wait for all workers to return their trained modules + -- then wait for all workers to return their partial gradParameters + outputs for t = 1,P do - gradParameters = parallel.children[t]:receive() + gradParameters[t] = parallel.children[t]:receive() outputs[t] = parallel.children[t]:receive() end - -- and join - parallel.children:join() - -- reduce - return lbfgs.evaluate_reduce() end - -- (1a) the map part of the evaluation: compute partial gradients - -- in separate threads - lbfgs.evaluate_map = [[ - -- require packages - require 'nnx' - - -- retrieve module + criterion + mini-batch - module = parallel.parent:receive() - criterion = parallel.parent:receive() - inputs = parallel.parent:receive() - targets = parallel.parent:receive() - - -- reset gradients - module:zeroGradParameters() - -- f is the average of all criterions - local output = 0 - -- evaluate gradients on inputs for this thread - for i = 1,#inputs do - -- estimate f - local output = module:forward(inputs[i]) - local err = criterion:forward(output, targets[i]) - output = output + err - -- estimate df/dW - local df_do = criterion:backward(output, targets[i]) - module:backward(inputs[i], df_do) - end - - -- return partial gradParameters + output - parallel.parent:send( nnx.getGradParameters(module) ) - parallel.parent:send(output) - ]] - -- (1b) the reduce part of the evaluation: accumulate all -- partial estimates of the gradients lbfgs.evaluate_reduce @@ -185,7 +149,7 @@ function LBFGS:forward_mapreduce(inputs, targets, options) self.gradParametersAcc:resizeAs(self.gradParameters):zero() -- update state from computed parameters for t = 1,P do - self:flatten(self.parametersT, gradParameters) + self:flatten(self.parametersT, gradParameters[t]) self.gradParametersAcc:add(self.gradParameters) end self.gradParameters:copy(self.gradParametersAcc) @@ -212,6 +176,83 @@ function LBFGS:forward_mapreduce(inputs, targets, options) -- (4) last: read parameters back into the main (not parrallel) model self:unflatten(self.parametersT, self.gradParametersT) + -- (6) reset workers so they're ready for next mini-batch + for t = 1,P do + parallel.children[t]:send('break') + end + -- (5) return current output after optimization return self.output end + +function LBFGS:setup_mapreduce () + -- (0) startup parallel package + if not xrequire 'parallel' then + xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)', + 'nn.LBFGSOptimization') + end + parallel.setSharedSize(4*1024*1024) + local P = self.parallelize + + -- (1) define code for workers + local worker_code = [[ + -- require packages + require 'nnx' + + -- retrieve module + criterion at startup + module = parallel.parent:receive() + criterion = parallel.parent:receive() + + -- get pointer to parameter and gradParameter vectors + parameters = nnx.getParameters(module) + gradParameters = nnx.getGradParameters(module) + + -- outter loop: mini-batches + while true do + -- receive new mini-batch + inputs = parallel.parent:receive() + if type(inputs) == 'string' and inputs == 'break' then break end + targets = parallel.parent:receive() + + -- inner loop: evaluations + while true do + -- receive new set of parameters + newParameters = parallel.parent:receive() + if type(newParameters) == 'string' and newParameters == 'break' then break end + for i = 1,#newParameters do + parameters[i]:copy(newParameters[i]) + end + + -- reset gradients + module:zeroGradParameters() + -- f is the average of all criterions + local f_x = 0 + -- evaluate gradients on inputs for this thread + for i = 1,#inputs do + -- estimate f + local output = module:forward(inputs[i]) + local err = criterion:forward(output, targets[i]) + f_x = f_x + err + -- estimate df/dW + local df_do = criterion:backward(output, targets[i]) + module:backward(inputs[i], df_do) + end + + -- now send back gradParameters + partial output + parallel.parent:send(gradParameters) + parallel.parent:send(f_x) + end + end + ]] + + -- (2) startup all workers + for t = 1,P do + parallel.run(worker_code) + end + + -- (3) and send them the module + criterion architecture + for t = 1,P do + parallel.children[t]:send(self.module) + parallel.children[t]:send(self.criterion) + end +end
\ No newline at end of file |