diff options
author | Marco Scoffier <github@metm.org> | 2011-09-27 02:02:54 +0400 |
---|---|---|
committer | Marco Scoffier <github@metm.org> | 2011-09-27 02:02:54 +0400 |
commit | 4f1120222c3c6e92987c391321d03e83acd48281 (patch) | |
tree | 08e0e178e7a2eebf49d54be54a6efb46135147ff | |
parent | ff8a4830a8012d272ec423899efa5c4e632350c0 (diff) |
almost working
-rw-r--r-- | BatchOptimization.lua | 251 | ||||
-rw-r--r-- | GenSGDOptimization.lua | 204 | ||||
-rw-r--r-- | init.lua | 2 |
3 files changed, 236 insertions, 221 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua index aa210d7..bb4c70c 100644 --- a/BatchOptimization.lua +++ b/BatchOptimization.lua @@ -243,144 +243,131 @@ function Batch:forward_mapreduce(inputs, targets, options) return self.output end +-- [MS] this default worker code is too detailed needs to be a +-- skeleton which is easier to adapt... for now I am overriding this +-- whole function with the 2 closures in GenSGD + function Batch:setup_mapreduce () -- (0) startup parallel package if not xrequire 'parallel' then xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)', 'nn.BatchOptimization') end - - local worker_code = function () end - - -- (1) define code for workers - - -- [MS] this default worker code is too detailed needs to be a - -- skeleton which is easier to adapt... for now I am allowing the - -- worker and setup functions to be overridden - - if self.worker_code then - worker_code = self.worker_code - else - worker_code = - function() - -- require packages - require 'nnx' - - -- retrieve optional code to setup worker - precode = parallel.parent:receive() - if type(precode) == 'function' then precode() end - - -- retrieve module + criterion at startup - parallel.yield() - module = parallel.parent:receive() - criterion = parallel.parent:receive() - - -- create fake optimizer, for hooks - optimizer = {module=module, criterion=criterion} - - -- retrieve optional prehook/posthook - prehook = parallel.parent:receive() - posthook = parallel.parent:receive() - if type(prehook) ~= 'function' then prehook = nil end - if type(posthook) ~= 'function' then posthook = nil end - - -- get pointer to parameter and gradParameter vectors - -- (this assumes that parameters+gradParameters are already flat parameters: - -- it should be the case, as the parent process flattens them at __init) - function check(tocheck) - for i = 2,#tocheck do - if tocheck[i]:storage() ~= tocheck[i-1]:storage() then - print('<BatchOptimization> error: inconsistent parameter vector (not flat)') - return - end - end - end - tableParameters = nnx.getParameters(module) - tableGradParameters = nnx.getGradParameters(module) - check(tableParameters) - check(tableGradParameters) - parameters = torch.Tensor():set(tableParameters[1]:storage()) - gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) - - -- outer loop: mini-batches - while true do - -- sync - if parallel.yield() == 'break' then break end - - -- receive new mini-batch - inputs = parallel.parent:receive() - targets = parallel.parent:receive() - options = parallel.parent:receive() - - -- inner loop: evaluations - while true do - -- sync - if parallel.yield() == 'break' then break end - - -- receive new set of parameters - parameters:copy(parallel.parent:receive()) - - -- reset gradients - gradParameters:zero() - -- f is the average of all criterions - local f_x = 0 - -- evaluate gradients on inputs for this thread - for i = 1,#inputs do - -- user hook - if prehook then - prehook(optimizer, {inputs[i], targets[i], options[i]}) - end - -- estimate f - local output = module:forward(inputs[i]) - local err = criterion:forward(output, targets[i]) - f_x = f_x + err - -- estimate df/dW - local df_do = criterion:backward(output, targets[i]) - module:backward(inputs[i], df_do) - module:accGradParameters(inputs[i], df_do) - -- user hook - if posthook then - if #inputs == #options then - posthook(optimizer, {inputs[i], targets[i], options[i]}) - else - posthook(module,options) - end - end - end - -- now send back gradParameters + partial output - parallel.parent:send(gradParameters) - parallel.parent:send(f_x) - -- force cleanup - collectgarbage() - end - end - end - end + + local worker_code = + function() + -- require packages + require 'nnx' + + -- retrieve optional code to setup worker + precode = parallel.parent:receive() + if type(precode) == 'function' then precode() end + + -- retrieve module + criterion at startup + parallel.yield() + module = parallel.parent:receive() + criterion = parallel.parent:receive() + + -- create fake optimizer, for hooks + optimizer = {module=module, criterion=criterion} + + -- retrieve optional prehook/posthook + prehook = parallel.parent:receive() + posthook = parallel.parent:receive() + if type(prehook) ~= 'function' then prehook = nil end + if type(posthook) ~= 'function' then posthook = nil end + + -- get pointer to parameter and gradParameter vectors + -- (this assumes that parameters+gradParameters are already flat parameters: + -- it should be the case, as the parent process flattens them at __init) + function check(tocheck) + for i = 2,#tocheck do + if tocheck[i]:storage() ~= tocheck[i-1]:storage() then + print('<BatchOptimization> error: inconsistent parameter vector (not flat)') + return + end + end + end + tableParameters = nnx.getParameters(module) + tableGradParameters = nnx.getGradParameters(module) + check(tableParameters) + check(tableGradParameters) + parameters = torch.Tensor():set(tableParameters[1]:storage()) + gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) + + -- outer loop: mini-batches + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new mini-batch + inputs = parallel.parent:receive() + targets = parallel.parent:receive() + options = parallel.parent:receive() + + -- inner loop: evaluations + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new set of parameters + parameters:copy(parallel.parent:receive()) + + -- reset gradients + gradParameters:zero() + -- f is the average of all criterions + local f_x = 0 + -- evaluate gradients on inputs for this thread + for i = 1,#inputs do + -- user hook + if prehook then + prehook(optimizer, {inputs[i], targets[i], options[i]}) + end + -- estimate f + local output = module:forward(inputs[i]) + local err = criterion:forward(output, targets[i]) + f_x = f_x + err + -- estimate df/dW + local df_do = criterion:backward(output, targets[i]) + module:backward(inputs[i], df_do) + module:accGradParameters(inputs[i], df_do) + -- user hook + if posthook then + if #inputs == #options then + posthook(optimizer, {inputs[i], targets[i], options[i]}) + else + posthook(module,options) + end + end + end + -- now send back gradParameters + partial output + parallel.parent:send(gradParameters) + parallel.parent:send(f_x) + -- force cleanup + collectgarbage() + end + end + end -- (2) dispatch workers - local setup = function() end - - if self.setup then - setup = self.setup - else - setup = function() - -- (1) optional calibration - if parallel.remotes then - parallel.calibrate() - end - - -- (2) startup all workers - self.children = parallel.sfork(self.parallelize) - self.children:exec(worker_code) - - -- (3) send them optional config code - self.children:send(self.precode or '') - - -- (4) and send them the module + criterion architecture - self.children:join() - self.children:send(self.module) - self.children:send(self.criterion) - end - end - local ok,err = pcall(setup(self)) + local setup = function() + -- (1) optional calibration + if parallel.remotes then + parallel.calibrate() + end + + -- (2) startup all workers + self.children = parallel.sfork(self.parallelize) + self.children:exec(worker_code) + + -- (3) send them optional config code + self.children:send(self.precode or '') + + -- (4) and send them the module + criterion architecture + self.children:join() + self.children:send(self.module) + self.children:send(self.criterion) + end + + local ok,err = pcall(setup) if not ok then parallel.close() error(err) end end diff --git a/GenSGDOptimization.lua b/GenSGDOptimization.lua index a5c8efe..c6a5caa 100644 --- a/GenSGDOptimization.lua +++ b/GenSGDOptimization.lua @@ -27,7 +27,8 @@ function GenSGD:__init(...) ) require 'lab' if self.parallelize < 2 then - print('ERROR: GenSGD needs to work on several processors') + xerror('GenSGD needs to work on several processors: set parallelize', + 'nn.GenSGDOptimization') end -- change the mapper to send the same batch to each worker self.copyBatch = true @@ -37,7 +38,6 @@ function GenSGD:__init(...) learningRate = self.learningRate, learningRateDecay = self.learningRateDecay } - self.workerParameters = torch.Tensor(self.P) end -- we are changing the way we map and reduce. It would be nice to @@ -72,7 +72,7 @@ function GenSGD:reduce_hook() end end if id == 0 then - print('ERROR: diverging') + xerror('diverging','nn.GenSGDOptimization') else self.baseParameters = outputsPartial[id] self.output = self.currentParameters.f_x @@ -114,92 +114,120 @@ function GenSGD:optimizer(module,params) params.learningRate = learningRate end -function GenSGD:worker_code() - -- require packages - require 'nnx' - - -- retrieve module + criterion at startup - parallel.yield() - - module = parallel.parent:receive() - criterion = parallel.parent:receive() - optimizer = parallel.parent:receive() - - module.parameters = nnx.flattenParameters(nnx.getParameters(module)) - module.gradParameters = nnx.flattenParameters(nnx.getGradParameters(module)) +function GenSGD:setup_mapreduce () + -- (0) startup parallel package + if not xrequire 'parallel' then + xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)', + 'nn.GenSGDOptimization') + end + local worker_code = + function() + -- require packages + require 'nnx' + + -- retrieve module + criterion at startup + parallel.yield() + + module = parallel.parent:receive() + criterion = parallel.parent:receive() + optimizer = parallel.parent:receive() + + -- I don't understand this [MS] + -- get pointer to parameter and gradParameter vectors + -- (this assumes that parameters+gradParameters are already flat parameters: + -- it should be the case, as the parent process flattens them at __init) + function check(tocheck) + for i = 2,#tocheck do + if tocheck[i]:storage() ~= tocheck[i-1]:storage() then + print('<BatchOptimization> error: inconsistent parameter vector (not flat)') + return + end + end + end + tableParameters = nnx.getParameters(module) + tableGradParameters = nnx.getGradParameters(module) + check(tableParameters) + check(tableGradParameters) + parameters = torch.Tensor():set(tableParameters[1]:storage()) + gradParameters = torch.Tensor():set(tableGradParameters[1]:storage()) - -- outer loop: mini-batches - while true do - -- sync - if parallel.yield() == 'break' then break end - - -- receive new mini-batch - inputs = parallel.parent:receive() - targets = parallel.parent:receive() - options = parallel.parent:receive() - - -- inner loop: evaluations - while true do - -- sync - if parallel.yield() == 'break' then break end - - -- receive new set of parameters - parameters:copy(parallel.parent:receive()) - - -- receive the learning rate etc. parameters which are - -- tweaked for each thread - optimization_parameters = parallel.parent:receive() - - -- evaluate gradients on inputs for this thread and perform - -- SGD on these inputs - -- reset gradients - gradParameters:zero() - for i = 1,#inputs do - -- estimate f - local output = module:forward(inputs[i]) - local err = criterion:forward(output, targets[i]) - -- estimate df/dW - local df_do = criterion:backward(output, targets[i]) - module:backward(inputs[i], df_do) - module:accGradParameters(inputs[i], df_do) - optimizer(module,optimization_parameters) - end - -- we need the result averaged over all the samples _after_ - -- the gradient steps so do one more loop to fprop through - -- the samples and collect the error _after_ the optimization - local f_x = 0 - for i = 1,#inputs do - -- estimate f - local output = module:forward(inputs[i]) - local err = criterion:forward(output, targets[i]) - f_x = f_x + err + -- outer loop: mini-batches + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new mini-batch + inputs = parallel.parent:receive() + targets = parallel.parent:receive() + options = parallel.parent:receive() + + -- inner loop: evaluations + while true do + -- sync + if parallel.yield() == 'break' then break end + + -- receive new set of parameters + parameters:copy(parallel.parent:receive()) + + -- receive the learning rate etc. parameters which are + -- tweaked for each thread + optimization_parameters = parallel.parent:receive() + + -- evaluate gradients on inputs for this thread and perform + -- SGD on these inputs + -- reset gradients + gradParameters:zero() + for i = 1,#inputs do + -- estimate f + local output = module:forward(inputs[i]) + local err = criterion:forward(output, targets[i]) + -- estimate df/dW + local df_do = criterion:backward(output, targets[i]) + module:backward(inputs[i], df_do) + module:accGradParameters(inputs[i], df_do) + optimizer(module,optimization_parameters) + end + -- we need the result averaged over all the samples _after_ + -- the gradient steps so do one more loop to fprop through + -- the samples and collect the error _after_ the optimization + local f_x = 0 + for i = 1,#inputs do + -- estimate f + local output = module:forward(inputs[i]) + local err = criterion:forward(output, targets[i]) + f_x = f_x + err + end + -- in this case send back parameters themselves b/c they are + -- already optimized + parallel.parent:send(parameters) + -- need to make sure we keep track of what was used to + -- compute these params along with the outputs + optimization_parameters['f_x'] = f_x/#inputs + parallel.parent:send(optimization_parameters) + -- force cleanup + collectgarbage() + end end - -- in this case send back parameters themselves b/c they are - -- already optimized - parallel.parent:send(parameters) - -- need to make sure we keep track of what was used to - -- compute these params along with the outputs - optimization_parameters['f_x'] = f_x/#inputs - parallel.parent:send(optimization_parameters) - -- force cleanup - collectgarbage() end - end -end -function GenSGD:setup() - -- (1) optional calibration - if parallel.remotes then - parallel.calibrate() - end - - -- (2) startup all workers - self.children = parallel.sfork(self.parallelize) - self.children:exec(worker_code) - - -- (4) and send them the module + criterion architecture - self.children:join() - self.children:send(self.module) - self.children:send(self.criterion) - self.children:send(self.optimizer) - end + local setup = function() + -- (1) optional calibration + if parallel.remotes then + parallel.calibrate() + end + print(self.P) + print(self.parallelize) + -- (2) startup all workers + self.children = parallel.sfork(self.parallelize) + self.children:exec(worker_code) + + -- (4) and send them the module + criterion architecture + self.children:join() + self.children:send(self.module) + self.children:send(self.criterion) + self.children:send(self.optimizer) + end + + local ok,err = pcall(setup) + if not ok then parallel.close() error(err) end +end
\ No newline at end of file @@ -208,7 +208,7 @@ function nnx.flattenParameters(parameters) if param:storage() == parameters[i]:storage() then offsets[k] = offsets[i] if storageOffsets[k] ~= storageOffsets[i] or elements[k] ~= elements[i] then - error('<nnx.flattenParameters> canot flatten shared weights with different structures') + error('<nnx.flattenParameters> cannot flatten shared weights with different structures') end isView = true break |