Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/clementfarabet/lua---nnx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Scoffier <github@metm.org>2011-09-24 04:23:43 +0400
committerMarco Scoffier <github@metm.org>2011-09-24 04:23:43 +0400
commit6a54a1d023cb7f5a421d8186f396f1d17fe53b66 (patch)
treed09942bd998bdec180529e5bda961abea7c2dd5a
parented8881eadcd626307adbc7d2a8fa98fd8d7fd98f (diff)
new branch for different kind of parallel Optimization
-rw-r--r--BatchOptimization.lua356
-rw-r--r--GenSGDOptimization.lua140
2 files changed, 337 insertions, 159 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua
index 4f087b3..85785c2 100644
--- a/BatchOptimization.lua
+++ b/BatchOptimization.lua
@@ -3,7 +3,7 @@ local Batch,parent = torch.class('nn.BatchOptimization', 'nn.Optimization')
-- this is a generic class for any batch optimization modeled after
-- the LBFGS optimization. It simply provides a batch.evaluate() method
-- which creates a self.parameters and self.gradParameters from your
--- self.model
+-- self.module
function Batch:__init(...)
parent.__init(self)
@@ -22,9 +22,10 @@ function Batch:__init(...)
self.parameters = nnx.flattenParameters(nnx.getParameters(self.module))
self.gradParameters = nnx.flattenParameters(nnx.getGradParameters(self.module))
- self.evalCounter = 0
- self.batchCounter = 0
+ self.evalCounter = 0
+ self.batchCounter = 0
self.sampleCounter = 0
+
if self.parallelize > 1 then
self:setup_mapreduce()
end
@@ -121,34 +122,44 @@ function Batch:forward_mapreduce(inputs, targets, options)
local outputsPartial = {}
local gradParametersPartial = {}
- -- (0b) divide input/target batch into N batches, based on speed of each worker
- local inputss = {}
- local targetss = {}
- local optionss = {}
- local speed = 0
- for t = 1,P do
- speed = speed + self.children[t].speed
- end
- local n = 1
- for t = 1,P do
- inputss[t] = {}
- targetss[t] = {}
- optionss[t] = {}
- for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do
- table.insert(inputss[t], inputs[n])
- table.insert(targetss[t], targets[n])
- if options then table.insert(optionss[t], options[n]) end
- n = n + 1
- if n > #inputs then break end
+ if self.copyBatch then
+ -- (0) send same mini-batch to all workers
+ for t = 1,P do
+ self.children[t]:join()
+ self.children[t]:send(inputs)
+ self.children[t]:send(targets)
+ self.children[t]:send(options)
+ end
+ else
+ -- (0b) divide input/target batch into N batches, based on speed of each worker
+ local inputss = {}
+ local targetss = {}
+ local optionss = {}
+ local speed = 0
+ for t = 1,P do
+ speed = speed + self.children[t].speed
+ end
+ local n = 1
+ for t = 1,P do
+ inputss[t] = {}
+ targetss[t] = {}
+ optionss[t] = {}
+ for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do
+ table.insert(inputss[t], inputs[n])
+ table.insert(targetss[t], targets[n])
+ if options then table.insert(optionss[t], options[n]) end
+ n = n + 1
+ if n > #inputs then break end
+ end
+ end
+
+ -- (0c) send parts of mini-batch to each worker
+ for t = 1,P do
+ self.children[t]:join()
+ self.children[t]:send(inputss[t])
+ self.children[t]:send(targetss[t])
+ self.children[t]:send(optionss[t])
end
- end
-
- -- (0c) send mini-batch to all workers
- for t = 1,P do
- self.children[t]:join()
- self.children[t]:send(inputss[t])
- self.children[t]:send(targetss[t])
- self.children[t]:send(optionss[t])
end
-- (1) construct a closure that compute f(inputs) + df/dW
@@ -179,32 +190,40 @@ function Batch:forward_mapreduce(inputs, targets, options)
-- in separate threads
self.evaluate_map
= function()
- -- transmit new parameters to all workers
- self.children:join()
- self.children:send(self.parameters)
- -- then wait for all workers to return their partial gradParameters + outputs
- gradParametersPartial = self.children:receive()
- outputsPartial = self.children:receive()
- -- force cleanup
- collectgarbage()
- end
-
+ if self.map_hook then
+ self:map_hook()
+ else
+ -- transmit new parameters to all workers
+ self.children:join()
+ self.children:send(self.parameters)
+ -- then wait for all workers to return their partial gradParameters + outputs
+ gradParametersPartial = self.children:receive()
+ outputsPartial = self.children:receive()
+ -- force cleanup
+ collectgarbage()
+ end
+ end
-- (1b) the reduce part of the evaluation: accumulate all
-- partial estimates of the gradients
self.evaluate_reduce
= function()
- -- accumulate partial gradients, and average
- self.gradParameters:zero()
- for t = 1,P do
- self.gradParameters:add(gradParametersPartial[t])
- end
- self.gradParameters:div(#inputs)
- -- return average f(X)
- self.output = 0
- for t = 1,P do
- self.output = self.output + outputsPartial[t]
- end
- self.output = self.output/#inputs
+ if self.reduce_hook then
+ self:reduce_hook()
+ else
+ -- standard reduce is to sum the gradients
+ -- accumulate partial gradients, and average
+ self.gradParameters:zero()
+ for t = 1,P do
+ self.gradParameters:add(gradParametersPartial[t])
+ end
+ self.gradParameters:div(#inputs)
+ -- return average f(X)
+ self.output = 0
+ for t = 1,P do
+ self.output = self.output + outputsPartial[t]
+ end
+ self.output = self.output/#inputs
+ end
end
if self.optimize then
@@ -230,117 +249,136 @@ function Batch:setup_mapreduce ()
'nn.BatchOptimization')
end
- -- (1) define code for workers
- local worker_code =
- function()
- -- require packages
- require 'nnx'
-
- -- retrieve optional code to setup worker
- precode = parallel.parent:receive()
- if type(precode) == 'function' then precode() end
+ local worker_code = function () end
- -- retrieve module + criterion at startup
- parallel.yield()
- module = parallel.parent:receive()
- criterion = parallel.parent:receive()
+ -- (1) define code for workers
- -- create fake optimizer, for hooks
- optimizer = {module=module, criterion=criterion}
-
- -- retrieve optional prehook/posthook
- prehook = parallel.parent:receive()
- posthook = parallel.parent:receive()
- if type(prehook) ~= 'function' then prehook = nil end
- if type(posthook) ~= 'function' then posthook = nil end
-
- -- get pointer to parameter and gradParameter vectors
- -- (this assumes that parameters+gradParameters are already flat parameters:
- -- it should be the case, as the parent process flattens them at __init)
- function check(tocheck)
- for i = 2,#tocheck do
- if tocheck[i]:storage() ~= tocheck[i-1]:storage() then
- print('<BatchOptimization> error: inconsistent parameter vector (not flat)')
- return
- end
- end
- end
- tableParameters = nnx.getParameters(module)
- tableGradParameters = nnx.getGradParameters(module)
- check(tableParameters)
- check(tableGradParameters)
- parameters = torch.Tensor():set(tableParameters[1]:storage())
- gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
-
- -- outter loop: mini-batches
- while true do
- -- sync
- if parallel.yield() == 'break' then break end
-
- -- receive new mini-batch
- inputs = parallel.parent:receive()
- targets = parallel.parent:receive()
- options = parallel.parent:receive()
-
- -- inner loop: evaluations
- while true do
- -- sync
- if parallel.yield() == 'break' then break end
-
- -- receive new set of parameters
- parameters:copy(parallel.parent:receive())
-
- -- reset gradients
- gradParameters:zero()
- -- f is the average of all criterions
- local f_x = 0
- -- evaluate gradients on inputs for this thread
- for i = 1,#inputs do
- -- user hook
- if prehook then
- prehook(optimizer, {inputs[i], targets[i], options[i]})
- end
- -- estimate f
- local output = module:forward(inputs[i])
- local err = criterion:forward(output, targets[i])
- f_x = f_x + err
- -- estimate df/dW
- local df_do = criterion:backward(output, targets[i])
- module:backward(inputs[i], df_do)
- module:accGradParameters(inputs[i], df_do)
- -- user hook
- if posthook then
- posthook(optimizer, {inputs[i], targets[i], options[i]})
- end
- end
- -- now send back gradParameters + partial output
- parallel.parent:send(gradParameters)
- parallel.parent:send(f_x)
- -- force cleanup
- collectgarbage()
- end
- end
- end
+ -- [MS] this default worker code is too detailed needs to be a
+ -- skeleton which is easier to adapt... for now I am allowing the
+ -- worker and setup functions to be overridden
+ if self.worker_code then
+ worker_code = self.worker_code
+ else
+ worker_code =
+ function()
+ -- require packages
+ require 'nnx'
+
+ -- retrieve optional code to setup worker
+ precode = parallel.parent:receive()
+ if type(precode) == 'function' then precode() end
+
+ -- retrieve module + criterion at startup
+ parallel.yield()
+ module = parallel.parent:receive()
+ criterion = parallel.parent:receive()
+
+ -- create fake optimizer, for hooks
+ optimizer = {module=module, criterion=criterion}
+
+ -- retrieve optional prehook/posthook
+ prehook = parallel.parent:receive()
+ posthook = parallel.parent:receive()
+ if type(prehook) ~= 'function' then prehook = nil end
+ if type(posthook) ~= 'function' then posthook = nil end
+
+ -- get pointer to parameter and gradParameter vectors
+ -- (this assumes that parameters+gradParameters are already flat parameters:
+ -- it should be the case, as the parent process flattens them at __init)
+ function check(tocheck)
+ for i = 2,#tocheck do
+ if tocheck[i]:storage() ~= tocheck[i-1]:storage() then
+ print('<BatchOptimization> error: inconsistent parameter vector (not flat)')
+ return
+ end
+ end
+ end
+ tableParameters = nnx.getParameters(module)
+ tableGradParameters = nnx.getGradParameters(module)
+ check(tableParameters)
+ check(tableGradParameters)
+ parameters = torch.Tensor():set(tableParameters[1]:storage())
+ gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
+
+ -- outer loop: mini-batches
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new mini-batch
+ inputs = parallel.parent:receive()
+ targets = parallel.parent:receive()
+ options = parallel.parent:receive()
+
+ -- inner loop: evaluations
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new set of parameters
+ parameters:copy(parallel.parent:receive())
+
+ -- reset gradients
+ gradParameters:zero()
+ -- f is the average of all criterions
+ local f_x = 0
+ -- evaluate gradients on inputs for this thread
+ for i = 1,#inputs do
+ -- user hook
+ if prehook then
+ prehook(optimizer, {inputs[i], targets[i], options[i]})
+ end
+ -- estimate f
+ local output = module:forward(inputs[i])
+ local err = criterion:forward(output, targets[i])
+ f_x = f_x + err
+ -- estimate df/dW
+ local df_do = criterion:backward(output, targets[i])
+ module:backward(inputs[i], df_do)
+ module:accGradParameters(inputs[i], df_do)
+ -- user hook
+ if posthook then
+ if #inputs == #options then
+ posthook(optimizer, {inputs[i], targets[i], options[i]})
+ else
+ posthook(module,options)
+ end
+ end
+ end
+ -- now send back gradParameters + partial output
+ parallel.parent:send(gradParameters)
+ parallel.parent:send(f_x)
+ -- force cleanup
+ collectgarbage()
+ end
+ end
+ end
+ end
-- (2) dispatch workers
- local setup = function()
- -- (1) optional calibration
- if parallel.remotes then
- parallel.calibrate()
- end
-
- -- (2) startup all workers
- self.children = parallel.sfork(self.parallelize)
- self.children:exec(worker_code)
-
- -- (3) send them optional config code
- self.children:send(self.precode or '')
-
- -- (4) and send them the module + criterion architecture
- self.children:join()
- self.children:send(self.module)
- self.children:send(self.criterion)
- end
+ local setup = function() end
+ if self.setup then
+ setup = self.setup
+ else
+ setup = function()
+ -- (1) optional calibration
+ if parallel.remotes then
+ parallel.calibrate()
+ end
+
+ -- (2) startup all workers
+ self.children = parallel.sfork(self.parallelize)
+ self.children:exec(worker_code)
+
+ -- (3) send them optional config code
+ self.children:send(self.precode or '')
+
+ -- (4) and send them the module + criterion architecture
+ self.children:join()
+ self.children:send(self.module)
+ self.children:send(self.criterion)
+ end
+ end
local ok,err = pcall(setup)
if not ok then parallel.close() error(err) end
end
diff --git a/GenSGDOptimization.lua b/GenSGDOptimization.lua
new file mode 100644
index 0000000..61f7476
--- /dev/null
+++ b/GenSGDOptimization.lua
@@ -0,0 +1,140 @@
+local GenSGD,parent = torch.class('nn.GenSGDOptimization',
+'nn.BatchOptimization')
+
+-- this module parallelizes SGD in a particular way. It sends out the
+-- same batch to each of several worker each with a different learning
+-- rate. The workers run and the parameters from the best worker and
+-- it's learning rate are kept for the next batch.
+
+function GenSGD:__init(...)
+ parent.__init(self,...)
+ xlua.unpack_class(self, {...},
+ 'GenSGDOptimization', nil,
+ {arg='maxIterations', type='number',
+ help='maximum nb of iterations per pass', default=1},
+ {arg='learningRate', type='number',
+ help='learning rate (W = W - rate*dE/dW)', default=1e-2},
+ {arg='learningRateDecay', type='number',
+ help='learning rate decay (lr_t = lr_0 / (1 + samplesSeen*lrDecay))', default=0},
+ {arg='weightDecay', type='number',
+ help='amount of weight decay (W = W - decay*W)', default=0},
+ {arg='momentum', type='number',
+ help='amount of momentum on weights (dE/W = dE/dW*(1-momentum) + prev(dE/dW)*momentum)', default=0}
+ )
+ if self.parallelize < 2 then
+ print('ERROR: GenSGD needs to work on several processors')
+ end
+ -- change the mapper to send the same batch to each worker
+ self.copyBatch = true
+ self.currentLearningRate = learningRate
+ self.workerRates = torch.Tensor(self.P)
+end
+
+function GenSGD:map_hook()
+end
+
+function GenSGD:reduce_hook()
+end
+
+function GenSGD:optimize()
+ self.evaluate()
+end
+
+
+function GenSGD:worker_code()
+ -- require packages
+ require 'nnx'
+
+ -- retrieve module + criterion at startup
+ parallel.yield()
+ module = parallel.parent:receive()
+ criterion = parallel.parent:receive()
+ optimizer = parallel.parent:receive()
+
+ parameters = nnx.flattenParameters(nnx.getParameters(self.module))
+ gradParameters = nnx.flattenParameters(nnx.getGradParameters(self.module))
+
+ -- outer loop: mini-batches
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new mini-batch
+ inputs = parallel.parent:receive()
+ targets = parallel.parent:receive()
+ options = parallel.parent:receive()
+
+ -- inner loop: evaluations
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new set of parameters
+ parameters:copy(parallel.parent:receive())
+
+ -- f is the average of all criterions
+ local f_x = 0
+ -- evaluate gradients on inputs for this thread
+ for i = 1,#inputs do
+ -- reset gradients
+ gradParameters:zero()
+ -- estimate f
+ local output = module:forward(inputs[i])
+ local err = criterion:forward(output, targets[i])
+ f_x = f_x + err
+ -- estimate df/dW
+ local df_do = criterion:backward(output, targets[i])
+ module:backward(inputs[i], df_do)
+ module:accGradParameters(inputs[i], df_do)
+ optimizer
+
+ end
+ -- now send back parameters b/c they are already optimized
+ parallel.parent:send(parameters)
+ parallel.parent:send(f_x)
+ -- force cleanup
+ collectgarbage()
+ end
+ end
+end
+
+function GenSGD:setup()
+ -- (1) optional calibration
+ if parallel.remotes then
+ parallel.calibrate()
+ end
+
+ -- (2) startup all workers
+ self.children = parallel.sfork(self.parallelize)
+ self.children:exec(worker_code)
+
+ -- (4) and send them the module + criterion architecture
+ self.children:join()
+ self.children:send(self.module)
+ self.children:send(self.criterion)
+ self.children:send(self.optimizer)
+end
+
+function GenSGD:post_hook(module,options)
+ -- we do the SGD on the worker
+ -- apply momentum
+ if options.momentum ~= 0 then
+ if not module.currentGradParameters then
+ module.currentGradParameters = torch.Tensor():resizeAs(gradParameters):copy(gradParameters)
+ else
+ options.currentGradParameters:mul(options.momentum):add(1-options.momentum, gradParameters)
+ end
+ else
+ options.currentGradParameters = gradParameters
+ end
+
+ -- weight decay
+ if options.weightDecay ~= 0 then
+ options.parameters:add(-options.weightDecay, options.parameters)
+ end
+
+ -- update parameters
+ local learningRate = self.learningRate /
+ (1 + self.sampleCounter*self.learningRateDecay)
+ self.parameters:add(-learningRate, self.currentGradParameters)
+end