Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/clementfarabet/lua---nnx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Scoffier <github@metm.org>2011-09-27 02:04:24 +0400
committerMarco Scoffier <github@metm.org>2011-09-27 02:04:24 +0400
commit135ac77eb914cbf218b5a67360bcd94fa3d438da (patch)
tree851139581c3459ecf22289de9fffd5ba6a2ea003
parent8e6cd11f8efdcba57955031f6e76ad57b9bf5e6f (diff)
new hooks for map and reduce in BatchOptimization
-rw-r--r--BatchOptimization.lua199
1 files changed, 113 insertions, 86 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua
index 4f087b3..bb4c70c 100644
--- a/BatchOptimization.lua
+++ b/BatchOptimization.lua
@@ -3,7 +3,7 @@ local Batch,parent = torch.class('nn.BatchOptimization', 'nn.Optimization')
-- this is a generic class for any batch optimization modeled after
-- the LBFGS optimization. It simply provides a batch.evaluate() method
-- which creates a self.parameters and self.gradParameters from your
--- self.model
+-- self.module
function Batch:__init(...)
parent.__init(self)
@@ -22,9 +22,10 @@ function Batch:__init(...)
self.parameters = nnx.flattenParameters(nnx.getParameters(self.module))
self.gradParameters = nnx.flattenParameters(nnx.getGradParameters(self.module))
- self.evalCounter = 0
- self.batchCounter = 0
+ self.evalCounter = 0
+ self.batchCounter = 0
self.sampleCounter = 0
+
if self.parallelize > 1 then
self:setup_mapreduce()
end
@@ -121,34 +122,45 @@ function Batch:forward_mapreduce(inputs, targets, options)
local outputsPartial = {}
local gradParametersPartial = {}
- -- (0b) divide input/target batch into N batches, based on speed of each worker
- local inputss = {}
- local targetss = {}
- local optionss = {}
- local speed = 0
- for t = 1,P do
- speed = speed + self.children[t].speed
- end
- local n = 1
- for t = 1,P do
- inputss[t] = {}
- targetss[t] = {}
- optionss[t] = {}
- for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do
- table.insert(inputss[t], inputs[n])
- table.insert(targetss[t], targets[n])
- if options then table.insert(optionss[t], options[n]) end
- n = n + 1
- if n > #inputs then break end
+ if self.copyBatch then
+ -- (0) send same mini-batch to all workers
+ for t = 1,P do
+ self.children[t]:join()
+ self.children[t]:send(inputs)
+ self.children[t]:send(targets)
+ self.children[t]:send(options)
+ end
+ else
+ -- (0b) divide input/target batch into N batches, based on speed
+ -- of each worker
+ local inputss = {}
+ local targetss = {}
+ local optionss = {}
+ local speed = 0
+ for t = 1,P do
+ speed = speed + self.children[t].speed
+ end
+ local n = 1
+ for t = 1,P do
+ inputss[t] = {}
+ targetss[t] = {}
+ optionss[t] = {}
+ for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do
+ table.insert(inputss[t], inputs[n])
+ table.insert(targetss[t], targets[n])
+ if options then table.insert(optionss[t], options[n]) end
+ n = n + 1
+ if n > #inputs then break end
+ end
+ end
+
+ -- (0c) send parts of mini-batch to each worker
+ for t = 1,P do
+ self.children[t]:join()
+ self.children[t]:send(inputss[t])
+ self.children[t]:send(targetss[t])
+ self.children[t]:send(optionss[t])
end
- end
-
- -- (0c) send mini-batch to all workers
- for t = 1,P do
- self.children[t]:join()
- self.children[t]:send(inputss[t])
- self.children[t]:send(targetss[t])
- self.children[t]:send(optionss[t])
end
-- (1) construct a closure that compute f(inputs) + df/dW
@@ -179,32 +191,40 @@ function Batch:forward_mapreduce(inputs, targets, options)
-- in separate threads
self.evaluate_map
= function()
- -- transmit new parameters to all workers
- self.children:join()
- self.children:send(self.parameters)
- -- then wait for all workers to return their partial gradParameters + outputs
- gradParametersPartial = self.children:receive()
- outputsPartial = self.children:receive()
- -- force cleanup
- collectgarbage()
- end
-
+ if self.map_hook then
+ self:map_hook()
+ else
+ -- transmit new parameters to all workers
+ self.children:join()
+ self.children:send(self.parameters)
+ -- then wait for all workers to return their partial gradParameters + outputs
+ gradParametersPartial = self.children:receive()
+ outputsPartial = self.children:receive()
+ -- force cleanup
+ collectgarbage()
+ end
+ end
-- (1b) the reduce part of the evaluation: accumulate all
-- partial estimates of the gradients
self.evaluate_reduce
= function()
- -- accumulate partial gradients, and average
- self.gradParameters:zero()
- for t = 1,P do
- self.gradParameters:add(gradParametersPartial[t])
- end
- self.gradParameters:div(#inputs)
- -- return average f(X)
- self.output = 0
- for t = 1,P do
- self.output = self.output + outputsPartial[t]
- end
- self.output = self.output/#inputs
+ if self.reduce_hook then
+ self:reduce_hook()
+ else
+ -- standard reduce is to sum the gradients
+ -- accumulate partial gradients, and average
+ self.gradParameters:zero()
+ for t = 1,P do
+ self.gradParameters:add(gradParametersPartial[t])
+ end
+ self.gradParameters:div(#inputs)
+ -- return average f(X)
+ self.output = 0
+ for t = 1,P do
+ self.output = self.output + outputsPartial[t]
+ end
+ self.output = self.output/#inputs
+ end
end
if self.optimize then
@@ -223,37 +243,40 @@ function Batch:forward_mapreduce(inputs, targets, options)
return self.output
end
+-- [MS] this default worker code is too detailed needs to be a
+-- skeleton which is easier to adapt... for now I am overriding this
+-- whole function with the 2 closures in GenSGD
+
function Batch:setup_mapreduce ()
-- (0) startup parallel package
if not xrequire 'parallel' then
xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)',
'nn.BatchOptimization')
end
-
- -- (1) define code for workers
- local worker_code =
+
+ local worker_code =
function()
-- require packages
require 'nnx'
-
+
-- retrieve optional code to setup worker
precode = parallel.parent:receive()
if type(precode) == 'function' then precode() end
-
+
-- retrieve module + criterion at startup
parallel.yield()
module = parallel.parent:receive()
criterion = parallel.parent:receive()
-
+
-- create fake optimizer, for hooks
optimizer = {module=module, criterion=criterion}
-
+
-- retrieve optional prehook/posthook
prehook = parallel.parent:receive()
posthook = parallel.parent:receive()
if type(prehook) ~= 'function' then prehook = nil end
if type(posthook) ~= 'function' then posthook = nil end
-
+
-- get pointer to parameter and gradParameter vectors
-- (this assumes that parameters+gradParameters are already flat parameters:
-- it should be the case, as the parent process flattens them at __init)
@@ -271,25 +294,25 @@ function Batch:setup_mapreduce ()
check(tableGradParameters)
parameters = torch.Tensor():set(tableParameters[1]:storage())
gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
-
- -- outter loop: mini-batches
+
+ -- outer loop: mini-batches
while true do
-- sync
if parallel.yield() == 'break' then break end
-
+
-- receive new mini-batch
- inputs = parallel.parent:receive()
+ inputs = parallel.parent:receive()
targets = parallel.parent:receive()
options = parallel.parent:receive()
-
+
-- inner loop: evaluations
while true do
-- sync
if parallel.yield() == 'break' then break end
-
+
-- receive new set of parameters
parameters:copy(parallel.parent:receive())
-
+
-- reset gradients
gradParameters:zero()
-- f is the average of all criterions
@@ -310,7 +333,11 @@ function Batch:setup_mapreduce ()
module:accGradParameters(inputs[i], df_do)
-- user hook
if posthook then
- posthook(optimizer, {inputs[i], targets[i], options[i]})
+ if #inputs == #options then
+ posthook(optimizer, {inputs[i], targets[i], options[i]})
+ else
+ posthook(module,options)
+ end
end
end
-- now send back gradParameters + partial output
@@ -321,26 +348,26 @@ function Batch:setup_mapreduce ()
end
end
end
-
-- (2) dispatch workers
local setup = function()
- -- (1) optional calibration
- if parallel.remotes then
- parallel.calibrate()
- end
-
- -- (2) startup all workers
- self.children = parallel.sfork(self.parallelize)
- self.children:exec(worker_code)
-
- -- (3) send them optional config code
- self.children:send(self.precode or '')
-
- -- (4) and send them the module + criterion architecture
- self.children:join()
- self.children:send(self.module)
- self.children:send(self.criterion)
- end
+ -- (1) optional calibration
+ if parallel.remotes then
+ parallel.calibrate()
+ end
+
+ -- (2) startup all workers
+ self.children = parallel.sfork(self.parallelize)
+ self.children:exec(worker_code)
+
+ -- (3) send them optional config code
+ self.children:send(self.precode or '')
+
+ -- (4) and send them the module + criterion architecture
+ self.children:join()
+ self.children:send(self.module)
+ self.children:send(self.criterion)
+ end
+
local ok,err = pcall(setup)
if not ok then parallel.close() error(err) end
end