diff options
author | Marco Scoffier <github@metm.org> | 2011-09-27 02:04:24 +0400 |
---|---|---|
committer | Marco Scoffier <github@metm.org> | 2011-09-27 02:04:24 +0400 |
commit | 135ac77eb914cbf218b5a67360bcd94fa3d438da (patch) | |
tree | 851139581c3459ecf22289de9fffd5ba6a2ea003 | |
parent | 8e6cd11f8efdcba57955031f6e76ad57b9bf5e6f (diff) |
new hooks for map and reduce in BatchOptimization
-rw-r--r-- | BatchOptimization.lua | 199 |
1 files changed, 113 insertions, 86 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua index 4f087b3..bb4c70c 100644 --- a/BatchOptimization.lua +++ b/BatchOptimization.lua @@ -3,7 +3,7 @@ local Batch,parent = torch.class('nn.BatchOptimization', 'nn.Optimization') -- this is a generic class for any batch optimization modeled after -- the LBFGS optimization. It simply provides a batch.evaluate() method -- which creates a self.parameters and self.gradParameters from your --- self.model +-- self.module function Batch:__init(...) parent.__init(self) @@ -22,9 +22,10 @@ function Batch:__init(...) self.parameters = nnx.flattenParameters(nnx.getParameters(self.module)) self.gradParameters = nnx.flattenParameters(nnx.getGradParameters(self.module)) - self.evalCounter = 0 - self.batchCounter = 0 + self.evalCounter = 0 + self.batchCounter = 0 self.sampleCounter = 0 + if self.parallelize > 1 then self:setup_mapreduce() end @@ -121,34 +122,45 @@ function Batch:forward_mapreduce(inputs, targets, options) local outputsPartial = {} local gradParametersPartial = {} - -- (0b) divide input/target batch into N batches, based on speed of each worker - local inputss = {} - local targetss = {} - local optionss = {} - local speed = 0 - for t = 1,P do - speed = speed + self.children[t].speed - end - local n = 1 - for t = 1,P do - inputss[t] = {} - targetss[t] = {} - optionss[t] = {} - for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do - table.insert(inputss[t], inputs[n]) - table.insert(targetss[t], targets[n]) - if options then table.insert(optionss[t], options[n]) end - n = n + 1 - if n > #inputs then break end + if self.copyBatch then + -- (0) send same mini-batch to all workers + for t = 1,P do + self.children[t]:join() + self.children[t]:send(inputs) + self.children[t]:send(targets) + self.children[t]:send(options) + end + else + -- (0b) divide input/target batch into N batches, based on speed + -- of each worker + local inputss = {} + local targetss = {} + local optionss = {} + local speed = 0 + for t = 1,P do + speed = speed + self.children[t].speed + end + local n = 1 + for t = 1,P do + inputss[t] = {} + targetss[t] = {} + optionss[t] = {} + for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do + table.insert(inputss[t], inputs[n]) + table.insert(targetss[t], targets[n]) + if options then table.insert(optionss[t], options[n]) end + n = n + 1 + if n > #inputs then break end + end + end + + -- (0c) send parts of mini-batch to each worker + for t = 1,P do + self.children[t]:join() + self.children[t]:send(inputss[t]) + self.children[t]:send(targetss[t]) + self.children[t]:send(optionss[t]) end - end - - -- (0c) send mini-batch to all workers - for t = 1,P do - self.children[t]:join() - self.children[t]:send(inputss[t]) - self.children[t]:send(targetss[t]) - self.children[t]:send(optionss[t]) end -- (1) construct a closure that compute f(inputs) + df/dW @@ -179,32 +191,40 @@ function Batch:forward_mapreduce(inputs, targets, options) -- in separate threads self.evaluate_map = function() - -- transmit new parameters to all workers - self.children:join() - self.children:send(self.parameters) - -- then wait for all workers to return their partial gradParameters + outputs - gradParametersPartial = self.children:receive() - outputsPartial = self.children:receive() - -- force cleanup - collectgarbage() - end - + if self.map_hook then + self:map_hook() + else + -- transmit new parameters to all workers + self.children:join() + self.children:send(self.parameters) + -- then wait for all workers to return their partial gradParameters + outputs + gradParametersPartial = self.children:receive() + outputsPartial = self.children:receive() + -- force cleanup + collectgarbage() + end + end -- (1b) the reduce part of the evaluation: accumulate all -- partial estimates of the gradients self.evaluate_reduce = function() - -- accumulate partial gradients, and average - self.gradParameters:zero() - for t = 1,P do - self.gradParameters:add(gradParametersPartial[t]) - end - self.gradParameters:div(#inputs) - -- return average f(X) - self.output = 0 - for t = 1,P do - self.output = self.output + outputsPartial[t] - end - self.output = self.output/#inputs + if self.reduce_hook then + self:reduce_hook() + else + -- standard reduce is to sum the gradients + -- accumulate partial gradients, and average + self.gradParameters:zero() + for t = 1,P do + self.gradParameters:add(gradParametersPartial[t]) + end + self.gradParameters:div(#inputs) + -- return average f(X) + self.output = 0 + for t = 1,P do + self.output = self.output + outputsPartial[t] + end + self.output = self.output/#inputs + end end if self.optimize then @@ -223,37 +243,40 @@ function Batch:forward_mapreduce(inputs, targets, options) return self.output end +-- [MS] this default worker code is too detailed needs to be a +-- skeleton which is easier to adapt... for now I am overriding this +-- whole function with the 2 closures in GenSGD + function Batch:setup_mapreduce () -- (0) startup parallel package if not xrequire 'parallel' then xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)', 'nn.BatchOptimization') end - - -- (1) define code for workers - local worker_code = + + local worker_code = function() -- require packages require 'nnx' - + -- retrieve optional code to setup worker precode = parallel.parent:receive() if type(precode) == 'function' then precode() end - + -- retrieve module + criterion at startup parallel.yield() module = parallel.parent:receive() criterion = parallel.parent:receive() - + -- create fake optimizer, for hooks optimizer = {module=module, criterion=criterion} - + -- retrieve optional prehook/posthook prehook = parallel.parent:receive() posthook = parallel.parent:receive() if type(prehook) ~= 'function' then prehook = nil end if type(posthook) ~= 'function' then posthook = nil end - + -- get pointer to parameter and gradParameter vectors -- (this assumes that parameters+gradParameters are already flat parameters: -- it should be the case, as the parent process flattens them at __init) @@ -271,25 +294,25 @@ function Batch:setup_mapreduce () check(tableGradParameters) parameters = torch.Tensor():set(tableParameters[1]:storage()) gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) - - -- outter loop: mini-batches + + -- outer loop: mini-batches while true do -- sync if parallel.yield() == 'break' then break end - + -- receive new mini-batch - inputs = parallel.parent:receive() + inputs = parallel.parent:receive() targets = parallel.parent:receive() options = parallel.parent:receive() - + -- inner loop: evaluations while true do -- sync if parallel.yield() == 'break' then break end - + -- receive new set of parameters parameters:copy(parallel.parent:receive()) - + -- reset gradients gradParameters:zero() -- f is the average of all criterions @@ -310,7 +333,11 @@ function Batch:setup_mapreduce () module:accGradParameters(inputs[i], df_do) -- user hook if posthook then - posthook(optimizer, {inputs[i], targets[i], options[i]}) + if #inputs == #options then + posthook(optimizer, {inputs[i], targets[i], options[i]}) + else + posthook(module,options) + end end end -- now send back gradParameters + partial output @@ -321,26 +348,26 @@ function Batch:setup_mapreduce () end end end - -- (2) dispatch workers local setup = function() - -- (1) optional calibration - if parallel.remotes then - parallel.calibrate() - end - - -- (2) startup all workers - self.children = parallel.sfork(self.parallelize) - self.children:exec(worker_code) - - -- (3) send them optional config code - self.children:send(self.precode or '') - - -- (4) and send them the module + criterion architecture - self.children:join() - self.children:send(self.module) - self.children:send(self.criterion) - end + -- (1) optional calibration + if parallel.remotes then + parallel.calibrate() + end + + -- (2) startup all workers + self.children = parallel.sfork(self.parallelize) + self.children:exec(worker_code) + + -- (3) send them optional config code + self.children:send(self.precode or '') + + -- (4) and send them the module + criterion architecture + self.children:join() + self.children:send(self.module) + self.children:send(self.criterion) + end + local ok,err = pcall(setup) if not ok then parallel.close() error(err) end end |