Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/clementfarabet/lua---nnx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClement Farabet <clement.farabet@gmail.com>2011-08-28 07:36:12 +0400
committerClement Farabet <clement.farabet@gmail.com>2011-08-28 07:36:12 +0400
commite0f477a75504b5ce09c511bc6f6e9e035e89d435 (patch)
treef95b9b4917c444d22cd615cd02bb1a851a54c683
parent5f632bb32240f2d65ea39ee90d12977ff29e5113 (diff)
First end-to-end working version of map-reduce for l-BFGS.parallel
-rw-r--r--LBFGSOptimization.lua153
1 files changed, 97 insertions, 56 deletions
diff --git a/LBFGSOptimization.lua b/LBFGSOptimization.lua
index e85bc97..d8b42e6 100644
--- a/LBFGSOptimization.lua
+++ b/LBFGSOptimization.lua
@@ -17,11 +17,7 @@ function LBFGS:__init(...)
self.gradParametersT = nnx.getGradParameters(self.module)
lbfgs.verbose = self.verbose
if self.parallelize > 1 then
- if not xrequire 'parallel' then
- xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)',
- 'nn.LBFGSOptimization')
- end
- parallel.setSharedSize(4*1024*1024)
+ self:setup_mapreduce()
end
end
@@ -110,6 +106,12 @@ function LBFGS:forward_mapreduce(inputs, targets, options)
end
end
+ -- (0c) send mini-batch to all workers
+ for t = 1,P do
+ parallel.children[t]:send(inputss[t])
+ parallel.children[t]:send(targetss[t])
+ end
+
-- (1) construct a closure that compute f(inputs) + df/dW
-- after each call to that function:
-- + self.parameters contains the current X vector
@@ -117,65 +119,27 @@ function LBFGS:forward_mapreduce(inputs, targets, options)
-- + self.output contains the estimated (average) F(X)
lbfgs.evaluate
= function()
- -- reset parallel state
- parallel.reset()
- -- dispatch N parallel jobs
- for t = 1,P do
- parallel.run(lbfgs.evaluate_map)
- end
+ lbfgs.evaluate_map()
+ return lbfgs.evaluate_reduce()
+ end
+
+ -- (1a) the map part of the evaluation: compute partial gradients
+ -- in separate threads
+ lbfgs.evaluate_map
+ = function()
-- load parameters into current model
self:unflatten(self.parametersT, self.gradParametersT)
- -- transmit data to all jobs
+ -- transmit new parameters to workers
for t = 1,P do
- -- transmit all necessary data
- parallel.children[t]:send(self.module)
- parallel.children[t]:send(self.criterion)
- parallel.children[t]:send(inputss[t])
- parallel.children[t]:send(targetss[t])
+ parallel.children[t]:send(self.parametersT)
end
- -- then wait for all workers to return their trained modules
+ -- then wait for all workers to return their partial gradParameters + outputs
for t = 1,P do
- gradParameters = parallel.children[t]:receive()
+ gradParameters[t] = parallel.children[t]:receive()
outputs[t] = parallel.children[t]:receive()
end
- -- and join
- parallel.children:join()
- -- reduce
- return lbfgs.evaluate_reduce()
end
- -- (1a) the map part of the evaluation: compute partial gradients
- -- in separate threads
- lbfgs.evaluate_map = [[
- -- require packages
- require 'nnx'
-
- -- retrieve module + criterion + mini-batch
- module = parallel.parent:receive()
- criterion = parallel.parent:receive()
- inputs = parallel.parent:receive()
- targets = parallel.parent:receive()
-
- -- reset gradients
- module:zeroGradParameters()
- -- f is the average of all criterions
- local output = 0
- -- evaluate gradients on inputs for this thread
- for i = 1,#inputs do
- -- estimate f
- local output = module:forward(inputs[i])
- local err = criterion:forward(output, targets[i])
- output = output + err
- -- estimate df/dW
- local df_do = criterion:backward(output, targets[i])
- module:backward(inputs[i], df_do)
- end
-
- -- return partial gradParameters + output
- parallel.parent:send( nnx.getGradParameters(module) )
- parallel.parent:send(output)
- ]]
-
-- (1b) the reduce part of the evaluation: accumulate all
-- partial estimates of the gradients
lbfgs.evaluate_reduce
@@ -185,7 +149,7 @@ function LBFGS:forward_mapreduce(inputs, targets, options)
self.gradParametersAcc:resizeAs(self.gradParameters):zero()
-- update state from computed parameters
for t = 1,P do
- self:flatten(self.parametersT, gradParameters)
+ self:flatten(self.parametersT, gradParameters[t])
self.gradParametersAcc:add(self.gradParameters)
end
self.gradParameters:copy(self.gradParametersAcc)
@@ -212,6 +176,83 @@ function LBFGS:forward_mapreduce(inputs, targets, options)
-- (4) last: read parameters back into the main (not parrallel) model
self:unflatten(self.parametersT, self.gradParametersT)
+ -- (6) reset workers so they're ready for next mini-batch
+ for t = 1,P do
+ parallel.children[t]:send('break')
+ end
+
-- (5) return current output after optimization
return self.output
end
+
+function LBFGS:setup_mapreduce ()
+ -- (0) startup parallel package
+ if not xrequire 'parallel' then
+ xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)',
+ 'nn.LBFGSOptimization')
+ end
+ parallel.setSharedSize(4*1024*1024)
+ local P = self.parallelize
+
+ -- (1) define code for workers
+ local worker_code = [[
+ -- require packages
+ require 'nnx'
+
+ -- retrieve module + criterion at startup
+ module = parallel.parent:receive()
+ criterion = parallel.parent:receive()
+
+ -- get pointer to parameter and gradParameter vectors
+ parameters = nnx.getParameters(module)
+ gradParameters = nnx.getGradParameters(module)
+
+ -- outter loop: mini-batches
+ while true do
+ -- receive new mini-batch
+ inputs = parallel.parent:receive()
+ if type(inputs) == 'string' and inputs == 'break' then break end
+ targets = parallel.parent:receive()
+
+ -- inner loop: evaluations
+ while true do
+ -- receive new set of parameters
+ newParameters = parallel.parent:receive()
+ if type(newParameters) == 'string' and newParameters == 'break' then break end
+ for i = 1,#newParameters do
+ parameters[i]:copy(newParameters[i])
+ end
+
+ -- reset gradients
+ module:zeroGradParameters()
+ -- f is the average of all criterions
+ local f_x = 0
+ -- evaluate gradients on inputs for this thread
+ for i = 1,#inputs do
+ -- estimate f
+ local output = module:forward(inputs[i])
+ local err = criterion:forward(output, targets[i])
+ f_x = f_x + err
+ -- estimate df/dW
+ local df_do = criterion:backward(output, targets[i])
+ module:backward(inputs[i], df_do)
+ end
+
+ -- now send back gradParameters + partial output
+ parallel.parent:send(gradParameters)
+ parallel.parent:send(f_x)
+ end
+ end
+ ]]
+
+ -- (2) startup all workers
+ for t = 1,P do
+ parallel.run(worker_code)
+ end
+
+ -- (3) and send them the module + criterion architecture
+ for t = 1,P do
+ parallel.children[t]:send(self.module)
+ parallel.children[t]:send(self.criterion)
+ end
+end \ No newline at end of file