Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/clementfarabet/lua---nnx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Scoffier <github@metm.org>2011-09-27 02:02:54 +0400
committerMarco Scoffier <github@metm.org>2011-09-27 02:02:54 +0400
commit4f1120222c3c6e92987c391321d03e83acd48281 (patch)
tree08e0e178e7a2eebf49d54be54a6efb46135147ff
parentff8a4830a8012d272ec423899efa5c4e632350c0 (diff)
almost working
-rw-r--r--BatchOptimization.lua251
-rw-r--r--GenSGDOptimization.lua204
-rw-r--r--init.lua2
3 files changed, 236 insertions, 221 deletions
diff --git a/BatchOptimization.lua b/BatchOptimization.lua
index aa210d7..bb4c70c 100644
--- a/BatchOptimization.lua
+++ b/BatchOptimization.lua
@@ -243,144 +243,131 @@ function Batch:forward_mapreduce(inputs, targets, options)
return self.output
end
+-- [MS] this default worker code is too detailed needs to be a
+-- skeleton which is easier to adapt... for now I am overriding this
+-- whole function with the 2 closures in GenSGD
+
function Batch:setup_mapreduce ()
-- (0) startup parallel package
if not xrequire 'parallel' then
xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)',
'nn.BatchOptimization')
end
-
- local worker_code = function () end
-
- -- (1) define code for workers
-
- -- [MS] this default worker code is too detailed needs to be a
- -- skeleton which is easier to adapt... for now I am allowing the
- -- worker and setup functions to be overridden
-
- if self.worker_code then
- worker_code = self.worker_code
- else
- worker_code =
- function()
- -- require packages
- require 'nnx'
-
- -- retrieve optional code to setup worker
- precode = parallel.parent:receive()
- if type(precode) == 'function' then precode() end
-
- -- retrieve module + criterion at startup
- parallel.yield()
- module = parallel.parent:receive()
- criterion = parallel.parent:receive()
-
- -- create fake optimizer, for hooks
- optimizer = {module=module, criterion=criterion}
-
- -- retrieve optional prehook/posthook
- prehook = parallel.parent:receive()
- posthook = parallel.parent:receive()
- if type(prehook) ~= 'function' then prehook = nil end
- if type(posthook) ~= 'function' then posthook = nil end
-
- -- get pointer to parameter and gradParameter vectors
- -- (this assumes that parameters+gradParameters are already flat parameters:
- -- it should be the case, as the parent process flattens them at __init)
- function check(tocheck)
- for i = 2,#tocheck do
- if tocheck[i]:storage() ~= tocheck[i-1]:storage() then
- print('<BatchOptimization> error: inconsistent parameter vector (not flat)')
- return
- end
- end
- end
- tableParameters = nnx.getParameters(module)
- tableGradParameters = nnx.getGradParameters(module)
- check(tableParameters)
- check(tableGradParameters)
- parameters = torch.Tensor():set(tableParameters[1]:storage())
- gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
-
- -- outer loop: mini-batches
- while true do
- -- sync
- if parallel.yield() == 'break' then break end
-
- -- receive new mini-batch
- inputs = parallel.parent:receive()
- targets = parallel.parent:receive()
- options = parallel.parent:receive()
-
- -- inner loop: evaluations
- while true do
- -- sync
- if parallel.yield() == 'break' then break end
-
- -- receive new set of parameters
- parameters:copy(parallel.parent:receive())
-
- -- reset gradients
- gradParameters:zero()
- -- f is the average of all criterions
- local f_x = 0
- -- evaluate gradients on inputs for this thread
- for i = 1,#inputs do
- -- user hook
- if prehook then
- prehook(optimizer, {inputs[i], targets[i], options[i]})
- end
- -- estimate f
- local output = module:forward(inputs[i])
- local err = criterion:forward(output, targets[i])
- f_x = f_x + err
- -- estimate df/dW
- local df_do = criterion:backward(output, targets[i])
- module:backward(inputs[i], df_do)
- module:accGradParameters(inputs[i], df_do)
- -- user hook
- if posthook then
- if #inputs == #options then
- posthook(optimizer, {inputs[i], targets[i], options[i]})
- else
- posthook(module,options)
- end
- end
- end
- -- now send back gradParameters + partial output
- parallel.parent:send(gradParameters)
- parallel.parent:send(f_x)
- -- force cleanup
- collectgarbage()
- end
- end
- end
- end
+
+ local worker_code =
+ function()
+ -- require packages
+ require 'nnx'
+
+ -- retrieve optional code to setup worker
+ precode = parallel.parent:receive()
+ if type(precode) == 'function' then precode() end
+
+ -- retrieve module + criterion at startup
+ parallel.yield()
+ module = parallel.parent:receive()
+ criterion = parallel.parent:receive()
+
+ -- create fake optimizer, for hooks
+ optimizer = {module=module, criterion=criterion}
+
+ -- retrieve optional prehook/posthook
+ prehook = parallel.parent:receive()
+ posthook = parallel.parent:receive()
+ if type(prehook) ~= 'function' then prehook = nil end
+ if type(posthook) ~= 'function' then posthook = nil end
+
+ -- get pointer to parameter and gradParameter vectors
+ -- (this assumes that parameters+gradParameters are already flat parameters:
+ -- it should be the case, as the parent process flattens them at __init)
+ function check(tocheck)
+ for i = 2,#tocheck do
+ if tocheck[i]:storage() ~= tocheck[i-1]:storage() then
+ print('<BatchOptimization> error: inconsistent parameter vector (not flat)')
+ return
+ end
+ end
+ end
+ tableParameters = nnx.getParameters(module)
+ tableGradParameters = nnx.getGradParameters(module)
+ check(tableParameters)
+ check(tableGradParameters)
+ parameters = torch.Tensor():set(tableParameters[1]:storage())
+ gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
+
+ -- outer loop: mini-batches
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new mini-batch
+ inputs = parallel.parent:receive()
+ targets = parallel.parent:receive()
+ options = parallel.parent:receive()
+
+ -- inner loop: evaluations
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new set of parameters
+ parameters:copy(parallel.parent:receive())
+
+ -- reset gradients
+ gradParameters:zero()
+ -- f is the average of all criterions
+ local f_x = 0
+ -- evaluate gradients on inputs for this thread
+ for i = 1,#inputs do
+ -- user hook
+ if prehook then
+ prehook(optimizer, {inputs[i], targets[i], options[i]})
+ end
+ -- estimate f
+ local output = module:forward(inputs[i])
+ local err = criterion:forward(output, targets[i])
+ f_x = f_x + err
+ -- estimate df/dW
+ local df_do = criterion:backward(output, targets[i])
+ module:backward(inputs[i], df_do)
+ module:accGradParameters(inputs[i], df_do)
+ -- user hook
+ if posthook then
+ if #inputs == #options then
+ posthook(optimizer, {inputs[i], targets[i], options[i]})
+ else
+ posthook(module,options)
+ end
+ end
+ end
+ -- now send back gradParameters + partial output
+ parallel.parent:send(gradParameters)
+ parallel.parent:send(f_x)
+ -- force cleanup
+ collectgarbage()
+ end
+ end
+ end
-- (2) dispatch workers
- local setup = function() end
-
- if self.setup then
- setup = self.setup
- else
- setup = function()
- -- (1) optional calibration
- if parallel.remotes then
- parallel.calibrate()
- end
-
- -- (2) startup all workers
- self.children = parallel.sfork(self.parallelize)
- self.children:exec(worker_code)
-
- -- (3) send them optional config code
- self.children:send(self.precode or '')
-
- -- (4) and send them the module + criterion architecture
- self.children:join()
- self.children:send(self.module)
- self.children:send(self.criterion)
- end
- end
- local ok,err = pcall(setup(self))
+ local setup = function()
+ -- (1) optional calibration
+ if parallel.remotes then
+ parallel.calibrate()
+ end
+
+ -- (2) startup all workers
+ self.children = parallel.sfork(self.parallelize)
+ self.children:exec(worker_code)
+
+ -- (3) send them optional config code
+ self.children:send(self.precode or '')
+
+ -- (4) and send them the module + criterion architecture
+ self.children:join()
+ self.children:send(self.module)
+ self.children:send(self.criterion)
+ end
+
+ local ok,err = pcall(setup)
if not ok then parallel.close() error(err) end
end
diff --git a/GenSGDOptimization.lua b/GenSGDOptimization.lua
index a5c8efe..c6a5caa 100644
--- a/GenSGDOptimization.lua
+++ b/GenSGDOptimization.lua
@@ -27,7 +27,8 @@ function GenSGD:__init(...)
)
require 'lab'
if self.parallelize < 2 then
- print('ERROR: GenSGD needs to work on several processors')
+ xerror('GenSGD needs to work on several processors: set parallelize',
+ 'nn.GenSGDOptimization')
end
-- change the mapper to send the same batch to each worker
self.copyBatch = true
@@ -37,7 +38,6 @@ function GenSGD:__init(...)
learningRate = self.learningRate,
learningRateDecay = self.learningRateDecay
}
- self.workerParameters = torch.Tensor(self.P)
end
-- we are changing the way we map and reduce. It would be nice to
@@ -72,7 +72,7 @@ function GenSGD:reduce_hook()
end
end
if id == 0 then
- print('ERROR: diverging')
+ xerror('diverging','nn.GenSGDOptimization')
else
self.baseParameters = outputsPartial[id]
self.output = self.currentParameters.f_x
@@ -114,92 +114,120 @@ function GenSGD:optimizer(module,params)
params.learningRate = learningRate
end
-function GenSGD:worker_code()
- -- require packages
- require 'nnx'
-
- -- retrieve module + criterion at startup
- parallel.yield()
-
- module = parallel.parent:receive()
- criterion = parallel.parent:receive()
- optimizer = parallel.parent:receive()
-
- module.parameters = nnx.flattenParameters(nnx.getParameters(module))
- module.gradParameters = nnx.flattenParameters(nnx.getGradParameters(module))
+function GenSGD:setup_mapreduce ()
+ -- (0) startup parallel package
+ if not xrequire 'parallel' then
+ xerror('install parallel for Lua to enable parallel computing (luarocks install parallel)',
+ 'nn.GenSGDOptimization')
+ end
+ local worker_code =
+ function()
+ -- require packages
+ require 'nnx'
+
+ -- retrieve module + criterion at startup
+ parallel.yield()
+
+ module = parallel.parent:receive()
+ criterion = parallel.parent:receive()
+ optimizer = parallel.parent:receive()
+
+ -- I don't understand this [MS]
+ -- get pointer to parameter and gradParameter vectors
+ -- (this assumes that parameters+gradParameters are already flat parameters:
+ -- it should be the case, as the parent process flattens them at __init)
+ function check(tocheck)
+ for i = 2,#tocheck do
+ if tocheck[i]:storage() ~= tocheck[i-1]:storage() then
+ print('<BatchOptimization> error: inconsistent parameter vector (not flat)')
+ return
+ end
+ end
+ end
+ tableParameters = nnx.getParameters(module)
+ tableGradParameters = nnx.getGradParameters(module)
+ check(tableParameters)
+ check(tableGradParameters)
+ parameters = torch.Tensor():set(tableParameters[1]:storage())
+ gradParameters = torch.Tensor():set(tableGradParameters[1]:storage())
- -- outer loop: mini-batches
- while true do
- -- sync
- if parallel.yield() == 'break' then break end
-
- -- receive new mini-batch
- inputs = parallel.parent:receive()
- targets = parallel.parent:receive()
- options = parallel.parent:receive()
-
- -- inner loop: evaluations
- while true do
- -- sync
- if parallel.yield() == 'break' then break end
-
- -- receive new set of parameters
- parameters:copy(parallel.parent:receive())
-
- -- receive the learning rate etc. parameters which are
- -- tweaked for each thread
- optimization_parameters = parallel.parent:receive()
-
- -- evaluate gradients on inputs for this thread and perform
- -- SGD on these inputs
- -- reset gradients
- gradParameters:zero()
- for i = 1,#inputs do
- -- estimate f
- local output = module:forward(inputs[i])
- local err = criterion:forward(output, targets[i])
- -- estimate df/dW
- local df_do = criterion:backward(output, targets[i])
- module:backward(inputs[i], df_do)
- module:accGradParameters(inputs[i], df_do)
- optimizer(module,optimization_parameters)
- end
- -- we need the result averaged over all the samples _after_
- -- the gradient steps so do one more loop to fprop through
- -- the samples and collect the error _after_ the optimization
- local f_x = 0
- for i = 1,#inputs do
- -- estimate f
- local output = module:forward(inputs[i])
- local err = criterion:forward(output, targets[i])
- f_x = f_x + err
+ -- outer loop: mini-batches
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new mini-batch
+ inputs = parallel.parent:receive()
+ targets = parallel.parent:receive()
+ options = parallel.parent:receive()
+
+ -- inner loop: evaluations
+ while true do
+ -- sync
+ if parallel.yield() == 'break' then break end
+
+ -- receive new set of parameters
+ parameters:copy(parallel.parent:receive())
+
+ -- receive the learning rate etc. parameters which are
+ -- tweaked for each thread
+ optimization_parameters = parallel.parent:receive()
+
+ -- evaluate gradients on inputs for this thread and perform
+ -- SGD on these inputs
+ -- reset gradients
+ gradParameters:zero()
+ for i = 1,#inputs do
+ -- estimate f
+ local output = module:forward(inputs[i])
+ local err = criterion:forward(output, targets[i])
+ -- estimate df/dW
+ local df_do = criterion:backward(output, targets[i])
+ module:backward(inputs[i], df_do)
+ module:accGradParameters(inputs[i], df_do)
+ optimizer(module,optimization_parameters)
+ end
+ -- we need the result averaged over all the samples _after_
+ -- the gradient steps so do one more loop to fprop through
+ -- the samples and collect the error _after_ the optimization
+ local f_x = 0
+ for i = 1,#inputs do
+ -- estimate f
+ local output = module:forward(inputs[i])
+ local err = criterion:forward(output, targets[i])
+ f_x = f_x + err
+ end
+ -- in this case send back parameters themselves b/c they are
+ -- already optimized
+ parallel.parent:send(parameters)
+ -- need to make sure we keep track of what was used to
+ -- compute these params along with the outputs
+ optimization_parameters['f_x'] = f_x/#inputs
+ parallel.parent:send(optimization_parameters)
+ -- force cleanup
+ collectgarbage()
+ end
end
- -- in this case send back parameters themselves b/c they are
- -- already optimized
- parallel.parent:send(parameters)
- -- need to make sure we keep track of what was used to
- -- compute these params along with the outputs
- optimization_parameters['f_x'] = f_x/#inputs
- parallel.parent:send(optimization_parameters)
- -- force cleanup
- collectgarbage()
end
- end
-end
-function GenSGD:setup()
- -- (1) optional calibration
- if parallel.remotes then
- parallel.calibrate()
- end
-
- -- (2) startup all workers
- self.children = parallel.sfork(self.parallelize)
- self.children:exec(worker_code)
-
- -- (4) and send them the module + criterion architecture
- self.children:join()
- self.children:send(self.module)
- self.children:send(self.criterion)
- self.children:send(self.optimizer)
- end
+ local setup = function()
+ -- (1) optional calibration
+ if parallel.remotes then
+ parallel.calibrate()
+ end
+ print(self.P)
+ print(self.parallelize)
+ -- (2) startup all workers
+ self.children = parallel.sfork(self.parallelize)
+ self.children:exec(worker_code)
+
+ -- (4) and send them the module + criterion architecture
+ self.children:join()
+ self.children:send(self.module)
+ self.children:send(self.criterion)
+ self.children:send(self.optimizer)
+ end
+
+ local ok,err = pcall(setup)
+ if not ok then parallel.close() error(err) end
+end \ No newline at end of file
diff --git a/init.lua b/init.lua
index 3c9e9b6..a0cab01 100644
--- a/init.lua
+++ b/init.lua
@@ -208,7 +208,7 @@ function nnx.flattenParameters(parameters)
if param:storage() == parameters[i]:storage() then
offsets[k] = offsets[i]
if storageOffsets[k] ~= storageOffsets[i] or elements[k] ~= elements[i] then
- error('<nnx.flattenParameters> canot flatten shared weights with different structures')
+ error('<nnx.flattenParameters> cannot flatten shared weights with different structures')
end
isView = true
break