diff options
author | Marco Scoffier <github@metm.org> | 2011-09-24 04:23:43 +0400 |
---|---|---|
committer | Marco Scoffier <github@metm.org> | 2011-09-24 04:23:43 +0400 |
commit | 6a54a1d023cb7f5a421d8186f396f1d17fe53b66 (patch) | |
tree | d09942bd998bdec180529e5bda961abea7c2dd5a | |
parent | ed8881eadcd626307adbc7d2a8fa98fd8d7fd98f (diff) |
new branch for different kind of parallel Optimization
-rw-r--r-- | BatchOptimization.lua | 356 | ||||
-rw-r--r-- | GenSGDOptimization.lua | 140 |
2 files changed, 337 insertions, 159 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua index 4f087b3..85785c2 100644 --- a/BatchOptimization.lua +++ b/BatchOptimization.lua @@ -3,7 +3,7 @@ local Batch,parent = torch.class('nn.BatchOptimization', 'nn.Optimization') -- this is a generic class for any batch optimization modeled after -- the LBFGS optimization. It simply provides a batch.evaluate() method -- which creates a self.parameters and self.gradParameters from your --- self.model +-- self.module function Batch:__init(...) parent.__init(self) @@ -22,9 +22,10 @@ function Batch:__init(...) self.parameters = nnx.flattenParameters(nnx.getParameters(self.module)) self.gradParameters = nnx.flattenParameters(nnx.getGradParameters(self.module)) - self.evalCounter = 0 - self.batchCounter = 0 + self.evalCounter = 0 + self.batchCounter = 0 self.sampleCounter = 0 + if self.parallelize > 1 then self:setup_mapreduce() end @@ -121,34 +122,44 @@ function Batch:forward_mapreduce(inputs, targets, options) local outputsPartial = {} local gradParametersPartial = {} - -- (0b) divide input/target batch into N batches, based on speed of each worker - local inputss = {} - local targetss = {} - local optionss = {} - local speed = 0 - for t = 1,P do - speed = speed + self.children[t].speed - end - local n = 1 - for t = 1,P do - inputss[t] = {} - targetss[t] = {} - optionss[t] = {} - for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do - table.insert(inputss[t], inputs[n]) - table.insert(targetss[t], targets[n]) - if options then table.insert(optionss[t], options[n]) end - n = n + 1 - if n > #inputs then break end + if self.copyBatch then + -- (0) send same mini-batch to all workers + for t = 1,P do + self.children[t]:join() + self.children[t]:send(inputs) + self.children[t]:send(targets) + self.children[t]:send(options) + end + else + -- (0b) divide input/target batch into N batches, based on speed of each worker + local inputss = {} + local targetss = {} + local optionss = {} + local speed = 0 + for t = 1,P do + speed = speed + self.children[t].speed + end + local n = 1 + for t = 1,P do + inputss[t] = {} + targetss[t] = {} + optionss[t] = {} + for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do + table.insert(inputss[t], inputs[n]) + table.insert(targetss[t], targets[n]) + if options then table.insert(optionss[t], options[n]) end + n = n + 1 + if n > #inputs then break end + end + end + + -- (0c) send parts of mini-batch to each worker + for t = 1,P do + self.children[t]:join() + self.children[t]:send(inputss[t]) + self.children[t]:send(targetss[t]) + self.children[t]:send(optionss[t]) end - end - - -- (0c) send mini-batch to all workers - for t = 1,P do - self.children[t]:join() - self.children[t]:send(inputss[t]) - self.children[t]:send(targetss[t]) - self.children[t]:send(optionss[t]) end -- (1) construct a closure that compute f(inputs) + df/dW @@ -179,32 +190,40 @@ function Batch:forward_mapreduce(inputs, targets, options) -- in separate threads self.evaluate_map = function() - -- transmit new parameters to all workers - self.children:join() - self.children:send(self.parameters) - -- then wait for all workers to return their partial gradParameters + outputs - gradParametersPartial = self.children:receive() - outputsPartial = self.children:receive() - -- force cleanup - collectgarbage() - end - + if self.map_hook then + self:map_hook() + else + -- transmit new parameters to all workers + self.children:join() + self.children:send(self.parameters) + -- then wait for all workers to return their partial gradParameters + outputs + gradParametersPartial = self.children:receive() + outputsPartial = self.children:receive() + -- force cleanup + collectgarbage() + end + end -- (1b) the reduce part of the evaluation: accumulate all -- partial estimates of the gradients self.evaluate_reduce = function() - -- accumulate partial gradients, and average - self.gradParameters:zero() - for t = 1,P do - self.gradParameters:add(gradParametersPartial[t]) - end - self.gradParameters:div(#inputs) - -- return average f(X) - self.output = 0 - for t = 1,P do - self.output = self.output + outputsPartial[t] - end - self.output = self.output/#inputs + if self.reduce_hook then + self:reduce_hook() + else + -- standard reduce is to sum the gradients + -- accumulate partial gradients, and average + self.gradParameters:zero() + for t = 1,P do + self.gradParameters:add(gradParametersPartial[t]) + end + self.gradParameters:div(#inputs) + -- return average f(X) + self.output = 0 + for t = 1,P do + self.output = self.output + outputsPartial[t] + end + self.output = self.output/#inputs + end end if self.optimize then @@ -230,117 +249,136 @@ function Batch:setup_mapreduce () 'nn.BatchOptimization') end - -- (1) define code for workers - local worker_code = - function() - -- require packages - require 'nnx' - - -- retrieve optional code to setup worker - precode = parallel.parent:receive() - if type(precode) == 'function' then precode() end + local worker_code = function () end - -- retrieve module + criterion at startup - parallel.yield() - module = parallel.parent:receive() - criterion = parallel.parent:receive() + -- (1) define code for workers - -- create fake optimizer, for hooks - optimizer = {module=module, criterion=criterion} - - -- retrieve optional prehook/posthook - prehook = parallel.parent:receive() - posthook = parallel.parent:receive() - if type(prehook) ~= 'function' then prehook = nil end - if type(posthook) ~= 'function' then posthook = nil end - - -- get pointer to parameter and gradParameter vectors - -- (this assumes that parameters+gradParameters are already flat parameters: - -- it should be the case, as the parent process flattens them at __init) - function check(tocheck) - for i = 2,#tocheck do - if tocheck[i]:storage() ~= tocheck[i-1]:storage() then - print('<BatchOptimization> error: inconsistent parameter vector (not flat)') - return - end - end - end - tableParameters = nnx.getParameters(module) - tableGradParameters = nnx.getGradParameters(module) - check(tableParameters) - check(tableGradParameters) - parameters = torch.Tensor():set(tableParameters[1]:storage()) - gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) - - -- outter loop: mini-batches - while true do - -- sync - if parallel.yield() == 'break' then break end - - -- receive new mini-batch - inputs = parallel.parent:receive() - targets = parallel.parent:receive() - options = parallel.parent:receive() - - -- inner loop: evaluations - while true do - -- sync - if parallel.yield() == 'break' then break end - - -- receive new set of parameters - parameters:copy(parallel.parent:receive()) - - -- reset gradients - gradParameters:zero() - -- f is the average of all criterions - local f_x = 0 - -- evaluate gradients on inputs for this thread - for i = 1,#inputs do - -- user hook - if prehook then - prehook(optimizer, {inputs[i], targets[i], options[i]}) - end - -- estimate f - local output = module:forward(inputs[i]) - local err = criterion:forward(output, targets[i]) - f_x = f_x + err - -- estimate df/dW - local df_do = criterion:backward(output, targets[i]) - module:backward(inputs[i], df_do) - module:accGradParameters(inputs[i], df_do) - -- user hook - if posthook then - posthook(optimizer, {inputs[i], targets[i], options[i]}) - end - end - -- now send back gradParameters + partial output - parallel.parent:send(gradParameters) - parallel.parent:send(f_x) - -- force cleanup - collectgarbage() - end - end - end + -- [MS] this default worker code is too detailed needs to be a + -- skeleton which is easier to adapt... for now I am allowing the + -- worker and setup functions to be overridden + if self.worker_code then + worker_code = self.worker_code + else + worker_code = + function() + -- require packages + require 'nnx' + + -- retrieve optional code to setup worker + precode = parallel.parent:receive() + if type(precode) == 'function' then precode() end + + -- retrieve module + criterion at startup + parallel.yield() + module = parallel.parent:receive() + criterion = parallel.parent:receive() + + -- create fake optimizer, for hooks + optimizer = {module=module, criterion=criterion} + + -- retrieve optional prehook/posthook + prehook = parallel.parent:receive() + posthook = parallel.parent:receive() + if type(prehook) ~= 'function' then prehook = nil end + if type(posthook) ~= 'function' then posthook = nil end + + -- get pointer to parameter and gradParameter vectors + -- (this assumes that parameters+gradParameters are already flat parameters: + -- it should be the case, as the parent process flattens them at __init) + function check(tocheck) + for i = 2,#tocheck do + if tocheck[i]:storage() ~= tocheck[i-1]:storage() then + print('<BatchOptimization> error: inconsistent parameter vector (not flat)') + return + end + end + end + tableParameters = nnx.getParameters(module) + tableGradParameters = nnx.getGradParameters(module) + check(tableParameters) + check(tableGradParameters) + parameters = torch.Tensor():set(tableParameters[1]:storage()) + gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) + + -- outer loop: mini-batches + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new mini-batch + inputs = parallel.parent:receive() + targets = parallel.parent:receive() + options = parallel.parent:receive() + + -- inner loop: evaluations + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new set of parameters + parameters:copy(parallel.parent:receive()) + + -- reset gradients + gradParameters:zero() + -- f is the average of all criterions + local f_x = 0 + -- evaluate gradients on inputs for this thread + for i = 1,#inputs do + -- user hook + if prehook then + prehook(optimizer, {inputs[i], targets[i], options[i]}) + end + -- estimate f + local output = module:forward(inputs[i]) + local err = criterion:forward(output, targets[i]) + f_x = f_x + err + -- estimate df/dW + local df_do = criterion:backward(output, targets[i]) + module:backward(inputs[i], df_do) + module:accGradParameters(inputs[i], df_do) + -- user hook + if posthook then + if #inputs == #options then + posthook(optimizer, {inputs[i], targets[i], options[i]}) + else + posthook(module,options) + end + end + end + -- now send back gradParameters + partial output + parallel.parent:send(gradParameters) + parallel.parent:send(f_x) + -- force cleanup + collectgarbage() + end + end + end + end -- (2) dispatch workers - local setup = function() - -- (1) optional calibration - if parallel.remotes then - parallel.calibrate() - end - - -- (2) startup all workers - self.children = parallel.sfork(self.parallelize) - self.children:exec(worker_code) - - -- (3) send them optional config code - self.children:send(self.precode or '') - - -- (4) and send them the module + criterion architecture - self.children:join() - self.children:send(self.module) - self.children:send(self.criterion) - end + local setup = function() end + if self.setup then + setup = self.setup + else + setup = function() + -- (1) optional calibration + if parallel.remotes then + parallel.calibrate() + end + + -- (2) startup all workers + self.children = parallel.sfork(self.parallelize) + self.children:exec(worker_code) + + -- (3) send them optional config code + self.children:send(self.precode or '') + + -- (4) and send them the module + criterion architecture + self.children:join() + self.children:send(self.module) + self.children:send(self.criterion) + end + end local ok,err = pcall(setup) if not ok then parallel.close() error(err) end end diff --git a/GenSGDOptimization.lua b/GenSGDOptimization.lua new file mode 100644 index 0000000..61f7476 --- /dev/null +++ b/GenSGDOptimization.lua @@ -0,0 +1,140 @@ +local GenSGD,parent = torch.class('nn.GenSGDOptimization', +'nn.BatchOptimization') + +-- this module parallelizes SGD in a particular way. It sends out the +-- same batch to each of several worker each with a different learning +-- rate. The workers run and the parameters from the best worker and +-- it's learning rate are kept for the next batch. + +function GenSGD:__init(...) + parent.__init(self,...) + xlua.unpack_class(self, {...}, + 'GenSGDOptimization', nil, + {arg='maxIterations', type='number', + help='maximum nb of iterations per pass', default=1}, + {arg='learningRate', type='number', + help='learning rate (W = W - rate*dE/dW)', default=1e-2}, + {arg='learningRateDecay', type='number', + help='learning rate decay (lr_t = lr_0 / (1 + samplesSeen*lrDecay))', default=0}, + {arg='weightDecay', type='number', + help='amount of weight decay (W = W - decay*W)', default=0}, + {arg='momentum', type='number', + help='amount of momentum on weights (dE/W = dE/dW*(1-momentum) + prev(dE/dW)*momentum)', default=0} + ) + if self.parallelize < 2 then + print('ERROR: GenSGD needs to work on several processors') + end + -- change the mapper to send the same batch to each worker + self.copyBatch = true + self.currentLearningRate = learningRate + self.workerRates = torch.Tensor(self.P) +end + +function GenSGD:map_hook() +end + +function GenSGD:reduce_hook() +end + +function GenSGD:optimize() + self.evaluate() +end + + +function GenSGD:worker_code() + -- require packages + require 'nnx' + + -- retrieve module + criterion at startup + parallel.yield() + module = parallel.parent:receive() + criterion = parallel.parent:receive() + optimizer = parallel.parent:receive() + + parameters = nnx.flattenParameters(nnx.getParameters(self.module)) + gradParameters = nnx.flattenParameters(nnx.getGradParameters(self.module)) + + -- outer loop: mini-batches + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new mini-batch + inputs = parallel.parent:receive() + targets = parallel.parent:receive() + options = parallel.parent:receive() + + -- inner loop: evaluations + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new set of parameters + parameters:copy(parallel.parent:receive()) + + -- f is the average of all criterions + local f_x = 0 + -- evaluate gradients on inputs for this thread + for i = 1,#inputs do + -- reset gradients + gradParameters:zero() + -- estimate f + local output = module:forward(inputs[i]) + local err = criterion:forward(output, targets[i]) + f_x = f_x + err + -- estimate df/dW + local df_do = criterion:backward(output, targets[i]) + module:backward(inputs[i], df_do) + module:accGradParameters(inputs[i], df_do) + optimizer + + end + -- now send back parameters b/c they are already optimized + parallel.parent:send(parameters) + parallel.parent:send(f_x) + -- force cleanup + collectgarbage() + end + end +end + +function GenSGD:setup() + -- (1) optional calibration + if parallel.remotes then + parallel.calibrate() + end + + -- (2) startup all workers + self.children = parallel.sfork(self.parallelize) + self.children:exec(worker_code) + + -- (4) and send them the module + criterion architecture + self.children:join() + self.children:send(self.module) + self.children:send(self.criterion) + self.children:send(self.optimizer) +end + +function GenSGD:post_hook(module,options) + -- we do the SGD on the worker + -- apply momentum + if options.momentum ~= 0 then + if not module.currentGradParameters then + module.currentGradParameters = torch.Tensor():resizeAs(gradParameters):copy(gradParameters) + else + options.currentGradParameters:mul(options.momentum):add(1-options.momentum, gradParameters) + end + else + options.currentGradParameters = gradParameters + end + + -- weight decay + if options.weightDecay ~= 0 then + options.parameters:add(-options.weightDecay, options.parameters) + end + + -- update parameters + local learningRate = self.learningRate / + (1 + self.sampleCounter*self.learningRateDecay) + self.parameters:add(-learningRate, self.currentGradParameters) +end |