Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/marian.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarcin Junczys-Dowmunt <junczys@amu.edu.pl>2018-07-27 20:14:21 +0300
committerMarcin Junczys-Dowmunt <junczys@amu.edu.pl>2018-07-27 20:14:21 +0300
commitdceb7185d86ed8fd1994e86dc3e3c0e03740ec4a (patch)
tree3514f87aa2da28313043959ebd0381b3ba7de233 /src
parent5cc8674d974bb5cae7bc8f25a51472166164a579 (diff)
parent8b0e2f951b5ce09a622fa7239b2e1e5bd8344fe4 (diff)
fix merge
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt21
-rw-r--r--src/command/marian.cpp37
-rw-r--r--src/common/config_parser.cpp19
-rw-r--r--src/data/batch_generator.h7
-rw-r--r--src/data/corpus_base.h57
-rw-r--r--src/data/corpus_sqlite.cpp2
-rw-r--r--src/functional/predicates.h2
-rw-r--r--src/graph/expression_graph.h7
-rw-r--r--src/graph/expression_operators.cpp13
-rw-r--r--src/graph/expression_operators.h1
-rw-r--r--src/graph/node_operators_binary.h62
-rw-r--r--src/graph/node_operators_unary.h12
-rw-r--r--src/layers/convolution.cpp4
-rw-r--r--src/layers/convolution.h3
-rw-r--r--src/layers/generic.h84
-rw-r--r--src/layers/loss.cpp94
-rw-r--r--src/layers/loss.h70
-rw-r--r--src/layers/weight.cpp21
-rw-r--r--src/layers/weight.h33
-rw-r--r--src/models/costs.h101
-rw-r--r--src/models/encoder_decoder.h8
-rw-r--r--src/models/nematus.h4
-rw-r--r--src/python/CMakeLists.txt42
-rw-r--r--src/python/mariannmt.cpp44
-rw-r--r--src/tensors/allocator.h32
-rw-r--r--src/tensors/cpu/element.h6
-rw-r--r--src/tensors/cpu/prod.cpp24
-rw-r--r--src/tensors/cpu/tensor_operators.cpp82
-rw-r--r--src/tensors/gpu/add.cu1
-rw-r--r--src/tensors/gpu/element.inc1
-rw-r--r--src/tensors/gpu/prod.cu95
-rw-r--r--src/tensors/gpu/prod.h5
-rw-r--r--src/tensors/gpu/tensor_operators.cu284
-rw-r--r--src/tensors/tensor_operators.h6
-rw-r--r--src/training/communicator.cpp13
-rw-r--r--src/training/communicator.cu239
-rw-r--r--src/training/communicator.h178
-rw-r--r--src/training/gradient_dropping/gpu/sparse_algorithm.cu178
-rw-r--r--src/training/gradient_dropping/gpu/sparse_algorithm.h29
-rw-r--r--src/training/gradient_dropping/sparse_tensor.h81
-rw-r--r--src/training/graph_group.h2
-rw-r--r--src/training/graph_group_async_drop.cpp145
-rw-r--r--src/training/graph_group_async_drop.h17
-rw-r--r--src/training/graph_group_multinode.cpp50
-rw-r--r--src/training/graph_group_multinode.h15
-rw-r--r--src/training/graph_group_multinode_sync.cpp280
-rw-r--r--src/training/graph_group_multinode_sync.h305
-rw-r--r--src/training/graph_group_sync.cpp363
-rw-r--r--src/training/graph_group_sync.h124
-rw-r--r--src/training/scheduler.h15
-rw-r--r--src/training/validator.h6
-rw-r--r--src/translator/beam_search.h89
-rw-r--r--src/translator/hypothesis.h4
-rw-r--r--src/translator/output_printer.cpp64
-rw-r--r--src/translator/output_printer.h86
-rw-r--r--src/translator/printer.cpp35
-rw-r--r--src/translator/printer.h56
-rw-r--r--src/translator/scorers.h97
-rw-r--r--src/translator/translator.h8
59 files changed, 2686 insertions, 1077 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index fc8cc8b3..08ed5399 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -37,6 +37,8 @@ add_library(marian STATIC
graph/node_initializers.cpp
layers/convolution.cpp
+ layers/loss.cpp
+ layers/weight.cpp
rnn/cells.cpp
rnn/attention.cpp
@@ -49,6 +51,7 @@ add_library(marian STATIC
translator/history.cpp
translator/output_collector.cpp
+ translator/output_printer.cpp
translator/nth_element.cpp
translator/helpers.cpp
translator/scorers.cpp
@@ -58,10 +61,13 @@ add_library(marian STATIC
training/graph_group_sync.cpp
training/graph_group_singleton.cpp
training/graph_group_multinode.cpp
+ training/graph_group_multinode_sync.cpp
training/validator.cpp
+ training/communicator.cpp
$<TARGET_OBJECTS:libyaml-cpp>
- $<TARGET_OBJECTS:SQLiteCpp>)
+ $<TARGET_OBJECTS:SQLiteCpp>
+)
if(CUDA_FOUND)
cuda_add_library(marian_cuda
@@ -77,6 +83,7 @@ cuda_add_library(marian_cuda
translator/helpers.cu
training/gradient_dropping/gpu/dropper.cu
training/gradient_dropping/gpu/sparse_algorithm.cu
+ training/communicator.cu
STATIC)
endif(CUDA_FOUND)
@@ -98,14 +105,16 @@ set_target_properties(marian_vocab PROPERTIES OUTPUT_NAME marian-vocab)
set(EXECUTABLES ${EXECUTABLES} marian_train marian_decoder marian_scorer marian_vocab)
# marian.zip and marian.tgz
-# This combines marian, marian_decoder in a single ZIP or TAR file for execution in MSFT internal tools FLO and Philly.
-# For Philly submission, we need statically-linked versions to deal with library dependencies, so this target is only enabled for static builds.
+# This combines marian, marian_decoder in a single ZIP or TAR file for
+# execution in MSFT internal tools FLO and Philly.
+# For Philly submission, we need statically-linked versions to deal with
+# library dependencies, so this target is only enabled for static builds.
if(USE_STATIC_LIBS)
add_custom_command(
OUTPUT "${CMAKE_BINARY_DIR}/marian.zip"
COMMAND zip -v -0 -j "${CMAKE_BINARY_DIR}/marian.zip"
"${CMAKE_BINARY_DIR}/marian"
- "${CMAKE_BINARY_DIR}/marian-decoder"
+ "${CMAKE_BINARY_DIR}/marian-decoder"
"${CMAKE_BINARY_DIR}/marian-scorer"
"${CMAKE_BINARY_DIR}/marian-vocab"
DEPENDS marian_train marian_decoder marian_scorer marian_vocab)
@@ -146,10 +155,6 @@ endforeach(exec)
#set_target_properties(align2steps PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}")
-if(PYTHONLIBS_FOUND)
-# add_subdirectory(python)
-endif(PYTHONLIBS_FOUND)
-
if(COMPILE_TESTS)
set(CATCH_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/3rd_party)
add_library(Catch INTERFACE)
diff --git a/src/command/marian.cpp b/src/command/marian.cpp
index 92b7be7a..d2aff81c 100644
--- a/src/command/marian.cpp
+++ b/src/command/marian.cpp
@@ -1,16 +1,17 @@
#include "marian.h"
#include "training/graph_group_async.h"
-#include "training/graph_group_multinode.h"
+#include "training/graph_group_multinode_sync.h"
#include "training/graph_group_singleton.h"
#include "training/graph_group_sync.h"
#include "training/training.h"
#ifdef CUDA_FOUND
#include "training/graph_group_async_drop.h"
+#include "training/graph_group_multinode.h"
#endif
-bool configureMPI(int, char**);
+bool configureMPI(int, char**, bool);
int main(int argc, char** argv) {
using namespace marian;
@@ -19,38 +20,54 @@ int main(int argc, char** argv) {
auto devices = options->getDevices();
if(options->get<bool>("multi-node")) {
- ABORT_IF(!configureMPI(argc, argv), "MPI not found.");
-
+ ABORT_IF(!configureMPI(argc, argv, options->get<bool>("sync-sgd")),
+ "MPI not found.");
LOG(warn, "[experimental] Running multi-node training");
- New<Train<MultiNodeGraphGroup>>(options)->run();
+
+ if(options->get<bool>("sync-sgd")) {
+ New<Train<MultiNodeGraphGroupSync>>(options)->run();
+ }
+ else {
+#ifdef CUDA_FOUND
+ New<Train<MultiNodeGraphGroup>>(options)->run();
+#else
+ ABORT("Asynchronous multi-node training requires CUDA");
+#endif
+ }
} else {
if(devices.size() == 1) {
New<Train<SingletonGraph>>(options)->run();
} else {
- if(options->get<bool>("sync-sgd"))
+ if(options->get<bool>("sync-sgd")) {
New<Train<SyncGraphGroup>>(options)->run();
+ }
+ else if(options->get<float>("grad-dropping-rate") > 0.0) {
#ifdef CUDA_FOUND
- else if(options->get<float>("grad-dropping-rate") > 0.0)
New<Train<AsyncGraphGroupDrop>>(options)->run();
+#else
+ ABORT("Asynchronous training with gradient dropping requires CUDA");
#endif
- else
+ }
+ else {
New<Train<AsyncGraphGroup>>(options)->run();
+ }
}
}
return 0;
}
-bool configureMPI(int argc, char** argv) {
+bool configureMPI(int argc, char** argv, bool sync) {
bool enable = false;
#if MPI_FOUND
+ int required_mode = sync ? MPI_THREAD_SERIALIZED : MPI_THREAD_MULTIPLE;
int provided_thread_mode = 0;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided_thread_mode);
// Enable if occasional truncation errors
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
ABORT_IF(
- provided_thread_mode < MPI_THREAD_MULTIPLE,
+ provided_thread_mode < required_mode,
"Your version of MPI does not support multi-threaded communication.");
enable = true;
diff --git a/src/common/config_parser.cpp b/src/common/config_parser.cpp
index 988e4746..e718217a 100644
--- a/src/common/config_parser.cpp
+++ b/src/common/config_parser.cpp
@@ -490,6 +490,10 @@ void ConfigParser::addOptionsTraining(po::options_description& desc) {
->multitoken()
->default_value(std::vector<std::string>({"0"}), "0"),
"GPU ID(s) to use for training")
+#ifdef USE_NCCL
+ ("no-nccl", po::value<bool>()->zero_tokens()->default_value(false),
+ "Disable inter-GPU communication via NCCL")
+#endif
#ifdef CUDA_FOUND
("cpu-threads", po::value<size_t>()->default_value(0)->implicit_value(1),
"Use CPU-based computation with this many independent threads, 0 means GPU-based computation")
@@ -607,10 +611,6 @@ void ConfigParser::addOptionsTraining(po::options_description& desc) {
("multi-node-overlap", po::value<bool>()
->default_value(true),
"Overlap model computations with MPI communication")
- ("multi-node-local-optimizers", po::value<bool>()
- ->zero_tokens()
- ->default_value(false),
- "Enable local optimizers with multi-node. Requires optimizer delay to be turned on.")
;
// clang-format on
desc.add(training);
@@ -722,9 +722,10 @@ void ConfigParser::addOptionsTranslate(po::options_description& desc) {
"Display n-best list")
("shortlist", po::value<std::vector<std::string>>()->multitoken(),
"Use softmax shortlist: path first best prune")
- ("weights", po::value<std::vector<float>>()
- ->multitoken(),
+ ("weights", po::value<std::vector<float>>()->multitoken(),
"Scorer weights")
+ ("alignment", po::value<float>()->default_value(0.f)->implicit_value(1.f),
+ "Return word alignments")
// TODO: the options should be available only in server
("port,p", po::value<size_t>()->default_value(8080),
"Port number for web socket server")
@@ -1006,7 +1007,10 @@ void ConfigParser::parseOptions(int argc, char** argv, bool doValidate) {
SET_OPTION("multi-node", bool);
SET_OPTION("multi-node-overlap", bool);
- SET_OPTION("multi-node-local-optimizers", bool);
+
+#ifdef USE_NCCL
+ SET_OPTION("no-nccl", bool);
+#endif
}
if(mode_ == ConfigMode::rescoring) {
@@ -1031,6 +1035,7 @@ void ConfigParser::parseOptions(int argc, char** argv, bool doValidate) {
SET_OPTION("mini-batch-words", int);
SET_OPTION_NONDEFAULT("weights", std::vector<float>);
SET_OPTION_NONDEFAULT("shortlist", std::vector<std::string>);
+ SET_OPTION("alignment", float);
SET_OPTION("port", size_t);
SET_OPTION("optimize", bool);
SET_OPTION("max-length-factor", float);
diff --git a/src/data/batch_generator.h b/src/data/batch_generator.h
index 38a8afe9..5cae929b 100644
--- a/src/data/batch_generator.h
+++ b/src/data/batch_generator.h
@@ -207,6 +207,13 @@ public:
return currentBatch_;
}
+ std::vector<BatchPtr> nextN(size_t num) {
+ std::vector<BatchPtr> batches;
+ for(int i = 0; i < num && *this; ++i)
+ batches.push_back(next());
+ return batches;
+ }
+
void prepare(bool shuffle = true) {
if(shuffle)
data_->shuffle();
diff --git a/src/data/corpus_base.h b/src/data/corpus_base.h
index 02b6ca36..c007b41e 100644
--- a/src/data/corpus_base.h
+++ b/src/data/corpus_base.h
@@ -172,33 +172,35 @@ public:
* @see marian::data::Batch::split(size_t n)
*/
std::vector<Ptr<SubBatch>> split(size_t n) {
- std::vector<Ptr<SubBatch>> splits;
+ ABORT_IF(size_ == 0, "Encoutered sub-batch size of 0");
+ std::vector<Ptr<SubBatch>> splits;
size_t subSize = std::ceil(size_ / (float)n);
- size_t totSize = size_;
-
+
+ size_t restSize = size_;
int pos = 0;
for(int k = 0; k < n; ++k) {
- size_t __size__ = std::min(subSize, totSize);
-
- auto sb = New<SubBatch>(__size__, width_, vocab_);
-
- size_t __words__ = 0;
- for(int j = 0; j < width_; ++j) {
- for(int i = 0; i < __size__; ++i) {
- sb->data()[j * __size__ + i] = indices_[j * size_ + pos + i];
- sb->mask()[j * __size__ + i] = mask_[j * size_ + pos + i];
-
- if(mask_[j * size_ + pos + i] != 0)
- __words__++;
+ size_t __size__ = std::min(subSize, restSize);
+ if(__size__ > 0) {
+ auto sb = New<SubBatch>(__size__, width_, vocab_);
+
+ size_t __words__ = 0;
+ for(int j = 0; j < width_; ++j) {
+ for(int i = 0; i < __size__; ++i) {
+ sb->data()[j * __size__ + i] = indices_[j * size_ + pos + i];
+ sb->mask()[j * __size__ + i] = mask_[j * size_ + pos + i];
+
+ if(mask_[j * size_ + pos + i] != 0)
+ __words__++;
+ }
}
- }
- sb->setWords(__words__);
- splits.push_back(sb);
+ sb->setWords(__words__);
+ splits.push_back(sb);
- totSize -= __size__;
- pos += __size__;
+ restSize -= __size__;
+ pos += __size__;
+ }
}
return splits;
}
@@ -260,7 +262,7 @@ public:
* @brief The number of sentences in the batch, target words.
*/
size_t sizeTrg() const { return subBatches_.back()->batchSize(); }
-
+
/**
* @brief The number of words for the longest sentence in the batch plus one.
*/
@@ -291,17 +293,19 @@ public:
Ptr<Options> options) {
std::vector<Ptr<SubBatch>> batches;
+ size_t idx = 0;
for(auto len : lengths) {
auto vocab = New<Vocab>();
vocab->createFake();
auto sb = New<SubBatch>(batchSize, len, vocab); // data: gets initialized to 0. No EOS symbol is distinguished.
+ std::fill(sb->data().begin(), sb->data().end(), idx++); // set word indices to different values to avoid same hashes
std::fill(sb->mask().begin(), sb->mask().end(), 1); // mask: no items ask being masked out
batches.push_back(sb);
}
auto batch = New<CorpusBatch>(batches);
-
+
if(!options) return batch;
if(options->has("guided-alignment")) {
@@ -331,12 +335,17 @@ public:
* @see marian::data::SubBatch::split(size_t n)
*/
std::vector<Ptr<Batch>> split(size_t n) {
+ ABORT_IF(size() == 0, "Encoutered batch size of 0");
+
+ std::vector<std::vector<Ptr<SubBatch>>> subs;
// split each subbatch separately
- std::vector<std::vector<Ptr<SubBatch>>> subs(n);
for(auto subBatch : subBatches_) {
size_t i = 0;
- for(auto splitSubBatch : subBatch->split(n))
+ for(auto splitSubBatch : subBatch->split(n)) {
+ if(subs.size() <= i)
+ subs.resize(i + 1);
subs[i++].push_back(splitSubBatch);
+ }
}
// create batches from split subbatches
diff --git a/src/data/corpus_sqlite.cpp b/src/data/corpus_sqlite.cpp
index 4c185b0c..7a58fa67 100644
--- a/src/data/corpus_sqlite.cpp
+++ b/src/data/corpus_sqlite.cpp
@@ -99,7 +99,7 @@ void CorpusSQLite::fillSQLite() {
}
}
db_->exec("commit;");
- LOG(info, "[sqlite] Inserted {} lines", lines);
+ LOG(info, "[sqlite] Inserted {} lines", lines - 1);
LOG(info, "[sqlite] Creating primary index");
db_->exec("create unique index idx_line on lines (_id);");
}
diff --git a/src/functional/predicates.h b/src/functional/predicates.h
index 51af38ad..45f82fd5 100644
--- a/src/functional/predicates.h
+++ b/src/functional/predicates.h
@@ -1,5 +1,7 @@
#pragma once
+#include <cmath>
+
#include "functional/defs.h"
#include "functional/operands.h"
diff --git a/src/graph/expression_graph.h b/src/graph/expression_graph.h
index d901000c..199994d0 100644
--- a/src/graph/expression_graph.h
+++ b/src/graph/expression_graph.h
@@ -244,12 +244,13 @@ public:
}
}
- void backward() {
+ void backward(bool zero = true) {
ABORT_IF(topNodes_.size() > 1,
"There are more than one top most node for backward step");
params_->allocateBackward();
- params_->set_zero_adjoint();
+ if(zero)
+ params_->set_zero_adjoint();
for(auto&& v : topNodes_)
v->init_dependent();
@@ -264,7 +265,7 @@ public:
nodesBackward_.pop_back();
for(auto&& child : v->children()) {
- if(child->trainable())
+ if(child->trainable() && child->type() != "param")
child->set_zero_adjoint();
}
diff --git a/src/graph/expression_operators.cpp b/src/graph/expression_operators.cpp
index 1666357a..ea8077fa 100644
--- a/src/graph/expression_operators.cpp
+++ b/src/graph/expression_operators.cpp
@@ -313,7 +313,9 @@ Expr affine(Expr a, Expr b, Expr bias, bool transA, bool transB, float scale) {
if(bc != b)
bc = rec2(bc);
- std::vector<Expr> nodes = {ac, bc, bias};
+ int rows = ac->shape().elements() / ac->shape()[-1];
+ Expr ones = ac->graph()->ones({rows, 1});
+ std::vector<Expr> nodes = {ac, bc, bias, ones};
return rec2(Expression<AffineNodeOp>(nodes, transA, transB, scale),
true);
};
@@ -333,13 +335,16 @@ Expr affine(Expr a, Expr b, Expr bias, bool transA, bool transB, float scale) {
}
else {
// general version, MKL, CBlas or CUDA
+
// if clipValue > 0, the inputs will be clipped to range [-clipValue, clipValue]
// This is meant to keep values at the same range as used during training when
// optimizing for 8-bit integer products. Likely to be removed in the future
// when we explore better ways to handle this.
- std::vector<Expr> nodes = {clip(a, clipValue), clip(b, clipValue), bias};
- return Expression<AffineNodeOp>(nodes, transA, transB, scale);
+ int rows = a->shape().elements() / a->shape()[-1];
+ Expr ones = a->graph()->ones({rows, 1});
+ std::vector<Expr> nodes = {clip(a, clipValue), clip(b, clipValue), bias, ones};
+ return Expression<AffineNodeOp>(nodes, transA, transB, scale);
}
}
@@ -462,6 +467,7 @@ Expr shift(Expr a, Shape shift, float padValue) {
//}
#ifdef CUDA_FOUND
+#ifdef CUDNN
Expr avg_pooling(Expr x,
int height,
@@ -526,4 +532,5 @@ Expr pooling_with_masking(Expr x, Expr mask, int width, bool isEven) {
}
#endif
+#endif
}
diff --git a/src/graph/expression_operators.h b/src/graph/expression_operators.h
index cc07dafb..53cf5966 100644
--- a/src/graph/expression_operators.h
+++ b/src/graph/expression_operators.h
@@ -106,7 +106,6 @@ Expr flatten_2d(Expr a);
Expr rows(Expr a, const std::vector<size_t>& indices);
Expr cols(Expr a, const std::vector<size_t>& indices);
-
Expr select(Expr a, int axis, const std::vector<size_t>& indices);
/*********************************************************/
diff --git a/src/graph/node_operators_binary.h b/src/graph/node_operators_binary.h
index 5b1f9865..ea2a3dfe 100644
--- a/src/graph/node_operators_binary.h
+++ b/src/graph/node_operators_binary.h
@@ -4,9 +4,12 @@
#include "functional/functional.h"
#include "graph/node.h"
-#include "tensors/gpu/cudnn_wrappers.h"
#include "tensors/tensor_operators.h"
+#ifdef CUDNN
+#include "tensors/gpu/cudnn_wrappers.h"
+#endif
+
namespace marian {
class DotNodeOp : public NaryNodeOp {
@@ -167,15 +170,17 @@ public:
NodeOps forwardOps() {
using namespace functional;
+
return {
- NodeOp(ProdWithBias(val_,
- child(0)->val(),
- child(1)->val(),
- child(2)->val(),
- transA_,
- transB_,
- 0.f,
- scalar_))
+ NodeOp(Prod(val_,
+ child(0)->val(),
+ child(1)->val(),
+ transA_, transB_, 0.f, scalar_);
+ Prod(val_,
+ child(3)->val(),
+ child(2)->val(),
+ false, false, 1.f, 1.f)
+ )
};
}
@@ -202,7 +207,12 @@ public:
false,
1.0,
scalar_)),
- NodeOp(Add(_1, child(2)->grad(), adj_))};
+ NodeOp(Prod(child(2)->grad(),
+ child(3)->val(), adj_,
+ true, false,
+ 0.f, 1.f))
+ //NodeOp(Add(_1, child(2)->grad(), adj_))
+ };
if(transA_ && !transB_)
return {NodeOp(Prod(child(0)->grad(),
@@ -219,7 +229,12 @@ public:
false,
1.0,
scalar_)),
- NodeOp(Add(_1, child(2)->grad(), adj_))};
+ NodeOp(Prod(child(2)->grad(),
+ child(3)->val(), adj_,
+ true, false,
+ 0.f, 1.f))
+ //NodeOp(Add(_1, child(2)->grad(), adj_))
+ };
if(transA_ && transB_)
return {NodeOp(Prod(child(0)->grad(),
@@ -236,7 +251,12 @@ public:
true,
1.0,
scalar_)),
- NodeOp(Add(_1, child(2)->grad(), adj_))};
+ NodeOp(Prod(child(2)->grad(),
+ child(3)->val(), adj_,
+ true, false,
+ 0.f, 1.f))
+ //NodeOp(Add(_1, child(2)->grad(), adj_))
+ };
return {NodeOp(Prod(child(0)->grad(),
adj_,
@@ -252,7 +272,12 @@ public:
false,
1.0,
scalar_)),
- NodeOp(Add(_1, child(2)->grad(), adj_))};
+ NodeOp(Prod(child(2)->grad(),
+ child(3)->val(), adj_,
+ true, false,
+ 0.f, 1.f))
+ //NodeOp(Add(_1, child(2)->grad(), adj_))
+ };
}
const std::string type() { return "affine"; }
@@ -294,6 +319,7 @@ public:
NodeOps forwardOps() {
// C = alpha * dot(op(A), op(B))
return {NodeOp(ProdBatched(val_,
+ graph()->allocator(),
child(0)->val(),
child(1)->val(),
transA_,
@@ -311,6 +337,7 @@ public:
if(!transA_ && transB_)
return {NodeOp(ProdBatched(child(0)->grad(),
+ graph()->allocator(),
adj_,
child(1)->val(),
false,
@@ -318,6 +345,7 @@ public:
1.0,
scalar_)),
NodeOp(ProdBatched(child(1)->grad(),
+ graph()->allocator(),
adj_,
child(0)->val(),
true,
@@ -327,6 +355,7 @@ public:
if(transA_ && !transB_)
return {NodeOp(ProdBatched(child(0)->grad(),
+ graph()->allocator(),
child(1)->val(),
adj_,
false,
@@ -334,6 +363,7 @@ public:
1.0,
scalar_)),
NodeOp(ProdBatched(child(1)->grad(),
+ graph()->allocator(),
child(0)->val(),
adj_,
false,
@@ -343,6 +373,7 @@ public:
if(transA_ && transB_)
return {NodeOp(ProdBatched(child(0)->grad(),
+ graph()->allocator(),
child(1)->val(),
adj_,
true,
@@ -350,6 +381,7 @@ public:
1.0,
scalar_)),
NodeOp(ProdBatched(child(1)->grad(),
+ graph()->allocator(),
adj_,
child(0)->val(),
true,
@@ -358,6 +390,7 @@ public:
scalar_))};
return {NodeOp(ProdBatched(child(0)->grad(),
+ graph()->allocator(),
adj_,
child(1)->val(),
false,
@@ -365,6 +398,7 @@ public:
1.0,
scalar_)),
NodeOp(ProdBatched(child(1)->grad(),
+ graph()->allocator(),
child(0)->val(),
adj_,
true,
@@ -766,6 +800,7 @@ struct HighwayNodeOp : public NaryNodeOp {
const std::string type() { return "highway"; }
};
+#ifdef CUDNN
class ConvolutionOp : public NaryNodeOp {
public:
ConvolutionOp(const std::vector<Expr>& nodes,
@@ -802,4 +837,5 @@ public:
protected:
ConvolutionWrapper conv_;
};
+#endif
}
diff --git a/src/graph/node_operators_unary.h b/src/graph/node_operators_unary.h
index fa6d25c7..d7ef751d 100644
--- a/src/graph/node_operators_unary.h
+++ b/src/graph/node_operators_unary.h
@@ -7,7 +7,9 @@
#include "graph/node.h"
#include "tensors/tensor_operators.h"
-//#include "tensors/gpu/cudnn_wrappers.h"
+#ifdef CUDNN
+#include "tensors/gpu/cudnn_wrappers.h"
+#endif
namespace marian {
@@ -815,7 +817,7 @@ struct TransposeNodeOp : public UnaryNodeOp {
}
NodeOps backwardOps() {
- return {NodeOp(TransposeND(child(0)->grad(), adj_, axes_))};
+ return {NodeOp(TransposeNDGrad(child(0)->grad(), adj_, axes_))};
}
template <class... Args>
@@ -1009,7 +1011,9 @@ struct ShiftNodeOp : public UnaryNodeOp {
}
NodeOps backwardOps() {
- return {NodeOp(Shift(child(0)->grad(), adj_, shift_, /*padValue=*/0.f, /*invert=*/true))};
+ // last parameter beta=1 says to use += (out = in + beta * out)
+ // @TODO: check need for padValue_
+ return {NodeOp(ShiftGrad(child(0)->grad(), adj_, shift_, true))};
}
const std::string type() { return "shift"; }
@@ -1076,6 +1080,7 @@ struct ShiftNodeOp : public UnaryNodeOp {
// Ptr<sparse::CSR> lf_;
//};
+#ifdef CUDNN
class PoolingOp : public UnaryNodeOp {
public:
PoolingOp(Expr x,
@@ -1109,6 +1114,7 @@ public:
protected:
PoolingWrapper pooling_;
};
+#endif
class PoolingWithMaskingOp : public UnaryNodeOp {
public:
diff --git a/src/layers/convolution.cpp b/src/layers/convolution.cpp
index eb1b0554..d4298e0b 100644
--- a/src/layers/convolution.cpp
+++ b/src/layers/convolution.cpp
@@ -2,6 +2,8 @@
#include "graph/node_operators_binary.h"
namespace marian {
+
+#ifdef CUDNN
Convolution::Convolution(Ptr<ExpressionGraph> graph) : Factory(graph) {}
Expr Convolution::apply(Expr x) {
@@ -29,4 +31,6 @@ Expr Convolution::apply(const std::vector<Expr>&) {
ABORT("Can't apply convolution on many inputs at once");
return nullptr;
}
+#endif
+
}
diff --git a/src/layers/convolution.h b/src/layers/convolution.h
index ae2c5c74..24ee6dd3 100644
--- a/src/layers/convolution.h
+++ b/src/layers/convolution.h
@@ -7,6 +7,7 @@
namespace marian {
+#ifdef CUDNN
class Convolution : public Factory {
protected:
Ptr<Options> getOptions() { return options_; }
@@ -82,4 +83,6 @@ protected:
std::vector<int> kernelNums_;
int stride_;
};
+#endif
+
}
diff --git a/src/layers/generic.h b/src/layers/generic.h
index 8b19123b..ff404af7 100644
--- a/src/layers/generic.h
+++ b/src/layers/generic.h
@@ -2,11 +2,14 @@
#include "marian.h"
-#include "layers/factory.h"
#include "data/shortlist.h"
+#include "layers/factory.h"
namespace marian {
namespace mlp {
+/**
+ * @brief Activation functions
+ */
enum struct act : int { linear, tanh, sigmoid, ReLU, LeakyReLU, PReLU, swish };
}
}
@@ -64,12 +67,9 @@ public:
if(inputs.size() > 1)
num = std::to_string(i);
- Expr W = g->param(name + "_W" + num,
- {in->shape()[-1], dim},
- inits::glorot_uniform);
- Expr b = g->param(name + "_b" + num,
- {1, dim},
- inits::zeros);
+ Expr W = g->param(
+ name + "_W" + num, {in->shape()[-1], dim}, inits::glorot_uniform);
+ Expr b = g->param(name + "_b" + num, {1, dim}, inits::zeros);
if(useLayerNorm) {
if(useNematusNorm) {
@@ -82,9 +82,8 @@ public:
outputs.push_back(layerNorm(affine(in, W, b), ln_s, ln_b, NEMATUS_LN_EPS));
} else {
- auto gamma = g->param(name + "_gamma" + num,
- {1, dim},
- inits::from_value(1.0));
+ auto gamma = g->param(
+ name + "_gamma" + num, {1, dim}, inits::from_value(1.0));
outputs.push_back(layerNorm(dot(in, W), gamma, b));
}
@@ -107,9 +106,7 @@ public:
}
};
- Expr apply(Expr input) {
- return apply(std::vector<Expr>({input}));
- }
+ Expr apply(Expr input) { return apply(std::vector<Expr>({input})); }
};
class Output : public Layer {
@@ -129,9 +126,7 @@ public:
tiedParams_[param] = graph_->get(tied);
}
- void set_shortlist(Ptr<data::Shortlist> shortlist) {
- shortlist_ = shortlist;
- }
+ void set_shortlist(Ptr<data::Shortlist> shortlist) { shortlist_ = shortlist; }
Expr apply(Expr input) {
if(!W_) {
@@ -146,15 +141,13 @@ public:
W_ = rows(W_, shortlist_->indices());
} else {
W_ = graph_->param(name + "_" + nameW,
- {input->shape()[-1], dim},
- inits::glorot_uniform);
+ {input->shape()[-1], dim},
+ inits::glorot_uniform);
if(shortlist_)
W_ = cols(W_, shortlist_->indices());
}
- b_ = graph_->param(name + "_b",
- {1, dim},
- inits::zeros);
+ b_ = graph_->param(name + "_b", {1, dim}, inits::zeros);
if(shortlist_)
b_ = cols(b_, shortlist_->indices());
}
@@ -165,10 +158,8 @@ public:
virtual Expr apply(const std::vector<Expr>& inputs) {
ABORT("Not implemented");
};
-
};
-
} // namespace mlp
struct EmbeddingFactory : public Factory {
@@ -195,51 +186,4 @@ struct EmbeddingFactory : public Factory {
};
typedef Accumulator<EmbeddingFactory> embedding;
-
-static inline Expr Cost(Expr logits,
- Expr indices,
- Expr mask,
- std::string costType = "cross-entropy",
- float smoothing = 0,
- Expr weights = nullptr) {
- using namespace keywords;
-
- auto ce = cross_entropy(logits, indices);
-
- if(weights)
- ce = weights * ce;
-
- if(smoothing > 0) {
- // @TODO: add this to CE kernels instead
- auto ceq = mean(logsoftmax(logits), axis = -1);
- ce = (1 - smoothing) * ce - smoothing * ceq;
- }
-
- if(mask)
- ce = ce * mask;
-
- auto costSum = sum(ce, axis = -3);
-
- Expr cost;
- // axes:
- // - time axis (words): -3
- // - batch axis (sentences): -2
- if(costType == "ce-mean"
- || costType
- == "cross-entropy") { // sum over words; average over sentences
- cost = mean(costSum, axis = -2);
- } else if(costType == "ce-mean-words") { // average over target tokens
- cost = sum(costSum, axis = -2) / sum(sum(mask, axis = -3), axis = -2);
- } else if(costType == "ce-sum") { // sum over target tokens
- cost = sum(costSum, axis = -2);
- } else if(costType == "perplexity") { // ==exp('ce-mean-words')
- cost = exp(sum(costSum, axis = -2) / sum(sum(mask, axis = -3), axis = -2));
- } else if(costType == "ce-rescore") { // sum over words, keep batch axis
- cost = -costSum;
- } else { // same as ce-mean
- cost = mean(costSum, axis = -2);
- }
-
- return cost;
-}
}
diff --git a/src/layers/loss.cpp b/src/layers/loss.cpp
new file mode 100644
index 00000000..c84895f4
--- /dev/null
+++ b/src/layers/loss.cpp
@@ -0,0 +1,94 @@
+#include "layers/loss.h"
+
+namespace marian {
+
+Ptr<LossBase> LossFactory(Ptr<Options> options, bool inference) {
+ float smoothing = inference ? 0.f : options->get<float>("label-smoothing");
+ std::string costType = options->get<std::string>("cost-type", "ce-mean");
+ if(costType == "ce-mean" || costType == "cross-entropy") {
+ return New<CrossEntropyMeanLoss>(smoothing);
+ } else if(costType == "ce-mean-words") {
+ return New<CrossEntropyMeanWordsLoss>(smoothing);
+ } else if(costType == "ce-sum") {
+ return New<CrossEntropySumLoss>(smoothing);
+ } else if(costType == "perplexity") {
+ return New<PerplexityLoss>(smoothing);
+ } else if(costType == "ce-rescore") {
+ return New<CrossEntropyRescoreLoss>(smoothing);
+ } else { // same as ce-mean
+ return New<CrossEntropyMeanLoss>(smoothing);
+ }
+}
+
+Expr LossBase::getCrossEntropy(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights) {
+ using namespace keywords;
+
+ auto ce = cross_entropy(logits, indices);
+
+ if(smoothing_ > 0) {
+ // @TODO: add this to CE kernels instead
+ auto ceq = mean(logsoftmax(logits), axis = -1);
+ ce = (1 - smoothing_) * ce - smoothing_ * ceq;
+ }
+
+ if(mask)
+ ce = ce * mask;
+
+ if(weights)
+ ce = ce * weights;
+
+ return ce;
+}
+
+Expr CrossEntropyMeanLoss::getCost(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights) {
+ using namespace keywords;
+ auto ce = getCrossEntropy(logits, indices, mask, weights);
+ // Time axis (words): -3
+ // Batch axis (sentences): -2
+ return mean(sum(ce, axis = -3), axis = -2);
+}
+
+Expr CrossEntropyMeanWordsLoss::getCost(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights) {
+ using namespace keywords;
+ auto ce = getCrossEntropy(logits, indices, mask, weights);
+ return sum(sum(ce, axis = -3), axis = -2)
+ / sum(sum(mask, axis = -3), axis = -2);
+}
+
+Expr CrossEntropySumLoss::getCost(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights) {
+ using namespace keywords;
+ auto ce = getCrossEntropy(logits, indices, mask, weights);
+ return sum(sum(ce, axis = -3), axis = -2);
+}
+
+Expr PerplexityLoss::getCost(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights) {
+ using namespace keywords;
+ auto ce = getCrossEntropy(logits, indices, mask, weights);
+ return exp(sum(sum(ce, axis = -3), axis = -2)
+ / sum(sum(mask, axis = -3), axis = -2));
+}
+
+Expr CrossEntropyRescoreLoss::getCost(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights) {
+ using namespace keywords;
+ auto ce = getCrossEntropy(logits, indices, mask, weights);
+ return -sum(ce, axis = -3);
+}
+}
diff --git a/src/layers/loss.h b/src/layers/loss.h
new file mode 100644
index 00000000..9f3bb345
--- /dev/null
+++ b/src/layers/loss.h
@@ -0,0 +1,70 @@
+#pragma once
+
+#include "marian.h"
+
+namespace marian {
+class LossBase {
+protected:
+ float smoothing_;
+
+public:
+ explicit LossBase(float smoothing = 0) : smoothing_(smoothing){};
+
+ Expr getCrossEntropy(Expr logits, Expr indices, Expr mask, Expr weights);
+ virtual Expr getCost(Expr logits,
+ Expr indices,
+ Expr mask,
+ Expr weights = nullptr)
+ = 0;
+};
+
+/*
+ * @brief The cross entropy loss function
+ *
+ * A sum over words and average over sentences
+ */
+class CrossEntropyMeanLoss : public LossBase {
+public:
+ explicit CrossEntropyMeanLoss(float smoothing = 0) : LossBase(smoothing){};
+ Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights);
+};
+
+/*
+ * @brief The cross entropy loss function as an average over target tokens
+ */
+class CrossEntropyMeanWordsLoss : public LossBase {
+public:
+ explicit CrossEntropyMeanWordsLoss(float smoothing = 0)
+ : LossBase(smoothing){};
+ Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights);
+};
+
+/*
+ * @brief The cross entropy loss function as a sum over target tokens
+ */
+class CrossEntropySumLoss : public LossBase {
+public:
+ explicit CrossEntropySumLoss(float smoothing = 0) : LossBase(smoothing){};
+ Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights);
+};
+
+/*
+ * @brief The perplexity loss function
+ */
+class PerplexityLoss : public LossBase {
+public:
+ explicit PerplexityLoss(float smoothing = 0) : LossBase(smoothing){};
+ Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights);
+};
+
+/*
+ * @brief The cross entropy loss function that keeps sentence-level costs
+ */
+class CrossEntropyRescoreLoss : public LossBase {
+public:
+ explicit CrossEntropyRescoreLoss(float smoothing = 0) : LossBase(smoothing){};
+ Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights);
+};
+
+Ptr<LossBase> LossFactory(Ptr<Options> options, bool inference);
+}
diff --git a/src/layers/weight.cpp b/src/layers/weight.cpp
new file mode 100644
index 00000000..01d7760e
--- /dev/null
+++ b/src/layers/weight.cpp
@@ -0,0 +1,21 @@
+#include "layers/weight.h"
+
+namespace marian {
+
+Ptr<WeightingBase> WeightingFactory(Ptr<Options> options) {
+ if(options->has("data-weighting"))
+ return New<DataWeighting>(options->get<std::string>("data-weighting-type"));
+}
+
+Expr DataWeighting::getWeights(Ptr<ExpressionGraph> graph,
+ Ptr<data::CorpusBatch> batch) {
+ ABORT_IF(batch->getDataWeights().empty(),
+ "Vector of weights is unexpectedly empty!");
+ bool sentenceWeighting = weightingType_ == "sentence";
+ int dimBatch = batch->size();
+ int dimWords = sentenceWeighting ? 1 : batch->back()->batchWidth();
+ auto weights = graph->constant({1, dimWords, dimBatch, 1},
+ inits::from_vector(batch->getDataWeights()));
+ return weights;
+}
+}
diff --git a/src/layers/weight.h b/src/layers/weight.h
new file mode 100644
index 00000000..0b79ee13
--- /dev/null
+++ b/src/layers/weight.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "common/options.h"
+#include "data/corpus.h"
+#include "graph/expression_graph.h"
+#include "graph/expression_operators.h"
+#include "graph/node_initializers.h"
+
+namespace marian {
+
+class WeightingBase {
+public:
+ WeightingBase(){};
+ virtual Expr getWeights(Ptr<ExpressionGraph> graph,
+ Ptr<data::CorpusBatch> batch)
+ = 0;
+ virtual void debugWeighting(std::vector<float> weightedMask,
+ std::vector<float> freqMask,
+ Ptr<data::CorpusBatch> batch){};
+};
+
+class DataWeighting : public WeightingBase {
+protected:
+ std::string weightingType_;
+
+public:
+ DataWeighting(std::string weightingType)
+ : WeightingBase(), weightingType_(weightingType){};
+ Expr getWeights(Ptr<ExpressionGraph> graph, Ptr<data::CorpusBatch> batch);
+};
+
+Ptr<WeightingBase> WeightingFactory(Ptr<Options> options);
+}
diff --git a/src/models/costs.h b/src/models/costs.h
index 207be2e9..9649492e 100644
--- a/src/models/costs.h
+++ b/src/models/costs.h
@@ -1,8 +1,10 @@
#pragma once
-#include "models/encoder_decoder.h"
#include "layers/generic.h"
#include "layers/guided_alignment.h"
+#include "layers/loss.h"
+#include "layers/weight.h"
+#include "models/encoder_decoder.h"
namespace marian {
namespace models {
@@ -12,58 +14,57 @@ public:
virtual Expr apply(Ptr<ModelBase> model,
Ptr<ExpressionGraph> graph,
Ptr<data::Batch> batch,
- bool clearGraph = true) = 0;
+ bool clearGraph = true)
+ = 0;
};
-
class EncoderDecoderCE : public CostBase {
protected:
Ptr<Options> options_;
+ bool inference_{false};
+ bool toBeWeighted_{false};
+ Ptr<LossBase> loss_;
+ Ptr<WeightingBase> weighter_;
+
public:
EncoderDecoderCE(Ptr<Options> options)
- : options_(options) {}
+ : options_(options), inference_(options->get<bool>("inference", false)) {
+ loss_ = LossFactory(options_, inference_);
+
+ toBeWeighted_ = (options_->has("data-weighting") && !inference_)
+ || (options_->has("dynamic-weighting")
+ && options_->get<bool>("dynamic-weighting")
+ && !inference_);
+ if(toBeWeighted_)
+ weighter_ = WeightingFactory(options_);
+ }
Expr apply(Ptr<ModelBase> model,
Ptr<ExpressionGraph> graph,
Ptr<data::Batch> batch,
bool clearGraph = true) {
-
auto encdec = std::static_pointer_cast<EncoderDecoder>(model);
auto corpusBatch = std::static_pointer_cast<data::CorpusBatch>(batch);
auto state = encdec->stepAll(graph, corpusBatch, clearGraph);
- std::string costType = options_->get<std::string>("cost-type");
- bool inference = options_->get<bool>("inference", false);
-
- float ls = inference ? 0.f : options_->get<float>("label-smoothing");
+ float ls = inference_ ? 0.f : options_->get<float>("label-smoothing");
Expr weights;
+ Expr cost;
bool sentenceWeighting = false;
- if(options_->has("data-weighting") && !inference) {
- ABORT_IF(corpusBatch->getDataWeights().empty(),
- "Vector of weights is unexpectedly empty!");
-
- sentenceWeighting
- = options_->get<std::string>("data-weighting-type") == "sentence";
- int dimBatch = corpusBatch->size();
- int dimWords = sentenceWeighting ? 1 : corpusBatch->back()->batchWidth();
-
- weights = graph->constant({1, dimWords, dimBatch, 1},
- inits::from_vector(corpusBatch->getDataWeights()));
+ if(toBeWeighted_) {
+ weights = weighter_->getWeights(graph, corpusBatch);
}
- auto cost
- = Cost(state->getProbs(),
- state->getTargetIndices(),
- state->getTargetMask(),
- costType,
- ls,
- weights);
+ cost = loss_->getCost(state->getProbs(),
+ state->getTargetIndices(),
+ state->getTargetMask(),
+ weights);
- if(options_->has("guided-alignment") && !inference) {
+ if(options_->has("guided-alignment") && !inference_) {
auto alignments = encdec->getDecoders()[0]->getAlignments();
ABORT_IF(alignments.empty(), "Model does not seem to support alignments");
@@ -73,8 +74,6 @@ public:
} else {
return cost;
}
-
- return cost;
}
};
@@ -85,7 +84,7 @@ protected:
public:
Trainer(Ptr<ModelBase> model, Ptr<CostBase> cost)
- : model_(model), cost_(cost) {}
+ : model_(model), cost_(cost) {}
Ptr<ModelBase> getModel() { return model_; }
@@ -104,16 +103,10 @@ public:
virtual Expr build(Ptr<ExpressionGraph> graph,
Ptr<data::Batch> batch,
bool clearGraph = true) {
- return cost_->apply(model_,
- graph,
- batch,
- clearGraph);
- };
-
- virtual void clear(Ptr<ExpressionGraph> graph) {
- model_->clear(graph);
+ return cost_->apply(model_, graph, batch, clearGraph);
};
+ virtual void clear(Ptr<ExpressionGraph> graph) { model_->clear(graph); };
};
typedef Trainer Scorer;
@@ -138,11 +131,11 @@ protected:
public:
Stepwise(Ptr<EncoderDecoderBase> encdec, Ptr<CostStep> cost)
- : encdec_(encdec), cost_(cost) {}
+ : encdec_(encdec), cost_(cost) {}
virtual void load(Ptr<ExpressionGraph> graph,
const std::string& name,
- bool markedReloaded = true) {
+ bool markedReloaded = true) {
encdec_->load(graph, name, markedReloaded);
}
@@ -152,9 +145,7 @@ public:
encdec_->save(graph, name, saveTranslatorConfig);
}
- virtual void clear(Ptr<ExpressionGraph> graph) {
- encdec_->clear(graph);
- }
+ virtual void clear(Ptr<ExpressionGraph> graph) { encdec_->clear(graph); }
virtual Expr build(Ptr<ExpressionGraph> graph,
Ptr<data::Batch> batch,
@@ -174,7 +165,8 @@ public:
const std::vector<size_t>& embIndices,
int dimBatch,
int beamSize) {
- auto nextState = encdec_->step(graph, state, hypIndices, embIndices, dimBatch, beamSize);
+ auto nextState = encdec_->step(
+ graph, state, hypIndices, embIndices, dimBatch, beamSize);
return cost_->apply(nextState);
}
@@ -185,11 +177,10 @@ public:
return nullptr;
}
- virtual Ptr<Options> getOptions() {
- return encdec_->getOptions();
- };
+ virtual Ptr<Options> getOptions() { return encdec_->getOptions(); };
- virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) {
+ virtual void setShortlistGenerator(
+ Ptr<data::ShortlistGenerator> shortlistGenerator) {
encdec_->setShortlistGenerator(shortlistGenerator);
};
@@ -197,10 +188,14 @@ public:
return encdec_->getShortlist();
};
+ virtual std::vector<float> getAlignment() {
+ return encdec_->getAlignment();
+ }
};
-static Ptr<ModelBase> add_cost(Ptr<EncoderDecoder> encdec, Ptr<Options> options) {
- switch (options->get<usage>("usage", usage::raw)) {
+static Ptr<ModelBase> add_cost(Ptr<EncoderDecoder> encdec,
+ Ptr<Options> options) {
+ switch(options->get<usage>("usage", usage::raw)) {
case usage::training:
return New<Trainer>(encdec, New<EncoderDecoderCE>(options));
case usage::scoring:
@@ -208,10 +203,8 @@ static Ptr<ModelBase> add_cost(Ptr<EncoderDecoder> encdec, Ptr<Options> options)
case usage::translation:
return New<Stepwise>(encdec, New<LogsoftmaxStep>());
case usage::raw:
- default:
- return encdec;
+ default: return encdec;
}
}
-
}
}
diff --git a/src/models/encoder_decoder.h b/src/models/encoder_decoder.h
index e1a07d1f..4eee31c5 100644
--- a/src/models/encoder_decoder.h
+++ b/src/models/encoder_decoder.h
@@ -44,6 +44,8 @@ public:
virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) = 0;
virtual Ptr<data::Shortlist> getShortlist() = 0;
+
+ virtual std::vector<float> getAlignment() = 0;
};
class EncoderDecoder : public EncoderDecoderBase {
@@ -113,6 +115,12 @@ public:
return decoders_[0]->getShortlist();
};
+ virtual std::vector<float> getAlignment() {
+ std::vector<float> softAlign;
+ decoders_[0]->getAlignments()[0]->val()->get(softAlign);
+ return softAlign;
+ };
+
/*********************************************************************/
virtual Ptr<DecoderState> startState(Ptr<ExpressionGraph> graph,
diff --git a/src/models/nematus.h b/src/models/nematus.h
index 82b77c68..3d23f5fc 100644
--- a/src/models/nematus.h
+++ b/src/models/nematus.h
@@ -21,6 +21,10 @@ public:
ABORT_IF(options_->get<std::string>("dec-cell") != "gru-nematus",
"--type nematus does not currently support other rnn cells "
"than gru-nematus, use --type s2s");
+
+ ABORT_IF(options_->get<int>("dec-cell-high-depth") > 1,
+ "--type nematus does not currently support "
+ "--dec-cell-high-depth > 1, use --type s2s");
}
void load(Ptr<ExpressionGraph> graph,
diff --git a/src/python/CMakeLists.txt b/src/python/CMakeLists.txt
deleted file mode 100644
index 9d54c01a..00000000
--- a/src/python/CMakeLists.txt
+++ /dev/null
@@ -1,42 +0,0 @@
-cuda_add_library(pymarian SHARED
- mariannmt.cpp
- ../3rd_party/cnpy/cnpy.cpp
- ../3rd_party/exception.cpp
- ../3rd_party/svd/svd.cpp
- ../graph/expression_graph.cpp
- ../graph/expression_operators.cu
- ../graph/node.cu
- ../graph/node_operators.cu
- ../tensors/tensor.cu
- ../tensors/device.cpp
- ../kernels/tensor_operators.cu
- ../tensors/gpu/dropout.cu
- ../tensors/cpu/dropout.cpp
- ../kernels/sparse.cu
- #../layers/param_initializers.cu
- ../rnn/attention.cu
- ../rnn/cells.cu
- #../optimizers/clippers.cu
- #../optimizers/optimizers.cu
- ../common/utils.cpp
- ../common/logging.cpp
- ../common/config.cpp
- ../common/config_parser.cpp
- ../translator/history.cpp
- ../translator/output_collector.cpp
- ../translator/nth_element.cu
- ../translator/helpers.cu
- ../data/vocab.cpp
- ../data/corpus.cpp
- ../data/text_input.cpp
- #../rescorer/score_collector.cpp
- $<TARGET_OBJECTS:libyaml-cpp>
-)
-
-set_target_properties(pymarian PROPERTIES EXCLUDE_FROM_ALL 1)
-set_target_properties(pymarian PROPERTIES OUTPUT_NAME mariannmt)
-set_target_properties(pymarian PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}")
-set_target_properties(pymarian PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}")
-
-target_link_libraries(pymarian ${EXT_LIBS} marian)
-cuda_add_cublas_to_target(pymarian)
diff --git a/src/python/mariannmt.cpp b/src/python/mariannmt.cpp
deleted file mode 100644
index d5bdd161..00000000
--- a/src/python/mariannmt.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-#include <cstdlib>
-#include <iostream>
-#include <string>
-
-#include <boost/python.hpp>
-
-#include "common/utils.h"
-#include "common/version.h"
-#include "translator/beam_search.h"
-#include "translator/translator.h"
-
-using namespace marian;
-
-Ptr<TranslateServiceMultiGPU<BeamSearch>> task;
-
-void init(const std::string& argopts) {
- auto options = New<Config>(argopts, ConfigMode::translating);
- task = New<TranslateServiceMultiGPU<BeamSearch>>(options);
- LOG(info, "Translator initialized");
-}
-
-boost::python::list translate(boost::python::list& pyinput) {
- std::vector<std::string> input;
- for(int i = 0; i < boost::python::len(pyinput); ++i) {
- input.emplace_back(
- boost::python::extract<std::string>(boost::python::object(pyinput[i])));
- }
-
- auto output = task->run(input);
-
- boost::python::list pyoutput;
- pyoutput.append(Join(output, "\n"));
- return pyoutput;
-}
-
-std::string version() {
- return PROJECT_VERSION;
-}
-
-BOOST_PYTHON_MODULE(libmariannmt) {
- boost::python::def("init", init);
- boost::python::def("translate", translate);
- boost::python::def("version", version);
-}
diff --git a/src/tensors/allocator.h b/src/tensors/allocator.h
index 4285a91b..3ec8c85b 100644
--- a/src/tensors/allocator.h
+++ b/src/tensors/allocator.h
@@ -16,9 +16,27 @@
namespace marian {
class AllocationException : public std::exception {
+private:
+ char* message_;
+
public:
- virtual const char* what() const throw() {
- return "Memory re-allocation attempted";
+ AllocationException(size_t available, size_t asked) {
+ std::string mstr = "Attempted allocation of "
+ + std::to_string(asked)
+ + ", but only "
+ + std::to_string(available)
+ + " free";
+
+ message_ = new char[mstr.size() + 1];
+ std::copy(mstr.begin(), mstr.end(), message_);
+ }
+
+ ~AllocationException() {
+ delete[] message_;
+ }
+
+ virtual const char* what() const noexcept {
+ return message_;
}
};
@@ -111,7 +129,7 @@ private:
auto it = std::lower_bound(gaps_.begin(), gaps_.end(), Gap(nullptr, size));
if(throw_ && it == gaps_.end()) {
- throw AllocationException();
+ throw AllocationException(available_, size);
}
while(it == gaps_.end()) {
@@ -119,8 +137,11 @@ private:
it = std::lower_bound(gaps_.begin(), gaps_.end(), Gap(nullptr, size));
}
- available_ -= it->size();
- return *it;
+ Gap gap = *it;
+ gaps_.erase(it);
+
+ available_ -= gap.size();
+ return gap;
}
void insertGap(Gap gap, bool consolidate = true) {
@@ -186,7 +207,6 @@ public:
bytes = align(bytes);
Gap gap = getGap(bytes);
- gaps_.erase(gap);
if(gap.size() > bytes) {
insertGap(gap.rest(bytes), false);
}
diff --git a/src/tensors/cpu/element.h b/src/tensors/cpu/element.h
index 23750bcd..a383edb0 100644
--- a/src/tensors/cpu/element.h
+++ b/src/tensors/cpu/element.h
@@ -23,9 +23,9 @@ template <size_t I = 0> struct E {
functional::Array<functional::Tensor<float>, K>& tensors,
functional::Array<int, K> indices) {
- auto& shape = tensors[0].shape();
+ const auto& shape = tensors[0].shape();
- // loop for outer-most dimension
+ // loop over outer-most dimension
for(int i = 0; i < shape[I]; ++i) {
// call loop for next-inner dimension
@@ -66,7 +66,7 @@ void Element(const Functor& functor, marian::Tensor out, Tensors... tensors) {
// call elementwise operation going from outer-most dimension
// to inner-most element.
- E<>::element(functor, gTensors, indices);
+ E<0>::element(functor, gTensors, indices);
}
}
diff --git a/src/tensors/cpu/prod.cpp b/src/tensors/cpu/prod.cpp
index c5d86479..bfdb8573 100644
--- a/src/tensors/cpu/prod.cpp
+++ b/src/tensors/cpu/prod.cpp
@@ -95,8 +95,9 @@ void Prod(marian::Tensor C,
}
void ProdBatched(marian::Tensor C,
- const marian::Tensor& A,
- const marian::Tensor& B,
+ Ptr<Allocator> allocator,
+ const marian::Tensor A,
+ const marian::Tensor B,
bool transA,
bool transB,
float beta,
@@ -128,30 +129,21 @@ void ProdBatched(marian::Tensor C,
auto strideA = batchA == 1 ? 0 : m * k;
auto strideC = n * m;
- int steps = std::max(batchA, batchB);
-
- int offsetA = 0;
- int offsetB = 0;
- int offsetC = 0;
-
- for(int i = 0; i < steps; ++i) {
+ int batchC = std::max(batchA, batchB);
+ for(int i = 0; i < batchC; ++i) {
sgemm(transA,
transB,
m,
n,
k,
alpha,
- A->data() + offsetA,
+ A->data() + (i % batchA) * strideA,
lda,
- B->data() + offsetB,
+ B->data() + (i % batchB) * strideB,
ldb,
beta,
- C->data() + offsetC,
+ C->data() + i * strideC,
ldc);
-
- offsetA += strideA;
- offsetB += strideB;
- offsetC += strideC;
}
#else
ABORT("Not implemented!");
diff --git a/src/tensors/cpu/tensor_operators.cpp b/src/tensors/cpu/tensor_operators.cpp
index 7310102d..8406f9d0 100644
--- a/src/tensors/cpu/tensor_operators.cpp
+++ b/src/tensors/cpu/tensor_operators.cpp
@@ -50,12 +50,13 @@ inline void gInsertCols(float* out,
size_t cols_out,
size_t cols_in,
size_t offset_out,
- size_t offset_in) {
+ size_t offset_in,
+ float beta) {
for(int j = 0; j < rows; ++j) {
float* rowOut = out + j * cols_out + offset_out;
const float* rowIn = in + j * cols_in + offset_in;
for(int i = 0; i < cols; ++i) {
- rowOut[i] = rowIn[i];
+ rowOut[i] = rowIn[i] + beta * rowOut[i];
}
}
}
@@ -71,7 +72,7 @@ void Concatenate1(Tensor out, const std::vector<Tensor>& inputs) {
"First dimension must be equal");
int cols_in = in->shape().back();
cpu::gInsertCols(
- out->data(), in->data(), rows, cols_in, cols_out, cols_in, offset, 0);
+ out->data(), in->data(), rows, cols_in, cols_out, cols_in, offset, 0, 0);
offset += cols_in;
}
}
@@ -91,8 +92,11 @@ void Split1(std::vector<Tensor>& outputs, const Tensor in) {
ABORT_IF(rows != out->shape().elements() / out->shape().back(),
"First dimension must be equal");
int cols_out = out->shape().back();
+
+ // set last parameter to 1 to enable += instead of =
+ // @TODO: do this in a more principled ways accross all/most kernels
cpu::gInsertCols(
- out->data(), in->data(), rows, cols_out, cols_out, cols_in, 0, offset);
+ out->data(), in->data(), rows, cols_out, cols_out, cols_in, 0, offset, 1);
offset += cols_out;
}
}
@@ -108,9 +112,17 @@ void SplitCont(std::vector<Tensor>& outputs, const Tensor in, int axis) {
size_t size = out->shape().elements() / step;
size_t offset2 = i * size;
- std::copy(in->data() + offset1,
- in->data() + offset1 + size,
- out->data() + offset2);
+ // BUG: This overwrites gradients!
+ //std::copy(in->data() + offset1,
+ // in->data() + offset1 + size,
+ // out->data() + offset2);
+
+ // Fixes gradient problem, @TODO: check performance
+ std::transform(in->data() + offset1,
+ in->data() + offset1 + size,
+ out->data() + offset2,
+ out->data() + offset2,
+ [](float a, float b){ return a + b; });
offset1 += size;
}
@@ -124,6 +136,7 @@ void Deconcatenate(std::vector<Tensor>& outputs, const Tensor in, int ax) {
SplitCont(outputs, in, ax);
}
+template <bool add>
void Transpose0213(Tensor out, Tensor in) {
int cols = in->shape()[-1];
int rows = in->shape().elements() / in->shape()[-1];
@@ -141,7 +154,15 @@ void Transpose0213(Tensor out, Tensor in) {
const float* inRow = in->data() + src * cols ;
float* outRow = out->data() + dst * cols;
- std::copy(inRow, inRow + cols, outRow);
+ if(!add) {
+ // mostly for fast forward computation
+ std::copy(inRow, inRow + cols, outRow);
+ }
+ else {
+ for(int i = 0; i < cols; ++i) {
+ outRow[i] += inRow[i];
+ }
+ }
}
}
}
@@ -186,6 +207,7 @@ void Transpose10(Tensor out, const Tensor in) {
}
// @TODO: optimize this, currently it's quite horrible
+template <bool add>
void TransposeGeneric(Tensor out, Tensor in, const std::vector<int>& vAxis) {
functional::Array<int, functional::Shape::size()> permute;
int diff = functional::Shape::size() - vAxis.size();
@@ -207,19 +229,29 @@ void TransposeGeneric(Tensor out, Tensor in, const std::vector<int>& vAxis) {
gOut.shape().dims(index, oDims);
for(int i = 0; i < N; ++i)
pDims[permute[i]] = oDims[i];
- gOut[index] = gIn[pDims];
+ if(add)
+ gOut[index] += gIn[pDims];
+ else
+ gOut[index] = gIn[pDims];
}
}
void TransposeND(Tensor out, Tensor in, const std::vector<int>& vAxis) {
if(vAxis == std::vector<int>({0, 2, 1, 3}))
- Transpose0213(out, in);
- else if(vAxis == std::vector<int>({1, 0})
- && in->shape()[-1] % 16 == 0
+ Transpose0213<false>(out, in);
+ else if(vAxis == std::vector<int>({1, 0})
+ && in->shape()[-1] % 16 == 0
&& in->shape()[-2] % 16 == 0)
Transpose10(out, in);
else
- TransposeGeneric(out, in, vAxis);
+ TransposeGeneric<false>(out, in, vAxis);
+}
+
+void TransposeNDGrad(Tensor out, Tensor in, const std::vector<int>& vAxis) {
+ if(vAxis == std::vector<int>({0, 2, 1, 3}))
+ Transpose0213<true>(out, in);
+ else
+ TransposeGeneric<true>(out, in, vAxis);
}
void Softmax(Tensor out_, Tensor in_, Tensor mask_) {
@@ -412,9 +444,8 @@ void PasteCols(Tensor out_,
const float* rowIn = in + j * colsIn;
float* rowOut = out + j * colsOut;
- // @TODO: should this be a sum?
for(int i = 0; i < colsIn; ++i) {
- rowOut[indices[i]] = rowIn[i];
+ rowOut[indices[i]] += rowIn[i];
}
}
}
@@ -458,7 +489,6 @@ void GRUFastForward(Tensor out_, std::vector<Tensor> inputs, bool final) {
#pragma omp simd
for(int i = 0; i < cols; ++i) {
- // @TODO: stable sigmoid
float r = stableSigmoid(xWrow[i] + sUrow[i] + b[i]);
int k = i + cols;
@@ -901,6 +931,26 @@ void Shift(Tensor out_, Tensor in_, marian::Shape shift, float padValue, bool in
}
}
+void ShiftGrad(Tensor out_, Tensor in_, marian::Shape shift, bool invert) {
+ int offset = 0;
+ for(int i = 0; i < shift.size(); ++i)
+ offset += in_->shape().stride(i) * shift[i];
+
+ if(invert)
+ offset = -offset;
+
+ float* out = out_->data();
+ const float* in = in_->data();
+
+ int length = out_->shape().elements();
+#pragma omp parallel for
+ for(int i = 0; i < length; ++i) {
+ if(i - offset >= 0 && i - offset < length) {
+ out[i] += in[i - offset];
+ }
+ }
+}
+
void SetSparse(float* out,
const std::vector<size_t>& indices,
const std::vector<float>& values) {
diff --git a/src/tensors/gpu/add.cu b/src/tensors/gpu/add.cu
index 1acb5b54..9f4d0dba 100644
--- a/src/tensors/gpu/add.cu
+++ b/src/tensors/gpu/add.cu
@@ -160,7 +160,6 @@ void Add(Functor functor, float scale, marian::Tensor out, Tensors... tensors) {
bool broadcast = false;
for(int i = 0; i < K; ++i)
broadcast = broadcast || gOut.shape() != gIns[i].shape();
-
gAddEqual<<<blocks, threads>>>(functor, gOut, gIns, scale, broadcast);
} else {
int threads = std::min(MAX_THREADS, length);
diff --git a/src/tensors/gpu/element.inc b/src/tensors/gpu/element.inc
index 02d269f3..79498466 100644
--- a/src/tensors/gpu/element.inc
+++ b/src/tensors/gpu/element.inc
@@ -41,3 +41,4 @@ template void Element<Assign<Var<1>, BinaryFunctor<elem::Clip, Assignee<2>, Capt
template void Element<Assign<Var<1>, BinaryFunctor<elem::LogAddExp, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor>(Assign<Var<1>, BinaryFunctor<elem::LogAddExp, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor, marian::Tensor);
template void Element<Assign<Var<1>, BinaryFunctor<elem::Maximum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor>(Assign<Var<1>, BinaryFunctor<elem::Maximum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor, marian::Tensor);
template void Element<Assign<Var<1>, BinaryFunctor<elem::Minimum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor>(Assign<Var<1>, BinaryFunctor<elem::Minimum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor, marian::Tensor);
+template void Element<Assign<Var<1>, BinaryFunctor<elem::Div, Assignee<1>, Capture>>>(Assign<Var<1>, BinaryFunctor<elem::Div, Assignee<1>, Capture>>, marian::Tensor);
diff --git a/src/tensors/gpu/prod.cu b/src/tensors/gpu/prod.cu
index e102311d..242dcccc 100644
--- a/src/tensors/gpu/prod.cu
+++ b/src/tensors/gpu/prod.cu
@@ -67,6 +67,30 @@ void Prod(marian::Tensor C,
#endif
}
+__global__ void gAddBias(float* out, const float* bias, size_t length, size_t cols) {
+ for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) {
+ int index = bid + blockDim.x * blockIdx.x + threadIdx.x;
+ if(index < length) {
+ size_t index2 = index % cols;
+ out[index] += bias[index2];
+ }
+ }
+}
+
+void AddBias(marian::Tensor C, const marian::Tensor bias) {
+ cudaSetDevice(C->getDevice().no);
+
+ int length = C->shape().elements();
+ int cols = bias->shape().elements();
+
+ int threads = std::min(MAX_THREADS, length);
+ int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0));
+
+ gAddBias<<<blocks, threads>>>(C->data(), bias->data(), length, cols);
+
+ cudaStreamSynchronize(0);
+}
+
void ProdWithBias(marian::Tensor C,
const marian::Tensor& A,
const marian::Tensor& B,
@@ -76,13 +100,13 @@ void ProdWithBias(marian::Tensor C,
float beta,
float scalar) {
marian::gpu::Prod(C, A, B, transA, transB, beta, scalar);
- marian::gpu::Add(functional::_1, 1.f, C, bias);
+ marian::gpu::AddBias(C, bias);
}
-
void ProdBatched(marian::Tensor C,
- const marian::Tensor& A,
- const marian::Tensor& B,
+ Ptr<Allocator> allocator,
+ const marian::Tensor A,
+ const marian::Tensor B,
bool transA,
bool transB,
float beta,
@@ -116,30 +140,57 @@ void ProdBatched(marian::Tensor C,
auto cublasHandle = std::static_pointer_cast<gpu::Backend>(C->getBackend())
->getCublasHandle();
+
+ int strideA = batchA == 1 ? 0 : m * k;
+ int strideB = batchB == 1 ? 0 : n * k;
+ int strideC = n * m;
+ int batchC = std::max(batchA, batchB);
+
+ std::vector<const float*> aptr;
+ std::vector<const float*> bptr;
+ std::vector<float*> cptr;
+
+ for(int i = 0; i < batchC; i++) {
+ aptr.push_back(A->data() + (i % batchA) * strideA);
+ bptr.push_back(B->data() + (i % batchB) * strideB);
+ cptr.push_back(C->data() + i * strideC);
+ }
+
+ auto mp_aptr = allocator->alloc<const float*>(aptr.size());
+ CudaCopy(aptr.data(), aptr.data() + aptr.size(), mp_aptr->data<const float*>());
+
+ auto mp_bptr = allocator->alloc<const float*>(bptr.size());
+ CudaCopy(bptr.data(), bptr.data() + bptr.size(), mp_bptr->data<const float*>());
+
+ auto mp_cptr = allocator->alloc<float*>(cptr.size());
+ CudaCopy(cptr.data(), cptr.data() + cptr.size(), mp_cptr->data<float*>());
+
#if CUDA_VERSION >= 9000
cublasSetMathMode(cublasHandle, CUBLAS_TENSOR_OP_MATH);
#endif
- cublasSgemmStridedBatched(cublasHandle,
- opB,
- opA,
- n,
- m,
- k,
- &alpha,
- B->data(),
- ldb,
- batchB == 1 ? 0 : n * k,
- A->data(),
- lda,
- batchA == 1 ? 0 : m * k,
- &beta,
- C->data(),
- ldc,
- n * m,
- std::max(batchA, batchB));
+ cublasSgemmBatched(cublasHandle,
+ opB,
+ opA,
+ n,
+ m,
+ k,
+ &alpha,
+ mp_bptr->data<const float*>(),
+ ldb,
+ mp_aptr->data<const float*>(),
+ lda,
+ &beta,
+ mp_cptr->data<float*>(),
+ ldc,
+ batchC);
#if CUDA_VERSION >= 9000
cublasSetMathMode(cublasHandle, CUBLAS_DEFAULT_MATH);
#endif
+
+ allocator->free(mp_aptr);
+ allocator->free(mp_bptr);
+ allocator->free(mp_cptr);
}
+
}
}
diff --git a/src/tensors/gpu/prod.h b/src/tensors/gpu/prod.h
index 5791fb1a..1710914e 100644
--- a/src/tensors/gpu/prod.h
+++ b/src/tensors/gpu/prod.h
@@ -26,8 +26,9 @@ void ProdWithBias(marian::Tensor C,
float scalar = 1);
void ProdBatched(marian::Tensor C,
- const marian::Tensor& A,
- const marian::Tensor& B,
+ Ptr<Allocator> allocator,
+ const marian::Tensor A,
+ const marian::Tensor B,
bool transA,
bool transB,
float beta = 0,
diff --git a/src/tensors/gpu/tensor_operators.cu b/src/tensors/gpu/tensor_operators.cu
index 87861a1c..2c7ab510 100644
--- a/src/tensors/gpu/tensor_operators.cu
+++ b/src/tensors/gpu/tensor_operators.cu
@@ -38,6 +38,8 @@ bool IsNan(Tensor in) {
}
void ConcatCont(Tensor out, const std::vector<Tensor>& inputs, int axis) {
+
+
cudaSetDevice(out->getDevice().no);
int step = 1;
for(int i = 0; i < axis; ++i)
@@ -49,7 +51,7 @@ void ConcatCont(Tensor out, const std::vector<Tensor>& inputs, int axis) {
size_t size = in->shape().elements() / step;
size_t offset2 = i * size;
- cudaMemcpyAsync(out->data() + offset1,
+ cudaMemcpy(out->data() + offset1,
in->data() + offset2,
size * sizeof(float),
cudaMemcpyDeviceToDevice);
@@ -60,14 +62,15 @@ void ConcatCont(Tensor out, const std::vector<Tensor>& inputs, int axis) {
cudaStreamSynchronize(0);
}
+template <bool add>
__global__ void gInsertCols(float* out,
- const float* in,
- size_t rows,
- size_t cols,
- size_t cols_out,
- size_t cols_in,
- size_t offset_out,
- size_t offset_in) {
+ const float* in,
+ size_t rows,
+ size_t cols,
+ size_t cols_out,
+ size_t cols_in,
+ size_t offset_out,
+ size_t offset_in) {
for(int bid = 0; bid < rows; bid += gridDim.x) {
int j = bid + blockIdx.x;
if(j < rows) {
@@ -77,7 +80,10 @@ __global__ void gInsertCols(float* out,
for(int tid = 0; tid < cols; tid += blockDim.x) {
int i = tid + threadIdx.x;
if(i < cols)
- rowOut[i] = rowIn[i];
+ if(add)
+ rowOut[i] += rowIn[i];
+ else
+ rowOut[i] = rowIn[i];
}
}
}
@@ -99,16 +105,81 @@ void Concatenate1(Tensor out, const std::vector<Tensor>& inputs) {
int blocks = std::min(MAX_BLOCKS, rows);
int threads = std::min(MAX_THREADS, cols_in);
- gInsertCols<<<blocks, threads>>>(
+ gInsertCols<false><<<blocks, threads>>>(
out->data(), in->data(), rows, cols_in, cols_out, cols_in, offset, 0);
offset += cols_in;
}
cudaStreamSynchronize(0);
}
+
+__global__ void gJoin2(float* out, size_t rowBatch, size_t cols,
+ const float* in1, size_t inStride1,
+ const float* in2, size_t inStride2) {
+
+ int outStride = inStride1 + inStride2;
+ int rows = rowBatch * outStride;
+
+ for(int bid = 0; bid < rows; bid += gridDim.x) {
+ int j = bid + blockIdx.x;
+ if(j < rows) {
+
+ float* rowOut = out + j * cols;
+
+ int curBatch = j / outStride;
+ int curPos = j % outStride;
+
+ int jIn1 = (curBatch * inStride1) + curPos;
+ int jIn2 = (curBatch * inStride2) + curPos - inStride1;
+
+ const float* rowIn1 = in1 + jIn1 * cols;
+ const float* rowIn2 = in2 + jIn2 * cols;
+
+ for(int tid = 0; tid < cols; tid += blockDim.x) {
+ int i = tid + threadIdx.x;
+ if(i < cols) {
+ if(curPos < inStride1)
+ rowOut[i] = rowIn1[i];
+ else
+ rowOut[i] = rowIn2[i];
+ }
+ }
+
+ }
+ }
+}
+
+
+void Concatenate2(Tensor out, Tensor in1, Tensor in2) {
+ cudaSetDevice(out->getDevice().no);
+
+ size_t rows = out->shape().elements() / out->shape().back();
+ size_t cols = out->shape().back();
+
+ size_t rowStride1 = in1->shape()[-2];
+ size_t rowStride2 = in2->shape()[-2];
+
+ size_t rowBatch = rows / out->shape()[-2];
+
+ int blocks = std::min(MAX_BLOCKS, (int)rows);
+ int threads = std::min(MAX_THREADS, (int)cols);
+
+ gJoin2<<<blocks, threads>>>(out->data(),
+ rowBatch,
+ cols,
+ in1->data(),
+ rowStride1,
+ in2->data(),
+ rowStride2);
+
+ cudaStreamSynchronize(0);
+}
+
void Concatenate(Tensor out, const std::vector<Tensor>& inputs, int ax) {
if(ax == out->shape().size() - 1)
Concatenate1(out, inputs);
+ else if(ax == out->shape().size() - 2 && inputs.size() == 2)
+ Concatenate2(out, inputs[0], inputs[1]);
else
ConcatCont(out, inputs, ax);
}
@@ -127,13 +198,24 @@ void Split1(std::vector<Tensor>& outputs, const Tensor in) {
int blocks = std::min(MAX_BLOCKS, rows);
int threads = std::min(MAX_THREADS, cols_out);
- gInsertCols<<<blocks, threads>>>(
+ gInsertCols<true><<<blocks, threads>>>(
out->data(), in->data(), rows, cols_out, cols_out, cols_in, 0, offset);
offset += cols_out;
}
cudaStreamSynchronize(0);
}
+// @TODO: this function is just a temporary fix until I come up with
+// something better for the situation below.
+__global__ void gAddRow(float* out, const float* in, int length) {
+ for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) {
+ int index = bid + blockDim.x * blockIdx.x + threadIdx.x;
+ if(index < length) {
+ out[index] = in[index] + out[index];
+ }
+ }
+}
+
void SplitCont(std::vector<Tensor>& outputs, const Tensor in, int axis) {
cudaSetDevice(in->getDevice().no);
@@ -141,17 +223,25 @@ void SplitCont(std::vector<Tensor>& outputs, const Tensor in, int axis) {
for(int i = 0; i < axis; ++i)
step *= in->shape()[i];
- size_t offset1 = 0;
+ int offset1 = 0;
for(int i = 0; i < step; ++i) {
for(auto out : outputs) {
- size_t size = out->shape().elements() / step;
- size_t offset2 = i * size;
-
- cudaMemcpyAsync(out->data() + offset2,
- in->data() + offset1,
- size * sizeof(float),
- cudaMemcpyDeviceToDevice);
-
+ int size = out->shape().elements() / step;
+ int offset2 = i * size;
+
+ // BUG: this is does not add gradients
+ //cudaMemcpyAsync(out->data() + offset2,
+ // in->data() + offset1,
+ // size * sizeof(float),
+ // cudaMemcpyDeviceToDevice);
+
+ // @TODO: this is a quick but bad fix for the above bug
+ int threads = std::min(MAX_THREADS, size);
+ int blocks = std::min(MAX_BLOCKS, size / threads + (size % threads != 0));
+
+ gAddRow<<<blocks, threads>>>(out->data() + offset2,
+ in->data() + offset1,
+ size);
offset1 += size;
}
}
@@ -165,6 +255,7 @@ void Deconcatenate(std::vector<Tensor>& outputs, const Tensor in, int ax) {
SplitCont(outputs, in, ax);
}
+template <bool add>
__global__ void gTransposeND(
functional::Tensor<float> out,
const functional::Tensor<float> in,
@@ -180,27 +271,114 @@ __global__ void gTransposeND(
out.shape().dims(index, oDims);
for(int i = 0; i < N; ++i)
pDims[permute[i]] = oDims[i];
- out[index] = in[pDims];
+ if(add)
+ out[index] += in[pDims];
+ else
+ out[index] = in[pDims];
+ }
+ }
+}
+
+template <bool add>
+__global__ void gTranspose0213(float* out, const float* in,
+ int rows,
+ int cols,
+ int stride1,
+ int stride2) {
+
+ int stride = stride1 * stride2;
+ for(int bid = 0; bid < rows; bid += gridDim.x) {
+ int j = bid + blockIdx.x;
+ if(j < rows) {
+ float* rowOut = out + j * cols;
+
+ int z = j / stride;
+ int y = (j % stride) / stride1;
+ int x = (j % stride) % stride1;
+ int j2 = z * stride + x * stride2 + y;
+
+ const float* rowIn = in + j2 * cols;
+
+ for(int tid = 0; tid < cols; tid += blockDim.x) {
+ int i = tid + threadIdx.x;
+ if(i < cols) {
+ if(add)
+ rowOut[i] += rowIn[i];
+ else
+ rowOut[i] = rowIn[i];
+ }
+ }
}
}
+
}
void TransposeND(Tensor out, Tensor in, const std::vector<int>& vAxis) {
cudaSetDevice(out->getDevice().no);
+ if(vAxis == std::vector<int>({0, 2, 1, 3})) {
- functional::Array<int, functional::Shape::size()> axes;
- int diff = functional::Shape::size() - vAxis.size();
- for(int i = 0; i < axes.size(); ++i)
- if(i < diff)
- axes[i] = i;
- else
- axes[i] = vAxis[i - diff] + diff;
+ int rows = out->shape().elements() / out->shape().back();
+ int cols = out->shape().back();
- int length = out->shape().elements();
- int threads = std::min(MAX_THREADS, length);
- int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0));
+ int blocks = std::min(MAX_BLOCKS, rows);
+ int threads = std::min(MAX_THREADS, cols);
+
+ int stride1 = out->shape()[-2];
+ int stride2 = out->shape()[-3];
+
+ gTranspose0213<false><<<blocks, threads>>>(out->data(), in->data(),
+ rows, cols, stride1, stride2);
+ }
+ else {
+
+ functional::Array<int, functional::Shape::size()> axes;
+ int diff = functional::Shape::size() - vAxis.size();
+ for(int i = 0; i < axes.size(); ++i)
+ if(i < diff)
+ axes[i] = i;
+ else
+ axes[i] = vAxis[i - diff] + diff;
+
+ int length = out->shape().elements();
+ int threads = std::min(MAX_THREADS, length);
+ int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0));
+
+ gTransposeND<false><<<blocks, threads>>>(out, in, axes);
+ }
+}
+
+void TransposeNDGrad(Tensor out, Tensor in, const std::vector<int>& vAxis) {
+ cudaSetDevice(out->getDevice().no);
+ if(vAxis == std::vector<int>({0, 2, 1, 3})) {
+
+ int rows = out->shape().elements() / out->shape().back();
+ int cols = out->shape().back();
+
+ int blocks = std::min(MAX_BLOCKS, rows);
+ int threads = std::min(MAX_THREADS, cols);
+
+ int stride1 = out->shape()[-2];
+ int stride2 = out->shape()[-3];
+
+ gTranspose0213<true><<<blocks, threads>>>(out->data(), in->data(),
+ rows, cols, stride1, stride2);
+ }
+ else {
- gTransposeND<<<blocks, threads>>>(out, in, axes);
+ functional::Array<int, functional::Shape::size()> axes;
+ int diff = functional::Shape::size() - vAxis.size();
+ for(int i = 0; i < axes.size(); ++i)
+ if(i < diff)
+ axes[i] = i;
+ else
+ axes[i] = vAxis[i - diff] + diff;
+
+ int length = out->shape().elements();
+ int threads = std::min(MAX_THREADS, length);
+ int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0));
+
+ gTransposeND<true><<<blocks, threads>>>(out, in, axes);
+ }
}
__global__ void gSoftmax(float* out,
@@ -697,7 +875,7 @@ __global__ void gPasteCols(float* out,
for(int tid = 0; tid < colsIn; tid += blockDim.x) {
int i = tid + threadIdx.x;
if(i < colsIn)
- rowOut[targetColIdx[i]] = rowIn[i];
+ rowOut[targetColIdx[i]] += rowIn[i];
}
}
}
@@ -764,7 +942,7 @@ __global__ void gInsert(float* out,
inShape.dims(index, dims);
dims[axis] = d_indices[dims[index]];
int outIndex = outShape.index(dims);
- out[outIndex] = in[index];
+ out[outIndex] += in[index];
}
}
}
@@ -1558,14 +1736,21 @@ void LayerNormalizationGrad(Tensor gradX,
eps);
}
+template <bool add>
__global__ void gShift(float* out, const float* in, int length, int offset, float padValue) {
for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) {
int index = bid + blockDim.x * blockIdx.x + threadIdx.x;
if(index < length) {
- if(index - offset < 0 || index - offset >= length)
- out[index] = padValue;
- else
- out[index] = in[index - offset];
+ if(add) {
+ if(index - offset >= 0 && index - offset < length)
+ out[index] += in[index - offset];
+ }
+ else {
+ if(index - offset < 0 || index - offset >= length)
+ out[index] = padValue;
+ else
+ out[index] = in[index - offset];
+ }
}
}
}
@@ -1588,7 +1773,28 @@ void Shift(Tensor out, Tensor in, marian::Shape shift, float padValue, bool inve
int threads = std::min(MAX_THREADS, length);
int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0));
- gShift<<<blocks, threads>>>(out->data(), in->data(), length, offset, padValue);
+ gShift<false><<<blocks, threads>>>(out->data(), in->data(), length, offset, padValue);
+}
+
+void ShiftGrad(Tensor out, Tensor in, marian::Shape shift, bool invert) {
+ ABORT_IF(in->shape().size() != shift.size(), "bad dimensions");
+
+ // BUGBUG: This can only shift along the first axis. Shifting, e.g., along the last axis cannot be implemented this way.
+ int offset = 0;
+ for(int i = 0; i < shift.size(); ++i)
+ offset += in->shape().stride(i) * shift[i];
+
+ if(invert)
+ offset = -offset;
+
+ cudaSetDevice(out->getDevice().no);
+
+ int length = out->shape().elements();
+
+ int threads = std::min(MAX_THREADS, length);
+ int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0));
+
+ gShift<true><<<blocks, threads>>>(out->data(), in->data(), length, offset, 0.f);
}
__global__ void gSetSparse(float* out,
diff --git a/src/tensors/tensor_operators.h b/src/tensors/tensor_operators.h
index 9aa9c58b..b605e347 100644
--- a/src/tensors/tensor_operators.h
+++ b/src/tensors/tensor_operators.h
@@ -64,7 +64,8 @@ void Reduce(Functor functor, marian::Tensor out, Tensors... tensors) {
// clang-format off
DISPATCH7(Prod, marian::Tensor, const marian::Tensor&, const marian::Tensor&, bool, bool, float, float)
DISPATCH8(ProdWithBias, marian::Tensor, const marian::Tensor&, const marian::Tensor&, const marian::Tensor&, bool, bool, float, float)
- DISPATCH7(ProdBatched, marian::Tensor, const marian::Tensor&, const marian::Tensor&, bool, bool, float, float)
+
+ DISPATCH8(ProdBatched, marian::Tensor, Ptr<Allocator>, const marian::Tensor, const marian::Tensor, bool, bool, float, float)
DISPATCH2(Dropout, marian::Tensor, float)
@@ -78,7 +79,10 @@ void Reduce(Functor functor, marian::Tensor out, Tensors... tensors) {
DISPATCH4(CrossEntropyPickBackward, marian::Tensor, marian::Tensor, marian::Tensor, marian::Tensor)
DISPATCH3(TransposeND, marian::Tensor, marian::Tensor, const std::vector<int>&)
+ DISPATCH3(TransposeNDGrad, marian::Tensor, marian::Tensor, const std::vector<int>&)
+
DISPATCH5(Shift, marian::Tensor, marian::Tensor, marian::Shape, float, bool)
+ DISPATCH4(ShiftGrad, marian::Tensor, marian::Tensor, marian::Shape, bool)
DISPATCH3(Concatenate, marian::Tensor, const std::vector<marian::Tensor>&, int)
// clang-format on
diff --git a/src/training/communicator.cpp b/src/training/communicator.cpp
new file mode 100644
index 00000000..22ccc8e8
--- /dev/null
+++ b/src/training/communicator.cpp
@@ -0,0 +1,13 @@
+#include "training/communicator.h"
+
+namespace marian {
+
+// Compile this if cuda is not being compiled.
+// Version with CUDA and/or NCCL is compiled in communicator.cu
+#ifndef CUDA_FOUND
+Ptr<Communicator> createCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, bool noNccl) {
+ return New<DefaultCommunicator>(graphs);
+}
+#endif
+
+}
diff --git a/src/training/communicator.cu b/src/training/communicator.cu
new file mode 100644
index 00000000..c721e0ed
--- /dev/null
+++ b/src/training/communicator.cu
@@ -0,0 +1,239 @@
+#include "training/communicator.h"
+#include "functional/functional.h"
+#include "tensors/tensor_operators.h"
+
+#ifdef USE_NCCL
+#include "cuda_runtime.h"
+#include "nccl.h"
+#endif
+
+namespace marian {
+
+#ifdef USE_NCCL
+class NCCLCommunicator : public Communicator {
+private:
+ std::vector<ncclComm_t> comms_;
+ std::vector<cudaStream_t> streams_;
+ std::vector<int> devices_;
+
+ void synchronizeAll() {
+ for(int i = 0; i < graphs_.size(); ++i) {
+ cudaSetDevice(devices_[i]);
+ cudaStreamSynchronize(streams_[i]);
+ }
+ }
+
+public:
+ NCCLCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs)
+ : Communicator(graphs),
+ comms_(graphs.size()),
+ streams_(graphs.size()),
+ devices_(graphs.size())
+ {
+ LOG(info, "[comm] Using NCCL library for GPU communication");
+
+ for(int i = 0; i < graphs_.size(); ++i) {
+ auto device = graphs_[i]->getBackend()->getDevice();
+
+ ABORT_IF(device.type != DeviceType::gpu,
+ "NCCL communicator can only be used with GPUs");
+
+ devices_[i] = device.no;
+ cudaSetDevice(devices_[i]);
+ cudaStreamCreate(&streams_[i]);
+ }
+
+ ncclCommInitAll(comms_.data(), devices_.size(), devices_.data());
+ }
+
+ ~NCCLCommunicator() override {
+ for(int i = 0; i < devices_.size(); ++i) {
+ cudaSetDevice(devices_[i]);
+ cudaStreamDestroy(streams_[i]);
+ ncclCommDestroy(comms_[i]);
+ }
+ }
+
+ void scatterReduce() override {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ int pos = 0;
+
+ ncclGroupStart();
+ for(int i = 0; i < graphs_.size(); ++i) {
+ int size = std::min(shardSize, totalSize);
+
+ const void* sendbuff = (const void*)graphs_[i]->params()->grads()->data();
+ auto subgrad = graphs_[i]->params()->grads()->subtensor(pos, size);
+ void* recvbuff = subgrad->data();
+
+ ncclReduceScatter(sendbuff,
+ recvbuff,
+ shardSize,
+ ncclFloat,
+ ncclSum,
+ comms_[i],
+ streams_[i]);
+
+ pos += size;
+ totalSize -= size;
+ }
+ ncclGroupEnd();
+
+ synchronizeAll();
+ }
+
+ void allGather() override {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ int pos = 0;
+
+ ncclGroupStart();
+ for(int i = 0; i < graphs_.size(); ++i) {
+ int size = std::min(shardSize, totalSize);
+
+ auto subparam = graphs_[i]->params()->vals()->subtensor(pos, size);
+ const void* sendbuff = (const void*)subparam->data();
+ void* recvbuff = (void*)graphs_[i]->params()->vals()->data();
+
+ ncclAllGather(sendbuff,
+ recvbuff,
+ shardSize,
+ ncclFloat,
+ comms_[i],
+ streams_[i]);
+
+ pos += size;
+ totalSize -= size;
+ }
+ ncclGroupEnd();
+
+ synchronizeAll();
+ }
+
+ void swapParams(const std::vector<Tensor>& params) override {
+ // Update all graphs with parameter shard
+ ABORT_IF(graphs_.size() < 2, "Swap requires at least two graphs");
+
+ auto gather = [this, params](size_t idx, int pos) {
+ // copy parameter shard to each graph, apart from last graph
+ for(int i = 0; i < graphs_.size() - 1; ++i) {
+ auto subParam = graphs_[i]->params()->vals()->subtensor(pos, params[idx]->size());
+ subParam->copyFrom(params[idx]);
+ }
+
+ // back-up shard from last graph
+ auto subParamLast = graphs_.back()->params()->vals()->subtensor(pos, params[idx]->size());
+ params[idx]->copyFrom(subParamLast);
+
+ auto subParamFirst = graphs_[0]->params()->vals()->subtensor(pos, params[idx]->size());
+ subParamLast->copyFrom(subParamFirst);
+ };
+
+ // execute for each shard
+ this->foreach(gather);
+ }
+
+ void pushParams(std::vector<Tensor>& params) override {
+ // Copy paramter shard from i-th graph to shard params[i].
+ // Graphs and shards with the same index live on the same device.
+
+ auto copy = [this, params](size_t idx, int pos) {
+ // copy parameter shard to each graph
+ auto subParam = graphs_[idx]->params()->vals()->subtensor(pos, params[idx]->size());
+ params[idx]->copyFrom(subParam);
+ };
+
+ this->foreach(copy);
+ }
+
+ void pullParams(const std::vector<Tensor>& params) override {
+ // Update all graphs with parameter shard
+
+ auto gather = [this, params](size_t idx, int pos) {
+ // copy parameter shard to each graph
+ for(auto graph : graphs_) {
+ auto subParam = graph->params()->vals()->subtensor(pos, params[idx]->size());
+ subParam->copyFrom(params[idx]);
+ }
+ };
+ this->foreach(gather);
+ }
+
+ // Doesn't work yet with NCCL
+ // void pushParams(std::vector<Tensor>& params) {
+ // // Copy paramter shard from i-th graph to shard params[i].
+ // // Graphs and shards with the same index live on the same device.
+
+ // int pos = 0;
+ // for(int i = 0; i < graphs_.size(); ++i) {
+ // auto subParam = graphs_[i]->params()->vals()->subtensor(pos, params[i]->size());
+ // ncclGroupStart();
+ // ncclBroadcast((const void*)subParam->data(),
+ // (void*)params[i]->data(),
+ // params[i]->size(),
+ // ncclFloat,
+ // 0,
+ // comms_[i],
+ // streams_[i]);
+ // ncclGroupEnd();
+ // pos += params[i]->size();
+ // }
+ // synchronizeAll();
+ // }
+
+ // void pullParams(const std::vector<Tensor>& params) {
+ // // Update all graphs with parameter shard
+
+ // int totalSize = graphs_[0]->params()->vals()->size();
+ // int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ // ncclGroupStart();
+ // for(int i = 0; i < graphs_.size(); ++i) {
+
+ // const void* sendbuff = (const void*)params[i]->data();
+ // void* recvbuff = (void*)graphs_[i]->params()->vals()->data();
+
+ // ncclAllGather(sendbuff,
+ // recvbuff,
+ // shardSize,
+ // ncclFloat,
+ // comms_[i],
+ // streams_[i]);
+ // }
+ // ncclGroupEnd();
+
+ // synchronizeAll();
+ // }
+};
+#endif
+
+Ptr<Communicator> createCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, bool noNccl) {
+#ifdef USE_NCCL
+ if(noNccl) {
+ LOG(warn, "[comm] NCCL communicator overridden");
+ return New<DefaultCommunicator>(graphs);
+ }
+
+ // if at least one of the devices is not a gpu, fall-back to default
+ for(auto& graph : graphs) {
+ if(graph->getBackend()->getDevice().type == DeviceType::cpu) {
+ return New<DefaultCommunicator>(graphs);
+ }
+ }
+
+ size_t d = graphs.size();
+ if((d & (d - 1)) != 0) {
+ LOG(warn, "[comm] Number of devices {} is not a power of 2 and communication might be slow with NCCL", d);
+ LOG(warn, "[comm] You can switch off NCCL with --no-nccl option", d);
+ }
+
+ return New<NCCLCommunicator>(graphs);
+#else
+ return New<DefaultCommunicator>(graphs);
+#endif
+}
+
+}
diff --git a/src/training/communicator.h b/src/training/communicator.h
new file mode 100644
index 00000000..b43de60d
--- /dev/null
+++ b/src/training/communicator.h
@@ -0,0 +1,178 @@
+#include "graph/expression_graph.h"
+#include "functional/functional.h"
+#include "tensors/tensor_operators.h"
+
+namespace marian {
+
+class Communicator {
+protected:
+ const std::vector<Ptr<ExpressionGraph>> graphs_;
+
+public:
+ Communicator(const std::vector<Ptr<ExpressionGraph>>& graphs)
+ : graphs_(graphs) {}
+
+ virtual ~Communicator() {}
+
+ virtual void foreach(const std::function<void(size_t, int)>& func) {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ int pos = 0;
+ std::vector<std::thread> group;
+ // iterate over all shards
+ for(int idx = 0; idx < graphs_.size(); ++idx) {
+ int size = std::min(shardSize, totalSize);
+
+ group.emplace_back(func, idx, pos);
+
+ pos += size;
+ totalSize -= size;
+ }
+ for(auto& t : group)
+ t.join();
+ }
+
+ virtual void scatterReduce() = 0;
+ virtual void allGather() = 0;
+
+ virtual void pushParams(std::vector<Tensor>& params) = 0;
+ virtual void pullParams(const std::vector<Tensor>& params) = 0;
+ virtual void swapParams(const std::vector<Tensor>& params) = 0;
+};
+
+class DefaultCommunicator : public Communicator {
+private:
+ std::vector<Ptr<TensorAllocator>> paramsAllocs_;
+ std::vector<Tensor> tmpTensors_;
+
+ void init() {
+ if(tmpTensors_.size() == 0) {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ int pos = 0;
+ for(auto graph : graphs_) {
+ int __size__ = std::min(shardSize, totalSize);
+
+ auto paramsAlloc = New<TensorAllocator>(graph->getBackend());
+ paramsAllocs_.push_back(paramsAlloc);
+
+ paramsAlloc->reserveExact(__size__ * sizeof(float));
+
+ Tensor tmp;
+
+ paramsAlloc->allocate(tmp, {1, __size__});
+ tmpTensors_.push_back(tmp);
+
+ // move to next shard
+ pos += __size__;
+ totalSize -= __size__;
+ }
+ }
+ }
+
+public:
+ DefaultCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs)
+ : Communicator(graphs) {}
+
+ ~DefaultCommunicator() override {}
+
+ void scatterReduce() override {
+ init();
+
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ // Gather gradients from different devices into current gradient shards
+ auto scatter = [this, shardSize](size_t idx, int pos) {
+ auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize);
+
+ // collect and sum gradients
+ // to be replaced with ncclScatterReduce
+ for(auto graph : graphs_) {
+ if(graph != graphs_[idx]) {
+ auto subGrad = graph->params()->grads()->subtensor(pos, shardSize);
+ tmpTensors_[idx]->copyFrom(subGrad);
+
+ using namespace functional;
+ Element(_1 = _1 + _2, curGrad, tmpTensors_[idx]);
+ }
+ }
+ };
+
+ this->foreach(scatter);
+ }
+
+ void allGather() override {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)graphs_.size());
+
+ // Update all graphs with parameter shard
+ auto gather = [this, shardSize](size_t idx, int pos) {
+ auto curParam = graphs_[idx]->params()->vals()->subtensor(pos, shardSize);
+
+ // copy parameter shard to each graph
+ for(auto graph : graphs_) {
+ if(graph != graphs_[idx]) {
+ auto subParam = graph->params()->vals()->subtensor(pos, shardSize);
+ subParam->copyFrom(curParam);
+ }
+ }
+ };
+
+ this->foreach(gather);
+ }
+
+ void pushParams(std::vector<Tensor>& params) override {
+ // Copy paramter shard from i-th graph to shard params[i].
+ // Graphs and shards with the same index live on the same device.
+
+ auto copy = [this, params](size_t idx, int pos) {
+ // copy parameter shard to each graph
+ auto subParam = graphs_[idx]->params()->vals()->subtensor(pos, params[idx]->size());
+ params[idx]->copyFrom(subParam);
+ };
+
+ this->foreach(copy);
+ }
+
+ void pullParams(const std::vector<Tensor>& params) override {
+ // Update all graphs with parameter shard
+
+ auto gather = [this, params](size_t idx, int pos) {
+ // copy parameter shard to each graph
+ for(auto graph : graphs_) {
+ auto subParam = graph->params()->vals()->subtensor(pos, params[idx]->size());
+ subParam->copyFrom(params[idx]);
+ }
+ };
+ this->foreach(gather);
+ }
+
+ void swapParams(const std::vector<Tensor>& params) override {
+ // Update all graphs with parameter shard
+ ABORT_IF(graphs_.size() < 2, "Swap requires at least two graphs");
+
+ auto gather = [this, params](size_t idx, int pos) {
+ // copy parameter shard to each graph, apart from last graph
+ for(int i = 0; i < graphs_.size() - 1; ++i) {
+ auto subParam = graphs_[i]->params()->vals()->subtensor(pos, params[idx]->size());
+ subParam->copyFrom(params[idx]);
+ }
+
+ // back-up shard from last graph
+ auto subParamLast = graphs_.back()->params()->vals()->subtensor(pos, params[idx]->size());
+ params[idx]->copyFrom(subParamLast);
+
+ auto subParamFirst = graphs_[0]->params()->vals()->subtensor(pos, params[idx]->size());
+ subParamLast->copyFrom(subParamFirst);
+ };
+ // execute for each shard
+ this->foreach(gather);
+ }
+};
+
+Ptr<Communicator> createCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, bool noNccl = false);
+
+}
diff --git a/src/training/gradient_dropping/gpu/sparse_algorithm.cu b/src/training/gradient_dropping/gpu/sparse_algorithm.cu
index 61f1d563..90bd3661 100644
--- a/src/training/gradient_dropping/gpu/sparse_algorithm.cu
+++ b/src/training/gradient_dropping/gpu/sparse_algorithm.cu
@@ -1,99 +1,139 @@
#include "training/gradient_dropping/gpu/sparse_algorithm.h"
-#include "tensors/gpu/algorithm.h"
-#include "tensors/gpu/cuda_helpers.h"
#include <curand.h>
#include <curand_kernel.h>
+#include <thrust/binary_search.h>
+#include <thrust/copy.h>
#include <thrust/device_ptr.h>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
-#include <thrust/copy.h>
-#include <thrust/binary_search.h>
+#include "tensors/gpu/algorithm.h"
+#include "tensors/gpu/cuda_helpers.h"
namespace marian {
- namespace gpu {
- struct non_zero
- {
- __host__ __device__
- bool operator()(const float x)
- {
- return x != 0;
- }
- };
-
- __global__ void copy_id(float* data,
- int* indices,
- float* out,
- int size) {
- int idx = blockDim.x * blockIdx.x + threadIdx.x;
- if(idx >= size)
- return;
- out[idx] = data[indices[idx]];
- }
-
- __global__ void gScatterAdd(float* denseData,
+namespace gpu {
+struct non_zero {
+ __host__ __device__ bool operator()(const float x) { return x != 0; }
+};
+
+__global__ void copy_id(float* data, int* indices, float* out, int size) {
+ int idx = blockDim.x * blockIdx.x + threadIdx.x;
+ if(idx >= size)
+ return;
+ out[idx] = data[indices[idx]];
+}
+
+__global__ void gScatterAdd(float* denseData,
float* sparseData,
int* sparseIndices,
int denseSize,
int sparseSize,
int offset) {
- int idx = blockDim.x * blockIdx.x + threadIdx.x;
- if(idx >= sparseSize)
- return;
- if(sparseIndices[idx] >= -offset
- && sparseIndices[idx] + offset < denseSize)
- denseData[sparseIndices[idx] + offset] += sparseData[idx];
- }
+ int idx = blockDim.x * blockIdx.x + threadIdx.x;
+ if(idx >= sparseSize)
+ return;
+ if(sparseIndices[idx] >= -offset && sparseIndices[idx] + offset < denseSize)
+ denseData[sparseIndices[idx] + offset] += sparseData[idx];
+}
+__global__ void gScatterUpdate(float* denseData,
+ float* sparseData,
+ int* sparseIndices,
+ int denseSize,
+ int sparseSize,
+ int offset) {
+ int idx = blockDim.x * blockIdx.x + threadIdx.x;
+ if(idx >= sparseSize)
+ return;
+ if(sparseIndices[idx] >= -offset && sparseIndices[idx] + offset < denseSize)
+ denseData[sparseIndices[idx] + offset] = sparseData[idx];
+}
+
+__global__ void gGather(float* denseData,
+ float* sparseData,
+ int* sparseIndices,
+ int denseSize,
+ int sparseSize,
+ int offset) {
+ int idx = blockDim.x * blockIdx.x + threadIdx.x;
+ if(idx >= sparseSize)
+ return;
+ if(sparseIndices[idx] >= -offset && sparseIndices[idx] + offset < denseSize)
+ sparseData[idx] = denseData[sparseIndices[idx] + offset];
+}
- std::vector<int> lower_bounds(int* data, std::vector<int> values, int size, DeviceId device) {
- cudaSetDevice(device.no);
+std::vector<int> lower_bounds(int* data,
+ std::vector<int> values,
+ int size,
+ DeviceId device) {
+ cudaSetDevice(device.no);
- thrust::device_ptr<int> data_ptr(data);
- thrust::device_vector<int> d_values(values);
- thrust::device_vector<int> d_output(values.size());
+ thrust::device_ptr<int> data_ptr(data);
+ thrust::device_vector<int> d_values(values);
+ thrust::device_vector<int> d_output(values.size());
- thrust::lower_bound(data_ptr, data_ptr + size,
- d_values.begin(), d_values.end(),
- d_output.begin());
+ thrust::lower_bound(data_ptr,
+ data_ptr + size,
+ d_values.begin(),
+ d_values.end(),
+ d_output.begin());
- std::vector<int> output(values.size());
- thrust::copy(d_output.begin(), d_output.end(), output.begin());
+ std::vector<int> output(values.size());
+ thrust::copy(d_output.begin(), d_output.end(), output.begin());
- return output;
- }
+ return output;
+}
+int buildSparse(Tensor t, float* data, int* indices) {
+ cudaSetDevice(t->getDevice().no);
+ using namespace thrust;
- int buildSparse(Tensor t, float* data, int* indices) {
- cudaSetDevice(t->getDevice().no);
- using namespace thrust;
+ device_ptr<float> grad_ptr(t->data());
+ device_ptr<float> sparse_grad_ptr(data);
+ device_ptr<int> indices_ptr(indices);
- device_ptr<float> grad_ptr(t->data());
- device_ptr<float> sparse_grad_ptr(data);
- device_ptr<int> indices_ptr(indices);
+ int sparse_size = copy_if(make_counting_iterator<int>(0),
+ make_counting_iterator<int>(t->size()),
+ grad_ptr,
+ indices_ptr,
+ non_zero())
+ - indices_ptr;
- int sparse_size = copy_if(make_counting_iterator<int>(0),
- make_counting_iterator<int>(t->size()),
- grad_ptr,
- indices_ptr,
- non_zero()) - indices_ptr;
+ int threads = 512;
+ int blocks = 1 + t->size() / threads;
+ copy_id<<<blocks, threads>>>(t->data(), indices, data, sparse_size);
- int threads = 512;
- int blocks = 1 + t->size() / threads;
- copy_id<<<blocks, threads>>>(t->data(), indices, data, sparse_size);
+ return sparse_size;
+}
- return sparse_size;
- }
+void scatterAdd(Tensor t, float* data, int* indices, int size, int offset) {
+ cudaSetDevice(t->getDevice().no);
+ int threads = 512;
+ int blocks = 1 + size / threads;
+ gScatterAdd<<<blocks, threads>>>(
+ t->data(), data, indices, t->size(), size, offset);
+ cudaStreamSynchronize(0);
+}
- void scatterAdd(Tensor t, float* data, int *indices, int size, int offset) {
- cudaSetDevice(t->getDevice().no);
+void scatterUpdate(Tensor t, float* data, int* indices, int size, int offset) {
+ cudaSetDevice(t->getDevice().no);
- int threads = 512;
- int blocks = 1 + size / threads;
- gScatterAdd<<<blocks, threads>>>(
- t->data(), data, indices, t->size(), size, offset);
- cudaStreamSynchronize(0);
- }
- }
+ int threads = 512;
+ int blocks = 1 + size / threads;
+ gScatterUpdate<<<blocks, threads>>>(
+ t->data(), data, indices, t->size(), size, offset);
+ cudaStreamSynchronize(0);
+}
+
+void gather(Tensor t, float* data, int* indices, int size, int offset) {
+ cudaSetDevice(t->getDevice().no);
+
+ int threads = 512;
+ int blocks = 1 + size / threads;
+ gGather<<<blocks, threads>>>(
+ t->data(), data, indices, t->size(), size, offset);
+ cudaStreamSynchronize(0);
+}
+}
}
diff --git a/src/training/gradient_dropping/gpu/sparse_algorithm.h b/src/training/gradient_dropping/gpu/sparse_algorithm.h
index e6bc13d3..8499e78d 100644
--- a/src/training/gradient_dropping/gpu/sparse_algorithm.h
+++ b/src/training/gradient_dropping/gpu/sparse_algorithm.h
@@ -4,17 +4,24 @@
#include "tensors/backend.h"
#include "tensors/tensor.h"
-
namespace marian {
- namespace gpu {
- // output is a vector of size values.size. Output[i] is lower_bound of values[i] in data
- std::vector<int> lower_bounds(int* data,
- std::vector<int> values,
- int size,
- DeviceId device);
+namespace gpu {
+/**
+ * @brief Output[i] is lower_bound of values[i] in data.
+ *
+ * @return A vector of size values.size
+ */
+std::vector<int> lower_bounds(int* data,
+ std::vector<int> values,
+ int size,
+ DeviceId device);
+
+int buildSparse(Tensor t, float* data, int* indices);
+
+void scatterAdd(Tensor t, float* data, int* indices, int size, int offset);
- int buildSparse(Tensor t, float* data, int* indices);
+void scatterUpdate(Tensor t, float* data, int* indices, int size, int offset);
- void scatterAdd(Tensor t, float* data, int *indices, int size, int offset);
- }
-} \ No newline at end of file
+void gather(Tensor t, float* data, int* indices, int size, int offset);
+}
+}
diff --git a/src/training/gradient_dropping/sparse_tensor.h b/src/training/gradient_dropping/sparse_tensor.h
index 516b47a0..e3b7eaf1 100644
--- a/src/training/gradient_dropping/sparse_tensor.h
+++ b/src/training/gradient_dropping/sparse_tensor.h
@@ -1,14 +1,15 @@
#pragma once
+#include <algorithm>
#include <memory>
#include "common/definitions.h"
#include "tensors/backend.h"
-
-#include "tensors/tensor_operators.h"
#include "tensors/device.h"
+#include "tensors/tensor_operators.h"
#ifdef CUDA_FOUND
+#include "tensors/gpu/algorithm.h"
#include "training/gradient_dropping/gpu/sparse_algorithm.h"
#endif
@@ -20,26 +21,26 @@ class SparseTensorBase : public std::enable_shared_from_this<SparseTensorBase> {
int size_;
int capacity_;
Ptr<Backend> backend_;
-
+
std::vector<Ptr<Device>> devices;
- template<typename T>
+ template <typename T>
T* newData(int size, Ptr<Backend> backend) {
Ptr<Device> device = DispatchDevice(backend->getDevice());
device->reserve(size * sizeof(T));
devices.push_back(device);
return (T*)device->data();
}
-
+
public:
SparseTensorBase(int capacity, Ptr<Backend> backend)
- : backend_(backend), capacity_(capacity) {
+ : backend_(backend), capacity_(capacity) {
data_ = newData<float>(capacity, backend);
indices_ = newData<int>(capacity, backend);
}
- SparseTensorBase(float* data, int* indices, int size, Ptr<Backend> backend)
- : backend_(backend) {
+ SparseTensorBase(float* data, int* indices, int size, Ptr<Backend> backend)
+ : backend_(backend) {
data_ = data;
indices_ = indices;
size_ = size;
@@ -60,6 +61,37 @@ public:
int* indices() { return indices_; }
+ // copy to cpu vector
+ void get(std::vector<float>& g, std::vector<int>& i) {
+ int s = std::min((int)g.size(), size());
+ if(backend_->getDevice().type == DeviceType::cpu) {
+ std::copy(data(), data() + s, g.data());
+ std::copy(indices(), indices() + s, i.data());
+ }
+#ifdef CUDA_FOUND
+ else {
+ gpu::copy(backend_, data(), data() + s, g.data());
+ gpu::copy(backend_, indices(), indices() + s, i.data());
+ }
+#endif
+ }
+
+ // copy from cpu vector
+ void set(const std::vector<float>& g, const std::vector<int>& i) {
+ int s = std::min((int)g.size(), capacity());
+ size_ = s;
+ if(backend_->getDevice().type == DeviceType::cpu) {
+ std::copy(g.data(), g.data() + s, data());
+ std::copy(i.data(), i.data() + s, indices());
+ }
+#ifdef CUDA_FOUND
+ else {
+ gpu::copy(backend_, g.data(), g.data() + s, data());
+ gpu::copy(backend_, i.data(), i.data() + s, indices());
+ }
+#endif
+ }
+
void copyFrom(float* ndata, int* nindices, int nsize) {
size_ = nsize;
if(backend_->getDevice().type == DeviceType::cpu) {
@@ -77,11 +109,13 @@ public:
copyFrom(t->data(), t->indices(), t->size());
}
- void toDense(Tensor t, int offset) {
+ // Convert sparseTensor into a Tensor
+ void toDense(Tensor t, int offset = 0) {
t->set(0);
scatterAdd(t, offset);
}
+ // Convert a tensor into a sparse tensor format
void fromDense(Tensor t) {
if(backend_->getDevice().type == DeviceType::cpu) {
ABORT("Gradient Dropping for CPU is not yet supported");
@@ -94,6 +128,7 @@ public:
#endif
}
+ // Add t[indices[i]] += data[i]
void scatterAdd(Tensor t, int offset = 0) {
if(backend_->getDevice().type == DeviceType::cpu) {
ABORT("Gradient Dropping for CPU is not yet supported");
@@ -105,6 +140,30 @@ public:
#endif
}
+ // Add t[indices[i]] = data[i]
+ void scatterUpdate(Tensor t, int offset = 0) {
+ if(backend_->getDevice().type == DeviceType::cpu) {
+ ABORT("Gradient Dropping for CPU is not yet supported");
+ }
+#ifdef CUDA_FOUND
+ else {
+ gpu::scatterUpdate(t, data(), indices(), size(), offset);
+ }
+#endif
+ }
+
+ // data[i] = t[indices[i]]
+ void gather(Tensor t, int offset = 0) {
+ if(backend_->getDevice().type == DeviceType::cpu) {
+ ABORT("Gradient Dropping for CPU is not yet supported");
+ }
+#ifdef CUDA_FOUND
+ else {
+ gpu::gather(t, data(), indices(), size(), offset);
+ }
+#endif
+ }
+
std::shared_ptr<SparseTensorBase> subtensor(int pos, int subsize) {
int startOffset = 0;
int endOffset = 0;
@@ -118,8 +177,8 @@ public:
}
#ifdef CUDA_FOUND
else {
- std::vector<int> outputs = gpu::lower_bounds(
- indices(), values, size(), backend_->getDevice());
+ std::vector<int> outputs
+ = gpu::lower_bounds(indices(), values, size(), backend_->getDevice());
startOffset = outputs[0];
endOffset = outputs[1];
diff --git a/src/training/graph_group.h b/src/training/graph_group.h
index 96642d8b..74acf747 100644
--- a/src/training/graph_group.h
+++ b/src/training/graph_group.h
@@ -32,7 +32,7 @@ public:
virtual ~GraphGroup() {}
- virtual void update(Ptr<data::Batch>) = 0;
+ virtual void update(Ptr<data::Batch> batch) = 0;
virtual void load() = 0;
diff --git a/src/training/graph_group_async_drop.cpp b/src/training/graph_group_async_drop.cpp
index c0704c33..ae535b6b 100644
--- a/src/training/graph_group_async_drop.cpp
+++ b/src/training/graph_group_async_drop.cpp
@@ -8,113 +8,74 @@
namespace marian {
-Tensor AsyncGraphGroupDrop::newTensor(int size, Ptr<Backend> backend) {
- Tensor t;
- Ptr<TensorAllocator> allocator_ = New<TensorAllocator>(backend);
- allocator_->reserveExact(size * sizeof(float));
- allocator_->allocate(t, {1, size});
- allocators.push_back(allocator_);
-
- return t;
-}
-
void AsyncGraphGroupDrop::fetchParams(Tensor oldParams,
const std::vector<Tensor>& params,
int device_id) {
- using namespace functional;
- // @TODO read guard on parameters
- int pos = 0;
+ // Full fetch when fetching moving average OR still in warm-up period.
+ if(&params == &paramsAvg_ || fetchStep_[device_id]++ <= dropping_warmup) {
+ AsyncGraphGroup::fetchParams(oldParams, params, device_id);
+ return;
+ }
std::vector<std::thread> threads;
- for(int i = 0; i < devices_.size(); i++) {
+ int pos = 0;
+ for(int idx = 0; idx < devices_.size(); idx++) {
threads.emplace_back(std::thread(
- [&](int idx, int pos) {
+ [=](int idx, int pos) {
+ auto sparseGrad = sparseGrads_[device_id][idx];
+ auto sparseShard = sparseShards_[device_id][idx];
+
// individual mutex per-shard
std::lock_guard<std::mutex> guard(shardSync_[idx]);
- // normal fetch
- if(fetchStep_[device_id] <= dropping_warmup
- || &params == &paramsAvg_) { // Do not use sparse fetch when
- // fetching from paramsAvg
- oldParams->subtensor(pos, params[idx]->size())
- ->copyFrom(params[idx]);
- paramsLocal_[device_id][idx]->copyFrom(params[idx]);
- return;
- }
-
- // sparse fetch
- // get delta : params latest version - current param (locally)
- Element(_1 = _2 - _3,
- paramsDelta_[idx],
- params[idx],
- paramsLocal_[device_id][idx]);
-
- // update current local param
- paramsLocal_[device_id][idx]->copyFrom(params[idx]);
-
- // get sparse delta
- fetchDropper[device_id][idx]->dropGraph(paramsDelta_[idx],
- fetchSparseGradient_[idx],
- droping_rate,
- dropping_momentum);
-
- // move sparse delta
- fetchShardedSparseGradient_[device_id][idx]->copyFrom(
- fetchSparseGradient_[idx]);
-
- fetchShardedSparseGradient_[device_id][idx]->scatterAdd(
+ sparseShard->gather(params[idx]);
+ sparseGrad->copyFrom(sparseShard);
+ sparseGrad->scatterUpdate(
oldParams->subtensor(pos, params[idx]->size()));
},
- i,
+ idx,
pos));
pos += shardSize_;
}
-#if 0
- for(auto&& t : threads)
- t.join();
- // BUGBUG [compiler]: This fails to compile on VS 2015, for the comparison of the iterator with end()
-#else
- for (size_t i = 0; i < threads.size(); i++)
- threads[i].join();
-#endif
- fetchStep_[device_id]++;
+ for(size_t i = 0; i < threads.size(); i++)
+ threads[i].join();
}
void AsyncGraphGroupDrop::pushGradients(Tensor newGrads,
size_t batch_words,
int device_id) {
- if(pushStep_[device_id]++ <= dropping_warmup) {
+ if(pushStep_[device_id]++ < dropping_warmup) {
AsyncGraphGroup::pushGradients(newGrads, batch_words, device_id);
return;
}
- // get the sparse gradient
- pushDropper_[device_id]->dropGraph(newGrads,
- pushSparseGradient_[device_id],
- droping_rate,
- dropping_momentum);
-
- SparseTensor newSparseGrads = pushSparseGradient_[device_id];
// add instead of copy?
std::vector<std::thread> threads;
int pos = 0;
for(int idx = 0; idx < devices_.size(); idx++) {
threads.emplace_back(std::thread(
[=](int idx, int pos) {
+ auto dropper = droppers_[device_id][idx];
+ auto sparseGrad = sparseGrads_[device_id][idx];
+ auto sparseShard = sparseShards_[device_id][idx];
+ auto tensor = newGrads->subtensor(pos, grads_[idx]->size());
// individual mutex per-shard
std::lock_guard<std::mutex> guard(shardSync_[idx]);
- // split to shard
- SparseTensor subGrad
- = newSparseGrads->subtensor(pos, grads_[idx]->size());
+ // drop the gradients
+ dropper->dropGraph(
+ tensor, sparseGrad, droping_rate, dropping_momentum);
// send the sharded sparse tensor
- pushShardedSparseGradient_[idx]->copyFrom(subGrad);
+ sparseShard->copyFrom(sparseGrad);
// convert back to dense, store it in grads_[idx]
- pushShardedSparseGradient_[idx]->toDense(grads_[idx], -pos);
+ // sparseShard indices is equal to the indices of the sparse gradient
+ // which will be used for sparse fetching
+ sparseShard->toDense(grads_[idx]);
+ // optimize
if(scaleLearningRate_) {
shardOpt_[idx]->update(
params_[idx], grads_[idx], batch_words / avgBatchWords_);
@@ -125,7 +86,6 @@ void AsyncGraphGroupDrop::pushGradients(Tensor newGrads,
if(movingAvg_)
updateMovingAverage(
paramsAvg_[idx], params_[idx], scheduler_->numberOfBatches());
-
},
idx,
pos));
@@ -140,51 +100,34 @@ void AsyncGraphGroupDrop::init(Ptr<data::Batch> batch) {
AsyncGraphGroup::init(batch);
// extra inits for gradient dropping
if(drop_first) {
- int totalSize = graphs_[0]->params()->vals()->size();
- int sparseCap = totalSize * 1.5 * (1.0 - droping_rate);
- int shardSize = ceil(totalSize / devices_.size());
-
- for(int i = 0; i < devices_.size(); i++)
- paramsLocal_.push_back(std::vector<Tensor>());
-
for(int i = 0; i < devices_.size(); i++) {
// warm-up counter
fetchStep_.push_back(0);
pushStep_.push_back(0);
+ fetch_ready.push_back(false);
- // temporary tensor to compute parameter delta before fetching
- paramsDelta_.push_back(newTensor(shardSize, graphs_[i]->getBackend()));
-
- // tensors to store local params history
- for(int h_id = 0; h_id < devices_.size(); h_id++) {
- Tensor tmp = newTensor(params_[i]->size(), graphs_[i]->getBackend());
- tmp->copyFrom(params_[i]);
- paramsLocal_[h_id].push_back(tmp);
- }
+ // Size of the sparse tensor
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int sparseCap = totalSize * 1.2 * (1.0 - droping_rate);
- // individual Gradient dropper per-device
- pushDropper_.push_back(PrepareGradientDrop(graphs_[i]->getDevice()));
-
- // N-dropper for fetch
+ // prepare droppers
std::vector<GradientDrop> tmpDropper;
for(auto device : devices_)
tmpDropper.push_back(PrepareGradientDrop(graphs_[i]->getDevice()));
- fetchDropper.push_back(tmpDropper);
-
- // sparsetensor to store sparsified gradients per-device
- pushSparseGradient_.push_back(SparseTensor(
- new SparseTensorBase(sparseCap, graphs_[i]->getBackend())));
-
- pushShardedSparseGradient_.push_back(SparseTensor(
- new SparseTensorBase(sparseCap, graphs_[i]->getBackend())));
- fetchSparseGradient_.push_back(SparseTensor(new SparseTensorBase(
- sparseCap / devices_.size(), graphs_[i]->getBackend())));
+ droppers_.push_back(tmpDropper);
+ // sparsetensor to store sparsified gradients per-device per-shard
std::vector<SparseTensor> tmp;
for(int j = 0; j < devices_.size(); j++)
tmp.push_back(SparseTensor(new SparseTensorBase(
sparseCap / devices_.size(), graphs_[i]->getBackend())));
- fetchShardedSparseGradient_.push_back(tmp);
+ sparseGrads_.push_back(tmp);
+
+ std::vector<SparseTensor> tmp2;
+ for(int j = 0; j < devices_.size(); j++)
+ tmp2.push_back(SparseTensor(new SparseTensorBase(
+ sparseCap / devices_.size(), graphs_[j]->getBackend())));
+ sparseShards_.push_back(tmp2);
}
drop_first = false;
}
diff --git a/src/training/graph_group_async_drop.h b/src/training/graph_group_async_drop.h
index bd9b6626..3c1be04d 100644
--- a/src/training/graph_group_async_drop.h
+++ b/src/training/graph_group_async_drop.h
@@ -10,6 +10,7 @@ namespace marian {
class AsyncGraphGroupDrop : public AsyncGraphGroup {
std::vector<int> fetchStep_;
std::vector<int> pushStep_;
+ std::vector<bool> fetch_ready;
bool drop_first = 1;
@@ -17,21 +18,9 @@ class AsyncGraphGroupDrop : public AsyncGraphGroup {
float droping_rate;
float dropping_momentum;
- std::vector<GradientDrop> pushDropper_;
- std::vector<std::vector<GradientDrop>> fetchDropper;
+ std::vector<std::vector<GradientDrop>> droppers_;
- std::vector<SparseTensor> pushSparseGradient_;
- std::vector<SparseTensor> pushShardedSparseGradient_;
-
- std::vector<SparseTensor> fetchSparseGradient_;
- std::vector<std::vector<SparseTensor>> fetchShardedSparseGradient_;
-
- std::vector<Tensor> paramsDelta_;
- std::vector<std::vector<Tensor>> paramsLocal_;
-
- std::vector<Ptr<TensorAllocator>> allocators;
-
- Tensor newTensor(int size, Ptr<Backend> backend);
+ std::vector<std::vector<SparseTensor>> sparseGrads_, sparseShards_;
protected:
void init(Ptr<data::Batch> batch);
diff --git a/src/training/graph_group_multinode.cpp b/src/training/graph_group_multinode.cpp
index 16a128c4..a2879d6c 100644
--- a/src/training/graph_group_multinode.cpp
+++ b/src/training/graph_group_multinode.cpp
@@ -52,19 +52,19 @@ void MultiNodeGraphGroup::init(Ptr<data::Batch> batch) {
}
// setup delayed gradient storage
- if (tau_ > 1) {
+ if(tau_ > 1) {
delay_count = std::vector<size_t>(mpi_comm_world_size_);
totalBatchWords = std::vector<int>(mpi_comm_world_size_);
optDelayMutex_ = std::vector<std::mutex>(mpi_comm_world_size_);
-
- for (int i = 0;i < mpi_comm_world_size_; i++) {
+
+ for(int i = 0; i < mpi_comm_world_size_; i++) {
// Shard buffers across GPUs
auto backend = clientGraphs_[i % devices_.size()]->getBackend();
Tensor accGrad = newTensor(nodeSizes_[i], backend);
Tensor accGradBuff = newTensor(nodeSizes_[i], backend);
accGradients.push_back(accGrad);
accGradientBuffer.push_back(accGradBuff);
- }
+ }
}
}
@@ -221,7 +221,7 @@ void MultiNodeGraphGroup::calculateShardSizes() {
*/
void MultiNodeGraphGroup::initShardGpuTensors() {
size_t offset = 0;
- for (int i = 0; i < mpi_my_rank_; i++) {
+ for(int i = 0; i < mpi_my_rank_; i++) {
offset += nodeSizes_[i];
}
for(int shard = 0; shard < devices_.size(); shard++) {
@@ -242,7 +242,8 @@ void MultiNodeGraphGroup::initShardGpuTensors() {
* updated parameters.
*/
void MultiNodeGraphGroup::launchServerThread() {
-#if MPI_FOUND
+// @TODO: move CUDA stuff into separate .cu files and remove '&& CUDA_FOUND'
+#if MPI_FOUND && CUDA_FOUND
serverShardThread_ = new std::thread([this] {
// keep track of number of nodes still communicating with this shard
int nCommunicatingNodes = mpi_comm_world_size_;
@@ -400,7 +401,8 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads,
Tensor oldParams,
int gpu,
size_t batchWords) {
-#if MPI_FOUND
+// @TODO: move CUDA stuff into separate .cu files and remove '&& CUDA_FOUND'
+#if MPI_FOUND && CUDA_FOUND
size_t offset = 0;
for(int node = 0; node < mpi_comm_world_size_; node++) {
size_t nodeSize = nodeSizes_[node];
@@ -410,9 +412,10 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads,
Tensor gradient;
// Delayed Gradient Update
- if (tau_ > 1) {
+ if(tau_ > 1) {
std::lock_guard<std::mutex> guard(optDelayMutex_[node]);
- accGradientBuffer[node]->copyFrom(newGrads->subtensor(offset, nodeSize));
+ accGradientBuffer[node]->copyFrom(
+ newGrads->subtensor(offset, nodeSize));
// Accumulate the gradient
using namespace functional;
Element(_1 += _2, accGradients[node], accGradientBuffer[node]);
@@ -420,14 +423,14 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads,
totalBatchWords[node] += batchWords;
delay_count[node]++;
- if (delay_count[node] < tau_)
+ if(delay_count[node] < tau_)
continue;
delay_count[node] = 0;
gradient = accGradients[node];
batchWords = totalBatchWords[node];
- } else {
+ } else {
gradient = newGrads->subtensor(offset, nodeSize);
- }
+ }
// Copy grads from GPU to CPU (for MPI sending)
cudaMemcpy(clientCommBuffersCPU_[gpu].data(),
@@ -455,7 +458,7 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads,
MPI_TAG_GRAD_PUSH_,
MPI_COMM_WORLD);
// Reset total gradient and batch words
- if (tau_ > 1) {
+ if(tau_ > 1) {
std::lock_guard<std::mutex> guard(optDelayMutex_[node]);
accGradients[node]->set(0);
totalBatchWords[node] = 0;
@@ -554,9 +557,9 @@ void MultiNodeGraphGroup::execute(Ptr<data::Batch> batch) {
auto costNode = builder->build(graph, batch);
#if MPI_FOUND
- if (t == 0) {
+ if(t == 0) {
MPI_Barrier(MPI_COMM_WORLD);
- if (my_id != 0)
+ if(my_id != 0)
graph->params()->vals()->copyFrom(clientGraphs_[0]->params()->vals());
MPI_Barrier(MPI_COMM_WORLD);
}
@@ -628,20 +631,19 @@ void MultiNodeGraphGroup::execute(Ptr<data::Batch> batch) {
// Wait until the thread that wants to do validation is finished.
clientThreadPool_->wait_for_one(lock);
- if (options_->get<std::string>("cost-type") != "ce-sum")
+ if(options_->get<std::string>("cost-type") != "ce-sum")
cost /= tau_;
- if (tau_ > 1) {
+ if(tau_ > 1) {
std::vector<size_t> fakeLength = {1, 1};
- auto fb = data::CorpusBatch::fakeBatch(fakeLength,
- num_seen_sentences,
- NULL);
+ auto fb = data::CorpusBatch::fakeBatch(
+ fakeLength, num_seen_sentences, NULL);
fb->front()->setWords(num_seen_words);
scheduler_->update(cost, fb);
} else {
scheduler_->update(cost, batch);
}
-
+
num_seen_words = 0;
num_seen_sentences = 0;
cost = 0;
@@ -653,11 +655,11 @@ void MultiNodeGraphGroup::execute(Ptr<data::Batch> batch) {
// a safe state.
clientThreadPool_->wait_for_others(lock);
#if MPI_FOUND
- //wait until other nodes are ready
+ // wait until other nodes are ready
MPI_Barrier(MPI_COMM_WORLD);
-
+
// TODO: Saving is broken
- //if(mpi_my_rank_ == 0 && scheduler_->saving())
+ // if(mpi_my_rank_ == 0 && scheduler_->saving())
// this->save(graph);
if(mpi_my_rank_ == 0 && scheduler_->validating())
diff --git a/src/training/graph_group_multinode.h b/src/training/graph_group_multinode.h
index b106bb1a..e34ad959 100644
--- a/src/training/graph_group_multinode.h
+++ b/src/training/graph_group_multinode.h
@@ -2,6 +2,9 @@
#if MPI_FOUND
#include "mpi.h"
+#endif
+
+#ifdef CUDA_FOUND
#include "cuda_runtime.h"
#endif
@@ -124,7 +127,7 @@ protected:
int mpi_comm_world_size_{1};
/**
- * Flag to indicate that an MPI message contains message info
+ * Flag to indicate that an MPI message contains message info
* before sending the gradient (client -> server).
*/
static const int MPI_TAG_GRAD_PUSH_MSG_{0};
@@ -233,7 +236,7 @@ protected:
/**
* LocalOptimizers related variables
*/
- bool useLocalOpt_;
+ // bool useLocalOpt_;
/**
* Allocate new tensor on given GPU and store allocator.
@@ -405,10 +408,10 @@ public:
MultiNodeGraphGroup(Ptr<Config> options)
: GraphGroup(options),
tau_{options_->get<size_t>("optimizer-delay")},
- useLocalOpt_{options_->get<bool>("multi-node-local-optimizers")},
+ // useLocalOpt_{options_->get<bool>("multi-node-local-optimizers")},
clientCommOverlap{options_->get<bool>("multi-node-overlap")} {
// Set up devices for this node
- setupMPI(); //Setup MPI before creating device vectors
+ setupMPI(); // Setup MPI before creating device vectors
std::vector<size_t> devices;
for(auto& d : options_->getDevices())
devices.push_back(d.no);
@@ -526,8 +529,6 @@ public:
return GraphGroup::collectStats(clientGraphs_[0], clientBuilders_[0]);
}
- virtual void finalize() {
- finalized_ = true;
- }
+ virtual void finalize() { finalized_ = true; }
};
}
diff --git a/src/training/graph_group_multinode_sync.cpp b/src/training/graph_group_multinode_sync.cpp
new file mode 100644
index 00000000..dc3f4718
--- /dev/null
+++ b/src/training/graph_group_multinode_sync.cpp
@@ -0,0 +1,280 @@
+#include "training/graph_group_multinode_sync.h"
+#include "functional/functional.h"
+#include "tensors/tensor_operators.h"
+
+namespace marian {
+
+void MultiNodeGraphGroupSync::updateMovingAverage(Tensor paramsAvg,
+ Tensor params,
+ size_t batches) {
+ using namespace functional;
+ float decay
+ = std::max(mvDecay_, 1.f - (float)(batches + 1) / (float)(batches + 10));
+ Element(_1 = ((1.f - decay) * _1) + (decay * _2), paramsAvg, params);
+}
+
+/**
+ * Set given scheduler to register training observers on the shard optimizers.
+ */
+void MultiNodeGraphGroupSync::setScheduler(Ptr<Scheduler> scheduler) {
+ scheduler_ = scheduler;
+ // optimizer has to be registered last to see a change of learning rate
+ scheduler_->registerTrainingObserver(scheduler_);
+
+ scheduler_->registerTrainingObserver(syncOptimizer_);
+
+}
+
+/**
+ * Allocate new tensor on given GPU and store allocator.
+ */
+Tensor MultiNodeGraphGroupSync::newTensor(int size, Ptr<Backend> backend) {
+ Tensor t;
+ Ptr<TensorAllocator> allocator = New<TensorAllocator>(backend);
+ allocator->reserveExact(size * sizeof(float));
+ allocator->allocate(t, {1, size});
+ allocators_.push_back(allocator);
+ return t;
+}
+
+/**
+ * Setup training environment and launch server thread and (if enabled) client
+ * communication overlap threads.
+ * Includes setting up MPI, node and shard sizes, clients, server shards and
+ * communication overlap stuff.
+ */
+void MultiNodeGraphGroupSync::init(Ptr<data::Batch> batch) {
+ // Setup clients and shards
+ setupClients(batch);
+ int network_size = clientGraphs_[0]->params()->vals()->size();
+ LOG(info, "model size = {} float params" , network_size);
+ if (movingAvg_)
+ paramsAvg_ = newTensor(network_size, clientGraphs_.back()->getBackend());
+
+ // setup sync sgd storage, We keep the summed gradient on Node 0
+ sumGradientBuffer = newTensor(network_size, clientGraphs_[0]->getBackend());
+ accGradientsSync = newTensor(network_size, clientGraphs_[0]->getBackend());
+}
+
+
+/**
+ * Initialize the CPU arrays, with pinned memory for faster CudaMemCpy operations.
+ * Requires the graph to be initialized first so we know its size
+ */
+void MultiNodeGraphGroupSync::initCPUArrays() {
+ accGradientsSync_cpu = std::vector<float>(clientGraphs_[0]->params()->vals()->size());
+ receiveBuffer_cpu = std::vector<float>(clientGraphs_[0]->params()->vals()->size());
+}
+
+/**
+ * Setup MPI world size and rank of this node.
+ */
+void MultiNodeGraphGroupSync::setupMPI() {
+#if MPI_FOUND
+ MPI_Comm_size(MPI_COMM_WORLD, &mpi_comm_world_size_);
+ MPI_Comm_rank(MPI_COMM_WORLD, &mpi_my_rank_);
+#endif
+}
+
+/**
+ * Setup clients that will compute gradients and communicate them with the
+ * server shards.
+ * There is one client per GPU.
+ */
+void MultiNodeGraphGroupSync::setupClients(Ptr<data::Batch> batch) {
+ runBatchThroughClientGraphs(batch);
+ initCPUArrays();
+}
+
+/**
+ * Initialize the graphs (models) of all clients on this node with the given
+ * batch.
+ */
+void MultiNodeGraphGroupSync::runBatchThroughClientGraphs(Ptr<data::Batch> batch) {
+ for(int i = 0; i < devices_.size(); i++) {
+ THREAD_GUARD(clientBuilders_[i]->build(clientGraphs_[i], batch);
+ clientGraphs_[i]->forward();
+ clientGraphs_[i]->getBackend()->synchronize(););
+ }
+}
+
+/**
+ * Initialize variables required for overlapping client computations and
+ * communication.
+ * Includes summed and committed word counts, buffer flags, mutexes and
+ * condition variables.
+ */
+void MultiNodeGraphGroupSync::sumGRAD(Tensor gradient) {
+ std::lock_guard<std::mutex> guard(sumGradientMutex_);
+ sumGradientBuffer->copyFrom(gradient);
+ using namespace functional; //@TODO makes more sense to do that on the CPU i think
+ Element(_1 += _2, accGradientsSync, sumGradientBuffer);
+}
+
+/**
+ * If it's rank 0, it's a local update, if it's rank one it's remote
+ * send and receive. Make sure you only call from device 0.
+ */
+void MultiNodeGraphGroupSync::sendReceiveUpdateSync() {
+ #if MPI_FOUND
+ int network_size = accGradientsSync_cpu.size();
+
+ // Copy the data to the CPU
+ accGradientsSync->get(accGradientsSync_cpu);
+
+ // Wait until all nodes are ready
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ int reduce_result = MPI_Allreduce(accGradientsSync_cpu.data(), //CPU buffers
+ receiveBuffer_cpu.data(),
+ network_size,
+ MPI_FLOAT,
+ MPI_SUM,
+ MPI_COMM_WORLD);
+
+ // Copy the data back to the GPU and do optimizer update
+ // Do update with last GPU to distribute the memory
+ clientGraphs_.back()->params()->grads()->set(receiveBuffer_cpu);
+
+ // Perform optimizer step
+ syncOptimizer_->update(clientGraphs_.back());
+
+ if(movingAvg_)
+ updateMovingAverage(
+ paramsAvg_, clientGraphs_.back()->params()->vals(),
+ scheduler_->numberOfBatches());
+
+ //Distribute the graph to the rest of the devices
+ std::vector<std::thread> threads;
+ for(int idx = 0; idx < devices_.size() - 1; idx++) {
+ threads.emplace_back(std::thread(
+ [=](int idx) {
+ clientGraphs_[idx]->params()->vals()->copyFrom(
+ clientGraphs_.back()->params()->vals());
+ },
+ idx));
+ }
+ for(auto&& t : threads) {
+ t.join();
+ }
+
+ //set the accumulating buffers to zero;
+ accGradientsSync->set(0);
+ std::fill(accGradientsSync_cpu.begin(), accGradientsSync_cpu.end(), 0);
+ std::fill(receiveBuffer_cpu.begin(), receiveBuffer_cpu.end(), 0);
+ #endif
+}
+
+
+/**
+ * Execute given batch on this node, pushing/pulling the resulting
+ * gradients/parameters to/from the server shards
+ * or -- if comm. overlap enabled -- to/from the communication buffers, summing
+ * gradients locally if the communication thread is busy
+ *
+ * @param batch Batch on which to perform forward and backward passes.
+ */
+void MultiNodeGraphGroupSync::execute(Ptr<data::Batch> fullBatch) {
+ if(!initialized_) {
+ init(fullBatch);
+ initialized_ = true;
+ }
+
+ std::vector<Ptr<data::Batch>> batches = fullBatch->split(devices_.size());
+
+ static int t = 0;
+
+ static float cost = 0;
+ static size_t num_seen_words = 0;
+ static size_t num_seen_sentences = 0;
+
+ {
+ auto task = [this, batches](int my_id) {
+ auto batch = batches[my_id];
+ auto graph = clientGraphs_[my_id];
+ auto builder = clientBuilders_[my_id];
+
+ auto costNode = builder->build(graph, batch);
+
+ if (t == 0) {
+ if (my_id != 0)
+ graph->params()->vals()->copyFrom(clientGraphs_[0]->params()->vals());
+ }
+
+ graph->forward();
+ {
+ std::lock_guard<std::mutex> guard(sumCostMutex_);
+ cost += costNode->scalar();
+ num_seen_words += batch->words();
+ num_seen_sentences += batch->size();
+ }
+ graph->backward();
+
+ graph->getBackend()->synchronize(); //@Alham do you know why we need this here?
+
+ sumGRAD(graph->params()->grads());
+ };
+
+ ThreadPool pool(devices_.size(), devices_.size());
+ for(int idx = 0; idx < devices_.size(); ++idx)
+ pool.enqueue(task, idx);
+ }
+
+ if (t % tau_ == 0)
+ sendReceiveUpdateSync();
+
+ t++;
+
+ // Run scheduler (if enabled)
+ if(t % tau_ == 0 && scheduler_) {
+ if (options_->get<std::string>("cost-type") != "ce-sum")
+ cost /= (tau_ * devices_.size());
+
+ if (tau_ > 1) {
+ std::vector<size_t> fakeLength = {1, 1};
+ auto fb = data::CorpusBatch::fakeBatch(fakeLength,
+ num_seen_sentences,
+ NULL);
+ fb->front()->setWords(num_seen_words);
+ scheduler_->update(cost, fb);
+ } else {
+ scheduler_->update(cost, fullBatch);
+ }
+
+ num_seen_words = 0;
+ num_seen_sentences = 0;
+ cost = 0;
+
+ if((scheduler_->saving() || scheduler_->validating())) {
+ #if MPI_FOUND
+ //wait until other nodes are ready
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ // TODO: Saving is broken
+ //if(mpi_my_rank_ == 0 && scheduler_->saving())
+ // this->save(graph);
+
+ if(mpi_my_rank_ == 0 && scheduler_->validating()) {
+ // temporarily save current params
+ if(movingAvg_)
+ accGradientsSync->copyFrom(clientGraphs_[0]->params()->vals());
+
+ if(movingAvg_)
+ for(auto graph : clientGraphs_)
+ graph->params()->vals()->copyFrom(paramsAvg_);
+
+ scheduler_->validate(clientGraphs_);
+
+ if(movingAvg_)
+ for(auto graph : clientGraphs_)
+ graph->params()->vals()->copyFrom(accGradientsSync);
+ }
+
+ // inform other nodes to continue
+ MPI_Barrier(MPI_COMM_WORLD);
+ #endif
+ }
+ }
+
+}
+}
diff --git a/src/training/graph_group_multinode_sync.h b/src/training/graph_group_multinode_sync.h
new file mode 100644
index 00000000..f13e8890
--- /dev/null
+++ b/src/training/graph_group_multinode_sync.h
@@ -0,0 +1,305 @@
+#pragma once
+
+#if MPI_FOUND
+#include "mpi.h"
+#endif
+
+#ifdef CUDA_FOUND
+#include "cuda_runtime.h"
+#endif
+
+#include <condition_variable>
+#include <future>
+#include <thread>
+
+#include <boost/filesystem.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+#include "3rd_party/threadpool.h"
+#include "training/graph_group.h"
+
+namespace marian {
+
+/**
+ * Multi-node graph group for asynchronous training over multiple
+ * machines each with one or multiple GPUs
+ */
+class MultiNodeGraphGroupSync : public GraphGroup {
+public:
+ virtual void setScheduler(Ptr<Scheduler> scheduler);
+
+protected:
+ ////////////////////////////////////////////////////////////////////////////
+ // General variables.
+
+ /** Number of clients on nodes in MPI world (cluster). */
+ std::vector<int> numberClientsOfNodes_; //@TODO not used for now, but might
+ // be useful maybe?
+
+ /** Whether graph group has been properly initialized with a first batch. */
+ bool initialized_{false};
+
+ /** Memory allocators for tensors (GPUs). */
+ std::vector<Ptr<TensorAllocator>> allocators_;
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Client variables.
+
+ /** Graph builders for clients (which run forward and backward passes). */
+ std::vector<Ptr<models::ModelBase>> clientBuilders_;
+
+ /** Graphs of clients. */
+ std::vector<Ptr<ExpressionGraph>> clientGraphs_;
+
+ /** Devices (GPUs) on this node. */
+ std::vector<size_t> devices_;
+
+ /** Mutex to ensure clients are uniquely assigned to graphs and builders. */
+ std::mutex mutexClientInit_;
+
+ /** Mutex to avoid race conditions in scheduler. */
+ std::mutex schedulerMutex_;
+
+ /**
+ * Batch number counter used for evenly distributing mini-batches across
+ * nodes.
+ */
+ size_t batchIter_ = 0;
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Communication variables.
+
+ /** MPI rank of this node. */
+ int mpi_my_rank_{0};
+
+ /** Number of nodes in MPI world (cluster). */
+ int mpi_comm_world_size_{1};
+
+ /**
+ * Variables for optimizer delay and synchronous SGD
+ */
+ size_t tau_{1};
+ std::mutex sumGradientMutex_;
+ std::mutex updateParamsMutex_;
+ std::mutex sumCostMutex_;
+ Tensor accGradientsSync;
+ Tensor sumGradientBuffer;
+ Tensor paramsAvg_;
+ std::vector<float> accGradientsSync_cpu;
+ std::vector<float> receiveBuffer_cpu;
+ bool synchronization_happened{false};
+
+ Ptr<OptimizerBase> syncOptimizer_;
+
+ std::vector<std::mutex> optDelayMutex_;
+ std::vector<size_t> delay_count;
+ std::vector<int> totalBatchWords;
+ std::vector<Tensor> accGradients, accGradientBuffer;
+
+ bool movingAvg_{false};
+ float mvDecay_{1e-4};
+
+ /**
+ * Allocate new tensor on given GPU and store allocator.
+ */
+ Tensor newTensor(int size, Ptr<Backend> backend);
+
+ /*
+ * exponential smoothing
+ */
+ void updateMovingAverage(Tensor paramsAvg, Tensor params, size_t batches);
+
+ /**
+ * Setup training environment and launch server thread and (if enabled) client
+ * communication overlap threads..
+ * Includes setting up MPI, node and shard sizes, clients, server shards and
+ * communication overlap stuff.
+ */
+ virtual void init(Ptr<data::Batch> batch);
+
+ /**
+ * Setup MPI world size and rank of this node.
+ */
+ void setupMPI();
+
+ /**
+ * Setup clients that will compute gradients and communicate them with the
+ * server shards.
+ * There is one client per GPU.
+ */
+ void setupClients(Ptr<data::Batch> batch);
+
+ /**
+ * Initialize the graphs (models) of all clients on this node with the given
+ * batch.
+ */
+ void runBatchThroughClientGraphs(Ptr<data::Batch> batch);
+
+ /**
+ * Initialize the CPU arrays, with pinned memory for faster CudaMemCpy
+ * operations.
+ */
+ void initCPUArrays();
+
+ /**
+ * Sums the gradients from a node, taking care of locking
+ * @param gradient - the gradient
+ */
+
+ void sumGRAD(Tensor gradient);
+
+ /**
+ * Does the MPI Communication, parameter update and copying back parameters.
+ * @TODO ALHAM. God function too godly?
+ */
+ void sendReceiveUpdateSync();
+
+ void execute(Ptr<data::Batch> batch);
+
+ /**
+ * Load the GPU configuration of this node (i.e. which GPUs to use) and the
+ * number of GPUs on the other nodes.
+ */
+ void loadDeviceConfig(std::vector<size_t> deviceConfig) {
+ size_t index = 0, node = 0, nClientsSeen = 0;
+ numberClientsOfNodes_ = std::vector<int>(mpi_comm_world_size_, 0);
+ while(index < deviceConfig.size()) {
+ if(numberClientsOfNodes_[node] == 0) {
+ numberClientsOfNodes_[node] = deviceConfig[index];
+ nClientsSeen = 0;
+ } else if(nClientsSeen < numberClientsOfNodes_[node]) {
+ if(node == mpi_my_rank_) {
+ devices_.push_back(deviceConfig[index]);
+ }
+ nClientsSeen++;
+ } else {
+ node++;
+ index--;
+ }
+ index++;
+ }
+ }
+
+public:
+ /**
+ * (Constructor) Call super class and initialize client graphs and builders.
+ */
+ MultiNodeGraphGroupSync(Ptr<Config> options)
+ : GraphGroup(options),
+ tau_{options_->get<size_t>("optimizer-delay")},
+ movingAvg_{options_->get<float>("exponential-smoothing") > 0},
+ mvDecay_{options_->get<float>("exponential-smoothing")},
+ syncOptimizer_{Optimizer(options_)} {
+ // Set up devices for this node
+ setupMPI(); // Setup MPI before creating device vectors
+ std::vector<size_t> devices;
+ for(auto& d : options_->getDevices())
+ devices.push_back(d.no);
+ loadDeviceConfig(devices);
+
+ // Create builders and graphs for clients.
+ for(size_t i = 0; i < devices_.size(); i++) {
+ clientGraphs_.push_back(New<ExpressionGraph>());
+ clientGraphs_[i]->setDevice({devices_[i], DeviceType::gpu});
+ clientGraphs_[i]->reserveWorkspaceMB(options_->get<size_t>("workspace"));
+ clientBuilders_.push_back(
+ models::from_config(options_, models::usage::training));
+ }
+ }
+
+ /**
+ * Update any client model with given batch if batch is assigned to this node.
+ */
+ void update(Ptr<data::Batch> batch) {
+ ABORT_IF(finalized_, "Training has already finished.");
+ if(batchIter_ % mpi_comm_world_size_
+ == mpi_my_rank_) { // Only take batch assigned to this node
+ execute(batch);
+ }
+ batchIter_++;
+ }
+
+ /**
+ * Load models from disk if file exists and setting is not disabled
+ */
+ void load() {
+ if(!options_->get<bool>("no-reload")) {
+ std::string name = options_->get<std::string>("model");
+
+ if(boost::filesystem::exists(name)) {
+ if(scheduler_)
+ scheduler_->load(name);
+ size_t i = 0;
+ for(auto graph : clientGraphs_)
+ clientBuilders_[i++]->load(graph, name);
+ } else if(options_->has("pretrained-model")) {
+ std::string init = options_->get<std::string>("pretrained-model");
+ LOG(info,
+ "Initialize model weights with the pre-trained model {}",
+ init);
+ size_t i = 0;
+ for(auto graph : clientGraphs_)
+ clientBuilders_[i++]->load(graph, init, false);
+ }
+ }
+ }
+
+ /**
+ * Save model of first client's graph to disk
+ */
+ void save(bool final = false) { save(clientGraphs_[0], final); }
+
+ /**
+ * Save model of given graph to disk.
+ */
+ void save(Ptr<ExpressionGraph> graph, bool final = false) {
+ int idx = 0;
+ for(int i = 0; i < clientGraphs_.size(); ++i) {
+ if(graph == clientGraphs_[i]) {
+ idx = i;
+ break;
+ }
+ }
+
+ if(options_->get<bool>("overwrite")) {
+ std::string name = options_->get<std::string>("model");
+
+ clientBuilders_[idx]->save(clientGraphs_[idx], name, true);
+ if(scheduler_)
+ scheduler_->save(name);
+ } else {
+ std::string name = options_->get<std::string>("model");
+
+ if(!final) {
+ std::string numberOfBatches
+ = scheduler_ ? std::to_string(scheduler_->numberOfBatches())
+ : "unknown";
+ std::string nameOverwrite = name;
+ nameOverwrite.replace(
+ name.size() - 4, 4, ".iter" + numberOfBatches + ".npz");
+ clientBuilders_[idx]->save(clientGraphs_[idx], nameOverwrite);
+ }
+
+ clientBuilders_[idx]->save(clientGraphs_[idx], name, true);
+ if(scheduler_)
+ scheduler_->save(name);
+ }
+ }
+
+ /**
+ * Collect statistics from first client's graph.
+ */
+ Ptr<data::BatchStats> collectStats() {
+ return GraphGroup::collectStats(
+ clientGraphs_[0], clientBuilders_[0], devices_.size());
+ }
+
+ virtual void finalize() {
+ finalized_ = true;
+#if MPI_FOUND
+ MPI_Finalize();
+#endif
+ }
+};
+}
diff --git a/src/training/graph_group_sync.cpp b/src/training/graph_group_sync.cpp
index 1f4d4caf..7fe88f7a 100644
--- a/src/training/graph_group_sync.cpp
+++ b/src/training/graph_group_sync.cpp
@@ -4,6 +4,27 @@
namespace marian {
+SyncGraphGroup::SyncGraphGroup(Ptr<Config> config)
+ : GraphGroup(config),
+ devices_{options_->getDevices()},
+ movingAvg_{options_->get<float>("exponential-smoothing") > 0},
+ mvDecay_{options_->get<float>("exponential-smoothing")},
+ delay_{options_->get<size_t>("optimizer-delay")} {
+
+ for(auto device : devices_) {
+ auto graph = New<ExpressionGraph>();
+ graph->setDevice(device);
+ graph->reserveWorkspaceMB(options_->get<size_t>("workspace"));
+ graph->getBackend()->setClip(options_->get<float>("clip-gemm"));
+
+ graphs_.push_back(graph);
+ shardOpt_.push_back(Optimizer(options_));
+ builders_.push_back(models::from_config(options_, models::usage::training));
+ }
+
+ comm_ = createCommunicator(graphs_, options_->get<bool>("no-nccl", false));
+}
+
void SyncGraphGroup::setScheduler(Ptr<Scheduler> scheduler) {
scheduler_ = scheduler;
// optimizer has to be registered last to see changes of learning rate
@@ -22,169 +43,133 @@ void SyncGraphGroup::updateMovingAverage(Tensor paramsAvg,
Element(_1 = ((1.f - decay) * _1) + (decay * _2), paramsAvg, params);
}
-void SyncGraphGroup::fetchParams(Tensor oldParams,
- const std::vector<Tensor>& params) {
- // @TODO read guard on parameters
- int pos = 0;
- std::vector<std::thread> threads;
- for(int idx = 0; idx < devices_.size(); idx++) {
- threads.emplace_back(std::thread(
- [=](int idx, int pos) {
- oldParams->subtensor(pos, params[idx]->size())->copyFrom(params[idx]);
- },
- idx,
- pos));
- pos += shardSize_;
- }
- for(auto&& t : threads) {
- t.join();
+void SyncGraphGroup::initialize(const std::vector<Ptr<data::Batch>>& batches) {
+ // Initialize 0th graph with random weights in one forward step
+ {
+ THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]);
+ graphs_[0]->forward(););
+
+ // Copy weights from 0th graph to all other graphs
+ // to have equal weights across devices
+ ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1);
+ for(size_t i = 1; i < graphs_.size(); ++i) {
+ auto init = [&](size_t i) {
+ // initialize i-th graph and weights
+ builders_[i]->build(graphs_[i], batches[0]);
+ graphs_[i]->forward();
+ // overwrite weights of i-th graph with weights from 0th graph
+ graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals());
+ };
+ pool.enqueue(init, i);
+ }
}
-}
-
-void SyncGraphGroup::execute(Ptr<data::Batch> fullBatch) {
- std::vector<Ptr<data::Batch>> delayedBatches =
- delay_ > 1 ?
- fullBatch->split(delay_) :
- std::vector<Ptr<data::Batch>>({ fullBatch });
-
- std::vector<float> costs(devices_.size(), 0.f);
-
- size_t t = 1;
- for(auto batch : delayedBatches) {
- std::vector<Ptr<data::Batch>> batches = batch->split(devices_.size());
-
- if(first_) {
- {
- THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]);
- graphs_[0]->forward(););
-
- ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1);
- for(size_t i = 1; i < graphs_.size(); ++i) {
- auto init = [&](size_t i) {
- builders_[i]->build(graphs_[i], batches[0]);
- graphs_[i]->forward();
- graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals());
- };
- pool.enqueue(init, i);
- }
- }
- if(params_.size() == 0) {
- int totalSize = graphs_[0]->params()->vals()->size();
- shardSize_ = ceil(totalSize / (float)devices_.size());
+ if(movingAvg_ && paramsAvg_.size() == 0) {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ shardSize_ = ceil(totalSize / (float)devices_.size());
- int pos = 0;
- for(auto graph : graphs_) {
- int __size__ = std::min(shardSize_, totalSize);
+ int pos = 0;
+ for(auto graph : graphs_) {
+ int __size__ = std::min(shardSize_, totalSize);
- auto paramsAlloc = New<TensorAllocator>(graph->getBackend());
- paramsAllocs_.push_back(paramsAlloc);
+ auto paramsAlloc = New<TensorAllocator>(graph->getBackend());
+ paramsAllocs_.push_back(paramsAlloc);
- paramsAlloc->reserveExact(3 * __size__ * sizeof(float));
+ paramsAlloc->reserveExact(__size__ * sizeof(float));
- Tensor param, grad, tmp;
- paramsAlloc->allocate(param, {1, __size__});
- paramsAlloc->allocate(grad, {1, __size__});
- paramsAlloc->allocate(tmp, {1, __size__});
- params_.push_back(param);
+ Tensor paramAvg;
+ paramsAlloc->allocate(paramAvg, {1, __size__});
+ paramsAvg_.push_back(paramAvg);
- grad->set(0.f);
- grads_.push_back(grad);
+ paramAvg->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__));
- tmpTensors_.push_back(tmp);
-
- param->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__));
- pos += __size__;
- totalSize -= __size__;
- }
- }
-
- if(movingAvg_ && paramsAvg_.size() == 0) {
- int totalSize = graphs_[0]->params()->vals()->size();
-
- int i = 0;
- for(auto graph : graphs_) {
- int __size__ = std::min(shardSize_, totalSize);
- totalSize -= __size__;
- Tensor paramAvg;
- auto allocator = New<TensorAllocator>(graph->getBackend());
-
- allocator->reserveExact(__size__ * sizeof(float));
- allocator->allocate(paramAvg, {1, __size__});
-
- paramAvg->copyFrom(params_[i++]);
+ // move to next shard
+ pos += __size__;
+ totalSize -= __size__;
+ }
+ }
+}
- paramsAllocAvg_.push_back(allocator);
- paramsAvg_.push_back(paramAvg);
- }
+void SyncGraphGroup::execute(Ptr<data::Batch> batch) {
+ size_t devs = devices_.size();
+ auto batches = batch->split(delay_ * devs);
+
+ float div = batches.size(); // no. of batches
+ // do not average gradients if cost type is sum.
+ if(options_->get<std::string>("cost-type") == "ce-sum")
+ div = 1;
+
+ std::vector<std::vector<Ptr<data::Batch>>> delayedBatches;
+
+ for(int i = 0; i < delay_; ++i) {
+ if(i * devs < batches.size()) {
+ delayedBatches.emplace_back();
+ for(int j = 0; j < devs; ++j) {
+ size_t index = i * devs + j;
+ if(index < batches.size())
+ delayedBatches.back().push_back(batches[i * devs + j]);
+ else
+ delayedBatches.back().push_back(nullptr);
}
-
- first_ = false;
}
+ }
- {
- auto task = [this, &costs, batches](size_t idx) {
- auto graph = graphs_[idx];
- auto batch = batches[idx];
-
- if(batch->size() > 0) {
- auto costNode = builders_[idx]->build(graph, batch);
- graph->forward();
- costs[idx] += costNode->scalar();
- graph->backward();
- }
- };
+ std::vector<float> costs(devices_.size(), 0.f);
+ size_t t = 1;
- ThreadPool pool(devices_.size(), devices_.size());
- for(int idx = 0; idx < batches.size(); ++idx)
- pool.enqueue(task, idx);
+ for(const auto& curBatches : delayedBatches) {
+ if(first_) {
+ initialize(curBatches);
+ first_ = false;
}
- {
- auto task = [this, batches](size_t idx, int pos, bool update) {
- int size = params_[idx]->size();
- int i = 0;
-
- float div = devices_.size(); // no. of GPUs
+ // Execute single forward/backward step
+ auto forwardBackward = [this, &costs, curBatches, t](size_t idx, int pos) {
+ auto graph = graphs_[idx];
+ auto batch = curBatches[idx];
- // do not average gradients if cost type is sum.
- if(options_->get<std::string>("cost-type") == "ce-sum") {
- div = 1;
- }
+ if(batch) {
+ auto costNode = builders_[idx]->build(graph, batch);
+ graph->forward();
+ costs[idx] += costNode->scalar();
- for(auto graph : graphs_) {
- if(batches[i]->size() > 0) {
- auto subGrad = graph->params()->grads()->subtensor(pos, size);
- tmpTensors_[idx]->copyFrom(subGrad);
+ // only reset gradients to 0 if t == 1
+ graph->backward(t == 1);
+ }
+ else {
+ // handle case of empty batch, execute do-nothing fw-bw step for
+ // proper inits and resets.
+ graph->forward();
+ graph->backward(t == 1);
+ }
+ };
- using namespace functional;
- Element(_1 = _1 + (_2 / div), grads_[idx], tmpTensors_[idx]);
- }
- i++;
- }
+ // Update parameter shard with gradient shard
+ auto update = [this, div](size_t idx, int pos) {
+ int totalSize = graphs_[0]->params()->vals()->size();
+ int shardSize = ceil(totalSize / (float)devices_.size());
- if(update) {
- shardOpt_[idx]->update(params_[idx], grads_[idx]);
- grads_[idx]->set(0.f);
+ int size = std::min(totalSize - pos, shardSize);
- if(movingAvg_)
- updateMovingAverage(
- paramsAvg_[idx], params_[idx], scheduler_->numberOfBatches());
+ auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, size);
+ auto curParam = graphs_[idx]->params()->vals()->subtensor(pos, size);
- for(auto graph : graphs_) {
- auto subParam = graph->params()->vals()->subtensor(pos, size);
- subParam->copyFrom(params_[idx]);
- }
- }
+ if(div != 1) {
+ using namespace functional;
+ Element(_1 = _1 / div, curGrad);
+ }
- };
+ shardOpt_[idx]->update(curParam, curGrad);
- ThreadPool pool(devices_.size(), devices_.size());
- int pos = 0;
- for(int idx = 0; idx < devices_.size(); ++idx) {
- pool.enqueue(task, idx, pos, t == delay_);
- pos += params_[idx]->size();
- }
+ if(movingAvg_)
+ updateMovingAverage(paramsAvg_[idx], curParam, scheduler_->numberOfBatches());
+ };
+
+ comm_->foreach(forwardBackward);
+ if(t == delayedBatches.size()) {
+ comm_->scatterReduce();
+ comm_->foreach(update);
+ comm_->allGather();
}
t++;
@@ -202,24 +187,108 @@ void SyncGraphGroup::execute(Ptr<data::Batch> fullBatch) {
}
if(scheduler_) {
- scheduler_->update(cost, fullBatch);
+ scheduler_->update(cost, batches);
if(scheduler_->saving()) {
this->save();
}
if(scheduler_->validating()) {
- if(movingAvg_)
- for(auto graph : graphs_)
- fetchParams(graph->params()->vals(), paramsAvg_);
+ if(movingAvg_) {
+ comm_->swapParams(paramsAvg_);
+ }
// safe, because all graphs are idle during validation with sync sgd
scheduler_->validate(graphs_);
- if(movingAvg_)
- for(auto graph : graphs_)
- fetchParams(graph->params()->vals(), params_);
+ if(movingAvg_) {
+ comm_->swapParams(paramsAvg_);
+ }
+ }
+ }
+}
+
+void SyncGraphGroup::load() {
+ if(!options_->get<bool>("no-reload")) {
+ std::string name = options_->get<std::string>("model");
+
+ if(boost::filesystem::exists(name)) {
+ size_t i = 0;
+ if(scheduler_)
+ scheduler_->load(name);
+ for(auto graph : graphs_)
+ builders_[i++]->load(graph, name);
+
+ // @TODO: probably we want to have the list of DeviceIds as an attribute
+ std::vector<Ptr<Backend>> backends;
+ for(auto graph : graphs_)
+ backends.push_back(graph->getBackend());
+ shardOpt_[0]->load(name + ".optimizer.npz", shardOpt_, backends);
+
+ } else if(options_->has("pretrained-model")) {
+ std::string init = options_->get<std::string>("pretrained-model");
+ LOG(info,
+ "Initialize model weights with the pre-trained model {}",
+ init);
+ size_t i = 0;
+ for(auto graph : graphs_)
+ builders_[i++]->load(graph, init, false);
}
}
}
+
+void SyncGraphGroup::save(bool final) {
+ if(final && scheduler_) {
+ if(movingAvg_ && paramsAvg_.size() > 0)
+ comm_->swapParams(paramsAvg_);
+
+ scheduler_->validate(graphs_, true);
+
+ if(movingAvg_ && paramsAvg_.size() > 0)
+ comm_->swapParams(paramsAvg_);
+ }
+ save(graphs_[0], final);
+ }
+
+ void SyncGraphGroup::save(Ptr<ExpressionGraph> graph, bool final) {
+ int idx = 0;
+ for(int i = 0; i < graphs_.size(); ++i) {
+ if(graph == graphs_[i]) {
+ idx = i;
+ break;
+ }
+ }
+
+ if(movingAvg_ && paramsAvg_.size() > 0)
+ comm_->swapParams(paramsAvg_);
+
+ std::string name = options_->get<std::string>("model");
+
+ if(options_->get<bool>("overwrite")) {
+ builders_[idx]->save(graphs_[idx], name, true);
+ if(scheduler_)
+ scheduler_->save(name);
+ } else {
+ if(!final) {
+ std::string numberOfBatches
+ = scheduler_ ? std::to_string(scheduler_->numberOfBatches())
+ : "unknown";
+ std::string nameOverwrite = name;
+ nameOverwrite.replace(
+ name.size() - 4, 4, ".iter" + numberOfBatches + ".npz");
+ builders_[idx]->save(graphs_[idx], nameOverwrite);
+ }
+
+ builders_[idx]->save(graphs_[idx], name, true);
+ if(scheduler_)
+ scheduler_->save(name);
+ }
+
+ if(movingAvg_ && paramsAvg_.size() > 0)
+ comm_->swapParams(paramsAvg_);
+
+ size_t totalSize = graphs_[idx]->params()->vals()->size();
+ shardOpt_[idx]->save(name + ".optimizer.npz", shardOpt_, totalSize);
+ }
+
}
diff --git a/src/training/graph_group_sync.h b/src/training/graph_group_sync.h
index b0e2b428..8a7c913f 100644
--- a/src/training/graph_group_sync.h
+++ b/src/training/graph_group_sync.h
@@ -4,6 +4,7 @@
#include "3rd_party/threadpool.h"
#include "training/graph_group.h"
+#include "training/communicator.h"
namespace marian {
@@ -12,26 +13,27 @@ public:
virtual void setScheduler(Ptr<Scheduler> scheduler);
private:
+
+ Ptr<Communicator> comm_;
+
std::vector<Ptr<models::ModelBase>> builders_;
std::vector<Ptr<ExpressionGraph>> graphs_;
std::vector<DeviceId> devices_;
- std::vector<Tensor> params_;
- std::vector<Tensor> grads_;
- std::vector<Tensor> tmpTensors_;
- std::vector<Ptr<TensorAllocator>> paramsAllocs_;
-
std::vector<Ptr<OptimizerBase>> shardOpt_;
int shardSize_;
bool first_{true};
std::vector<Tensor> paramsAvg_;
- std::vector<Ptr<TensorAllocator>> paramsAllocAvg_;
+ std::vector<Ptr<TensorAllocator>> paramsAllocs_;
+
bool movingAvg_{false};
float mvDecay_{1e-4f};
size_t delay_{1};
+ void initialize(const std::vector<Ptr<data::Batch>>& batches);
+
void updateMovingAverage(Tensor paramsAvg, Tensor params, size_t batches);
void fetchParams(Tensor oldParams, const std::vector<Tensor>& params);
@@ -39,117 +41,23 @@ private:
void execute(Ptr<data::Batch> batch);
public:
- SyncGraphGroup(Ptr<Config> config)
- : GraphGroup(config),
- devices_{options_->getDevices()},
- movingAvg_{options_->get<float>("exponential-smoothing") > 0},
- mvDecay_{options_->get<float>("exponential-smoothing")},
- delay_{options_->get<size_t>("optimizer-delay")} {
- for(auto device : devices_) {
- auto graph = New<ExpressionGraph>();
- graph->setDevice(device);
- graph->reserveWorkspaceMB(options_->get<size_t>("workspace"));
- graph->getBackend()->setClip(options_->get<float>("clip-gemm"));
-
- graphs_.push_back(graph);
- shardOpt_.push_back(Optimizer(options_));
- builders_.push_back(models::from_config(options_, models::usage::training));
- }
- }
+ SyncGraphGroup(Ptr<Config> config);
void update(Ptr<data::Batch> batch) {
ABORT_IF(finalized_, "Training has already finished.");
execute(batch);
}
- void load() {
- if(!options_->get<bool>("no-reload")) {
- std::string name = options_->get<std::string>("model");
-
- if(boost::filesystem::exists(name)) {
- size_t i = 0;
- if(scheduler_)
- scheduler_->load(name);
- for(auto graph : graphs_)
- builders_[i++]->load(graph, name);
-
- // @TODO: probably we want to have the list of DeviceIds as an attribute
- std::vector<Ptr<Backend>> backends;
- for(auto graph : graphs_)
- backends.push_back(graph->getBackend());
- shardOpt_[0]->load(name + ".optimizer.npz", shardOpt_, backends);
-
- } else if(options_->has("pretrained-model")) {
- std::string init = options_->get<std::string>("pretrained-model");
- LOG(info,
- "Initialize model weights with the pre-trained model {}",
- init);
- size_t i = 0;
- for(auto graph : graphs_)
- builders_[i++]->load(graph, init, false);
- }
- }
- }
-
- void save(bool final = false) {
- if(final && scheduler_) {
- if(movingAvg_ && paramsAvg_.size() > 0)
- for(auto graph : graphs_)
- fetchParams(graph->params()->vals(), paramsAvg_);
-
- scheduler_->validate(graphs_, true);
-
- if(movingAvg_ && paramsAvg_.size() > 0)
- for(auto graph : graphs_)
- fetchParams(graph->params()->vals(), params_);
- }
+ void load();
+ void save(bool final = false);
+ void save(Ptr<ExpressionGraph> graph, bool final = false);
- save(graphs_[0], final);
- }
-
- void save(Ptr<ExpressionGraph> graph, bool final = false) {
- int idx = 0;
- for(int i = 0; i < graphs_.size(); ++i) {
- if(graph == graphs_[i]) {
- idx = i;
- break;
- }
- }
-
- if(movingAvg_ && paramsAvg_.size() > 0)
- fetchParams(graphs_[idx]->params()->vals(), paramsAvg_);
-
- std::string name = options_->get<std::string>("model");
-
- if(options_->get<bool>("overwrite")) {
- builders_[idx]->save(graphs_[idx], name, true);
- if(scheduler_)
- scheduler_->save(name);
- } else {
- if(!final) {
- std::string numberOfBatches
- = scheduler_ ? std::to_string(scheduler_->numberOfBatches())
- : "unknown";
- std::string nameOverwrite = name;
- nameOverwrite.replace(
- name.size() - 4, 4, ".iter" + numberOfBatches + ".npz");
- builders_[idx]->save(graphs_[idx], nameOverwrite);
- }
-
- builders_[idx]->save(graphs_[idx], name, true);
- if(scheduler_)
- scheduler_->save(name);
- }
-
- if(movingAvg_ && paramsAvg_.size() > 0)
- fetchParams(graphs_[idx]->params()->vals(), params_);
-
- size_t totalSize = graphs_[idx]->params()->vals()->size();
- shardOpt_[idx]->save(name + ".optimizer.npz", shardOpt_, totalSize);
+ Ptr<data::BatchStats> collectStats() {
+ return GraphGroup::collectStats(graphs_[0], builders_[0], numBatches());
}
- Ptr<data::BatchStats> collectStats() {
- return GraphGroup::collectStats(graphs_[0], builders_[0], devices_.size() * delay_);
+ size_t numBatches() {
+ return devices_.size() * delay_;
}
virtual void finalize() {
diff --git a/src/training/scheduler.h b/src/training/scheduler.h
index 6dc1f0a9..24dce827 100644
--- a/src/training/scheduler.h
+++ b/src/training/scheduler.h
@@ -154,10 +154,20 @@ public:
}
void update(float cost, Ptr<data::Batch> batch) {
+ update(cost, std::vector<Ptr<data::Batch>>({batch}));
+ }
+
+ void update(float cost, const std::vector<Ptr<data::Batch>>& batches) {
state_->validated = false;
- auto batchSize = batch->size(); // number of sentences in batch
- auto batchLabels = batch->words(-1); // number of target words in batch
+ auto batchSize = 0; // number of sentences in batch
+ auto batchLabels = 0; // number of target words in batch
+
+ for(const auto& batch : batches) {
+ batchSize += batch->size();
+ batchLabels += batch->words(-1);
+ }
+
// reconstruct sum cost, for displaying epoch-level averages instead of minibatch-level
auto costType = options_->get<std::string>("cost-type");
auto dispLabelCounts = options_->get<bool>("disp-label-counts"); // if true then show as "cost per label * number of labels"
@@ -178,6 +188,7 @@ public:
state_->wordsDisp += batchLabels; // target words processed since last display, for speed display
state_->samplesEpoch += batchSize; // sentences processed in this epoch
state_->labelsTotal += batchLabels; // total labels processed
+
state_->newBatch();
if(state_->batches % options_->get<size_t>("disp-freq") == 0) {
diff --git a/src/training/validator.h b/src/training/validator.h
index b25f462a..5d5bdbca 100644
--- a/src/training/validator.h
+++ b/src/training/validator.h
@@ -14,7 +14,7 @@
#include "translator/beam_search.h"
#include "translator/history.h"
#include "translator/output_collector.h"
-#include "translator/printer.h"
+#include "translator/output_printer.h"
#include "translator/scorers.h"
namespace marian {
@@ -294,9 +294,11 @@ public:
boost::timer::cpu_timer timer;
{
+ auto printer = New<OutputPrinter>(options_, vocabs_.back());
auto collector = options_->has("valid-translation-output")
? New<OutputCollector>(fileName)
: New<OutputCollector>(*tempFile);
+
if(quiet_)
collector->setPrintingStrategy(New<QuietPrinting>());
else
@@ -329,7 +331,7 @@ public:
for(auto history : histories) {
std::stringstream best1;
std::stringstream bestn;
- Printer(options_, vocabs_.back(), history, best1, bestn);
+ printer->print(history, best1, bestn);
collector->Write(history->GetLineNum(),
best1.str(),
bestn.str(),
diff --git a/src/translator/beam_search.h b/src/translator/beam_search.h
index 1fbca214..0ff65917 100644
--- a/src/translator/beam_search.h
+++ b/src/translator/beam_search.h
@@ -36,16 +36,22 @@ public:
const Beams& beams,
std::vector<Ptr<ScorerState>>& states,
size_t beamSize,
- bool first) {
+ bool first,
+ Ptr<data::CorpusBatch> batch) {
Beams newBeams(beams.size());
- for(int i = 0; i < keys.size(); ++i) {
- // keys is contains indices to vocab items in the entire beam.
- // values can be between 0 and beamSize * vocabSize.
+ std::vector<float> alignments;
+ if(options_->get<float>("alignment", 0.f))
+ // Use alignments from the first scorer, even if ensemble
+ alignments = scorers_[0]->getAlignment();
+
+ for(int i = 0; i < keys.size(); ++i) {
+ // Keys contains indices to vocab items in the entire beam.
+ // Values can be between 0 and beamSize * vocabSize.
int embIdx = keys[i] % vocabSize;
int beamIdx = i / beamSize;
- // retrieve short list for final softmax (based on words aligned
+ // Retrieve short list for final softmax (based on words aligned
// to source sentences). If short list has been set, map the indices
// in the sub-selected vocabulary matrix back to their original positions.
auto shortlist = scorers_[0]->getShortlist();
@@ -72,6 +78,8 @@ public:
beamHypIdx = 0;
auto hyp = New<Hypothesis>(beam[beamHypIdx], embIdx, hypIdxTrans, cost);
+
+ // Set cost breakdown for n-best lists
if(options_->get<bool>("n-best")) {
std::vector<float> breakDown(states.size(), 0);
beam[beamHypIdx]->GetCostBreakdown().resize(states.size(), 0);
@@ -82,12 +90,57 @@ public:
}
hyp->GetCostBreakdown() = breakDown;
}
+
+ // Set alignments
+ if(!alignments.empty()) {
+ auto align = getHardAlignmentsForHypothesis(
+ alignments, batch, beamSize, beamHypIdx, beamIdx);
+ hyp->SetAlignment(align);
+ }
+
newBeam.push_back(hyp);
}
}
return newBeams;
}
+ std::vector<float> getHardAlignmentsForHypothesis(
+ const std::vector<float> alignments,
+ Ptr<data::CorpusBatch> batch,
+ int beamSize,
+ int beamHypIdx,
+ int beamIdx) {
+ // Let's B be the beam size, N be the number of batched sentences,
+ // and L the number of words in the longest sentence in the batch.
+ // The alignment vector:
+ //
+ // if(first)
+ // * has length of N x L if it's the first beam
+ // * stores elements in the following order:
+ // beam1 = [word1-batch1, word1-batch2, ..., word2-batch1, ...]
+ // else
+ // * has length of N x L x B
+ // * stores elements in the following order:
+ // beams = [beam1, beam2, ..., beam_n]
+ //
+ // The mask vector is always of length N x L and has 1/0s stored like
+ // in a single beam, i.e.:
+ // * [word1-batch1, word1-batch2, ..., word2-batch1, ...]
+ //
+ size_t batchSize = batch->size();
+ size_t batchWidth = batch->width() * batchSize;
+ std::vector<float> align;
+
+ for(size_t w = 0; w < batchWidth / batchSize; ++w) {
+ size_t a = ((batchWidth * beamHypIdx) + beamIdx) + (batchSize * w);
+ size_t m = a % batchWidth;
+ if(batch->front()->mask()[m] != 0)
+ align.emplace_back(alignments[a]);
+ }
+
+ return align;
+ }
+
Beams pruneBeam(const Beams& beams) {
Beams newBeams;
for(auto beam : beams) {
@@ -108,7 +161,9 @@ public:
Histories histories;
for(int i = 0; i < dimBatch; ++i) {
size_t sentId = batch->getSentenceIds()[i];
- auto history = New<History>(sentId, options_->get<float>("normalize"), options_->get<float>("word-penalty"));
+ auto history = New<History>(sentId,
+ options_->get<float>("normalize"),
+ options_->get<float>("word-penalty"));
histories.push_back(history);
}
@@ -183,8 +238,12 @@ public:
// BUGBUG: it's not cost but score (higher=better)
for(int i = 0; i < scorers_.size(); ++i) {
- states[i] = scorers_[i]->step(
- graph, states[i], hypIndices, embIndices, dimBatch, localBeamSize);
+ states[i] = scorers_[i]->step(graph,
+ states[i],
+ hypIndices,
+ embIndices,
+ dimBatch,
+ localBeamSize);
if(scorers_[i]->getWeight() != 1.f)
totalCosts
@@ -219,14 +278,22 @@ public:
nth->getNBestList(beamSizes, totalCosts->val(), outCosts, outKeys, first);
int dimTrgVoc = totalCosts->shape()[-1];
- beams = toHyps(
- outKeys, outCosts, dimTrgVoc, beams, states, localBeamSize, first);
+ beams = toHyps(outKeys,
+ outCosts,
+ dimTrgVoc,
+ beams,
+ states,
+ localBeamSize,
+ first,
+ batch);
auto prunedBeams = pruneBeam(beams);
for(int i = 0; i < dimBatch; ++i) {
if(!beams[i].empty()) {
final = final
- || histories[i]->size() >= options_->get<float>("max-length-factor") * batch->front()->batchWidth();
+ || histories[i]->size()
+ >= options_->get<float>("max-length-factor")
+ * batch->front()->batchWidth();
histories[i]->Add(beams[i], trgEosId_, prunedBeams[i].empty() || final);
}
}
diff --git a/src/translator/hypothesis.h b/src/translator/hypothesis.h
index 1bc4fe47..ac9d6699 100644
--- a/src/translator/hypothesis.h
+++ b/src/translator/hypothesis.h
@@ -24,6 +24,9 @@ public:
float GetCost() const { return cost_; }
std::vector<float>& GetCostBreakdown() { return costBreakdown_; }
+ std::vector<float>& GetAlignment() { return alignment_; }
+
+ void SetAlignment(const std::vector<float>& align) { alignment_ = align; };
private:
const Ptr<Hypothesis> prevHyp_;
@@ -32,6 +35,7 @@ private:
const float cost_;
std::vector<float> costBreakdown_;
+ std::vector<float> alignment_;
};
typedef std::vector<Ptr<Hypothesis>> Beam;
diff --git a/src/translator/output_printer.cpp b/src/translator/output_printer.cpp
new file mode 100644
index 00000000..2230755d
--- /dev/null
+++ b/src/translator/output_printer.cpp
@@ -0,0 +1,64 @@
+#include "output_printer.h"
+
+namespace marian {
+
+std::vector<HardAlignment> OutputPrinter::getAlignment(
+ const Ptr<Hypothesis>& hyp,
+ float threshold) {
+ std::vector<SoftAlignment> alignSoft;
+ // Skip EOS
+ auto last = hyp->GetPrevHyp();
+ // Get soft alignments for each target word
+ while(last->GetPrevHyp().get() != nullptr) {
+ alignSoft.push_back(last->GetAlignment());
+ last = last->GetPrevHyp();
+ }
+
+ std::vector<HardAlignment> align;
+ // Alignments by maximum value
+ if(threshold == 1.f) {
+ for(size_t t = 0; t < alignSoft.size(); ++t) {
+ // Retrieved alignments are in reversed order
+ size_t rev = alignSoft.size() - t - 1;
+ size_t maxArg = 0;
+ for(size_t s = 0; s < alignSoft[0].size(); ++s) {
+ if(alignSoft[rev][maxArg] < alignSoft[rev][s]) {
+ maxArg = s;
+ }
+ }
+ align.push_back(std::make_pair(maxArg, t));
+ }
+ } else {
+ // Alignments by greather-than-threshold
+ for(size_t t = 0; t < alignSoft.size(); ++t) {
+ // Retrieved alignments are in reversed order
+ size_t rev = alignSoft.size() - t - 1;
+ for(size_t s = 0; s < alignSoft[0].size(); ++s) {
+ if(alignSoft[rev][s] > threshold) {
+ align.push_back(std::make_pair(s, t));
+ }
+ }
+ }
+ }
+
+ // Sort alignment pairs in ascending order
+ std::sort(align.begin(),
+ align.end(),
+ [](const HardAlignment& a, const HardAlignment& b) {
+ return (a.first == b.first) ? a.second < b.second
+ : a.first < b.first;
+ });
+
+ return align;
+}
+
+std::string OutputPrinter::getAlignmentString(
+ const std::vector<HardAlignment>& align) {
+ std::stringstream alignStr;
+ alignStr << " |||";
+ for(auto p = align.begin(); p != align.end(); ++p) {
+ alignStr << " " << p->first << "-" << p->second;
+ }
+ return alignStr.str();
+}
+}
diff --git a/src/translator/output_printer.h b/src/translator/output_printer.h
new file mode 100644
index 00000000..e309484e
--- /dev/null
+++ b/src/translator/output_printer.h
@@ -0,0 +1,86 @@
+#pragma once
+
+#include <vector>
+
+#include "common/config.h"
+#include "common/utils.h"
+#include "data/vocab.h"
+#include "translator/history.h"
+#include "translator/hypothesis.h"
+
+namespace marian {
+
+typedef std::vector<float> SoftAlignment;
+typedef std::pair<size_t, size_t> HardAlignment;
+
+class OutputPrinter {
+public:
+ OutputPrinter(Ptr<Config> options, Ptr<Vocab> vocab)
+ : vocab_(vocab),
+ reverse_(options->get<bool>("right-left")),
+ nbest_(options->get<bool>("n-best", false)
+ ? options->get<size_t>("beam-size")
+ : 0),
+ alignment_(options->get<float>("alignment", 0.f)) {}
+
+ template <class OStream>
+ void print(Ptr<History> history, OStream& best1, OStream& bestn) {
+ const auto& nbl = history->NBest(nbest_);
+
+ for(size_t i = 0; i < nbl.size(); ++i) {
+ const auto& result = nbl[i];
+ const auto& words = std::get<0>(result);
+ const auto& hypo = std::get<1>(result);
+
+ std::string translation = Join((*vocab_)(words), " ", reverse_);
+ bestn << history->GetLineNum() << " ||| " << translation;
+
+ if(alignment_ > 0.f) {
+ auto align = getAlignment(hypo, alignment_);
+ bestn << getAlignmentString(align);
+ }
+
+ bestn << " |||";
+
+ if(hypo->GetCostBreakdown().empty()) {
+ bestn << " F0=" << hypo->GetCost();
+ } else {
+ for(size_t j = 0; j < hypo->GetCostBreakdown().size(); ++j) {
+ bestn << " F" << j << "= " << hypo->GetCostBreakdown()[j];
+ }
+ }
+
+ float realCost = std::get<2>(result);
+ bestn << " ||| " << realCost;
+
+ if(i < nbl.size() - 1)
+ bestn << std::endl;
+ else
+ bestn << std::flush;
+ }
+
+ auto result = history->Top();
+ const auto& words = std::get<0>(result);
+
+ std::string translation = Join((*vocab_)(words), " ", reverse_);
+
+ best1 << translation;
+ if(alignment_ > 0.f) {
+ const auto& hypo = std::get<1>(result);
+ auto align = getAlignment(hypo, alignment_);
+ best1 << getAlignmentString(align);
+ }
+ best1 << std::flush;
+ }
+
+private:
+ Ptr<Vocab> vocab_;
+ bool reverse_{false};
+ size_t nbest_{0};
+ float alignment_{0.f};
+
+ std::vector<HardAlignment> getAlignment(const Ptr<Hypothesis>& hyp,
+ float threshold);
+ std::string getAlignmentString(const std::vector<HardAlignment>& align);
+};
+}
diff --git a/src/translator/printer.cpp b/src/translator/printer.cpp
deleted file mode 100644
index 50f83d20..00000000
--- a/src/translator/printer.cpp
+++ /dev/null
@@ -1,35 +0,0 @@
-#include "printer.h"
-
-namespace marian {
-
-std::vector<size_t> GetAlignment(const HypothesisPtr& hypothesis) {
- std::vector<SoftAlignment> aligns;
- HypothesisPtr last = hypothesis->GetPrevHyp();
- while(last->GetPrevHyp().get() != nullptr) {
- aligns.push_back(*(last->GetAlignment(0)));
- last = last->GetPrevHyp();
- }
-
- std::vector<size_t> alignment;
- for(auto it = aligns.rbegin(); it != aligns.rend(); ++it) {
- size_t maxArg = 0;
- for(size_t i = 0; i < it->size(); ++i) {
- if((*it)[maxArg] < (*it)[i]) {
- maxArg = i;
- }
- }
- alignment.push_back(maxArg);
- }
-
- return alignment;
-}
-
-std::string GetAlignmentString(const std::vector<size_t>& alignment) {
- std::stringstream alignString;
- alignString << " |||";
- for(size_t wordIdx = 0; wordIdx < alignment.size(); ++wordIdx) {
- alignString << " " << wordIdx << "-" << alignment[wordIdx];
- }
- return alignString.str();
-}
-}
diff --git a/src/translator/printer.h b/src/translator/printer.h
deleted file mode 100644
index a14d5d96..00000000
--- a/src/translator/printer.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#pragma once
-
-#include <vector>
-
-#include "common/utils.h"
-#include "data/vocab.h"
-#include "translator/history.h"
-
-namespace marian {
-
-template <class OStream>
-void Printer(Ptr<Config> options,
- Ptr<Vocab> vocab,
- Ptr<History> history,
- OStream& best1,
- OStream& bestn) {
- bool reverse = options->get<bool>("right-left");
-
- if(options->has("n-best") && options->get<bool>("n-best")) {
- const auto& nbl = history->NBest(options->get<size_t>("beam-size"));
-
- for(size_t i = 0; i < nbl.size(); ++i) {
- const auto& result = nbl[i];
- const auto& words = std::get<0>(result);
- const auto& hypo = std::get<1>(result);
-
- float realCost = std::get<2>(result);
-
- std::string translation = Join((*vocab)(words), " ", reverse);
-
- bestn << history->GetLineNum() << " ||| " << translation << " |||";
-
- if(hypo->GetCostBreakdown().empty()) {
- bestn << " F0=" << hypo->GetCost();
- } else {
- for(size_t j = 0; j < hypo->GetCostBreakdown().size(); ++j) {
- bestn << " F" << j << "= " << hypo->GetCostBreakdown()[j];
- }
- }
-
- bestn << " ||| " << realCost;
-
- if(i < nbl.size() - 1)
- bestn << std::endl;
- else
- bestn << std::flush;
- }
- }
-
- auto bestTranslation = history->Top();
-
- std::string translation
- = Join((*vocab)(std::get<0>(bestTranslation)), " ", reverse);
- best1 << translation << std::flush;
-}
-}
diff --git a/src/translator/scorers.h b/src/translator/scorers.h
index 30fd9d1d..ae5df297 100644
--- a/src/translator/scorers.h
+++ b/src/translator/scorers.h
@@ -1,8 +1,9 @@
#pragma once
#include "marian.h"
-#include "models/model_factory.h"
+
#include "data/shortlist.h"
+#include "models/model_factory.h"
namespace marian {
@@ -41,8 +42,10 @@ public:
virtual void init(Ptr<ExpressionGraph> graph) {}
- virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) {};
+ virtual void setShortlistGenerator(
+ Ptr<data::ShortlistGenerator> shortlistGenerator){};
virtual Ptr<data::Shortlist> getShortlist() { return nullptr; };
+ virtual std::vector<float> getAlignment() { return {}; };
};
class ScorerWrapperState : public ScorerState {
@@ -104,97 +107,19 @@ public:
graph, wrappedState, hypIndices, embIndices, dimBatch, beamSize));
}
- virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) {
+ virtual void setShortlistGenerator(
+ Ptr<data::ShortlistGenerator> shortlistGenerator) {
encdec_->setShortlistGenerator(shortlistGenerator);
};
virtual Ptr<data::Shortlist> getShortlist() {
return encdec_->getShortlist();
};
-};
-//class WordPenaltyState : public ScorerState {
-//private:
-// int dimVocab_;
-// Expr penalties_;
-//
-//public:
-// WordPenaltyState(int dimVocab, Expr penalties)
-// : dimVocab_(dimVocab), penalties_(penalties) {}
-//
-// virtual Expr getProbs() { return penalties_; };
-//
-// virtual float breakDown(size_t i) {
-// return getProbs()->val()->get(i % dimVocab_);
-// }
-//};
-//
-//class WordPenalty : public Scorer {
-//private:
-// int dimVocab_;
-// Expr penalties_;
-//
-//public:
-// WordPenalty(const std::string& name, float weight, int dimVocab)
-// : Scorer(name, weight), dimVocab_(dimVocab) {}
-//
-// virtual void clear(Ptr<ExpressionGraph> graph) {}
-//
-// virtual Ptr<ScorerState> startState(Ptr<ExpressionGraph> graph,
-// Ptr<data::CorpusBatch> batch) {
-// std::vector<float> p(dimVocab_, 1);
-// p[0] = 0;
-// p[2] = 0;
-//
-// penalties_ = graph->constant({1, dimVocab_}, inits::from_vector(p));
-// return New<WordPenaltyState>(dimVocab_, penalties_);
-// }
-//
-// virtual Ptr<ScorerState> step(Ptr<ExpressionGraph> graph,
-// Ptr<ScorerState> state,
-// const std::vector<size_t>& hypIndices,
-// const std::vector<size_t>& embIndices,
-// int dimBatch,
-// int beamSize) {
-// return state;
-// }
-//};
-//
-//class UnseenWordPenalty : public Scorer {
-//private:
-// int batchIndex_;
-// int dimVocab_;
-// Expr penalties_;
-//
-//public:
-// UnseenWordPenalty(const std::string& name,
-// float weight,
-// int dimVocab,
-// int batchIndex)
-// : Scorer(name, weight), dimVocab_(dimVocab), batchIndex_(batchIndex) {}
-//
-// virtual void clear(Ptr<ExpressionGraph> graph) {}
-//
-// virtual Ptr<ScorerState> startState(Ptr<ExpressionGraph> graph,
-// Ptr<data::CorpusBatch> batch) {
-// std::vector<float> p(dimVocab_, -1);
-// for(auto i : (*batch)[batchIndex_]->data())
-// p[i] = 0;
-// p[2] = 0;
-//
-// penalties_ = graph->constant({1, dimVocab_}, inits::from_vector(p));
-// return New<WordPenaltyState>(dimVocab_, penalties_);
-// }
-//
-// virtual Ptr<ScorerState> step(Ptr<ExpressionGraph> graph,
-// Ptr<ScorerState> state,
-// const std::vector<size_t>& hypIndices,
-// const std::vector<size_t>& embIndices,
-// int dimBatch,
-// int beamSize) {
-// return state;
-// }
-//};
+ virtual std::vector<float> getAlignment() {
+ return encdec_->getAlignment();
+ }
+};
Ptr<Scorer> scorerByType(std::string fname,
float weight,
diff --git a/src/translator/translator.h b/src/translator/translator.h
index 84a85d06..759669ee 100644
--- a/src/translator/translator.h
+++ b/src/translator/translator.h
@@ -8,7 +8,7 @@
#include "3rd_party/threadpool.h"
#include "translator/history.h"
#include "translator/output_collector.h"
-#include "translator/printer.h"
+#include "translator/output_printer.h"
#include "models/model_task.h"
#include "translator/scorers.h"
@@ -83,6 +83,7 @@ public:
size_t batchId = 0;
auto collector = New<OutputCollector>();
+ auto printer = New<OutputPrinter>(options_, trgVocab_);
if(options_->get<bool>("quiet-translation"))
collector->setPrintingStrategy(New<QuietPrinting>());
@@ -111,7 +112,7 @@ public:
for(auto history : histories) {
std::stringstream best1;
std::stringstream bestn;
- Printer(options_, trgVocab_, history, best1, bestn);
+ printer->print(history, best1, bestn);
collector->Write(history->GetLineNum(),
best1.str(),
bestn.str(),
@@ -176,6 +177,7 @@ public:
data::BatchGenerator<data::TextInput> bg(corpus_, options_);
auto collector = New<StringCollector>();
+ auto printer = New<OutputPrinter>(options_, trgVocab_);
size_t batchId = 0;
// @TODO: unify this and get rid of Config object.
@@ -205,7 +207,7 @@ public:
for(auto history : histories) {
std::stringstream best1;
std::stringstream bestn;
- Printer(options_, trgVocab_, history, best1, bestn);
+ printer->print(history, best1, bestn);
collector->add(history->GetLineNum(), best1.str(), bestn.str());
}
};