Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/clementfarabet/lua---nnx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Scoffier <github@metm.org>2011-09-27 08:06:26 +0400
committerMarco Scoffier <github@metm.org>2011-09-27 08:06:26 +0400
commit103507186e85351becce963f32732dd9ffae0ed1 (patch)
tree28e190c42ad32a547fb12e52a824456ccf9f4090
parent4f1120222c3c6e92987c391321d03e83acd48281 (diff)
working genetic SGD
-rw-r--r--BatchOptimization.lua178
-rw-r--r--GenSGDOptimization.lua45
2 files changed, 121 insertions, 102 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua
index bb4c70c..c84763e 100644
--- a/BatchOptimization.lua
+++ b/BatchOptimization.lua
@@ -55,7 +55,7 @@ function Batch:forward_sequential(inputs, targets, options)
end
local _t_ = sys.clock()
-- reset gradients
- self.gradParameters:zero()
+ self.gradParameters:zero()
-- f is the average of all criterions
self.output = 0
-- given all inputs, evaluate gradients
@@ -76,16 +76,16 @@ function Batch:forward_sequential(inputs, targets, options)
if self.posthook then
self.posthook(self, {inputs[i], targets[i], options[i]})
end
- -- update evaluation counter
- self.evalCounter = self.evalCounter + 1
+ -- update evaluation counter
+ self.evalCounter = self.evalCounter + 1
end
-- update evaluation counter
self.batchCounter = self.batchCounter + 1
-- normalize gradients
- self.gradParameters:div(#inputs)
-
+ self.gradParameters:div(#inputs)
+
-- verbose
if self.verbose >= 2 then
print('<BatchOptimization> ' .. self.batchCounter .. 'th batch took ' .. (sys.clock() - _t_) .. ' sec')
@@ -125,12 +125,13 @@ function Batch:forward_mapreduce(inputs, targets, options)
if self.copyBatch then
-- (0) send same mini-batch to all workers
for t = 1,P do
- self.children[t]:join()
- self.children[t]:send(inputs)
- self.children[t]:send(targets)
- self.children[t]:send(options)
+ self.children[t]:join()
+ self.children[t]:send(inputs)
+ self.children[t]:send(targets)
+ self.children[t]:send(options)
end
- else
+
+ else
-- (0b) divide input/target batch into N batches, based on speed
-- of each worker
local inputss = {}
@@ -138,28 +139,28 @@ function Batch:forward_mapreduce(inputs, targets, options)
local optionss = {}
local speed = 0
for t = 1,P do
- speed = speed + self.children[t].speed
+ speed = speed + self.children[t].speed
end
local n = 1
for t = 1,P do
- inputss[t] = {}
- targetss[t] = {}
- optionss[t] = {}
- for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do
- table.insert(inputss[t], inputs[n])
- table.insert(targetss[t], targets[n])
- if options then table.insert(optionss[t], options[n]) end
- n = n + 1
- if n > #inputs then break end
- end
+ inputss[t] = {}
+ targetss[t] = {}
+ optionss[t] = {}
+ for i = 1,math.ceil(self.children[t].speed*(#inputs)/speed) do
+ table.insert(inputss[t], inputs[n])
+ table.insert(targetss[t], targets[n])
+ if options then table.insert(optionss[t], options[n]) end
+ n = n + 1
+ if n > #inputs then break end
+ end
end
-
+
-- (0c) send parts of mini-batch to each worker
for t = 1,P do
- self.children[t]:join()
- self.children[t]:send(inputss[t])
- self.children[t]:send(targetss[t])
- self.children[t]:send(optionss[t])
+ self.children[t]:join()
+ self.children[t]:send(inputss[t])
+ self.children[t]:send(targetss[t])
+ self.children[t]:send(optionss[t])
end
end
@@ -191,40 +192,40 @@ function Batch:forward_mapreduce(inputs, targets, options)
-- in separate threads
self.evaluate_map
= function()
- if self.map_hook then
- self:map_hook()
- else
- -- transmit new parameters to all workers
- self.children:join()
- self.children:send(self.parameters)
- -- then wait for all workers to return their partial gradParameters + outputs
- gradParametersPartial = self.children:receive()
- outputsPartial = self.children:receive()
- -- force cleanup
- collectgarbage()
- end
- end
+ if self.map_hook then
+ self:map_hook()
+ else
+ -- transmit new parameters to all workers
+ self.children:join()
+ self.children:send(self.parameters)
+ -- then wait for all workers to return their partial gradParameters + outputs
+ gradParametersPartial = self.children:receive()
+ outputsPartial = self.children:receive()
+ -- force cleanup
+ collectgarbage()
+ end
+ end
-- (1b) the reduce part of the evaluation: accumulate all
-- partial estimates of the gradients
self.evaluate_reduce
= function()
if self.reduce_hook then
- self:reduce_hook()
- else
- -- standard reduce is to sum the gradients
- -- accumulate partial gradients, and average
- self.gradParameters:zero()
- for t = 1,P do
- self.gradParameters:add(gradParametersPartial[t])
- end
- self.gradParameters:div(#inputs)
- -- return average f(X)
- self.output = 0
- for t = 1,P do
- self.output = self.output + outputsPartial[t]
- end
- self.output = self.output/#inputs
- end
+ self:reduce_hook()
+ else
+ -- standard reduce is to sum the gradients
+ -- accumulate partial gradients, and average
+ self.gradParameters:zero()
+ for t = 1,P do
+ self.gradParameters:add(gradParametersPartial[t])
+ end
+ self.gradParameters:div(#inputs)
+ -- return average f(X)
+ self.output = 0
+ for t = 1,P do
+ self.output = self.output + outputsPartial[t]
+ end
+ self.output = self.output/#inputs
+ end
end
if self.optimize then
@@ -253,30 +254,31 @@ function Batch:setup_mapreduce ()
xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)',
'nn.BatchOptimization')
end
-
- local worker_code =
+
+ -- (1) define code for workers
+ local worker_code =
function()
-- require packages
require 'nnx'
-
+
-- retrieve optional code to setup worker
precode = parallel.parent:receive()
if type(precode) == 'function' then precode() end
-
+
-- retrieve module + criterion at startup
parallel.yield()
module = parallel.parent:receive()
criterion = parallel.parent:receive()
-
+
-- create fake optimizer, for hooks
optimizer = {module=module, criterion=criterion}
-
+
-- retrieve optional prehook/posthook
prehook = parallel.parent:receive()
posthook = parallel.parent:receive()
if type(prehook) ~= 'function' then prehook = nil end
if type(posthook) ~= 'function' then posthook = nil end
-
+
-- get pointer to parameter and gradParameter vectors
-- (this assumes that parameters+gradParameters are already flat parameters:
-- it should be the case, as the parent process flattens them at __init)
@@ -294,25 +296,25 @@ function Batch:setup_mapreduce ()
check(tableGradParameters)
parameters = torch.Tensor():set(tableParameters[1]:storage())
gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
-
+
-- outer loop: mini-batches
while true do
-- sync
if parallel.yield() == 'break' then break end
-
+
-- receive new mini-batch
inputs = parallel.parent:receive()
targets = parallel.parent:receive()
options = parallel.parent:receive()
-
+
-- inner loop: evaluations
while true do
-- sync
if parallel.yield() == 'break' then break end
-
+
-- receive new set of parameters
parameters:copy(parallel.parent:receive())
-
+
-- reset gradients
gradParameters:zero()
-- f is the average of all criterions
@@ -333,11 +335,7 @@ function Batch:setup_mapreduce ()
module:accGradParameters(inputs[i], df_do)
-- user hook
if posthook then
- if #inputs == #options then
- posthook(optimizer, {inputs[i], targets[i], options[i]})
- else
- posthook(module,options)
- end
+ posthook(optimizer, {inputs[i], targets[i], options[i]})
end
end
-- now send back gradParameters + partial output
@@ -350,24 +348,24 @@ function Batch:setup_mapreduce ()
end
-- (2) dispatch workers
local setup = function()
- -- (1) optional calibration
- if parallel.remotes then
- parallel.calibrate()
- end
-
- -- (2) startup all workers
- self.children = parallel.sfork(self.parallelize)
- self.children:exec(worker_code)
-
- -- (3) send them optional config code
- self.children:send(self.precode or '')
-
- -- (4) and send them the module + criterion architecture
- self.children:join()
- self.children:send(self.module)
- self.children:send(self.criterion)
- end
-
+ -- (1) optional calibration
+ if parallel.remotes then
+ parallel.calibrate()
+ end
+
+ -- (2) startup all workers
+ self.children = parallel.sfork(self.parallelize)
+ self.children:exec(worker_code)
+
+ -- (3) send them optional config code
+ self.children:send(self.precode or '')
+
+ -- (4) and send them the module + criterion architecture
+ self.children:join()
+ self.children:send(self.module)
+ self.children:send(self.criterion)
+ end
+
local ok,err = pcall(setup)
if not ok then parallel.close() error(err) end
end
diff --git a/GenSGDOptimization.lua b/GenSGDOptimization.lua
index c6a5caa..41aab82 100644
--- a/GenSGDOptimization.lua
+++ b/GenSGDOptimization.lua
@@ -2,7 +2,7 @@ local GenSGD,parent = torch.class('nn.GenSGDOptimization',
'nn.BatchOptimization')
-- this module parallelizes SGD in a particular way. It sends out the
--- same batch to each of several worker each with a different learning
+-- same batch to each of several workers, each with a different learning
-- rate. The workers run and the parameters from the best worker and
-- it's learning rate are kept for the next batch.
@@ -36,7 +36,8 @@ function GenSGD:__init(...)
self.baseParameters = { momentum = self.momentum,
weightDecay = self.weightDecay,
learningRate = self.learningRate,
- learningRateDecay = self.learningRateDecay
+ learningRateDecay = self.learningRateDecay,
+ sampleCounter = self.sampleCounter
}
end
@@ -44,14 +45,25 @@ end
-- change gradParametersPartial to ParametersPartial, as the logic is
-- different for this kind of parallelization.
function GenSGD:map_hook()
+ local P = self.parallelize
-- transmit new parameters to all workers
self.children:join()
self.children:send(self.parameters)
+ print('randomizing for '..P..' lr: '..self.learningRate..' sigma: '..self.sigma)
-- randomize learning rate (could randomize other bits)
- local n = self.learningRate + (lab.randn(P) * self.sigma)
- for i = 1,P do
- self.baseParameters[learningRate] = n[i]
- self.children[t]:join()
+ local n = torch.Tensor(P)
+
+ n[1] = self.learningRate
+ n[2] = self.learningRate * 10
+ n[3] = self.learningRate / 10
+ n[4] = self.learningRate / 100
+-- (lab.randn(P) * self.sigma):add(self.learningRate)
+ self.baseParameters.sampleCounter = self.sampleCounter
+
+ for t = 1,P do
+ self.baseParameters.learningRate = n[t]
+ print('lr: '..self.baseParameters.learningRate)
+ --self.children[t]:join()
self.children[t]:send(self.baseParameters)
end
-- then wait for all workers to return their Parameters + outputs
@@ -63,6 +75,7 @@ function GenSGD:map_hook()
end
function GenSGD:reduce_hook()
+ local P = self.parallelize
local id = 0
local mx = 1e9
for t = 1,P do
@@ -75,11 +88,12 @@ function GenSGD:reduce_hook()
xerror('diverging','nn.GenSGDOptimization')
else
self.baseParameters = outputsPartial[id]
- self.output = self.currentParameters.f_x
+ self.output = self.baseParameters.f_x
-- in this case we get the parameters back directly
self.parameters:copy(gradParametersPartial[id])
print('Winner: output = '..self.output..
'learningRate = '..self.baseParameters['learningRate'])
+ self.learningRate = self.baseParameters.learningRate
end
end
@@ -88,8 +102,9 @@ function GenSGD:optimize()
end
-- optimization (could do others in this mode)
-function GenSGD:optimizer(module,params)
- -- apply momentum (store in the module)
+GenSGD.optimizer =
+ function (module,params)
+ -- apply momentum (store in the module)
if params.momentum ~= 0 then
if not module.currentGradParameters then
module.currentGradParameters =
@@ -132,6 +147,12 @@ function GenSGD:setup_mapreduce ()
criterion = parallel.parent:receive()
optimizer = parallel.parent:receive()
+ -- retrieve optional prehook/posthook
+ prehook = parallel.parent:receive()
+ posthook = parallel.parent:receive()
+ if type(prehook) ~= 'function' then prehook = nil end
+ if type(posthook) ~= 'function' then posthook = nil end
+
-- I don't understand this [MS]
-- get pointer to parameter and gradParameter vectors
-- (this assumes that parameters+gradParameters are already flat parameters:
@@ -168,7 +189,6 @@ function GenSGD:setup_mapreduce ()
-- receive new set of parameters
parameters:copy(parallel.parent:receive())
-
-- receive the learning rate etc. parameters which are
-- tweaked for each thread
optimization_parameters = parallel.parent:receive()
@@ -177,6 +197,8 @@ function GenSGD:setup_mapreduce ()
-- SGD on these inputs
-- reset gradients
gradParameters:zero()
+ module.parameters = parameters
+ module.gradParameters = gradParameters
for i = 1,#inputs do
-- estimate f
local output = module:forward(inputs[i])
@@ -215,8 +237,7 @@ function GenSGD:setup_mapreduce ()
if parallel.remotes then
parallel.calibrate()
end
- print(self.P)
- print(self.parallelize)
+
-- (2) startup all workers
self.children = parallel.sfork(self.parallelize)
self.children:exec(worker_code)