diff options
author | Marco Scoffier <github@metm.org> | 2011-09-27 08:06:26 +0400 |
---|---|---|
committer | Marco Scoffier <github@metm.org> | 2011-09-27 08:06:26 +0400 |
commit | 103507186e85351becce963f32732dd9ffae0ed1 (patch) | |
tree | 28e190c42ad32a547fb12e52a824456ccf9f4090 | |
parent | 4f1120222c3c6e92987c391321d03e83acd48281 (diff) |
working genetic SGD
-rw-r--r-- | BatchOptimization.lua | 178 | ||||
-rw-r--r-- | GenSGDOptimization.lua | 45 |
2 files changed, 121 insertions, 102 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua index bb4c70c..c84763e 100644 --- a/BatchOptimization.lua +++ b/BatchOptimization.lua @@ -55,7 +55,7 @@ function Batch:forward_sequential(inputs, targets, options) end local _t_ = sys.clock() -- reset gradients - self.gradParameters:zero() + self.gradParameters:zero() -- f is the average of all criterions self.output = 0 -- given all inputs, evaluate gradients @@ -76,16 +76,16 @@ function Batch:forward_sequential(inputs, targets, options) if self.posthook then self.posthook(self, {inputs[i], targets[i], options[i]}) end - -- update evaluation counter - self.evalCounter = self.evalCounter + 1 + -- update evaluation counter + self.evalCounter = self.evalCounter + 1 end -- update evaluation counter self.batchCounter = self.batchCounter + 1 -- normalize gradients - self.gradParameters:div(#inputs) - + self.gradParameters:div(#inputs) + -- verbose if self.verbose >= 2 then print('<BatchOptimization> ' .. self.batchCounter .. 'th batch took ' .. (sys.clock() - _t_) .. ' sec') @@ -125,12 +125,13 @@ function Batch:forward_mapreduce(inputs, targets, options) if self.copyBatch then -- (0) send same mini-batch to all workers for t = 1,P do - self.children[t]:join() - self.children[t]:send(inputs) - self.children[t]:send(targets) - self.children[t]:send(options) + self.children[t]:join() + self.children[t]:send(inputs) + self.children[t]:send(targets) + self.children[t]:send(options) end - else + + else -- (0b) divide input/target batch into N batches, based on speed -- of each worker local inputss = {} @@ -138,28 +139,28 @@ function Batch:forward_mapreduce(inputs, targets, options) local optionss = {} local speed = 0 for t = 1,P do - speed = speed + self.children[t].speed + speed = speed + self.children[t].speed end local n = 1 for t = 1,P do - inputss[t] = {} - targetss[t] = {} - optionss[t] = {} - for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do - table.insert(inputss[t], inputs[n]) - table.insert(targetss[t], targets[n]) - if options then table.insert(optionss[t], options[n]) end - n = n + 1 - if n > #inputs then break end - end + inputss[t] = {} + targetss[t] = {} + optionss[t] = {} + for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do + table.insert(inputss[t], inputs[n]) + table.insert(targetss[t], targets[n]) + if options then table.insert(optionss[t], options[n]) end + n = n + 1 + if n > #inputs then break end + end end - + -- (0c) send parts of mini-batch to each worker for t = 1,P do - self.children[t]:join() - self.children[t]:send(inputss[t]) - self.children[t]:send(targetss[t]) - self.children[t]:send(optionss[t]) + self.children[t]:join() + self.children[t]:send(inputss[t]) + self.children[t]:send(targetss[t]) + self.children[t]:send(optionss[t]) end end @@ -191,40 +192,40 @@ function Batch:forward_mapreduce(inputs, targets, options) -- in separate threads self.evaluate_map = function() - if self.map_hook then - self:map_hook() - else - -- transmit new parameters to all workers - self.children:join() - self.children:send(self.parameters) - -- then wait for all workers to return their partial gradParameters + outputs - gradParametersPartial = self.children:receive() - outputsPartial = self.children:receive() - -- force cleanup - collectgarbage() - end - end + if self.map_hook then + self:map_hook() + else + -- transmit new parameters to all workers + self.children:join() + self.children:send(self.parameters) + -- then wait for all workers to return their partial gradParameters + outputs + gradParametersPartial = self.children:receive() + outputsPartial = self.children:receive() + -- force cleanup + collectgarbage() + end + end -- (1b) the reduce part of the evaluation: accumulate all -- partial estimates of the gradients self.evaluate_reduce = function() if self.reduce_hook then - self:reduce_hook() - else - -- standard reduce is to sum the gradients - -- accumulate partial gradients, and average - self.gradParameters:zero() - for t = 1,P do - self.gradParameters:add(gradParametersPartial[t]) - end - self.gradParameters:div(#inputs) - -- return average f(X) - self.output = 0 - for t = 1,P do - self.output = self.output + outputsPartial[t] - end - self.output = self.output/#inputs - end + self:reduce_hook() + else + -- standard reduce is to sum the gradients + -- accumulate partial gradients, and average + self.gradParameters:zero() + for t = 1,P do + self.gradParameters:add(gradParametersPartial[t]) + end + self.gradParameters:div(#inputs) + -- return average f(X) + self.output = 0 + for t = 1,P do + self.output = self.output + outputsPartial[t] + end + self.output = self.output/#inputs + end end if self.optimize then @@ -253,30 +254,31 @@ function Batch:setup_mapreduce () xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)', 'nn.BatchOptimization') end - - local worker_code = + + -- (1) define code for workers + local worker_code = function() -- require packages require 'nnx' - + -- retrieve optional code to setup worker precode = parallel.parent:receive() if type(precode) == 'function' then precode() end - + -- retrieve module + criterion at startup parallel.yield() module = parallel.parent:receive() criterion = parallel.parent:receive() - + -- create fake optimizer, for hooks optimizer = {module=module, criterion=criterion} - + -- retrieve optional prehook/posthook prehook = parallel.parent:receive() posthook = parallel.parent:receive() if type(prehook) ~= 'function' then prehook = nil end if type(posthook) ~= 'function' then posthook = nil end - + -- get pointer to parameter and gradParameter vectors -- (this assumes that parameters+gradParameters are already flat parameters: -- it should be the case, as the parent process flattens them at __init) @@ -294,25 +296,25 @@ function Batch:setup_mapreduce () check(tableGradParameters) parameters = torch.Tensor():set(tableParameters[1]:storage()) gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) - + -- outer loop: mini-batches while true do -- sync if parallel.yield() == 'break' then break end - + -- receive new mini-batch inputs = parallel.parent:receive() targets = parallel.parent:receive() options = parallel.parent:receive() - + -- inner loop: evaluations while true do -- sync if parallel.yield() == 'break' then break end - + -- receive new set of parameters parameters:copy(parallel.parent:receive()) - + -- reset gradients gradParameters:zero() -- f is the average of all criterions @@ -333,11 +335,7 @@ function Batch:setup_mapreduce () module:accGradParameters(inputs[i], df_do) -- user hook if posthook then - if #inputs == #options then - posthook(optimizer, {inputs[i], targets[i], options[i]}) - else - posthook(module,options) - end + posthook(optimizer, {inputs[i], targets[i], options[i]}) end end -- now send back gradParameters + partial output @@ -350,24 +348,24 @@ function Batch:setup_mapreduce () end -- (2) dispatch workers local setup = function() - -- (1) optional calibration - if parallel.remotes then - parallel.calibrate() - end - - -- (2) startup all workers - self.children = parallel.sfork(self.parallelize) - self.children:exec(worker_code) - - -- (3) send them optional config code - self.children:send(self.precode or '') - - -- (4) and send them the module + criterion architecture - self.children:join() - self.children:send(self.module) - self.children:send(self.criterion) - end - + -- (1) optional calibration + if parallel.remotes then + parallel.calibrate() + end + + -- (2) startup all workers + self.children = parallel.sfork(self.parallelize) + self.children:exec(worker_code) + + -- (3) send them optional config code + self.children:send(self.precode or '') + + -- (4) and send them the module + criterion architecture + self.children:join() + self.children:send(self.module) + self.children:send(self.criterion) + end + local ok,err = pcall(setup) if not ok then parallel.close() error(err) end end diff --git a/GenSGDOptimization.lua b/GenSGDOptimization.lua index c6a5caa..41aab82 100644 --- a/GenSGDOptimization.lua +++ b/GenSGDOptimization.lua @@ -2,7 +2,7 @@ local GenSGD,parent = torch.class('nn.GenSGDOptimization', 'nn.BatchOptimization') -- this module parallelizes SGD in a particular way. It sends out the --- same batch to each of several worker each with a different learning +-- same batch to each of several workers, each with a different learning -- rate. The workers run and the parameters from the best worker and -- it's learning rate are kept for the next batch. @@ -36,7 +36,8 @@ function GenSGD:__init(...) self.baseParameters = { momentum = self.momentum, weightDecay = self.weightDecay, learningRate = self.learningRate, - learningRateDecay = self.learningRateDecay + learningRateDecay = self.learningRateDecay, + sampleCounter = self.sampleCounter } end @@ -44,14 +45,25 @@ end -- change gradParametersPartial to ParametersPartial, as the logic is -- different for this kind of parallelization. function GenSGD:map_hook() + local P = self.parallelize -- transmit new parameters to all workers self.children:join() self.children:send(self.parameters) + print('randomizing for '..P..' lr: '..self.learningRate..' sigma: '..self.sigma) -- randomize learning rate (could randomize other bits) - local n = self.learningRate + (lab.randn(P) * self.sigma) - for i = 1,P do - self.baseParameters[learningRate] = n[i] - self.children[t]:join() + local n = torch.Tensor(P) + + n[1] = self.learningRate + n[2] = self.learningRate * 10 + n[3] = self.learningRate / 10 + n[4] = self.learningRate / 100 +-- (lab.randn(P) * self.sigma):add(self.learningRate) + self.baseParameters.sampleCounter = self.sampleCounter + + for t = 1,P do + self.baseParameters.learningRate = n[t] + print('lr: '..self.baseParameters.learningRate) + --self.children[t]:join() self.children[t]:send(self.baseParameters) end -- then wait for all workers to return their Parameters + outputs @@ -63,6 +75,7 @@ function GenSGD:map_hook() end function GenSGD:reduce_hook() + local P = self.parallelize local id = 0 local mx = 1e9 for t = 1,P do @@ -75,11 +88,12 @@ function GenSGD:reduce_hook() xerror('diverging','nn.GenSGDOptimization') else self.baseParameters = outputsPartial[id] - self.output = self.currentParameters.f_x + self.output = self.baseParameters.f_x -- in this case we get the parameters back directly self.parameters:copy(gradParametersPartial[id]) print('Winner: output = '..self.output.. 'learningRate = '..self.baseParameters['learningRate']) + self.learningRate = self.baseParameters.learningRate end end @@ -88,8 +102,9 @@ function GenSGD:optimize() end -- optimization (could do others in this mode) -function GenSGD:optimizer(module,params) - -- apply momentum (store in the module) +GenSGD.optimizer = + function (module,params) + -- apply momentum (store in the module) if params.momentum ~= 0 then if not module.currentGradParameters then module.currentGradParameters = @@ -132,6 +147,12 @@ function GenSGD:setup_mapreduce () criterion = parallel.parent:receive() optimizer = parallel.parent:receive() + -- retrieve optional prehook/posthook + prehook = parallel.parent:receive() + posthook = parallel.parent:receive() + if type(prehook) ~= 'function' then prehook = nil end + if type(posthook) ~= 'function' then posthook = nil end + -- I don't understand this [MS] -- get pointer to parameter and gradParameter vectors -- (this assumes that parameters+gradParameters are already flat parameters: @@ -168,7 +189,6 @@ function GenSGD:setup_mapreduce () -- receive new set of parameters parameters:copy(parallel.parent:receive()) - -- receive the learning rate etc. parameters which are -- tweaked for each thread optimization_parameters = parallel.parent:receive() @@ -177,6 +197,8 @@ function GenSGD:setup_mapreduce () -- SGD on these inputs -- reset gradients gradParameters:zero() + module.parameters = parameters + module.gradParameters = gradParameters for i = 1,#inputs do -- estimate f local output = module:forward(inputs[i]) @@ -215,8 +237,7 @@ function GenSGD:setup_mapreduce () if parallel.remotes then parallel.calibrate() end - print(self.P) - print(self.parallelize) + -- (2) startup all workers self.children = parallel.sfork(self.parallelize) self.children:exec(worker_code) |