diff options
author | Marcin Junczys-Dowmunt <junczys@amu.edu.pl> | 2018-07-27 20:14:21 +0300 |
---|---|---|
committer | Marcin Junczys-Dowmunt <junczys@amu.edu.pl> | 2018-07-27 20:14:21 +0300 |
commit | dceb7185d86ed8fd1994e86dc3e3c0e03740ec4a (patch) | |
tree | 3514f87aa2da28313043959ebd0381b3ba7de233 | |
parent | 5cc8674d974bb5cae7bc8f25a51472166164a579 (diff) | |
parent | 8b0e2f951b5ce09a622fa7239b2e1e5bd8344fe4 (diff) |
fix merge
66 files changed, 2766 insertions, 1149 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b7a297d..667eea42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,41 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added + +- Faster training (20-30%) by optimizing gradient popagation of biases +- Returning Moses-style hard alignments during decoding single models, ensembles and n-best + lists +- Hard alignment extraction strategy taking source words that have the + attention value greater than the threshold +- Refactored sync sgd for easier communication and integration with NCCL +- Smaller memory-overhead for sync-sgd +- NCCL integration (version 2.2.13) + +### Fixed + +- A couple of bugs in "selection" (transpose, shift, cols, rows) operators during + back-prob for a very specific case: one of the operators is the first operator after + a branch, in that case gradient propgation might be interrupted. This did not affect + any of the existing models as such a case was not present, but might have caused + future models to not train properly. +- Bug in mini-batch-fit, tied embeddings would result in identical embeddings in fake + source and target batch. Caused under-estimation of memory usage and re-allocation. + +## [1.5.0] - 2018-06-17 + +### Added + +- Average Attention Networks for Transformer model +- 16-bit matrix multiplication on CPU +- Memoization for constant nodes for decoding +- Autotuning for decoding + +### Fixed + +- GPU decoding optimizations, about 2x faster decoding of transformer models +- Multi-node MPI-based training on GPUs + ## [1.4.0] - 2018-03-13 ### Added diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c5f0a5f..cee114aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ option(COMPILE_CPU "Compile CPU version" ON) option(COMPILE_CUDA "Compile GPU version" ON) option(USE_STATIC_LIBS "Compile GPU version" OFF) option(USE_CUDNN "Use CUDNN library" OFF) +option(USE_NCCL "Use NCCL library" ON) option(USE_MPI "Use MPI library" OFF) # Project versioning @@ -49,6 +50,17 @@ if(CUDA_FOUND) LIST(APPEND CUDA_NVCC_FLAGS -DCUDNN; ) endif(CUDNN_FOUND) endif(USE_CUDNN) + + if(USE_NCCL) + find_package(NCCL) + if(NCCL_FOUND) + include_directories(${NCCL_INCLUDE_DIR}) + set(EXT_LIBS ${EXT_LIBS} ${NCCL_LIBRARIES}) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_NCCL") + LIST(APPEND CUDA_NVCC_FLAGS -DUSE_NCCL; ) + endif(NCCL_FOUND) + endif(USE_NCCL) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DCUDA_FOUND") list(APPEND CUDA_NVCC_FLAGS -DCUDA_FOUND; ) else(CUDA_FOUND) @@ -3,6 +3,7 @@ Marian [![Join the chat at https://gitter.im/marian-nmt](https://badges.gitter.im/amunmt/marian.svg)](https://gitter.im/marian-nmt?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Build Status](http://vali.inf.ed.ac.uk/jenkins/buildStatus/icon?job=marian-dev)](http://vali.inf.ed.ac.uk/jenkins/job/marian-dev/) +[![CPU Build Status](http://vali.inf.ed.ac.uk/jenkins/buildStatus/icon?job=marian-dev)](http://vali.inf.ed.ac.uk/jenkins/job/marian-dev-cpu/) [![Tests Status](http://vali.inf.ed.ac.uk/jenkins/buildStatus/icon?job=marian-regression-tests)](http://vali.inf.ed.ac.uk/jenkins/job/marian-regression-tests/) [![Twitter](https://img.shields.io/twitter/follow/marian_nmt.svg?style=social&label=Follow)](https://twitter.com/intent/follow?screen_name=marian_nmt) @@ -22,7 +23,7 @@ Named in honour of Marian Rejewski, a Polish mathematician and cryptologist. cd marian-dev mkdir -p build cd build -cmake .. -DCMAKE_BUILD_TYPE=relwithdebinfo +cmake .. -DCMAKE_BUILD_TYPE=Release make -j ``` @@ -1 +1 @@ -v1.4.0 +v1.5.0 diff --git a/cmake/FindNCCL.cmake b/cmake/FindNCCL.cmake new file mode 100644 index 00000000..ab3c55a8 --- /dev/null +++ b/cmake/FindNCCL.cmake @@ -0,0 +1,30 @@ +set(NCCL_INC_PATHS + /usr/include + /usr/local/include + /usr/local/cuda/include + $ENV{NCCL_DIR}/include + $ENV{CUDA_TOOLKIT_ROOT_DIRCUDA_ROOT}/include +) + +set(NCCL_LIB_PATHS + /lib + /lib64 + /usr/lib + /usr/lib64 + /usr/local/lib + /usr/local/lib64 + /usr/local/cuda/lib64 + $ENV{NCCL_DIR}/lib64 + $ENV{CUDA_TOOLKIT_ROOT_DIR}/lib64 +) + +find_path(NCCL_INCLUDE_DIR NAMES nccl.h PATHS ${NCCL_INC_PATHS}) +find_library(NCCL_LIBRARIES NAMES nccl PATHS ${NCCL_LIB_PATHS}) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(NCCL DEFAULT_MSG NCCL_INCLUDE_DIR NCCL_LIBRARIES) + +if (NCCL_FOUND) + message(STATUS "Found NCCL (include: ${NCCL_INCLUDE_DIR}, library: ${NCCL_LIBRARIES})") + mark_as_advanced(NCCL_INCLUDE_DIR NCCL_LIBRARIES) +endif () diff --git a/scripts/python/example.py b/scripts/python/example.py deleted file mode 100644 index c0a70720..00000000 --- a/scripts/python/example.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python - -import os -import sys - -sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + "../../build")) -import libmariannmt as nmt - -print >>sys.stderr, "marian-nmt version: ", nmt.version() - -if len(sys.argv) == 1: - print >>sys.stderr, "Specify s2s arguments" - exit(1) - -nmt.init(' '.join(sys.argv)) -for line in sys.stdin: - print nmt.translate([line.rstrip()]) diff --git a/scripts/python/mariannmt_server.py b/scripts/python/mariannmt_server.py deleted file mode 100644 index 9f9d2a53..00000000 --- a/scripts/python/mariannmt_server.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import sys -import os -import argparse - -sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + "./../build")) -import libmariannmt as nmt - -from bottle import request, Bottle, abort - -app = Bottle() - - -@app.route('/translate') -def handle_websocket(): - wsock = request.environ.get('wsgi.websocket') - if not wsock: - abort(400, 'Expected WebSocket request.') - - while True: - try: - message = wsock.receive() - if message is not None: - # force potential unicode to str() for boost conversion - listSentences = str(message).split('\n') - numEle = len(listSentences) - if numEle > 0 and listSentences[numEle - 1] == "": - del listSentences[numEle - 1] - trans = nmt.translate(listSentences) - wsock.send('\n'.join(trans)) - except WebSocketError: - break - - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", dest="config") - parser.add_argument('-p', dest="port", default=8080, type=int) - return parser.parse_args() - - -if __name__ == "__main__": - args = parse_args() - nmt.init("-c {}".format(args.config)) - - from gevent.pywsgi import WSGIServer - from geventwebsocket import WebSocketError - from geventwebsocket.handler import WebSocketHandler - server = WSGIServer( - ("0.0.0.0", args.port), app, handler_class=WebSocketHandler) - server.serve_forever() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fc8cc8b3..08ed5399 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -37,6 +37,8 @@ add_library(marian STATIC graph/node_initializers.cpp layers/convolution.cpp + layers/loss.cpp + layers/weight.cpp rnn/cells.cpp rnn/attention.cpp @@ -49,6 +51,7 @@ add_library(marian STATIC translator/history.cpp translator/output_collector.cpp + translator/output_printer.cpp translator/nth_element.cpp translator/helpers.cpp translator/scorers.cpp @@ -58,10 +61,13 @@ add_library(marian STATIC training/graph_group_sync.cpp training/graph_group_singleton.cpp training/graph_group_multinode.cpp + training/graph_group_multinode_sync.cpp training/validator.cpp + training/communicator.cpp $<TARGET_OBJECTS:libyaml-cpp> - $<TARGET_OBJECTS:SQLiteCpp>) + $<TARGET_OBJECTS:SQLiteCpp> +) if(CUDA_FOUND) cuda_add_library(marian_cuda @@ -77,6 +83,7 @@ cuda_add_library(marian_cuda translator/helpers.cu training/gradient_dropping/gpu/dropper.cu training/gradient_dropping/gpu/sparse_algorithm.cu + training/communicator.cu STATIC) endif(CUDA_FOUND) @@ -98,14 +105,16 @@ set_target_properties(marian_vocab PROPERTIES OUTPUT_NAME marian-vocab) set(EXECUTABLES ${EXECUTABLES} marian_train marian_decoder marian_scorer marian_vocab) # marian.zip and marian.tgz -# This combines marian, marian_decoder in a single ZIP or TAR file for execution in MSFT internal tools FLO and Philly. -# For Philly submission, we need statically-linked versions to deal with library dependencies, so this target is only enabled for static builds. +# This combines marian, marian_decoder in a single ZIP or TAR file for +# execution in MSFT internal tools FLO and Philly. +# For Philly submission, we need statically-linked versions to deal with +# library dependencies, so this target is only enabled for static builds. if(USE_STATIC_LIBS) add_custom_command( OUTPUT "${CMAKE_BINARY_DIR}/marian.zip" COMMAND zip -v -0 -j "${CMAKE_BINARY_DIR}/marian.zip" "${CMAKE_BINARY_DIR}/marian" - "${CMAKE_BINARY_DIR}/marian-decoder" + "${CMAKE_BINARY_DIR}/marian-decoder" "${CMAKE_BINARY_DIR}/marian-scorer" "${CMAKE_BINARY_DIR}/marian-vocab" DEPENDS marian_train marian_decoder marian_scorer marian_vocab) @@ -146,10 +155,6 @@ endforeach(exec) #set_target_properties(align2steps PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}") -if(PYTHONLIBS_FOUND) -# add_subdirectory(python) -endif(PYTHONLIBS_FOUND) - if(COMPILE_TESTS) set(CATCH_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/3rd_party) add_library(Catch INTERFACE) diff --git a/src/command/marian.cpp b/src/command/marian.cpp index 92b7be7a..d2aff81c 100644 --- a/src/command/marian.cpp +++ b/src/command/marian.cpp @@ -1,16 +1,17 @@ #include "marian.h" #include "training/graph_group_async.h" -#include "training/graph_group_multinode.h" +#include "training/graph_group_multinode_sync.h" #include "training/graph_group_singleton.h" #include "training/graph_group_sync.h" #include "training/training.h" #ifdef CUDA_FOUND #include "training/graph_group_async_drop.h" +#include "training/graph_group_multinode.h" #endif -bool configureMPI(int, char**); +bool configureMPI(int, char**, bool); int main(int argc, char** argv) { using namespace marian; @@ -19,38 +20,54 @@ int main(int argc, char** argv) { auto devices = options->getDevices(); if(options->get<bool>("multi-node")) { - ABORT_IF(!configureMPI(argc, argv), "MPI not found."); - + ABORT_IF(!configureMPI(argc, argv, options->get<bool>("sync-sgd")), + "MPI not found."); LOG(warn, "[experimental] Running multi-node training"); - New<Train<MultiNodeGraphGroup>>(options)->run(); + + if(options->get<bool>("sync-sgd")) { + New<Train<MultiNodeGraphGroupSync>>(options)->run(); + } + else { +#ifdef CUDA_FOUND + New<Train<MultiNodeGraphGroup>>(options)->run(); +#else + ABORT("Asynchronous multi-node training requires CUDA"); +#endif + } } else { if(devices.size() == 1) { New<Train<SingletonGraph>>(options)->run(); } else { - if(options->get<bool>("sync-sgd")) + if(options->get<bool>("sync-sgd")) { New<Train<SyncGraphGroup>>(options)->run(); + } + else if(options->get<float>("grad-dropping-rate") > 0.0) { #ifdef CUDA_FOUND - else if(options->get<float>("grad-dropping-rate") > 0.0) New<Train<AsyncGraphGroupDrop>>(options)->run(); +#else + ABORT("Asynchronous training with gradient dropping requires CUDA"); #endif - else + } + else { New<Train<AsyncGraphGroup>>(options)->run(); + } } } return 0; } -bool configureMPI(int argc, char** argv) { +bool configureMPI(int argc, char** argv, bool sync) { bool enable = false; #if MPI_FOUND + int required_mode = sync ? MPI_THREAD_SERIALIZED : MPI_THREAD_MULTIPLE; int provided_thread_mode = 0; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided_thread_mode); // Enable if occasional truncation errors MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); ABORT_IF( - provided_thread_mode < MPI_THREAD_MULTIPLE, + provided_thread_mode < required_mode, "Your version of MPI does not support multi-threaded communication."); enable = true; diff --git a/src/common/config_parser.cpp b/src/common/config_parser.cpp index 988e4746..e718217a 100644 --- a/src/common/config_parser.cpp +++ b/src/common/config_parser.cpp @@ -490,6 +490,10 @@ void ConfigParser::addOptionsTraining(po::options_description& desc) { ->multitoken() ->default_value(std::vector<std::string>({"0"}), "0"), "GPU ID(s) to use for training") +#ifdef USE_NCCL + ("no-nccl", po::value<bool>()->zero_tokens()->default_value(false), + "Disable inter-GPU communication via NCCL") +#endif #ifdef CUDA_FOUND ("cpu-threads", po::value<size_t>()->default_value(0)->implicit_value(1), "Use CPU-based computation with this many independent threads, 0 means GPU-based computation") @@ -607,10 +611,6 @@ void ConfigParser::addOptionsTraining(po::options_description& desc) { ("multi-node-overlap", po::value<bool>() ->default_value(true), "Overlap model computations with MPI communication") - ("multi-node-local-optimizers", po::value<bool>() - ->zero_tokens() - ->default_value(false), - "Enable local optimizers with multi-node. Requires optimizer delay to be turned on.") ; // clang-format on desc.add(training); @@ -722,9 +722,10 @@ void ConfigParser::addOptionsTranslate(po::options_description& desc) { "Display n-best list") ("shortlist", po::value<std::vector<std::string>>()->multitoken(), "Use softmax shortlist: path first best prune") - ("weights", po::value<std::vector<float>>() - ->multitoken(), + ("weights", po::value<std::vector<float>>()->multitoken(), "Scorer weights") + ("alignment", po::value<float>()->default_value(0.f)->implicit_value(1.f), + "Return word alignments") // TODO: the options should be available only in server ("port,p", po::value<size_t>()->default_value(8080), "Port number for web socket server") @@ -1006,7 +1007,10 @@ void ConfigParser::parseOptions(int argc, char** argv, bool doValidate) { SET_OPTION("multi-node", bool); SET_OPTION("multi-node-overlap", bool); - SET_OPTION("multi-node-local-optimizers", bool); + +#ifdef USE_NCCL + SET_OPTION("no-nccl", bool); +#endif } if(mode_ == ConfigMode::rescoring) { @@ -1031,6 +1035,7 @@ void ConfigParser::parseOptions(int argc, char** argv, bool doValidate) { SET_OPTION("mini-batch-words", int); SET_OPTION_NONDEFAULT("weights", std::vector<float>); SET_OPTION_NONDEFAULT("shortlist", std::vector<std::string>); + SET_OPTION("alignment", float); SET_OPTION("port", size_t); SET_OPTION("optimize", bool); SET_OPTION("max-length-factor", float); diff --git a/src/data/batch_generator.h b/src/data/batch_generator.h index 38a8afe9..5cae929b 100644 --- a/src/data/batch_generator.h +++ b/src/data/batch_generator.h @@ -207,6 +207,13 @@ public: return currentBatch_; } + std::vector<BatchPtr> nextN(size_t num) { + std::vector<BatchPtr> batches; + for(int i = 0; i < num && *this; ++i) + batches.push_back(next()); + return batches; + } + void prepare(bool shuffle = true) { if(shuffle) data_->shuffle(); diff --git a/src/data/corpus_base.h b/src/data/corpus_base.h index 02b6ca36..c007b41e 100644 --- a/src/data/corpus_base.h +++ b/src/data/corpus_base.h @@ -172,33 +172,35 @@ public: * @see marian::data::Batch::split(size_t n) */ std::vector<Ptr<SubBatch>> split(size_t n) { - std::vector<Ptr<SubBatch>> splits; + ABORT_IF(size_ == 0, "Encoutered sub-batch size of 0"); + std::vector<Ptr<SubBatch>> splits; size_t subSize = std::ceil(size_ / (float)n); - size_t totSize = size_; - + + size_t restSize = size_; int pos = 0; for(int k = 0; k < n; ++k) { - size_t __size__ = std::min(subSize, totSize); - - auto sb = New<SubBatch>(__size__, width_, vocab_); - - size_t __words__ = 0; - for(int j = 0; j < width_; ++j) { - for(int i = 0; i < __size__; ++i) { - sb->data()[j * __size__ + i] = indices_[j * size_ + pos + i]; - sb->mask()[j * __size__ + i] = mask_[j * size_ + pos + i]; - - if(mask_[j * size_ + pos + i] != 0) - __words__++; + size_t __size__ = std::min(subSize, restSize); + if(__size__ > 0) { + auto sb = New<SubBatch>(__size__, width_, vocab_); + + size_t __words__ = 0; + for(int j = 0; j < width_; ++j) { + for(int i = 0; i < __size__; ++i) { + sb->data()[j * __size__ + i] = indices_[j * size_ + pos + i]; + sb->mask()[j * __size__ + i] = mask_[j * size_ + pos + i]; + + if(mask_[j * size_ + pos + i] != 0) + __words__++; + } } - } - sb->setWords(__words__); - splits.push_back(sb); + sb->setWords(__words__); + splits.push_back(sb); - totSize -= __size__; - pos += __size__; + restSize -= __size__; + pos += __size__; + } } return splits; } @@ -260,7 +262,7 @@ public: * @brief The number of sentences in the batch, target words. */ size_t sizeTrg() const { return subBatches_.back()->batchSize(); } - + /** * @brief The number of words for the longest sentence in the batch plus one. */ @@ -291,17 +293,19 @@ public: Ptr<Options> options) { std::vector<Ptr<SubBatch>> batches; + size_t idx = 0; for(auto len : lengths) { auto vocab = New<Vocab>(); vocab->createFake(); auto sb = New<SubBatch>(batchSize, len, vocab); // data: gets initialized to 0. No EOS symbol is distinguished. + std::fill(sb->data().begin(), sb->data().end(), idx++); // set word indices to different values to avoid same hashes std::fill(sb->mask().begin(), sb->mask().end(), 1); // mask: no items ask being masked out batches.push_back(sb); } auto batch = New<CorpusBatch>(batches); - + if(!options) return batch; if(options->has("guided-alignment")) { @@ -331,12 +335,17 @@ public: * @see marian::data::SubBatch::split(size_t n) */ std::vector<Ptr<Batch>> split(size_t n) { + ABORT_IF(size() == 0, "Encoutered batch size of 0"); + + std::vector<std::vector<Ptr<SubBatch>>> subs; // split each subbatch separately - std::vector<std::vector<Ptr<SubBatch>>> subs(n); for(auto subBatch : subBatches_) { size_t i = 0; - for(auto splitSubBatch : subBatch->split(n)) + for(auto splitSubBatch : subBatch->split(n)) { + if(subs.size() <= i) + subs.resize(i + 1); subs[i++].push_back(splitSubBatch); + } } // create batches from split subbatches diff --git a/src/data/corpus_sqlite.cpp b/src/data/corpus_sqlite.cpp index 4c185b0c..7a58fa67 100644 --- a/src/data/corpus_sqlite.cpp +++ b/src/data/corpus_sqlite.cpp @@ -99,7 +99,7 @@ void CorpusSQLite::fillSQLite() { } } db_->exec("commit;"); - LOG(info, "[sqlite] Inserted {} lines", lines); + LOG(info, "[sqlite] Inserted {} lines", lines - 1); LOG(info, "[sqlite] Creating primary index"); db_->exec("create unique index idx_line on lines (_id);"); } diff --git a/src/functional/predicates.h b/src/functional/predicates.h index 51af38ad..45f82fd5 100644 --- a/src/functional/predicates.h +++ b/src/functional/predicates.h @@ -1,5 +1,7 @@ #pragma once +#include <cmath> + #include "functional/defs.h" #include "functional/operands.h" diff --git a/src/graph/expression_graph.h b/src/graph/expression_graph.h index d901000c..199994d0 100644 --- a/src/graph/expression_graph.h +++ b/src/graph/expression_graph.h @@ -244,12 +244,13 @@ public: } } - void backward() { + void backward(bool zero = true) { ABORT_IF(topNodes_.size() > 1, "There are more than one top most node for backward step"); params_->allocateBackward(); - params_->set_zero_adjoint(); + if(zero) + params_->set_zero_adjoint(); for(auto&& v : topNodes_) v->init_dependent(); @@ -264,7 +265,7 @@ public: nodesBackward_.pop_back(); for(auto&& child : v->children()) { - if(child->trainable()) + if(child->trainable() && child->type() != "param") child->set_zero_adjoint(); } diff --git a/src/graph/expression_operators.cpp b/src/graph/expression_operators.cpp index 1666357a..ea8077fa 100644 --- a/src/graph/expression_operators.cpp +++ b/src/graph/expression_operators.cpp @@ -313,7 +313,9 @@ Expr affine(Expr a, Expr b, Expr bias, bool transA, bool transB, float scale) { if(bc != b) bc = rec2(bc); - std::vector<Expr> nodes = {ac, bc, bias}; + int rows = ac->shape().elements() / ac->shape()[-1]; + Expr ones = ac->graph()->ones({rows, 1}); + std::vector<Expr> nodes = {ac, bc, bias, ones}; return rec2(Expression<AffineNodeOp>(nodes, transA, transB, scale), true); }; @@ -333,13 +335,16 @@ Expr affine(Expr a, Expr b, Expr bias, bool transA, bool transB, float scale) { } else { // general version, MKL, CBlas or CUDA + // if clipValue > 0, the inputs will be clipped to range [-clipValue, clipValue] // This is meant to keep values at the same range as used during training when // optimizing for 8-bit integer products. Likely to be removed in the future // when we explore better ways to handle this. - std::vector<Expr> nodes = {clip(a, clipValue), clip(b, clipValue), bias}; - return Expression<AffineNodeOp>(nodes, transA, transB, scale); + int rows = a->shape().elements() / a->shape()[-1]; + Expr ones = a->graph()->ones({rows, 1}); + std::vector<Expr> nodes = {clip(a, clipValue), clip(b, clipValue), bias, ones}; + return Expression<AffineNodeOp>(nodes, transA, transB, scale); } } @@ -462,6 +467,7 @@ Expr shift(Expr a, Shape shift, float padValue) { //} #ifdef CUDA_FOUND +#ifdef CUDNN Expr avg_pooling(Expr x, int height, @@ -526,4 +532,5 @@ Expr pooling_with_masking(Expr x, Expr mask, int width, bool isEven) { } #endif +#endif } diff --git a/src/graph/expression_operators.h b/src/graph/expression_operators.h index cc07dafb..53cf5966 100644 --- a/src/graph/expression_operators.h +++ b/src/graph/expression_operators.h @@ -106,7 +106,6 @@ Expr flatten_2d(Expr a); Expr rows(Expr a, const std::vector<size_t>& indices); Expr cols(Expr a, const std::vector<size_t>& indices); - Expr select(Expr a, int axis, const std::vector<size_t>& indices); /*********************************************************/ diff --git a/src/graph/node_operators_binary.h b/src/graph/node_operators_binary.h index 5b1f9865..ea2a3dfe 100644 --- a/src/graph/node_operators_binary.h +++ b/src/graph/node_operators_binary.h @@ -4,9 +4,12 @@ #include "functional/functional.h" #include "graph/node.h" -#include "tensors/gpu/cudnn_wrappers.h" #include "tensors/tensor_operators.h" +#ifdef CUDNN +#include "tensors/gpu/cudnn_wrappers.h" +#endif + namespace marian { class DotNodeOp : public NaryNodeOp { @@ -167,15 +170,17 @@ public: NodeOps forwardOps() { using namespace functional; + return { - NodeOp(ProdWithBias(val_, - child(0)->val(), - child(1)->val(), - child(2)->val(), - transA_, - transB_, - 0.f, - scalar_)) + NodeOp(Prod(val_, + child(0)->val(), + child(1)->val(), + transA_, transB_, 0.f, scalar_); + Prod(val_, + child(3)->val(), + child(2)->val(), + false, false, 1.f, 1.f) + ) }; } @@ -202,7 +207,12 @@ public: false, 1.0, scalar_)), - NodeOp(Add(_1, child(2)->grad(), adj_))}; + NodeOp(Prod(child(2)->grad(), + child(3)->val(), adj_, + true, false, + 0.f, 1.f)) + //NodeOp(Add(_1, child(2)->grad(), adj_)) + }; if(transA_ && !transB_) return {NodeOp(Prod(child(0)->grad(), @@ -219,7 +229,12 @@ public: false, 1.0, scalar_)), - NodeOp(Add(_1, child(2)->grad(), adj_))}; + NodeOp(Prod(child(2)->grad(), + child(3)->val(), adj_, + true, false, + 0.f, 1.f)) + //NodeOp(Add(_1, child(2)->grad(), adj_)) + }; if(transA_ && transB_) return {NodeOp(Prod(child(0)->grad(), @@ -236,7 +251,12 @@ public: true, 1.0, scalar_)), - NodeOp(Add(_1, child(2)->grad(), adj_))}; + NodeOp(Prod(child(2)->grad(), + child(3)->val(), adj_, + true, false, + 0.f, 1.f)) + //NodeOp(Add(_1, child(2)->grad(), adj_)) + }; return {NodeOp(Prod(child(0)->grad(), adj_, @@ -252,7 +272,12 @@ public: false, 1.0, scalar_)), - NodeOp(Add(_1, child(2)->grad(), adj_))}; + NodeOp(Prod(child(2)->grad(), + child(3)->val(), adj_, + true, false, + 0.f, 1.f)) + //NodeOp(Add(_1, child(2)->grad(), adj_)) + }; } const std::string type() { return "affine"; } @@ -294,6 +319,7 @@ public: NodeOps forwardOps() { // C = alpha * dot(op(A), op(B)) return {NodeOp(ProdBatched(val_, + graph()->allocator(), child(0)->val(), child(1)->val(), transA_, @@ -311,6 +337,7 @@ public: if(!transA_ && transB_) return {NodeOp(ProdBatched(child(0)->grad(), + graph()->allocator(), adj_, child(1)->val(), false, @@ -318,6 +345,7 @@ public: 1.0, scalar_)), NodeOp(ProdBatched(child(1)->grad(), + graph()->allocator(), adj_, child(0)->val(), true, @@ -327,6 +355,7 @@ public: if(transA_ && !transB_) return {NodeOp(ProdBatched(child(0)->grad(), + graph()->allocator(), child(1)->val(), adj_, false, @@ -334,6 +363,7 @@ public: 1.0, scalar_)), NodeOp(ProdBatched(child(1)->grad(), + graph()->allocator(), child(0)->val(), adj_, false, @@ -343,6 +373,7 @@ public: if(transA_ && transB_) return {NodeOp(ProdBatched(child(0)->grad(), + graph()->allocator(), child(1)->val(), adj_, true, @@ -350,6 +381,7 @@ public: 1.0, scalar_)), NodeOp(ProdBatched(child(1)->grad(), + graph()->allocator(), adj_, child(0)->val(), true, @@ -358,6 +390,7 @@ public: scalar_))}; return {NodeOp(ProdBatched(child(0)->grad(), + graph()->allocator(), adj_, child(1)->val(), false, @@ -365,6 +398,7 @@ public: 1.0, scalar_)), NodeOp(ProdBatched(child(1)->grad(), + graph()->allocator(), child(0)->val(), adj_, true, @@ -766,6 +800,7 @@ struct HighwayNodeOp : public NaryNodeOp { const std::string type() { return "highway"; } }; +#ifdef CUDNN class ConvolutionOp : public NaryNodeOp { public: ConvolutionOp(const std::vector<Expr>& nodes, @@ -802,4 +837,5 @@ public: protected: ConvolutionWrapper conv_; }; +#endif } diff --git a/src/graph/node_operators_unary.h b/src/graph/node_operators_unary.h index fa6d25c7..d7ef751d 100644 --- a/src/graph/node_operators_unary.h +++ b/src/graph/node_operators_unary.h @@ -7,7 +7,9 @@ #include "graph/node.h" #include "tensors/tensor_operators.h" -//#include "tensors/gpu/cudnn_wrappers.h" +#ifdef CUDNN +#include "tensors/gpu/cudnn_wrappers.h" +#endif namespace marian { @@ -815,7 +817,7 @@ struct TransposeNodeOp : public UnaryNodeOp { } NodeOps backwardOps() { - return {NodeOp(TransposeND(child(0)->grad(), adj_, axes_))}; + return {NodeOp(TransposeNDGrad(child(0)->grad(), adj_, axes_))}; } template <class... Args> @@ -1009,7 +1011,9 @@ struct ShiftNodeOp : public UnaryNodeOp { } NodeOps backwardOps() { - return {NodeOp(Shift(child(0)->grad(), adj_, shift_, /*padValue=*/0.f, /*invert=*/true))}; + // last parameter beta=1 says to use += (out = in + beta * out) + // @TODO: check need for padValue_ + return {NodeOp(ShiftGrad(child(0)->grad(), adj_, shift_, true))}; } const std::string type() { return "shift"; } @@ -1076,6 +1080,7 @@ struct ShiftNodeOp : public UnaryNodeOp { // Ptr<sparse::CSR> lf_; //}; +#ifdef CUDNN class PoolingOp : public UnaryNodeOp { public: PoolingOp(Expr x, @@ -1109,6 +1114,7 @@ public: protected: PoolingWrapper pooling_; }; +#endif class PoolingWithMaskingOp : public UnaryNodeOp { public: diff --git a/src/layers/convolution.cpp b/src/layers/convolution.cpp index eb1b0554..d4298e0b 100644 --- a/src/layers/convolution.cpp +++ b/src/layers/convolution.cpp @@ -2,6 +2,8 @@ #include "graph/node_operators_binary.h" namespace marian { + +#ifdef CUDNN Convolution::Convolution(Ptr<ExpressionGraph> graph) : Factory(graph) {} Expr Convolution::apply(Expr x) { @@ -29,4 +31,6 @@ Expr Convolution::apply(const std::vector<Expr>&) { ABORT("Can't apply convolution on many inputs at once"); return nullptr; } +#endif + } diff --git a/src/layers/convolution.h b/src/layers/convolution.h index ae2c5c74..24ee6dd3 100644 --- a/src/layers/convolution.h +++ b/src/layers/convolution.h @@ -7,6 +7,7 @@ namespace marian { +#ifdef CUDNN class Convolution : public Factory { protected: Ptr<Options> getOptions() { return options_; } @@ -82,4 +83,6 @@ protected: std::vector<int> kernelNums_; int stride_; }; +#endif + } diff --git a/src/layers/generic.h b/src/layers/generic.h index 8b19123b..ff404af7 100644 --- a/src/layers/generic.h +++ b/src/layers/generic.h @@ -2,11 +2,14 @@ #include "marian.h" -#include "layers/factory.h" #include "data/shortlist.h" +#include "layers/factory.h" namespace marian { namespace mlp { +/** + * @brief Activation functions + */ enum struct act : int { linear, tanh, sigmoid, ReLU, LeakyReLU, PReLU, swish }; } } @@ -64,12 +67,9 @@ public: if(inputs.size() > 1) num = std::to_string(i); - Expr W = g->param(name + "_W" + num, - {in->shape()[-1], dim}, - inits::glorot_uniform); - Expr b = g->param(name + "_b" + num, - {1, dim}, - inits::zeros); + Expr W = g->param( + name + "_W" + num, {in->shape()[-1], dim}, inits::glorot_uniform); + Expr b = g->param(name + "_b" + num, {1, dim}, inits::zeros); if(useLayerNorm) { if(useNematusNorm) { @@ -82,9 +82,8 @@ public: outputs.push_back(layerNorm(affine(in, W, b), ln_s, ln_b, NEMATUS_LN_EPS)); } else { - auto gamma = g->param(name + "_gamma" + num, - {1, dim}, - inits::from_value(1.0)); + auto gamma = g->param( + name + "_gamma" + num, {1, dim}, inits::from_value(1.0)); outputs.push_back(layerNorm(dot(in, W), gamma, b)); } @@ -107,9 +106,7 @@ public: } }; - Expr apply(Expr input) { - return apply(std::vector<Expr>({input})); - } + Expr apply(Expr input) { return apply(std::vector<Expr>({input})); } }; class Output : public Layer { @@ -129,9 +126,7 @@ public: tiedParams_[param] = graph_->get(tied); } - void set_shortlist(Ptr<data::Shortlist> shortlist) { - shortlist_ = shortlist; - } + void set_shortlist(Ptr<data::Shortlist> shortlist) { shortlist_ = shortlist; } Expr apply(Expr input) { if(!W_) { @@ -146,15 +141,13 @@ public: W_ = rows(W_, shortlist_->indices()); } else { W_ = graph_->param(name + "_" + nameW, - {input->shape()[-1], dim}, - inits::glorot_uniform); + {input->shape()[-1], dim}, + inits::glorot_uniform); if(shortlist_) W_ = cols(W_, shortlist_->indices()); } - b_ = graph_->param(name + "_b", - {1, dim}, - inits::zeros); + b_ = graph_->param(name + "_b", {1, dim}, inits::zeros); if(shortlist_) b_ = cols(b_, shortlist_->indices()); } @@ -165,10 +158,8 @@ public: virtual Expr apply(const std::vector<Expr>& inputs) { ABORT("Not implemented"); }; - }; - } // namespace mlp struct EmbeddingFactory : public Factory { @@ -195,51 +186,4 @@ struct EmbeddingFactory : public Factory { }; typedef Accumulator<EmbeddingFactory> embedding; - -static inline Expr Cost(Expr logits, - Expr indices, - Expr mask, - std::string costType = "cross-entropy", - float smoothing = 0, - Expr weights = nullptr) { - using namespace keywords; - - auto ce = cross_entropy(logits, indices); - - if(weights) - ce = weights * ce; - - if(smoothing > 0) { - // @TODO: add this to CE kernels instead - auto ceq = mean(logsoftmax(logits), axis = -1); - ce = (1 - smoothing) * ce - smoothing * ceq; - } - - if(mask) - ce = ce * mask; - - auto costSum = sum(ce, axis = -3); - - Expr cost; - // axes: - // - time axis (words): -3 - // - batch axis (sentences): -2 - if(costType == "ce-mean" - || costType - == "cross-entropy") { // sum over words; average over sentences - cost = mean(costSum, axis = -2); - } else if(costType == "ce-mean-words") { // average over target tokens - cost = sum(costSum, axis = -2) / sum(sum(mask, axis = -3), axis = -2); - } else if(costType == "ce-sum") { // sum over target tokens - cost = sum(costSum, axis = -2); - } else if(costType == "perplexity") { // ==exp('ce-mean-words') - cost = exp(sum(costSum, axis = -2) / sum(sum(mask, axis = -3), axis = -2)); - } else if(costType == "ce-rescore") { // sum over words, keep batch axis - cost = -costSum; - } else { // same as ce-mean - cost = mean(costSum, axis = -2); - } - - return cost; -} } diff --git a/src/layers/loss.cpp b/src/layers/loss.cpp new file mode 100644 index 00000000..c84895f4 --- /dev/null +++ b/src/layers/loss.cpp @@ -0,0 +1,94 @@ +#include "layers/loss.h" + +namespace marian { + +Ptr<LossBase> LossFactory(Ptr<Options> options, bool inference) { + float smoothing = inference ? 0.f : options->get<float>("label-smoothing"); + std::string costType = options->get<std::string>("cost-type", "ce-mean"); + if(costType == "ce-mean" || costType == "cross-entropy") { + return New<CrossEntropyMeanLoss>(smoothing); + } else if(costType == "ce-mean-words") { + return New<CrossEntropyMeanWordsLoss>(smoothing); + } else if(costType == "ce-sum") { + return New<CrossEntropySumLoss>(smoothing); + } else if(costType == "perplexity") { + return New<PerplexityLoss>(smoothing); + } else if(costType == "ce-rescore") { + return New<CrossEntropyRescoreLoss>(smoothing); + } else { // same as ce-mean + return New<CrossEntropyMeanLoss>(smoothing); + } +} + +Expr LossBase::getCrossEntropy(Expr logits, + Expr indices, + Expr mask, + Expr weights) { + using namespace keywords; + + auto ce = cross_entropy(logits, indices); + + if(smoothing_ > 0) { + // @TODO: add this to CE kernels instead + auto ceq = mean(logsoftmax(logits), axis = -1); + ce = (1 - smoothing_) * ce - smoothing_ * ceq; + } + + if(mask) + ce = ce * mask; + + if(weights) + ce = ce * weights; + + return ce; +} + +Expr CrossEntropyMeanLoss::getCost(Expr logits, + Expr indices, + Expr mask, + Expr weights) { + using namespace keywords; + auto ce = getCrossEntropy(logits, indices, mask, weights); + // Time axis (words): -3 + // Batch axis (sentences): -2 + return mean(sum(ce, axis = -3), axis = -2); +} + +Expr CrossEntropyMeanWordsLoss::getCost(Expr logits, + Expr indices, + Expr mask, + Expr weights) { + using namespace keywords; + auto ce = getCrossEntropy(logits, indices, mask, weights); + return sum(sum(ce, axis = -3), axis = -2) + / sum(sum(mask, axis = -3), axis = -2); +} + +Expr CrossEntropySumLoss::getCost(Expr logits, + Expr indices, + Expr mask, + Expr weights) { + using namespace keywords; + auto ce = getCrossEntropy(logits, indices, mask, weights); + return sum(sum(ce, axis = -3), axis = -2); +} + +Expr PerplexityLoss::getCost(Expr logits, + Expr indices, + Expr mask, + Expr weights) { + using namespace keywords; + auto ce = getCrossEntropy(logits, indices, mask, weights); + return exp(sum(sum(ce, axis = -3), axis = -2) + / sum(sum(mask, axis = -3), axis = -2)); +} + +Expr CrossEntropyRescoreLoss::getCost(Expr logits, + Expr indices, + Expr mask, + Expr weights) { + using namespace keywords; + auto ce = getCrossEntropy(logits, indices, mask, weights); + return -sum(ce, axis = -3); +} +} diff --git a/src/layers/loss.h b/src/layers/loss.h new file mode 100644 index 00000000..9f3bb345 --- /dev/null +++ b/src/layers/loss.h @@ -0,0 +1,70 @@ +#pragma once + +#include "marian.h" + +namespace marian { +class LossBase { +protected: + float smoothing_; + +public: + explicit LossBase(float smoothing = 0) : smoothing_(smoothing){}; + + Expr getCrossEntropy(Expr logits, Expr indices, Expr mask, Expr weights); + virtual Expr getCost(Expr logits, + Expr indices, + Expr mask, + Expr weights = nullptr) + = 0; +}; + +/* + * @brief The cross entropy loss function + * + * A sum over words and average over sentences + */ +class CrossEntropyMeanLoss : public LossBase { +public: + explicit CrossEntropyMeanLoss(float smoothing = 0) : LossBase(smoothing){}; + Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights); +}; + +/* + * @brief The cross entropy loss function as an average over target tokens + */ +class CrossEntropyMeanWordsLoss : public LossBase { +public: + explicit CrossEntropyMeanWordsLoss(float smoothing = 0) + : LossBase(smoothing){}; + Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights); +}; + +/* + * @brief The cross entropy loss function as a sum over target tokens + */ +class CrossEntropySumLoss : public LossBase { +public: + explicit CrossEntropySumLoss(float smoothing = 0) : LossBase(smoothing){}; + Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights); +}; + +/* + * @brief The perplexity loss function + */ +class PerplexityLoss : public LossBase { +public: + explicit PerplexityLoss(float smoothing = 0) : LossBase(smoothing){}; + Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights); +}; + +/* + * @brief The cross entropy loss function that keeps sentence-level costs + */ +class CrossEntropyRescoreLoss : public LossBase { +public: + explicit CrossEntropyRescoreLoss(float smoothing = 0) : LossBase(smoothing){}; + Expr getCost(Expr logits, Expr indices, Expr mask, Expr weights); +}; + +Ptr<LossBase> LossFactory(Ptr<Options> options, bool inference); +} diff --git a/src/layers/weight.cpp b/src/layers/weight.cpp new file mode 100644 index 00000000..01d7760e --- /dev/null +++ b/src/layers/weight.cpp @@ -0,0 +1,21 @@ +#include "layers/weight.h" + +namespace marian { + +Ptr<WeightingBase> WeightingFactory(Ptr<Options> options) { + if(options->has("data-weighting")) + return New<DataWeighting>(options->get<std::string>("data-weighting-type")); +} + +Expr DataWeighting::getWeights(Ptr<ExpressionGraph> graph, + Ptr<data::CorpusBatch> batch) { + ABORT_IF(batch->getDataWeights().empty(), + "Vector of weights is unexpectedly empty!"); + bool sentenceWeighting = weightingType_ == "sentence"; + int dimBatch = batch->size(); + int dimWords = sentenceWeighting ? 1 : batch->back()->batchWidth(); + auto weights = graph->constant({1, dimWords, dimBatch, 1}, + inits::from_vector(batch->getDataWeights())); + return weights; +} +} diff --git a/src/layers/weight.h b/src/layers/weight.h new file mode 100644 index 00000000..0b79ee13 --- /dev/null +++ b/src/layers/weight.h @@ -0,0 +1,33 @@ +#pragma once + +#include "common/options.h" +#include "data/corpus.h" +#include "graph/expression_graph.h" +#include "graph/expression_operators.h" +#include "graph/node_initializers.h" + +namespace marian { + +class WeightingBase { +public: + WeightingBase(){}; + virtual Expr getWeights(Ptr<ExpressionGraph> graph, + Ptr<data::CorpusBatch> batch) + = 0; + virtual void debugWeighting(std::vector<float> weightedMask, + std::vector<float> freqMask, + Ptr<data::CorpusBatch> batch){}; +}; + +class DataWeighting : public WeightingBase { +protected: + std::string weightingType_; + +public: + DataWeighting(std::string weightingType) + : WeightingBase(), weightingType_(weightingType){}; + Expr getWeights(Ptr<ExpressionGraph> graph, Ptr<data::CorpusBatch> batch); +}; + +Ptr<WeightingBase> WeightingFactory(Ptr<Options> options); +} diff --git a/src/models/costs.h b/src/models/costs.h index 207be2e9..9649492e 100644 --- a/src/models/costs.h +++ b/src/models/costs.h @@ -1,8 +1,10 @@ #pragma once -#include "models/encoder_decoder.h" #include "layers/generic.h" #include "layers/guided_alignment.h" +#include "layers/loss.h" +#include "layers/weight.h" +#include "models/encoder_decoder.h" namespace marian { namespace models { @@ -12,58 +14,57 @@ public: virtual Expr apply(Ptr<ModelBase> model, Ptr<ExpressionGraph> graph, Ptr<data::Batch> batch, - bool clearGraph = true) = 0; + bool clearGraph = true) + = 0; }; - class EncoderDecoderCE : public CostBase { protected: Ptr<Options> options_; + bool inference_{false}; + bool toBeWeighted_{false}; + Ptr<LossBase> loss_; + Ptr<WeightingBase> weighter_; + public: EncoderDecoderCE(Ptr<Options> options) - : options_(options) {} + : options_(options), inference_(options->get<bool>("inference", false)) { + loss_ = LossFactory(options_, inference_); + + toBeWeighted_ = (options_->has("data-weighting") && !inference_) + || (options_->has("dynamic-weighting") + && options_->get<bool>("dynamic-weighting") + && !inference_); + if(toBeWeighted_) + weighter_ = WeightingFactory(options_); + } Expr apply(Ptr<ModelBase> model, Ptr<ExpressionGraph> graph, Ptr<data::Batch> batch, bool clearGraph = true) { - auto encdec = std::static_pointer_cast<EncoderDecoder>(model); auto corpusBatch = std::static_pointer_cast<data::CorpusBatch>(batch); auto state = encdec->stepAll(graph, corpusBatch, clearGraph); - std::string costType = options_->get<std::string>("cost-type"); - bool inference = options_->get<bool>("inference", false); - - float ls = inference ? 0.f : options_->get<float>("label-smoothing"); + float ls = inference_ ? 0.f : options_->get<float>("label-smoothing"); Expr weights; + Expr cost; bool sentenceWeighting = false; - if(options_->has("data-weighting") && !inference) { - ABORT_IF(corpusBatch->getDataWeights().empty(), - "Vector of weights is unexpectedly empty!"); - - sentenceWeighting - = options_->get<std::string>("data-weighting-type") == "sentence"; - int dimBatch = corpusBatch->size(); - int dimWords = sentenceWeighting ? 1 : corpusBatch->back()->batchWidth(); - - weights = graph->constant({1, dimWords, dimBatch, 1}, - inits::from_vector(corpusBatch->getDataWeights())); + if(toBeWeighted_) { + weights = weighter_->getWeights(graph, corpusBatch); } - auto cost - = Cost(state->getProbs(), - state->getTargetIndices(), - state->getTargetMask(), - costType, - ls, - weights); + cost = loss_->getCost(state->getProbs(), + state->getTargetIndices(), + state->getTargetMask(), + weights); - if(options_->has("guided-alignment") && !inference) { + if(options_->has("guided-alignment") && !inference_) { auto alignments = encdec->getDecoders()[0]->getAlignments(); ABORT_IF(alignments.empty(), "Model does not seem to support alignments"); @@ -73,8 +74,6 @@ public: } else { return cost; } - - return cost; } }; @@ -85,7 +84,7 @@ protected: public: Trainer(Ptr<ModelBase> model, Ptr<CostBase> cost) - : model_(model), cost_(cost) {} + : model_(model), cost_(cost) {} Ptr<ModelBase> getModel() { return model_; } @@ -104,16 +103,10 @@ public: virtual Expr build(Ptr<ExpressionGraph> graph, Ptr<data::Batch> batch, bool clearGraph = true) { - return cost_->apply(model_, - graph, - batch, - clearGraph); - }; - - virtual void clear(Ptr<ExpressionGraph> graph) { - model_->clear(graph); + return cost_->apply(model_, graph, batch, clearGraph); }; + virtual void clear(Ptr<ExpressionGraph> graph) { model_->clear(graph); }; }; typedef Trainer Scorer; @@ -138,11 +131,11 @@ protected: public: Stepwise(Ptr<EncoderDecoderBase> encdec, Ptr<CostStep> cost) - : encdec_(encdec), cost_(cost) {} + : encdec_(encdec), cost_(cost) {} virtual void load(Ptr<ExpressionGraph> graph, const std::string& name, - bool markedReloaded = true) { + bool markedReloaded = true) { encdec_->load(graph, name, markedReloaded); } @@ -152,9 +145,7 @@ public: encdec_->save(graph, name, saveTranslatorConfig); } - virtual void clear(Ptr<ExpressionGraph> graph) { - encdec_->clear(graph); - } + virtual void clear(Ptr<ExpressionGraph> graph) { encdec_->clear(graph); } virtual Expr build(Ptr<ExpressionGraph> graph, Ptr<data::Batch> batch, @@ -174,7 +165,8 @@ public: const std::vector<size_t>& embIndices, int dimBatch, int beamSize) { - auto nextState = encdec_->step(graph, state, hypIndices, embIndices, dimBatch, beamSize); + auto nextState = encdec_->step( + graph, state, hypIndices, embIndices, dimBatch, beamSize); return cost_->apply(nextState); } @@ -185,11 +177,10 @@ public: return nullptr; } - virtual Ptr<Options> getOptions() { - return encdec_->getOptions(); - }; + virtual Ptr<Options> getOptions() { return encdec_->getOptions(); }; - virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) { + virtual void setShortlistGenerator( + Ptr<data::ShortlistGenerator> shortlistGenerator) { encdec_->setShortlistGenerator(shortlistGenerator); }; @@ -197,10 +188,14 @@ public: return encdec_->getShortlist(); }; + virtual std::vector<float> getAlignment() { + return encdec_->getAlignment(); + } }; -static Ptr<ModelBase> add_cost(Ptr<EncoderDecoder> encdec, Ptr<Options> options) { - switch (options->get<usage>("usage", usage::raw)) { +static Ptr<ModelBase> add_cost(Ptr<EncoderDecoder> encdec, + Ptr<Options> options) { + switch(options->get<usage>("usage", usage::raw)) { case usage::training: return New<Trainer>(encdec, New<EncoderDecoderCE>(options)); case usage::scoring: @@ -208,10 +203,8 @@ static Ptr<ModelBase> add_cost(Ptr<EncoderDecoder> encdec, Ptr<Options> options) case usage::translation: return New<Stepwise>(encdec, New<LogsoftmaxStep>()); case usage::raw: - default: - return encdec; + default: return encdec; } } - } } diff --git a/src/models/encoder_decoder.h b/src/models/encoder_decoder.h index e1a07d1f..4eee31c5 100644 --- a/src/models/encoder_decoder.h +++ b/src/models/encoder_decoder.h @@ -44,6 +44,8 @@ public: virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) = 0; virtual Ptr<data::Shortlist> getShortlist() = 0; + + virtual std::vector<float> getAlignment() = 0; }; class EncoderDecoder : public EncoderDecoderBase { @@ -113,6 +115,12 @@ public: return decoders_[0]->getShortlist(); }; + virtual std::vector<float> getAlignment() { + std::vector<float> softAlign; + decoders_[0]->getAlignments()[0]->val()->get(softAlign); + return softAlign; + }; + /*********************************************************************/ virtual Ptr<DecoderState> startState(Ptr<ExpressionGraph> graph, diff --git a/src/models/nematus.h b/src/models/nematus.h index 82b77c68..3d23f5fc 100644 --- a/src/models/nematus.h +++ b/src/models/nematus.h @@ -21,6 +21,10 @@ public: ABORT_IF(options_->get<std::string>("dec-cell") != "gru-nematus", "--type nematus does not currently support other rnn cells " "than gru-nematus, use --type s2s"); + + ABORT_IF(options_->get<int>("dec-cell-high-depth") > 1, + "--type nematus does not currently support " + "--dec-cell-high-depth > 1, use --type s2s"); } void load(Ptr<ExpressionGraph> graph, diff --git a/src/python/CMakeLists.txt b/src/python/CMakeLists.txt deleted file mode 100644 index 9d54c01a..00000000 --- a/src/python/CMakeLists.txt +++ /dev/null @@ -1,42 +0,0 @@ -cuda_add_library(pymarian SHARED - mariannmt.cpp - ../3rd_party/cnpy/cnpy.cpp - ../3rd_party/exception.cpp - ../3rd_party/svd/svd.cpp - ../graph/expression_graph.cpp - ../graph/expression_operators.cu - ../graph/node.cu - ../graph/node_operators.cu - ../tensors/tensor.cu - ../tensors/device.cpp - ../kernels/tensor_operators.cu - ../tensors/gpu/dropout.cu - ../tensors/cpu/dropout.cpp - ../kernels/sparse.cu - #../layers/param_initializers.cu - ../rnn/attention.cu - ../rnn/cells.cu - #../optimizers/clippers.cu - #../optimizers/optimizers.cu - ../common/utils.cpp - ../common/logging.cpp - ../common/config.cpp - ../common/config_parser.cpp - ../translator/history.cpp - ../translator/output_collector.cpp - ../translator/nth_element.cu - ../translator/helpers.cu - ../data/vocab.cpp - ../data/corpus.cpp - ../data/text_input.cpp - #../rescorer/score_collector.cpp - $<TARGET_OBJECTS:libyaml-cpp> -) - -set_target_properties(pymarian PROPERTIES EXCLUDE_FROM_ALL 1) -set_target_properties(pymarian PROPERTIES OUTPUT_NAME mariannmt) -set_target_properties(pymarian PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}") -set_target_properties(pymarian PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}") - -target_link_libraries(pymarian ${EXT_LIBS} marian) -cuda_add_cublas_to_target(pymarian) diff --git a/src/python/mariannmt.cpp b/src/python/mariannmt.cpp deleted file mode 100644 index d5bdd161..00000000 --- a/src/python/mariannmt.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include <cstdlib> -#include <iostream> -#include <string> - -#include <boost/python.hpp> - -#include "common/utils.h" -#include "common/version.h" -#include "translator/beam_search.h" -#include "translator/translator.h" - -using namespace marian; - -Ptr<TranslateServiceMultiGPU<BeamSearch>> task; - -void init(const std::string& argopts) { - auto options = New<Config>(argopts, ConfigMode::translating); - task = New<TranslateServiceMultiGPU<BeamSearch>>(options); - LOG(info, "Translator initialized"); -} - -boost::python::list translate(boost::python::list& pyinput) { - std::vector<std::string> input; - for(int i = 0; i < boost::python::len(pyinput); ++i) { - input.emplace_back( - boost::python::extract<std::string>(boost::python::object(pyinput[i]))); - } - - auto output = task->run(input); - - boost::python::list pyoutput; - pyoutput.append(Join(output, "\n")); - return pyoutput; -} - -std::string version() { - return PROJECT_VERSION; -} - -BOOST_PYTHON_MODULE(libmariannmt) { - boost::python::def("init", init); - boost::python::def("translate", translate); - boost::python::def("version", version); -} diff --git a/src/tensors/allocator.h b/src/tensors/allocator.h index 4285a91b..3ec8c85b 100644 --- a/src/tensors/allocator.h +++ b/src/tensors/allocator.h @@ -16,9 +16,27 @@ namespace marian { class AllocationException : public std::exception { +private: + char* message_; + public: - virtual const char* what() const throw() { - return "Memory re-allocation attempted"; + AllocationException(size_t available, size_t asked) { + std::string mstr = "Attempted allocation of " + + std::to_string(asked) + + ", but only " + + std::to_string(available) + + " free"; + + message_ = new char[mstr.size() + 1]; + std::copy(mstr.begin(), mstr.end(), message_); + } + + ~AllocationException() { + delete[] message_; + } + + virtual const char* what() const noexcept { + return message_; } }; @@ -111,7 +129,7 @@ private: auto it = std::lower_bound(gaps_.begin(), gaps_.end(), Gap(nullptr, size)); if(throw_ && it == gaps_.end()) { - throw AllocationException(); + throw AllocationException(available_, size); } while(it == gaps_.end()) { @@ -119,8 +137,11 @@ private: it = std::lower_bound(gaps_.begin(), gaps_.end(), Gap(nullptr, size)); } - available_ -= it->size(); - return *it; + Gap gap = *it; + gaps_.erase(it); + + available_ -= gap.size(); + return gap; } void insertGap(Gap gap, bool consolidate = true) { @@ -186,7 +207,6 @@ public: bytes = align(bytes); Gap gap = getGap(bytes); - gaps_.erase(gap); if(gap.size() > bytes) { insertGap(gap.rest(bytes), false); } diff --git a/src/tensors/cpu/element.h b/src/tensors/cpu/element.h index 23750bcd..a383edb0 100644 --- a/src/tensors/cpu/element.h +++ b/src/tensors/cpu/element.h @@ -23,9 +23,9 @@ template <size_t I = 0> struct E { functional::Array<functional::Tensor<float>, K>& tensors, functional::Array<int, K> indices) { - auto& shape = tensors[0].shape(); + const auto& shape = tensors[0].shape(); - // loop for outer-most dimension + // loop over outer-most dimension for(int i = 0; i < shape[I]; ++i) { // call loop for next-inner dimension @@ -66,7 +66,7 @@ void Element(const Functor& functor, marian::Tensor out, Tensors... tensors) { // call elementwise operation going from outer-most dimension // to inner-most element. - E<>::element(functor, gTensors, indices); + E<0>::element(functor, gTensors, indices); } } diff --git a/src/tensors/cpu/prod.cpp b/src/tensors/cpu/prod.cpp index c5d86479..bfdb8573 100644 --- a/src/tensors/cpu/prod.cpp +++ b/src/tensors/cpu/prod.cpp @@ -95,8 +95,9 @@ void Prod(marian::Tensor C, } void ProdBatched(marian::Tensor C, - const marian::Tensor& A, - const marian::Tensor& B, + Ptr<Allocator> allocator, + const marian::Tensor A, + const marian::Tensor B, bool transA, bool transB, float beta, @@ -128,30 +129,21 @@ void ProdBatched(marian::Tensor C, auto strideA = batchA == 1 ? 0 : m * k; auto strideC = n * m; - int steps = std::max(batchA, batchB); - - int offsetA = 0; - int offsetB = 0; - int offsetC = 0; - - for(int i = 0; i < steps; ++i) { + int batchC = std::max(batchA, batchB); + for(int i = 0; i < batchC; ++i) { sgemm(transA, transB, m, n, k, alpha, - A->data() + offsetA, + A->data() + (i % batchA) * strideA, lda, - B->data() + offsetB, + B->data() + (i % batchB) * strideB, ldb, beta, - C->data() + offsetC, + C->data() + i * strideC, ldc); - - offsetA += strideA; - offsetB += strideB; - offsetC += strideC; } #else ABORT("Not implemented!"); diff --git a/src/tensors/cpu/tensor_operators.cpp b/src/tensors/cpu/tensor_operators.cpp index 7310102d..8406f9d0 100644 --- a/src/tensors/cpu/tensor_operators.cpp +++ b/src/tensors/cpu/tensor_operators.cpp @@ -50,12 +50,13 @@ inline void gInsertCols(float* out, size_t cols_out, size_t cols_in, size_t offset_out, - size_t offset_in) { + size_t offset_in, + float beta) { for(int j = 0; j < rows; ++j) { float* rowOut = out + j * cols_out + offset_out; const float* rowIn = in + j * cols_in + offset_in; for(int i = 0; i < cols; ++i) { - rowOut[i] = rowIn[i]; + rowOut[i] = rowIn[i] + beta * rowOut[i]; } } } @@ -71,7 +72,7 @@ void Concatenate1(Tensor out, const std::vector<Tensor>& inputs) { "First dimension must be equal"); int cols_in = in->shape().back(); cpu::gInsertCols( - out->data(), in->data(), rows, cols_in, cols_out, cols_in, offset, 0); + out->data(), in->data(), rows, cols_in, cols_out, cols_in, offset, 0, 0); offset += cols_in; } } @@ -91,8 +92,11 @@ void Split1(std::vector<Tensor>& outputs, const Tensor in) { ABORT_IF(rows != out->shape().elements() / out->shape().back(), "First dimension must be equal"); int cols_out = out->shape().back(); + + // set last parameter to 1 to enable += instead of = + // @TODO: do this in a more principled ways accross all/most kernels cpu::gInsertCols( - out->data(), in->data(), rows, cols_out, cols_out, cols_in, 0, offset); + out->data(), in->data(), rows, cols_out, cols_out, cols_in, 0, offset, 1); offset += cols_out; } } @@ -108,9 +112,17 @@ void SplitCont(std::vector<Tensor>& outputs, const Tensor in, int axis) { size_t size = out->shape().elements() / step; size_t offset2 = i * size; - std::copy(in->data() + offset1, - in->data() + offset1 + size, - out->data() + offset2); + // BUG: This overwrites gradients! + //std::copy(in->data() + offset1, + // in->data() + offset1 + size, + // out->data() + offset2); + + // Fixes gradient problem, @TODO: check performance + std::transform(in->data() + offset1, + in->data() + offset1 + size, + out->data() + offset2, + out->data() + offset2, + [](float a, float b){ return a + b; }); offset1 += size; } @@ -124,6 +136,7 @@ void Deconcatenate(std::vector<Tensor>& outputs, const Tensor in, int ax) { SplitCont(outputs, in, ax); } +template <bool add> void Transpose0213(Tensor out, Tensor in) { int cols = in->shape()[-1]; int rows = in->shape().elements() / in->shape()[-1]; @@ -141,7 +154,15 @@ void Transpose0213(Tensor out, Tensor in) { const float* inRow = in->data() + src * cols ; float* outRow = out->data() + dst * cols; - std::copy(inRow, inRow + cols, outRow); + if(!add) { + // mostly for fast forward computation + std::copy(inRow, inRow + cols, outRow); + } + else { + for(int i = 0; i < cols; ++i) { + outRow[i] += inRow[i]; + } + } } } } @@ -186,6 +207,7 @@ void Transpose10(Tensor out, const Tensor in) { } // @TODO: optimize this, currently it's quite horrible +template <bool add> void TransposeGeneric(Tensor out, Tensor in, const std::vector<int>& vAxis) { functional::Array<int, functional::Shape::size()> permute; int diff = functional::Shape::size() - vAxis.size(); @@ -207,19 +229,29 @@ void TransposeGeneric(Tensor out, Tensor in, const std::vector<int>& vAxis) { gOut.shape().dims(index, oDims); for(int i = 0; i < N; ++i) pDims[permute[i]] = oDims[i]; - gOut[index] = gIn[pDims]; + if(add) + gOut[index] += gIn[pDims]; + else + gOut[index] = gIn[pDims]; } } void TransposeND(Tensor out, Tensor in, const std::vector<int>& vAxis) { if(vAxis == std::vector<int>({0, 2, 1, 3})) - Transpose0213(out, in); - else if(vAxis == std::vector<int>({1, 0}) - && in->shape()[-1] % 16 == 0 + Transpose0213<false>(out, in); + else if(vAxis == std::vector<int>({1, 0}) + && in->shape()[-1] % 16 == 0 && in->shape()[-2] % 16 == 0) Transpose10(out, in); else - TransposeGeneric(out, in, vAxis); + TransposeGeneric<false>(out, in, vAxis); +} + +void TransposeNDGrad(Tensor out, Tensor in, const std::vector<int>& vAxis) { + if(vAxis == std::vector<int>({0, 2, 1, 3})) + Transpose0213<true>(out, in); + else + TransposeGeneric<true>(out, in, vAxis); } void Softmax(Tensor out_, Tensor in_, Tensor mask_) { @@ -412,9 +444,8 @@ void PasteCols(Tensor out_, const float* rowIn = in + j * colsIn; float* rowOut = out + j * colsOut; - // @TODO: should this be a sum? for(int i = 0; i < colsIn; ++i) { - rowOut[indices[i]] = rowIn[i]; + rowOut[indices[i]] += rowIn[i]; } } } @@ -458,7 +489,6 @@ void GRUFastForward(Tensor out_, std::vector<Tensor> inputs, bool final) { #pragma omp simd for(int i = 0; i < cols; ++i) { - // @TODO: stable sigmoid float r = stableSigmoid(xWrow[i] + sUrow[i] + b[i]); int k = i + cols; @@ -901,6 +931,26 @@ void Shift(Tensor out_, Tensor in_, marian::Shape shift, float padValue, bool in } } +void ShiftGrad(Tensor out_, Tensor in_, marian::Shape shift, bool invert) { + int offset = 0; + for(int i = 0; i < shift.size(); ++i) + offset += in_->shape().stride(i) * shift[i]; + + if(invert) + offset = -offset; + + float* out = out_->data(); + const float* in = in_->data(); + + int length = out_->shape().elements(); +#pragma omp parallel for + for(int i = 0; i < length; ++i) { + if(i - offset >= 0 && i - offset < length) { + out[i] += in[i - offset]; + } + } +} + void SetSparse(float* out, const std::vector<size_t>& indices, const std::vector<float>& values) { diff --git a/src/tensors/gpu/add.cu b/src/tensors/gpu/add.cu index 1acb5b54..9f4d0dba 100644 --- a/src/tensors/gpu/add.cu +++ b/src/tensors/gpu/add.cu @@ -160,7 +160,6 @@ void Add(Functor functor, float scale, marian::Tensor out, Tensors... tensors) { bool broadcast = false; for(int i = 0; i < K; ++i) broadcast = broadcast || gOut.shape() != gIns[i].shape(); - gAddEqual<<<blocks, threads>>>(functor, gOut, gIns, scale, broadcast); } else { int threads = std::min(MAX_THREADS, length); diff --git a/src/tensors/gpu/element.inc b/src/tensors/gpu/element.inc index 02d269f3..79498466 100644 --- a/src/tensors/gpu/element.inc +++ b/src/tensors/gpu/element.inc @@ -41,3 +41,4 @@ template void Element<Assign<Var<1>, BinaryFunctor<elem::Clip, Assignee<2>, Capt template void Element<Assign<Var<1>, BinaryFunctor<elem::LogAddExp, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor>(Assign<Var<1>, BinaryFunctor<elem::LogAddExp, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor, marian::Tensor); template void Element<Assign<Var<1>, BinaryFunctor<elem::Maximum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor>(Assign<Var<1>, BinaryFunctor<elem::Maximum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor, marian::Tensor); template void Element<Assign<Var<1>, BinaryFunctor<elem::Minimum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor>(Assign<Var<1>, BinaryFunctor<elem::Minimum, Assignee<2>, Assignee<3>>>, marian::Tensor, marian::Tensor, marian::Tensor); +template void Element<Assign<Var<1>, BinaryFunctor<elem::Div, Assignee<1>, Capture>>>(Assign<Var<1>, BinaryFunctor<elem::Div, Assignee<1>, Capture>>, marian::Tensor); diff --git a/src/tensors/gpu/prod.cu b/src/tensors/gpu/prod.cu index e102311d..242dcccc 100644 --- a/src/tensors/gpu/prod.cu +++ b/src/tensors/gpu/prod.cu @@ -67,6 +67,30 @@ void Prod(marian::Tensor C, #endif } +__global__ void gAddBias(float* out, const float* bias, size_t length, size_t cols) { + for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) { + int index = bid + blockDim.x * blockIdx.x + threadIdx.x; + if(index < length) { + size_t index2 = index % cols; + out[index] += bias[index2]; + } + } +} + +void AddBias(marian::Tensor C, const marian::Tensor bias) { + cudaSetDevice(C->getDevice().no); + + int length = C->shape().elements(); + int cols = bias->shape().elements(); + + int threads = std::min(MAX_THREADS, length); + int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); + + gAddBias<<<blocks, threads>>>(C->data(), bias->data(), length, cols); + + cudaStreamSynchronize(0); +} + void ProdWithBias(marian::Tensor C, const marian::Tensor& A, const marian::Tensor& B, @@ -76,13 +100,13 @@ void ProdWithBias(marian::Tensor C, float beta, float scalar) { marian::gpu::Prod(C, A, B, transA, transB, beta, scalar); - marian::gpu::Add(functional::_1, 1.f, C, bias); + marian::gpu::AddBias(C, bias); } - void ProdBatched(marian::Tensor C, - const marian::Tensor& A, - const marian::Tensor& B, + Ptr<Allocator> allocator, + const marian::Tensor A, + const marian::Tensor B, bool transA, bool transB, float beta, @@ -116,30 +140,57 @@ void ProdBatched(marian::Tensor C, auto cublasHandle = std::static_pointer_cast<gpu::Backend>(C->getBackend()) ->getCublasHandle(); + + int strideA = batchA == 1 ? 0 : m * k; + int strideB = batchB == 1 ? 0 : n * k; + int strideC = n * m; + int batchC = std::max(batchA, batchB); + + std::vector<const float*> aptr; + std::vector<const float*> bptr; + std::vector<float*> cptr; + + for(int i = 0; i < batchC; i++) { + aptr.push_back(A->data() + (i % batchA) * strideA); + bptr.push_back(B->data() + (i % batchB) * strideB); + cptr.push_back(C->data() + i * strideC); + } + + auto mp_aptr = allocator->alloc<const float*>(aptr.size()); + CudaCopy(aptr.data(), aptr.data() + aptr.size(), mp_aptr->data<const float*>()); + + auto mp_bptr = allocator->alloc<const float*>(bptr.size()); + CudaCopy(bptr.data(), bptr.data() + bptr.size(), mp_bptr->data<const float*>()); + + auto mp_cptr = allocator->alloc<float*>(cptr.size()); + CudaCopy(cptr.data(), cptr.data() + cptr.size(), mp_cptr->data<float*>()); + #if CUDA_VERSION >= 9000 cublasSetMathMode(cublasHandle, CUBLAS_TENSOR_OP_MATH); #endif - cublasSgemmStridedBatched(cublasHandle, - opB, - opA, - n, - m, - k, - &alpha, - B->data(), - ldb, - batchB == 1 ? 0 : n * k, - A->data(), - lda, - batchA == 1 ? 0 : m * k, - &beta, - C->data(), - ldc, - n * m, - std::max(batchA, batchB)); + cublasSgemmBatched(cublasHandle, + opB, + opA, + n, + m, + k, + &alpha, + mp_bptr->data<const float*>(), + ldb, + mp_aptr->data<const float*>(), + lda, + &beta, + mp_cptr->data<float*>(), + ldc, + batchC); #if CUDA_VERSION >= 9000 cublasSetMathMode(cublasHandle, CUBLAS_DEFAULT_MATH); #endif + + allocator->free(mp_aptr); + allocator->free(mp_bptr); + allocator->free(mp_cptr); } + } } diff --git a/src/tensors/gpu/prod.h b/src/tensors/gpu/prod.h index 5791fb1a..1710914e 100644 --- a/src/tensors/gpu/prod.h +++ b/src/tensors/gpu/prod.h @@ -26,8 +26,9 @@ void ProdWithBias(marian::Tensor C, float scalar = 1); void ProdBatched(marian::Tensor C, - const marian::Tensor& A, - const marian::Tensor& B, + Ptr<Allocator> allocator, + const marian::Tensor A, + const marian::Tensor B, bool transA, bool transB, float beta = 0, diff --git a/src/tensors/gpu/tensor_operators.cu b/src/tensors/gpu/tensor_operators.cu index 87861a1c..2c7ab510 100644 --- a/src/tensors/gpu/tensor_operators.cu +++ b/src/tensors/gpu/tensor_operators.cu @@ -38,6 +38,8 @@ bool IsNan(Tensor in) { } void ConcatCont(Tensor out, const std::vector<Tensor>& inputs, int axis) { + + cudaSetDevice(out->getDevice().no); int step = 1; for(int i = 0; i < axis; ++i) @@ -49,7 +51,7 @@ void ConcatCont(Tensor out, const std::vector<Tensor>& inputs, int axis) { size_t size = in->shape().elements() / step; size_t offset2 = i * size; - cudaMemcpyAsync(out->data() + offset1, + cudaMemcpy(out->data() + offset1, in->data() + offset2, size * sizeof(float), cudaMemcpyDeviceToDevice); @@ -60,14 +62,15 @@ void ConcatCont(Tensor out, const std::vector<Tensor>& inputs, int axis) { cudaStreamSynchronize(0); } +template <bool add> __global__ void gInsertCols(float* out, - const float* in, - size_t rows, - size_t cols, - size_t cols_out, - size_t cols_in, - size_t offset_out, - size_t offset_in) { + const float* in, + size_t rows, + size_t cols, + size_t cols_out, + size_t cols_in, + size_t offset_out, + size_t offset_in) { for(int bid = 0; bid < rows; bid += gridDim.x) { int j = bid + blockIdx.x; if(j < rows) { @@ -77,7 +80,10 @@ __global__ void gInsertCols(float* out, for(int tid = 0; tid < cols; tid += blockDim.x) { int i = tid + threadIdx.x; if(i < cols) - rowOut[i] = rowIn[i]; + if(add) + rowOut[i] += rowIn[i]; + else + rowOut[i] = rowIn[i]; } } } @@ -99,16 +105,81 @@ void Concatenate1(Tensor out, const std::vector<Tensor>& inputs) { int blocks = std::min(MAX_BLOCKS, rows); int threads = std::min(MAX_THREADS, cols_in); - gInsertCols<<<blocks, threads>>>( + gInsertCols<false><<<blocks, threads>>>( out->data(), in->data(), rows, cols_in, cols_out, cols_in, offset, 0); offset += cols_in; } cudaStreamSynchronize(0); } + +__global__ void gJoin2(float* out, size_t rowBatch, size_t cols, + const float* in1, size_t inStride1, + const float* in2, size_t inStride2) { + + int outStride = inStride1 + inStride2; + int rows = rowBatch * outStride; + + for(int bid = 0; bid < rows; bid += gridDim.x) { + int j = bid + blockIdx.x; + if(j < rows) { + + float* rowOut = out + j * cols; + + int curBatch = j / outStride; + int curPos = j % outStride; + + int jIn1 = (curBatch * inStride1) + curPos; + int jIn2 = (curBatch * inStride2) + curPos - inStride1; + + const float* rowIn1 = in1 + jIn1 * cols; + const float* rowIn2 = in2 + jIn2 * cols; + + for(int tid = 0; tid < cols; tid += blockDim.x) { + int i = tid + threadIdx.x; + if(i < cols) { + if(curPos < inStride1) + rowOut[i] = rowIn1[i]; + else + rowOut[i] = rowIn2[i]; + } + } + + } + } +} + + +void Concatenate2(Tensor out, Tensor in1, Tensor in2) { + cudaSetDevice(out->getDevice().no); + + size_t rows = out->shape().elements() / out->shape().back(); + size_t cols = out->shape().back(); + + size_t rowStride1 = in1->shape()[-2]; + size_t rowStride2 = in2->shape()[-2]; + + size_t rowBatch = rows / out->shape()[-2]; + + int blocks = std::min(MAX_BLOCKS, (int)rows); + int threads = std::min(MAX_THREADS, (int)cols); + + gJoin2<<<blocks, threads>>>(out->data(), + rowBatch, + cols, + in1->data(), + rowStride1, + in2->data(), + rowStride2); + + cudaStreamSynchronize(0); +} + void Concatenate(Tensor out, const std::vector<Tensor>& inputs, int ax) { if(ax == out->shape().size() - 1) Concatenate1(out, inputs); + else if(ax == out->shape().size() - 2 && inputs.size() == 2) + Concatenate2(out, inputs[0], inputs[1]); else ConcatCont(out, inputs, ax); } @@ -127,13 +198,24 @@ void Split1(std::vector<Tensor>& outputs, const Tensor in) { int blocks = std::min(MAX_BLOCKS, rows); int threads = std::min(MAX_THREADS, cols_out); - gInsertCols<<<blocks, threads>>>( + gInsertCols<true><<<blocks, threads>>>( out->data(), in->data(), rows, cols_out, cols_out, cols_in, 0, offset); offset += cols_out; } cudaStreamSynchronize(0); } +// @TODO: this function is just a temporary fix until I come up with +// something better for the situation below. +__global__ void gAddRow(float* out, const float* in, int length) { + for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) { + int index = bid + blockDim.x * blockIdx.x + threadIdx.x; + if(index < length) { + out[index] = in[index] + out[index]; + } + } +} + void SplitCont(std::vector<Tensor>& outputs, const Tensor in, int axis) { cudaSetDevice(in->getDevice().no); @@ -141,17 +223,25 @@ void SplitCont(std::vector<Tensor>& outputs, const Tensor in, int axis) { for(int i = 0; i < axis; ++i) step *= in->shape()[i]; - size_t offset1 = 0; + int offset1 = 0; for(int i = 0; i < step; ++i) { for(auto out : outputs) { - size_t size = out->shape().elements() / step; - size_t offset2 = i * size; - - cudaMemcpyAsync(out->data() + offset2, - in->data() + offset1, - size * sizeof(float), - cudaMemcpyDeviceToDevice); - + int size = out->shape().elements() / step; + int offset2 = i * size; + + // BUG: this is does not add gradients + //cudaMemcpyAsync(out->data() + offset2, + // in->data() + offset1, + // size * sizeof(float), + // cudaMemcpyDeviceToDevice); + + // @TODO: this is a quick but bad fix for the above bug + int threads = std::min(MAX_THREADS, size); + int blocks = std::min(MAX_BLOCKS, size / threads + (size % threads != 0)); + + gAddRow<<<blocks, threads>>>(out->data() + offset2, + in->data() + offset1, + size); offset1 += size; } } @@ -165,6 +255,7 @@ void Deconcatenate(std::vector<Tensor>& outputs, const Tensor in, int ax) { SplitCont(outputs, in, ax); } +template <bool add> __global__ void gTransposeND( functional::Tensor<float> out, const functional::Tensor<float> in, @@ -180,27 +271,114 @@ __global__ void gTransposeND( out.shape().dims(index, oDims); for(int i = 0; i < N; ++i) pDims[permute[i]] = oDims[i]; - out[index] = in[pDims]; + if(add) + out[index] += in[pDims]; + else + out[index] = in[pDims]; + } + } +} + +template <bool add> +__global__ void gTranspose0213(float* out, const float* in, + int rows, + int cols, + int stride1, + int stride2) { + + int stride = stride1 * stride2; + for(int bid = 0; bid < rows; bid += gridDim.x) { + int j = bid + blockIdx.x; + if(j < rows) { + float* rowOut = out + j * cols; + + int z = j / stride; + int y = (j % stride) / stride1; + int x = (j % stride) % stride1; + int j2 = z * stride + x * stride2 + y; + + const float* rowIn = in + j2 * cols; + + for(int tid = 0; tid < cols; tid += blockDim.x) { + int i = tid + threadIdx.x; + if(i < cols) { + if(add) + rowOut[i] += rowIn[i]; + else + rowOut[i] = rowIn[i]; + } + } } } + } void TransposeND(Tensor out, Tensor in, const std::vector<int>& vAxis) { cudaSetDevice(out->getDevice().no); + if(vAxis == std::vector<int>({0, 2, 1, 3})) { - functional::Array<int, functional::Shape::size()> axes; - int diff = functional::Shape::size() - vAxis.size(); - for(int i = 0; i < axes.size(); ++i) - if(i < diff) - axes[i] = i; - else - axes[i] = vAxis[i - diff] + diff; + int rows = out->shape().elements() / out->shape().back(); + int cols = out->shape().back(); - int length = out->shape().elements(); - int threads = std::min(MAX_THREADS, length); - int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); + int blocks = std::min(MAX_BLOCKS, rows); + int threads = std::min(MAX_THREADS, cols); + + int stride1 = out->shape()[-2]; + int stride2 = out->shape()[-3]; + + gTranspose0213<false><<<blocks, threads>>>(out->data(), in->data(), + rows, cols, stride1, stride2); + } + else { + + functional::Array<int, functional::Shape::size()> axes; + int diff = functional::Shape::size() - vAxis.size(); + for(int i = 0; i < axes.size(); ++i) + if(i < diff) + axes[i] = i; + else + axes[i] = vAxis[i - diff] + diff; + + int length = out->shape().elements(); + int threads = std::min(MAX_THREADS, length); + int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); + + gTransposeND<false><<<blocks, threads>>>(out, in, axes); + } +} + +void TransposeNDGrad(Tensor out, Tensor in, const std::vector<int>& vAxis) { + cudaSetDevice(out->getDevice().no); + if(vAxis == std::vector<int>({0, 2, 1, 3})) { + + int rows = out->shape().elements() / out->shape().back(); + int cols = out->shape().back(); + + int blocks = std::min(MAX_BLOCKS, rows); + int threads = std::min(MAX_THREADS, cols); + + int stride1 = out->shape()[-2]; + int stride2 = out->shape()[-3]; + + gTranspose0213<true><<<blocks, threads>>>(out->data(), in->data(), + rows, cols, stride1, stride2); + } + else { - gTransposeND<<<blocks, threads>>>(out, in, axes); + functional::Array<int, functional::Shape::size()> axes; + int diff = functional::Shape::size() - vAxis.size(); + for(int i = 0; i < axes.size(); ++i) + if(i < diff) + axes[i] = i; + else + axes[i] = vAxis[i - diff] + diff; + + int length = out->shape().elements(); + int threads = std::min(MAX_THREADS, length); + int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); + + gTransposeND<true><<<blocks, threads>>>(out, in, axes); + } } __global__ void gSoftmax(float* out, @@ -697,7 +875,7 @@ __global__ void gPasteCols(float* out, for(int tid = 0; tid < colsIn; tid += blockDim.x) { int i = tid + threadIdx.x; if(i < colsIn) - rowOut[targetColIdx[i]] = rowIn[i]; + rowOut[targetColIdx[i]] += rowIn[i]; } } } @@ -764,7 +942,7 @@ __global__ void gInsert(float* out, inShape.dims(index, dims); dims[axis] = d_indices[dims[index]]; int outIndex = outShape.index(dims); - out[outIndex] = in[index]; + out[outIndex] += in[index]; } } } @@ -1558,14 +1736,21 @@ void LayerNormalizationGrad(Tensor gradX, eps); } +template <bool add> __global__ void gShift(float* out, const float* in, int length, int offset, float padValue) { for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) { int index = bid + blockDim.x * blockIdx.x + threadIdx.x; if(index < length) { - if(index - offset < 0 || index - offset >= length) - out[index] = padValue; - else - out[index] = in[index - offset]; + if(add) { + if(index - offset >= 0 && index - offset < length) + out[index] += in[index - offset]; + } + else { + if(index - offset < 0 || index - offset >= length) + out[index] = padValue; + else + out[index] = in[index - offset]; + } } } } @@ -1588,7 +1773,28 @@ void Shift(Tensor out, Tensor in, marian::Shape shift, float padValue, bool inve int threads = std::min(MAX_THREADS, length); int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); - gShift<<<blocks, threads>>>(out->data(), in->data(), length, offset, padValue); + gShift<false><<<blocks, threads>>>(out->data(), in->data(), length, offset, padValue); +} + +void ShiftGrad(Tensor out, Tensor in, marian::Shape shift, bool invert) { + ABORT_IF(in->shape().size() != shift.size(), "bad dimensions"); + + // BUGBUG: This can only shift along the first axis. Shifting, e.g., along the last axis cannot be implemented this way. + int offset = 0; + for(int i = 0; i < shift.size(); ++i) + offset += in->shape().stride(i) * shift[i]; + + if(invert) + offset = -offset; + + cudaSetDevice(out->getDevice().no); + + int length = out->shape().elements(); + + int threads = std::min(MAX_THREADS, length); + int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); + + gShift<true><<<blocks, threads>>>(out->data(), in->data(), length, offset, 0.f); } __global__ void gSetSparse(float* out, diff --git a/src/tensors/tensor_operators.h b/src/tensors/tensor_operators.h index 9aa9c58b..b605e347 100644 --- a/src/tensors/tensor_operators.h +++ b/src/tensors/tensor_operators.h @@ -64,7 +64,8 @@ void Reduce(Functor functor, marian::Tensor out, Tensors... tensors) { // clang-format off DISPATCH7(Prod, marian::Tensor, const marian::Tensor&, const marian::Tensor&, bool, bool, float, float) DISPATCH8(ProdWithBias, marian::Tensor, const marian::Tensor&, const marian::Tensor&, const marian::Tensor&, bool, bool, float, float) - DISPATCH7(ProdBatched, marian::Tensor, const marian::Tensor&, const marian::Tensor&, bool, bool, float, float) + + DISPATCH8(ProdBatched, marian::Tensor, Ptr<Allocator>, const marian::Tensor, const marian::Tensor, bool, bool, float, float) DISPATCH2(Dropout, marian::Tensor, float) @@ -78,7 +79,10 @@ void Reduce(Functor functor, marian::Tensor out, Tensors... tensors) { DISPATCH4(CrossEntropyPickBackward, marian::Tensor, marian::Tensor, marian::Tensor, marian::Tensor) DISPATCH3(TransposeND, marian::Tensor, marian::Tensor, const std::vector<int>&) + DISPATCH3(TransposeNDGrad, marian::Tensor, marian::Tensor, const std::vector<int>&) + DISPATCH5(Shift, marian::Tensor, marian::Tensor, marian::Shape, float, bool) + DISPATCH4(ShiftGrad, marian::Tensor, marian::Tensor, marian::Shape, bool) DISPATCH3(Concatenate, marian::Tensor, const std::vector<marian::Tensor>&, int) // clang-format on diff --git a/src/training/communicator.cpp b/src/training/communicator.cpp new file mode 100644 index 00000000..22ccc8e8 --- /dev/null +++ b/src/training/communicator.cpp @@ -0,0 +1,13 @@ +#include "training/communicator.h" + +namespace marian { + +// Compile this if cuda is not being compiled. +// Version with CUDA and/or NCCL is compiled in communicator.cu +#ifndef CUDA_FOUND +Ptr<Communicator> createCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, bool noNccl) { + return New<DefaultCommunicator>(graphs); +} +#endif + +} diff --git a/src/training/communicator.cu b/src/training/communicator.cu new file mode 100644 index 00000000..c721e0ed --- /dev/null +++ b/src/training/communicator.cu @@ -0,0 +1,239 @@ +#include "training/communicator.h" +#include "functional/functional.h" +#include "tensors/tensor_operators.h" + +#ifdef USE_NCCL +#include "cuda_runtime.h" +#include "nccl.h" +#endif + +namespace marian { + +#ifdef USE_NCCL +class NCCLCommunicator : public Communicator { +private: + std::vector<ncclComm_t> comms_; + std::vector<cudaStream_t> streams_; + std::vector<int> devices_; + + void synchronizeAll() { + for(int i = 0; i < graphs_.size(); ++i) { + cudaSetDevice(devices_[i]); + cudaStreamSynchronize(streams_[i]); + } + } + +public: + NCCLCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs) + : Communicator(graphs), + comms_(graphs.size()), + streams_(graphs.size()), + devices_(graphs.size()) + { + LOG(info, "[comm] Using NCCL library for GPU communication"); + + for(int i = 0; i < graphs_.size(); ++i) { + auto device = graphs_[i]->getBackend()->getDevice(); + + ABORT_IF(device.type != DeviceType::gpu, + "NCCL communicator can only be used with GPUs"); + + devices_[i] = device.no; + cudaSetDevice(devices_[i]); + cudaStreamCreate(&streams_[i]); + } + + ncclCommInitAll(comms_.data(), devices_.size(), devices_.data()); + } + + ~NCCLCommunicator() override { + for(int i = 0; i < devices_.size(); ++i) { + cudaSetDevice(devices_[i]); + cudaStreamDestroy(streams_[i]); + ncclCommDestroy(comms_[i]); + } + } + + void scatterReduce() override { + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)graphs_.size()); + + int pos = 0; + + ncclGroupStart(); + for(int i = 0; i < graphs_.size(); ++i) { + int size = std::min(shardSize, totalSize); + + const void* sendbuff = (const void*)graphs_[i]->params()->grads()->data(); + auto subgrad = graphs_[i]->params()->grads()->subtensor(pos, size); + void* recvbuff = subgrad->data(); + + ncclReduceScatter(sendbuff, + recvbuff, + shardSize, + ncclFloat, + ncclSum, + comms_[i], + streams_[i]); + + pos += size; + totalSize -= size; + } + ncclGroupEnd(); + + synchronizeAll(); + } + + void allGather() override { + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)graphs_.size()); + + int pos = 0; + + ncclGroupStart(); + for(int i = 0; i < graphs_.size(); ++i) { + int size = std::min(shardSize, totalSize); + + auto subparam = graphs_[i]->params()->vals()->subtensor(pos, size); + const void* sendbuff = (const void*)subparam->data(); + void* recvbuff = (void*)graphs_[i]->params()->vals()->data(); + + ncclAllGather(sendbuff, + recvbuff, + shardSize, + ncclFloat, + comms_[i], + streams_[i]); + + pos += size; + totalSize -= size; + } + ncclGroupEnd(); + + synchronizeAll(); + } + + void swapParams(const std::vector<Tensor>& params) override { + // Update all graphs with parameter shard + ABORT_IF(graphs_.size() < 2, "Swap requires at least two graphs"); + + auto gather = [this, params](size_t idx, int pos) { + // copy parameter shard to each graph, apart from last graph + for(int i = 0; i < graphs_.size() - 1; ++i) { + auto subParam = graphs_[i]->params()->vals()->subtensor(pos, params[idx]->size()); + subParam->copyFrom(params[idx]); + } + + // back-up shard from last graph + auto subParamLast = graphs_.back()->params()->vals()->subtensor(pos, params[idx]->size()); + params[idx]->copyFrom(subParamLast); + + auto subParamFirst = graphs_[0]->params()->vals()->subtensor(pos, params[idx]->size()); + subParamLast->copyFrom(subParamFirst); + }; + + // execute for each shard + this->foreach(gather); + } + + void pushParams(std::vector<Tensor>& params) override { + // Copy paramter shard from i-th graph to shard params[i]. + // Graphs and shards with the same index live on the same device. + + auto copy = [this, params](size_t idx, int pos) { + // copy parameter shard to each graph + auto subParam = graphs_[idx]->params()->vals()->subtensor(pos, params[idx]->size()); + params[idx]->copyFrom(subParam); + }; + + this->foreach(copy); + } + + void pullParams(const std::vector<Tensor>& params) override { + // Update all graphs with parameter shard + + auto gather = [this, params](size_t idx, int pos) { + // copy parameter shard to each graph + for(auto graph : graphs_) { + auto subParam = graph->params()->vals()->subtensor(pos, params[idx]->size()); + subParam->copyFrom(params[idx]); + } + }; + this->foreach(gather); + } + + // Doesn't work yet with NCCL + // void pushParams(std::vector<Tensor>& params) { + // // Copy paramter shard from i-th graph to shard params[i]. + // // Graphs and shards with the same index live on the same device. + + // int pos = 0; + // for(int i = 0; i < graphs_.size(); ++i) { + // auto subParam = graphs_[i]->params()->vals()->subtensor(pos, params[i]->size()); + // ncclGroupStart(); + // ncclBroadcast((const void*)subParam->data(), + // (void*)params[i]->data(), + // params[i]->size(), + // ncclFloat, + // 0, + // comms_[i], + // streams_[i]); + // ncclGroupEnd(); + // pos += params[i]->size(); + // } + // synchronizeAll(); + // } + + // void pullParams(const std::vector<Tensor>& params) { + // // Update all graphs with parameter shard + + // int totalSize = graphs_[0]->params()->vals()->size(); + // int shardSize = ceil(totalSize / (float)graphs_.size()); + + // ncclGroupStart(); + // for(int i = 0; i < graphs_.size(); ++i) { + + // const void* sendbuff = (const void*)params[i]->data(); + // void* recvbuff = (void*)graphs_[i]->params()->vals()->data(); + + // ncclAllGather(sendbuff, + // recvbuff, + // shardSize, + // ncclFloat, + // comms_[i], + // streams_[i]); + // } + // ncclGroupEnd(); + + // synchronizeAll(); + // } +}; +#endif + +Ptr<Communicator> createCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, bool noNccl) { +#ifdef USE_NCCL + if(noNccl) { + LOG(warn, "[comm] NCCL communicator overridden"); + return New<DefaultCommunicator>(graphs); + } + + // if at least one of the devices is not a gpu, fall-back to default + for(auto& graph : graphs) { + if(graph->getBackend()->getDevice().type == DeviceType::cpu) { + return New<DefaultCommunicator>(graphs); + } + } + + size_t d = graphs.size(); + if((d & (d - 1)) != 0) { + LOG(warn, "[comm] Number of devices {} is not a power of 2 and communication might be slow with NCCL", d); + LOG(warn, "[comm] You can switch off NCCL with --no-nccl option", d); + } + + return New<NCCLCommunicator>(graphs); +#else + return New<DefaultCommunicator>(graphs); +#endif +} + +} diff --git a/src/training/communicator.h b/src/training/communicator.h new file mode 100644 index 00000000..b43de60d --- /dev/null +++ b/src/training/communicator.h @@ -0,0 +1,178 @@ +#include "graph/expression_graph.h" +#include "functional/functional.h" +#include "tensors/tensor_operators.h" + +namespace marian { + +class Communicator { +protected: + const std::vector<Ptr<ExpressionGraph>> graphs_; + +public: + Communicator(const std::vector<Ptr<ExpressionGraph>>& graphs) + : graphs_(graphs) {} + + virtual ~Communicator() {} + + virtual void foreach(const std::function<void(size_t, int)>& func) { + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)graphs_.size()); + + int pos = 0; + std::vector<std::thread> group; + // iterate over all shards + for(int idx = 0; idx < graphs_.size(); ++idx) { + int size = std::min(shardSize, totalSize); + + group.emplace_back(func, idx, pos); + + pos += size; + totalSize -= size; + } + for(auto& t : group) + t.join(); + } + + virtual void scatterReduce() = 0; + virtual void allGather() = 0; + + virtual void pushParams(std::vector<Tensor>& params) = 0; + virtual void pullParams(const std::vector<Tensor>& params) = 0; + virtual void swapParams(const std::vector<Tensor>& params) = 0; +}; + +class DefaultCommunicator : public Communicator { +private: + std::vector<Ptr<TensorAllocator>> paramsAllocs_; + std::vector<Tensor> tmpTensors_; + + void init() { + if(tmpTensors_.size() == 0) { + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)graphs_.size()); + + int pos = 0; + for(auto graph : graphs_) { + int __size__ = std::min(shardSize, totalSize); + + auto paramsAlloc = New<TensorAllocator>(graph->getBackend()); + paramsAllocs_.push_back(paramsAlloc); + + paramsAlloc->reserveExact(__size__ * sizeof(float)); + + Tensor tmp; + + paramsAlloc->allocate(tmp, {1, __size__}); + tmpTensors_.push_back(tmp); + + // move to next shard + pos += __size__; + totalSize -= __size__; + } + } + } + +public: + DefaultCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs) + : Communicator(graphs) {} + + ~DefaultCommunicator() override {} + + void scatterReduce() override { + init(); + + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)graphs_.size()); + + // Gather gradients from different devices into current gradient shards + auto scatter = [this, shardSize](size_t idx, int pos) { + auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize); + + // collect and sum gradients + // to be replaced with ncclScatterReduce + for(auto graph : graphs_) { + if(graph != graphs_[idx]) { + auto subGrad = graph->params()->grads()->subtensor(pos, shardSize); + tmpTensors_[idx]->copyFrom(subGrad); + + using namespace functional; + Element(_1 = _1 + _2, curGrad, tmpTensors_[idx]); + } + } + }; + + this->foreach(scatter); + } + + void allGather() override { + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)graphs_.size()); + + // Update all graphs with parameter shard + auto gather = [this, shardSize](size_t idx, int pos) { + auto curParam = graphs_[idx]->params()->vals()->subtensor(pos, shardSize); + + // copy parameter shard to each graph + for(auto graph : graphs_) { + if(graph != graphs_[idx]) { + auto subParam = graph->params()->vals()->subtensor(pos, shardSize); + subParam->copyFrom(curParam); + } + } + }; + + this->foreach(gather); + } + + void pushParams(std::vector<Tensor>& params) override { + // Copy paramter shard from i-th graph to shard params[i]. + // Graphs and shards with the same index live on the same device. + + auto copy = [this, params](size_t idx, int pos) { + // copy parameter shard to each graph + auto subParam = graphs_[idx]->params()->vals()->subtensor(pos, params[idx]->size()); + params[idx]->copyFrom(subParam); + }; + + this->foreach(copy); + } + + void pullParams(const std::vector<Tensor>& params) override { + // Update all graphs with parameter shard + + auto gather = [this, params](size_t idx, int pos) { + // copy parameter shard to each graph + for(auto graph : graphs_) { + auto subParam = graph->params()->vals()->subtensor(pos, params[idx]->size()); + subParam->copyFrom(params[idx]); + } + }; + this->foreach(gather); + } + + void swapParams(const std::vector<Tensor>& params) override { + // Update all graphs with parameter shard + ABORT_IF(graphs_.size() < 2, "Swap requires at least two graphs"); + + auto gather = [this, params](size_t idx, int pos) { + // copy parameter shard to each graph, apart from last graph + for(int i = 0; i < graphs_.size() - 1; ++i) { + auto subParam = graphs_[i]->params()->vals()->subtensor(pos, params[idx]->size()); + subParam->copyFrom(params[idx]); + } + + // back-up shard from last graph + auto subParamLast = graphs_.back()->params()->vals()->subtensor(pos, params[idx]->size()); + params[idx]->copyFrom(subParamLast); + + auto subParamFirst = graphs_[0]->params()->vals()->subtensor(pos, params[idx]->size()); + subParamLast->copyFrom(subParamFirst); + }; + // execute for each shard + this->foreach(gather); + } +}; + +Ptr<Communicator> createCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, bool noNccl = false); + +} diff --git a/src/training/gradient_dropping/gpu/sparse_algorithm.cu b/src/training/gradient_dropping/gpu/sparse_algorithm.cu index 61f1d563..90bd3661 100644 --- a/src/training/gradient_dropping/gpu/sparse_algorithm.cu +++ b/src/training/gradient_dropping/gpu/sparse_algorithm.cu @@ -1,99 +1,139 @@ #include "training/gradient_dropping/gpu/sparse_algorithm.h" -#include "tensors/gpu/algorithm.h" -#include "tensors/gpu/cuda_helpers.h" #include <curand.h> #include <curand_kernel.h> +#include <thrust/binary_search.h> +#include <thrust/copy.h> #include <thrust/device_ptr.h> #include <thrust/device_vector.h> #include <thrust/iterator/counting_iterator.h> -#include <thrust/copy.h> -#include <thrust/binary_search.h> +#include "tensors/gpu/algorithm.h" +#include "tensors/gpu/cuda_helpers.h" namespace marian { - namespace gpu { - struct non_zero - { - __host__ __device__ - bool operator()(const float x) - { - return x != 0; - } - }; - - __global__ void copy_id(float* data, - int* indices, - float* out, - int size) { - int idx = blockDim.x * blockIdx.x + threadIdx.x; - if(idx >= size) - return; - out[idx] = data[indices[idx]]; - } - - __global__ void gScatterAdd(float* denseData, +namespace gpu { +struct non_zero { + __host__ __device__ bool operator()(const float x) { return x != 0; } +}; + +__global__ void copy_id(float* data, int* indices, float* out, int size) { + int idx = blockDim.x * blockIdx.x + threadIdx.x; + if(idx >= size) + return; + out[idx] = data[indices[idx]]; +} + +__global__ void gScatterAdd(float* denseData, float* sparseData, int* sparseIndices, int denseSize, int sparseSize, int offset) { - int idx = blockDim.x * blockIdx.x + threadIdx.x; - if(idx >= sparseSize) - return; - if(sparseIndices[idx] >= -offset - && sparseIndices[idx] + offset < denseSize) - denseData[sparseIndices[idx] + offset] += sparseData[idx]; - } + int idx = blockDim.x * blockIdx.x + threadIdx.x; + if(idx >= sparseSize) + return; + if(sparseIndices[idx] >= -offset && sparseIndices[idx] + offset < denseSize) + denseData[sparseIndices[idx] + offset] += sparseData[idx]; +} +__global__ void gScatterUpdate(float* denseData, + float* sparseData, + int* sparseIndices, + int denseSize, + int sparseSize, + int offset) { + int idx = blockDim.x * blockIdx.x + threadIdx.x; + if(idx >= sparseSize) + return; + if(sparseIndices[idx] >= -offset && sparseIndices[idx] + offset < denseSize) + denseData[sparseIndices[idx] + offset] = sparseData[idx]; +} + +__global__ void gGather(float* denseData, + float* sparseData, + int* sparseIndices, + int denseSize, + int sparseSize, + int offset) { + int idx = blockDim.x * blockIdx.x + threadIdx.x; + if(idx >= sparseSize) + return; + if(sparseIndices[idx] >= -offset && sparseIndices[idx] + offset < denseSize) + sparseData[idx] = denseData[sparseIndices[idx] + offset]; +} - std::vector<int> lower_bounds(int* data, std::vector<int> values, int size, DeviceId device) { - cudaSetDevice(device.no); +std::vector<int> lower_bounds(int* data, + std::vector<int> values, + int size, + DeviceId device) { + cudaSetDevice(device.no); - thrust::device_ptr<int> data_ptr(data); - thrust::device_vector<int> d_values(values); - thrust::device_vector<int> d_output(values.size()); + thrust::device_ptr<int> data_ptr(data); + thrust::device_vector<int> d_values(values); + thrust::device_vector<int> d_output(values.size()); - thrust::lower_bound(data_ptr, data_ptr + size, - d_values.begin(), d_values.end(), - d_output.begin()); + thrust::lower_bound(data_ptr, + data_ptr + size, + d_values.begin(), + d_values.end(), + d_output.begin()); - std::vector<int> output(values.size()); - thrust::copy(d_output.begin(), d_output.end(), output.begin()); + std::vector<int> output(values.size()); + thrust::copy(d_output.begin(), d_output.end(), output.begin()); - return output; - } + return output; +} +int buildSparse(Tensor t, float* data, int* indices) { + cudaSetDevice(t->getDevice().no); + using namespace thrust; - int buildSparse(Tensor t, float* data, int* indices) { - cudaSetDevice(t->getDevice().no); - using namespace thrust; + device_ptr<float> grad_ptr(t->data()); + device_ptr<float> sparse_grad_ptr(data); + device_ptr<int> indices_ptr(indices); - device_ptr<float> grad_ptr(t->data()); - device_ptr<float> sparse_grad_ptr(data); - device_ptr<int> indices_ptr(indices); + int sparse_size = copy_if(make_counting_iterator<int>(0), + make_counting_iterator<int>(t->size()), + grad_ptr, + indices_ptr, + non_zero()) + - indices_ptr; - int sparse_size = copy_if(make_counting_iterator<int>(0), - make_counting_iterator<int>(t->size()), - grad_ptr, - indices_ptr, - non_zero()) - indices_ptr; + int threads = 512; + int blocks = 1 + t->size() / threads; + copy_id<<<blocks, threads>>>(t->data(), indices, data, sparse_size); - int threads = 512; - int blocks = 1 + t->size() / threads; - copy_id<<<blocks, threads>>>(t->data(), indices, data, sparse_size); + return sparse_size; +} - return sparse_size; - } +void scatterAdd(Tensor t, float* data, int* indices, int size, int offset) { + cudaSetDevice(t->getDevice().no); + int threads = 512; + int blocks = 1 + size / threads; + gScatterAdd<<<blocks, threads>>>( + t->data(), data, indices, t->size(), size, offset); + cudaStreamSynchronize(0); +} - void scatterAdd(Tensor t, float* data, int *indices, int size, int offset) { - cudaSetDevice(t->getDevice().no); +void scatterUpdate(Tensor t, float* data, int* indices, int size, int offset) { + cudaSetDevice(t->getDevice().no); - int threads = 512; - int blocks = 1 + size / threads; - gScatterAdd<<<blocks, threads>>>( - t->data(), data, indices, t->size(), size, offset); - cudaStreamSynchronize(0); - } - } + int threads = 512; + int blocks = 1 + size / threads; + gScatterUpdate<<<blocks, threads>>>( + t->data(), data, indices, t->size(), size, offset); + cudaStreamSynchronize(0); +} + +void gather(Tensor t, float* data, int* indices, int size, int offset) { + cudaSetDevice(t->getDevice().no); + + int threads = 512; + int blocks = 1 + size / threads; + gGather<<<blocks, threads>>>( + t->data(), data, indices, t->size(), size, offset); + cudaStreamSynchronize(0); +} +} } diff --git a/src/training/gradient_dropping/gpu/sparse_algorithm.h b/src/training/gradient_dropping/gpu/sparse_algorithm.h index e6bc13d3..8499e78d 100644 --- a/src/training/gradient_dropping/gpu/sparse_algorithm.h +++ b/src/training/gradient_dropping/gpu/sparse_algorithm.h @@ -4,17 +4,24 @@ #include "tensors/backend.h" #include "tensors/tensor.h" - namespace marian { - namespace gpu { - // output is a vector of size values.size. Output[i] is lower_bound of values[i] in data - std::vector<int> lower_bounds(int* data, - std::vector<int> values, - int size, - DeviceId device); +namespace gpu { +/** + * @brief Output[i] is lower_bound of values[i] in data. + * + * @return A vector of size values.size + */ +std::vector<int> lower_bounds(int* data, + std::vector<int> values, + int size, + DeviceId device); + +int buildSparse(Tensor t, float* data, int* indices); + +void scatterAdd(Tensor t, float* data, int* indices, int size, int offset); - int buildSparse(Tensor t, float* data, int* indices); +void scatterUpdate(Tensor t, float* data, int* indices, int size, int offset); - void scatterAdd(Tensor t, float* data, int *indices, int size, int offset); - } -}
\ No newline at end of file +void gather(Tensor t, float* data, int* indices, int size, int offset); +} +} diff --git a/src/training/gradient_dropping/sparse_tensor.h b/src/training/gradient_dropping/sparse_tensor.h index 516b47a0..e3b7eaf1 100644 --- a/src/training/gradient_dropping/sparse_tensor.h +++ b/src/training/gradient_dropping/sparse_tensor.h @@ -1,14 +1,15 @@ #pragma once +#include <algorithm> #include <memory> #include "common/definitions.h" #include "tensors/backend.h" - -#include "tensors/tensor_operators.h" #include "tensors/device.h" +#include "tensors/tensor_operators.h" #ifdef CUDA_FOUND +#include "tensors/gpu/algorithm.h" #include "training/gradient_dropping/gpu/sparse_algorithm.h" #endif @@ -20,26 +21,26 @@ class SparseTensorBase : public std::enable_shared_from_this<SparseTensorBase> { int size_; int capacity_; Ptr<Backend> backend_; - + std::vector<Ptr<Device>> devices; - template<typename T> + template <typename T> T* newData(int size, Ptr<Backend> backend) { Ptr<Device> device = DispatchDevice(backend->getDevice()); device->reserve(size * sizeof(T)); devices.push_back(device); return (T*)device->data(); } - + public: SparseTensorBase(int capacity, Ptr<Backend> backend) - : backend_(backend), capacity_(capacity) { + : backend_(backend), capacity_(capacity) { data_ = newData<float>(capacity, backend); indices_ = newData<int>(capacity, backend); } - SparseTensorBase(float* data, int* indices, int size, Ptr<Backend> backend) - : backend_(backend) { + SparseTensorBase(float* data, int* indices, int size, Ptr<Backend> backend) + : backend_(backend) { data_ = data; indices_ = indices; size_ = size; @@ -60,6 +61,37 @@ public: int* indices() { return indices_; } + // copy to cpu vector + void get(std::vector<float>& g, std::vector<int>& i) { + int s = std::min((int)g.size(), size()); + if(backend_->getDevice().type == DeviceType::cpu) { + std::copy(data(), data() + s, g.data()); + std::copy(indices(), indices() + s, i.data()); + } +#ifdef CUDA_FOUND + else { + gpu::copy(backend_, data(), data() + s, g.data()); + gpu::copy(backend_, indices(), indices() + s, i.data()); + } +#endif + } + + // copy from cpu vector + void set(const std::vector<float>& g, const std::vector<int>& i) { + int s = std::min((int)g.size(), capacity()); + size_ = s; + if(backend_->getDevice().type == DeviceType::cpu) { + std::copy(g.data(), g.data() + s, data()); + std::copy(i.data(), i.data() + s, indices()); + } +#ifdef CUDA_FOUND + else { + gpu::copy(backend_, g.data(), g.data() + s, data()); + gpu::copy(backend_, i.data(), i.data() + s, indices()); + } +#endif + } + void copyFrom(float* ndata, int* nindices, int nsize) { size_ = nsize; if(backend_->getDevice().type == DeviceType::cpu) { @@ -77,11 +109,13 @@ public: copyFrom(t->data(), t->indices(), t->size()); } - void toDense(Tensor t, int offset) { + // Convert sparseTensor into a Tensor + void toDense(Tensor t, int offset = 0) { t->set(0); scatterAdd(t, offset); } + // Convert a tensor into a sparse tensor format void fromDense(Tensor t) { if(backend_->getDevice().type == DeviceType::cpu) { ABORT("Gradient Dropping for CPU is not yet supported"); @@ -94,6 +128,7 @@ public: #endif } + // Add t[indices[i]] += data[i] void scatterAdd(Tensor t, int offset = 0) { if(backend_->getDevice().type == DeviceType::cpu) { ABORT("Gradient Dropping for CPU is not yet supported"); @@ -105,6 +140,30 @@ public: #endif } + // Add t[indices[i]] = data[i] + void scatterUpdate(Tensor t, int offset = 0) { + if(backend_->getDevice().type == DeviceType::cpu) { + ABORT("Gradient Dropping for CPU is not yet supported"); + } +#ifdef CUDA_FOUND + else { + gpu::scatterUpdate(t, data(), indices(), size(), offset); + } +#endif + } + + // data[i] = t[indices[i]] + void gather(Tensor t, int offset = 0) { + if(backend_->getDevice().type == DeviceType::cpu) { + ABORT("Gradient Dropping for CPU is not yet supported"); + } +#ifdef CUDA_FOUND + else { + gpu::gather(t, data(), indices(), size(), offset); + } +#endif + } + std::shared_ptr<SparseTensorBase> subtensor(int pos, int subsize) { int startOffset = 0; int endOffset = 0; @@ -118,8 +177,8 @@ public: } #ifdef CUDA_FOUND else { - std::vector<int> outputs = gpu::lower_bounds( - indices(), values, size(), backend_->getDevice()); + std::vector<int> outputs + = gpu::lower_bounds(indices(), values, size(), backend_->getDevice()); startOffset = outputs[0]; endOffset = outputs[1]; diff --git a/src/training/graph_group.h b/src/training/graph_group.h index 96642d8b..74acf747 100644 --- a/src/training/graph_group.h +++ b/src/training/graph_group.h @@ -32,7 +32,7 @@ public: virtual ~GraphGroup() {} - virtual void update(Ptr<data::Batch>) = 0; + virtual void update(Ptr<data::Batch> batch) = 0; virtual void load() = 0; diff --git a/src/training/graph_group_async_drop.cpp b/src/training/graph_group_async_drop.cpp index c0704c33..ae535b6b 100644 --- a/src/training/graph_group_async_drop.cpp +++ b/src/training/graph_group_async_drop.cpp @@ -8,113 +8,74 @@ namespace marian { -Tensor AsyncGraphGroupDrop::newTensor(int size, Ptr<Backend> backend) { - Tensor t; - Ptr<TensorAllocator> allocator_ = New<TensorAllocator>(backend); - allocator_->reserveExact(size * sizeof(float)); - allocator_->allocate(t, {1, size}); - allocators.push_back(allocator_); - - return t; -} - void AsyncGraphGroupDrop::fetchParams(Tensor oldParams, const std::vector<Tensor>& params, int device_id) { - using namespace functional; - // @TODO read guard on parameters - int pos = 0; + // Full fetch when fetching moving average OR still in warm-up period. + if(¶ms == ¶msAvg_ || fetchStep_[device_id]++ <= dropping_warmup) { + AsyncGraphGroup::fetchParams(oldParams, params, device_id); + return; + } std::vector<std::thread> threads; - for(int i = 0; i < devices_.size(); i++) { + int pos = 0; + for(int idx = 0; idx < devices_.size(); idx++) { threads.emplace_back(std::thread( - [&](int idx, int pos) { + [=](int idx, int pos) { + auto sparseGrad = sparseGrads_[device_id][idx]; + auto sparseShard = sparseShards_[device_id][idx]; + // individual mutex per-shard std::lock_guard<std::mutex> guard(shardSync_[idx]); - // normal fetch - if(fetchStep_[device_id] <= dropping_warmup - || ¶ms == ¶msAvg_) { // Do not use sparse fetch when - // fetching from paramsAvg - oldParams->subtensor(pos, params[idx]->size()) - ->copyFrom(params[idx]); - paramsLocal_[device_id][idx]->copyFrom(params[idx]); - return; - } - - // sparse fetch - // get delta : params latest version - current param (locally) - Element(_1 = _2 - _3, - paramsDelta_[idx], - params[idx], - paramsLocal_[device_id][idx]); - - // update current local param - paramsLocal_[device_id][idx]->copyFrom(params[idx]); - - // get sparse delta - fetchDropper[device_id][idx]->dropGraph(paramsDelta_[idx], - fetchSparseGradient_[idx], - droping_rate, - dropping_momentum); - - // move sparse delta - fetchShardedSparseGradient_[device_id][idx]->copyFrom( - fetchSparseGradient_[idx]); - - fetchShardedSparseGradient_[device_id][idx]->scatterAdd( + sparseShard->gather(params[idx]); + sparseGrad->copyFrom(sparseShard); + sparseGrad->scatterUpdate( oldParams->subtensor(pos, params[idx]->size())); }, - i, + idx, pos)); pos += shardSize_; } -#if 0 - for(auto&& t : threads) - t.join(); - // BUGBUG [compiler]: This fails to compile on VS 2015, for the comparison of the iterator with end() -#else - for (size_t i = 0; i < threads.size(); i++) - threads[i].join(); -#endif - fetchStep_[device_id]++; + for(size_t i = 0; i < threads.size(); i++) + threads[i].join(); } void AsyncGraphGroupDrop::pushGradients(Tensor newGrads, size_t batch_words, int device_id) { - if(pushStep_[device_id]++ <= dropping_warmup) { + if(pushStep_[device_id]++ < dropping_warmup) { AsyncGraphGroup::pushGradients(newGrads, batch_words, device_id); return; } - // get the sparse gradient - pushDropper_[device_id]->dropGraph(newGrads, - pushSparseGradient_[device_id], - droping_rate, - dropping_momentum); - - SparseTensor newSparseGrads = pushSparseGradient_[device_id]; // add instead of copy? std::vector<std::thread> threads; int pos = 0; for(int idx = 0; idx < devices_.size(); idx++) { threads.emplace_back(std::thread( [=](int idx, int pos) { + auto dropper = droppers_[device_id][idx]; + auto sparseGrad = sparseGrads_[device_id][idx]; + auto sparseShard = sparseShards_[device_id][idx]; + auto tensor = newGrads->subtensor(pos, grads_[idx]->size()); // individual mutex per-shard std::lock_guard<std::mutex> guard(shardSync_[idx]); - // split to shard - SparseTensor subGrad - = newSparseGrads->subtensor(pos, grads_[idx]->size()); + // drop the gradients + dropper->dropGraph( + tensor, sparseGrad, droping_rate, dropping_momentum); // send the sharded sparse tensor - pushShardedSparseGradient_[idx]->copyFrom(subGrad); + sparseShard->copyFrom(sparseGrad); // convert back to dense, store it in grads_[idx] - pushShardedSparseGradient_[idx]->toDense(grads_[idx], -pos); + // sparseShard indices is equal to the indices of the sparse gradient + // which will be used for sparse fetching + sparseShard->toDense(grads_[idx]); + // optimize if(scaleLearningRate_) { shardOpt_[idx]->update( params_[idx], grads_[idx], batch_words / avgBatchWords_); @@ -125,7 +86,6 @@ void AsyncGraphGroupDrop::pushGradients(Tensor newGrads, if(movingAvg_) updateMovingAverage( paramsAvg_[idx], params_[idx], scheduler_->numberOfBatches()); - }, idx, pos)); @@ -140,51 +100,34 @@ void AsyncGraphGroupDrop::init(Ptr<data::Batch> batch) { AsyncGraphGroup::init(batch); // extra inits for gradient dropping if(drop_first) { - int totalSize = graphs_[0]->params()->vals()->size(); - int sparseCap = totalSize * 1.5 * (1.0 - droping_rate); - int shardSize = ceil(totalSize / devices_.size()); - - for(int i = 0; i < devices_.size(); i++) - paramsLocal_.push_back(std::vector<Tensor>()); - for(int i = 0; i < devices_.size(); i++) { // warm-up counter fetchStep_.push_back(0); pushStep_.push_back(0); + fetch_ready.push_back(false); - // temporary tensor to compute parameter delta before fetching - paramsDelta_.push_back(newTensor(shardSize, graphs_[i]->getBackend())); - - // tensors to store local params history - for(int h_id = 0; h_id < devices_.size(); h_id++) { - Tensor tmp = newTensor(params_[i]->size(), graphs_[i]->getBackend()); - tmp->copyFrom(params_[i]); - paramsLocal_[h_id].push_back(tmp); - } + // Size of the sparse tensor + int totalSize = graphs_[0]->params()->vals()->size(); + int sparseCap = totalSize * 1.2 * (1.0 - droping_rate); - // individual Gradient dropper per-device - pushDropper_.push_back(PrepareGradientDrop(graphs_[i]->getDevice())); - - // N-dropper for fetch + // prepare droppers std::vector<GradientDrop> tmpDropper; for(auto device : devices_) tmpDropper.push_back(PrepareGradientDrop(graphs_[i]->getDevice())); - fetchDropper.push_back(tmpDropper); - - // sparsetensor to store sparsified gradients per-device - pushSparseGradient_.push_back(SparseTensor( - new SparseTensorBase(sparseCap, graphs_[i]->getBackend()))); - - pushShardedSparseGradient_.push_back(SparseTensor( - new SparseTensorBase(sparseCap, graphs_[i]->getBackend()))); - fetchSparseGradient_.push_back(SparseTensor(new SparseTensorBase( - sparseCap / devices_.size(), graphs_[i]->getBackend()))); + droppers_.push_back(tmpDropper); + // sparsetensor to store sparsified gradients per-device per-shard std::vector<SparseTensor> tmp; for(int j = 0; j < devices_.size(); j++) tmp.push_back(SparseTensor(new SparseTensorBase( sparseCap / devices_.size(), graphs_[i]->getBackend()))); - fetchShardedSparseGradient_.push_back(tmp); + sparseGrads_.push_back(tmp); + + std::vector<SparseTensor> tmp2; + for(int j = 0; j < devices_.size(); j++) + tmp2.push_back(SparseTensor(new SparseTensorBase( + sparseCap / devices_.size(), graphs_[j]->getBackend()))); + sparseShards_.push_back(tmp2); } drop_first = false; } diff --git a/src/training/graph_group_async_drop.h b/src/training/graph_group_async_drop.h index bd9b6626..3c1be04d 100644 --- a/src/training/graph_group_async_drop.h +++ b/src/training/graph_group_async_drop.h @@ -10,6 +10,7 @@ namespace marian { class AsyncGraphGroupDrop : public AsyncGraphGroup { std::vector<int> fetchStep_; std::vector<int> pushStep_; + std::vector<bool> fetch_ready; bool drop_first = 1; @@ -17,21 +18,9 @@ class AsyncGraphGroupDrop : public AsyncGraphGroup { float droping_rate; float dropping_momentum; - std::vector<GradientDrop> pushDropper_; - std::vector<std::vector<GradientDrop>> fetchDropper; + std::vector<std::vector<GradientDrop>> droppers_; - std::vector<SparseTensor> pushSparseGradient_; - std::vector<SparseTensor> pushShardedSparseGradient_; - - std::vector<SparseTensor> fetchSparseGradient_; - std::vector<std::vector<SparseTensor>> fetchShardedSparseGradient_; - - std::vector<Tensor> paramsDelta_; - std::vector<std::vector<Tensor>> paramsLocal_; - - std::vector<Ptr<TensorAllocator>> allocators; - - Tensor newTensor(int size, Ptr<Backend> backend); + std::vector<std::vector<SparseTensor>> sparseGrads_, sparseShards_; protected: void init(Ptr<data::Batch> batch); diff --git a/src/training/graph_group_multinode.cpp b/src/training/graph_group_multinode.cpp index 16a128c4..a2879d6c 100644 --- a/src/training/graph_group_multinode.cpp +++ b/src/training/graph_group_multinode.cpp @@ -52,19 +52,19 @@ void MultiNodeGraphGroup::init(Ptr<data::Batch> batch) { } // setup delayed gradient storage - if (tau_ > 1) { + if(tau_ > 1) { delay_count = std::vector<size_t>(mpi_comm_world_size_); totalBatchWords = std::vector<int>(mpi_comm_world_size_); optDelayMutex_ = std::vector<std::mutex>(mpi_comm_world_size_); - - for (int i = 0;i < mpi_comm_world_size_; i++) { + + for(int i = 0; i < mpi_comm_world_size_; i++) { // Shard buffers across GPUs auto backend = clientGraphs_[i % devices_.size()]->getBackend(); Tensor accGrad = newTensor(nodeSizes_[i], backend); Tensor accGradBuff = newTensor(nodeSizes_[i], backend); accGradients.push_back(accGrad); accGradientBuffer.push_back(accGradBuff); - } + } } } @@ -221,7 +221,7 @@ void MultiNodeGraphGroup::calculateShardSizes() { */ void MultiNodeGraphGroup::initShardGpuTensors() { size_t offset = 0; - for (int i = 0; i < mpi_my_rank_; i++) { + for(int i = 0; i < mpi_my_rank_; i++) { offset += nodeSizes_[i]; } for(int shard = 0; shard < devices_.size(); shard++) { @@ -242,7 +242,8 @@ void MultiNodeGraphGroup::initShardGpuTensors() { * updated parameters. */ void MultiNodeGraphGroup::launchServerThread() { -#if MPI_FOUND +// @TODO: move CUDA stuff into separate .cu files and remove '&& CUDA_FOUND' +#if MPI_FOUND && CUDA_FOUND serverShardThread_ = new std::thread([this] { // keep track of number of nodes still communicating with this shard int nCommunicatingNodes = mpi_comm_world_size_; @@ -400,7 +401,8 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads, Tensor oldParams, int gpu, size_t batchWords) { -#if MPI_FOUND +// @TODO: move CUDA stuff into separate .cu files and remove '&& CUDA_FOUND' +#if MPI_FOUND && CUDA_FOUND size_t offset = 0; for(int node = 0; node < mpi_comm_world_size_; node++) { size_t nodeSize = nodeSizes_[node]; @@ -410,9 +412,10 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads, Tensor gradient; // Delayed Gradient Update - if (tau_ > 1) { + if(tau_ > 1) { std::lock_guard<std::mutex> guard(optDelayMutex_[node]); - accGradientBuffer[node]->copyFrom(newGrads->subtensor(offset, nodeSize)); + accGradientBuffer[node]->copyFrom( + newGrads->subtensor(offset, nodeSize)); // Accumulate the gradient using namespace functional; Element(_1 += _2, accGradients[node], accGradientBuffer[node]); @@ -420,14 +423,14 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads, totalBatchWords[node] += batchWords; delay_count[node]++; - if (delay_count[node] < tau_) + if(delay_count[node] < tau_) continue; delay_count[node] = 0; gradient = accGradients[node]; batchWords = totalBatchWords[node]; - } else { + } else { gradient = newGrads->subtensor(offset, nodeSize); - } + } // Copy grads from GPU to CPU (for MPI sending) cudaMemcpy(clientCommBuffersCPU_[gpu].data(), @@ -455,7 +458,7 @@ void MultiNodeGraphGroup::synchronizeWithServerShards(Tensor newGrads, MPI_TAG_GRAD_PUSH_, MPI_COMM_WORLD); // Reset total gradient and batch words - if (tau_ > 1) { + if(tau_ > 1) { std::lock_guard<std::mutex> guard(optDelayMutex_[node]); accGradients[node]->set(0); totalBatchWords[node] = 0; @@ -554,9 +557,9 @@ void MultiNodeGraphGroup::execute(Ptr<data::Batch> batch) { auto costNode = builder->build(graph, batch); #if MPI_FOUND - if (t == 0) { + if(t == 0) { MPI_Barrier(MPI_COMM_WORLD); - if (my_id != 0) + if(my_id != 0) graph->params()->vals()->copyFrom(clientGraphs_[0]->params()->vals()); MPI_Barrier(MPI_COMM_WORLD); } @@ -628,20 +631,19 @@ void MultiNodeGraphGroup::execute(Ptr<data::Batch> batch) { // Wait until the thread that wants to do validation is finished. clientThreadPool_->wait_for_one(lock); - if (options_->get<std::string>("cost-type") != "ce-sum") + if(options_->get<std::string>("cost-type") != "ce-sum") cost /= tau_; - if (tau_ > 1) { + if(tau_ > 1) { std::vector<size_t> fakeLength = {1, 1}; - auto fb = data::CorpusBatch::fakeBatch(fakeLength, - num_seen_sentences, - NULL); + auto fb = data::CorpusBatch::fakeBatch( + fakeLength, num_seen_sentences, NULL); fb->front()->setWords(num_seen_words); scheduler_->update(cost, fb); } else { scheduler_->update(cost, batch); } - + num_seen_words = 0; num_seen_sentences = 0; cost = 0; @@ -653,11 +655,11 @@ void MultiNodeGraphGroup::execute(Ptr<data::Batch> batch) { // a safe state. clientThreadPool_->wait_for_others(lock); #if MPI_FOUND - //wait until other nodes are ready + // wait until other nodes are ready MPI_Barrier(MPI_COMM_WORLD); - + // TODO: Saving is broken - //if(mpi_my_rank_ == 0 && scheduler_->saving()) + // if(mpi_my_rank_ == 0 && scheduler_->saving()) // this->save(graph); if(mpi_my_rank_ == 0 && scheduler_->validating()) diff --git a/src/training/graph_group_multinode.h b/src/training/graph_group_multinode.h index b106bb1a..e34ad959 100644 --- a/src/training/graph_group_multinode.h +++ b/src/training/graph_group_multinode.h @@ -2,6 +2,9 @@ #if MPI_FOUND #include "mpi.h" +#endif + +#ifdef CUDA_FOUND #include "cuda_runtime.h" #endif @@ -124,7 +127,7 @@ protected: int mpi_comm_world_size_{1}; /** - * Flag to indicate that an MPI message contains message info + * Flag to indicate that an MPI message contains message info * before sending the gradient (client -> server). */ static const int MPI_TAG_GRAD_PUSH_MSG_{0}; @@ -233,7 +236,7 @@ protected: /** * LocalOptimizers related variables */ - bool useLocalOpt_; + // bool useLocalOpt_; /** * Allocate new tensor on given GPU and store allocator. @@ -405,10 +408,10 @@ public: MultiNodeGraphGroup(Ptr<Config> options) : GraphGroup(options), tau_{options_->get<size_t>("optimizer-delay")}, - useLocalOpt_{options_->get<bool>("multi-node-local-optimizers")}, + // useLocalOpt_{options_->get<bool>("multi-node-local-optimizers")}, clientCommOverlap{options_->get<bool>("multi-node-overlap")} { // Set up devices for this node - setupMPI(); //Setup MPI before creating device vectors + setupMPI(); // Setup MPI before creating device vectors std::vector<size_t> devices; for(auto& d : options_->getDevices()) devices.push_back(d.no); @@ -526,8 +529,6 @@ public: return GraphGroup::collectStats(clientGraphs_[0], clientBuilders_[0]); } - virtual void finalize() { - finalized_ = true; - } + virtual void finalize() { finalized_ = true; } }; } diff --git a/src/training/graph_group_multinode_sync.cpp b/src/training/graph_group_multinode_sync.cpp new file mode 100644 index 00000000..dc3f4718 --- /dev/null +++ b/src/training/graph_group_multinode_sync.cpp @@ -0,0 +1,280 @@ +#include "training/graph_group_multinode_sync.h" +#include "functional/functional.h" +#include "tensors/tensor_operators.h" + +namespace marian { + +void MultiNodeGraphGroupSync::updateMovingAverage(Tensor paramsAvg, + Tensor params, + size_t batches) { + using namespace functional; + float decay + = std::max(mvDecay_, 1.f - (float)(batches + 1) / (float)(batches + 10)); + Element(_1 = ((1.f - decay) * _1) + (decay * _2), paramsAvg, params); +} + +/** + * Set given scheduler to register training observers on the shard optimizers. + */ +void MultiNodeGraphGroupSync::setScheduler(Ptr<Scheduler> scheduler) { + scheduler_ = scheduler; + // optimizer has to be registered last to see a change of learning rate + scheduler_->registerTrainingObserver(scheduler_); + + scheduler_->registerTrainingObserver(syncOptimizer_); + +} + +/** + * Allocate new tensor on given GPU and store allocator. + */ +Tensor MultiNodeGraphGroupSync::newTensor(int size, Ptr<Backend> backend) { + Tensor t; + Ptr<TensorAllocator> allocator = New<TensorAllocator>(backend); + allocator->reserveExact(size * sizeof(float)); + allocator->allocate(t, {1, size}); + allocators_.push_back(allocator); + return t; +} + +/** + * Setup training environment and launch server thread and (if enabled) client + * communication overlap threads. + * Includes setting up MPI, node and shard sizes, clients, server shards and + * communication overlap stuff. + */ +void MultiNodeGraphGroupSync::init(Ptr<data::Batch> batch) { + // Setup clients and shards + setupClients(batch); + int network_size = clientGraphs_[0]->params()->vals()->size(); + LOG(info, "model size = {} float params" , network_size); + if (movingAvg_) + paramsAvg_ = newTensor(network_size, clientGraphs_.back()->getBackend()); + + // setup sync sgd storage, We keep the summed gradient on Node 0 + sumGradientBuffer = newTensor(network_size, clientGraphs_[0]->getBackend()); + accGradientsSync = newTensor(network_size, clientGraphs_[0]->getBackend()); +} + + +/** + * Initialize the CPU arrays, with pinned memory for faster CudaMemCpy operations. + * Requires the graph to be initialized first so we know its size + */ +void MultiNodeGraphGroupSync::initCPUArrays() { + accGradientsSync_cpu = std::vector<float>(clientGraphs_[0]->params()->vals()->size()); + receiveBuffer_cpu = std::vector<float>(clientGraphs_[0]->params()->vals()->size()); +} + +/** + * Setup MPI world size and rank of this node. + */ +void MultiNodeGraphGroupSync::setupMPI() { +#if MPI_FOUND + MPI_Comm_size(MPI_COMM_WORLD, &mpi_comm_world_size_); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_my_rank_); +#endif +} + +/** + * Setup clients that will compute gradients and communicate them with the + * server shards. + * There is one client per GPU. + */ +void MultiNodeGraphGroupSync::setupClients(Ptr<data::Batch> batch) { + runBatchThroughClientGraphs(batch); + initCPUArrays(); +} + +/** + * Initialize the graphs (models) of all clients on this node with the given + * batch. + */ +void MultiNodeGraphGroupSync::runBatchThroughClientGraphs(Ptr<data::Batch> batch) { + for(int i = 0; i < devices_.size(); i++) { + THREAD_GUARD(clientBuilders_[i]->build(clientGraphs_[i], batch); + clientGraphs_[i]->forward(); + clientGraphs_[i]->getBackend()->synchronize();); + } +} + +/** + * Initialize variables required for overlapping client computations and + * communication. + * Includes summed and committed word counts, buffer flags, mutexes and + * condition variables. + */ +void MultiNodeGraphGroupSync::sumGRAD(Tensor gradient) { + std::lock_guard<std::mutex> guard(sumGradientMutex_); + sumGradientBuffer->copyFrom(gradient); + using namespace functional; //@TODO makes more sense to do that on the CPU i think + Element(_1 += _2, accGradientsSync, sumGradientBuffer); +} + +/** + * If it's rank 0, it's a local update, if it's rank one it's remote + * send and receive. Make sure you only call from device 0. + */ +void MultiNodeGraphGroupSync::sendReceiveUpdateSync() { + #if MPI_FOUND + int network_size = accGradientsSync_cpu.size(); + + // Copy the data to the CPU + accGradientsSync->get(accGradientsSync_cpu); + + // Wait until all nodes are ready + MPI_Barrier(MPI_COMM_WORLD); + + int reduce_result = MPI_Allreduce(accGradientsSync_cpu.data(), //CPU buffers + receiveBuffer_cpu.data(), + network_size, + MPI_FLOAT, + MPI_SUM, + MPI_COMM_WORLD); + + // Copy the data back to the GPU and do optimizer update + // Do update with last GPU to distribute the memory + clientGraphs_.back()->params()->grads()->set(receiveBuffer_cpu); + + // Perform optimizer step + syncOptimizer_->update(clientGraphs_.back()); + + if(movingAvg_) + updateMovingAverage( + paramsAvg_, clientGraphs_.back()->params()->vals(), + scheduler_->numberOfBatches()); + + //Distribute the graph to the rest of the devices + std::vector<std::thread> threads; + for(int idx = 0; idx < devices_.size() - 1; idx++) { + threads.emplace_back(std::thread( + [=](int idx) { + clientGraphs_[idx]->params()->vals()->copyFrom( + clientGraphs_.back()->params()->vals()); + }, + idx)); + } + for(auto&& t : threads) { + t.join(); + } + + //set the accumulating buffers to zero; + accGradientsSync->set(0); + std::fill(accGradientsSync_cpu.begin(), accGradientsSync_cpu.end(), 0); + std::fill(receiveBuffer_cpu.begin(), receiveBuffer_cpu.end(), 0); + #endif +} + + +/** + * Execute given batch on this node, pushing/pulling the resulting + * gradients/parameters to/from the server shards + * or -- if comm. overlap enabled -- to/from the communication buffers, summing + * gradients locally if the communication thread is busy + * + * @param batch Batch on which to perform forward and backward passes. + */ +void MultiNodeGraphGroupSync::execute(Ptr<data::Batch> fullBatch) { + if(!initialized_) { + init(fullBatch); + initialized_ = true; + } + + std::vector<Ptr<data::Batch>> batches = fullBatch->split(devices_.size()); + + static int t = 0; + + static float cost = 0; + static size_t num_seen_words = 0; + static size_t num_seen_sentences = 0; + + { + auto task = [this, batches](int my_id) { + auto batch = batches[my_id]; + auto graph = clientGraphs_[my_id]; + auto builder = clientBuilders_[my_id]; + + auto costNode = builder->build(graph, batch); + + if (t == 0) { + if (my_id != 0) + graph->params()->vals()->copyFrom(clientGraphs_[0]->params()->vals()); + } + + graph->forward(); + { + std::lock_guard<std::mutex> guard(sumCostMutex_); + cost += costNode->scalar(); + num_seen_words += batch->words(); + num_seen_sentences += batch->size(); + } + graph->backward(); + + graph->getBackend()->synchronize(); //@Alham do you know why we need this here? + + sumGRAD(graph->params()->grads()); + }; + + ThreadPool pool(devices_.size(), devices_.size()); + for(int idx = 0; idx < devices_.size(); ++idx) + pool.enqueue(task, idx); + } + + if (t % tau_ == 0) + sendReceiveUpdateSync(); + + t++; + + // Run scheduler (if enabled) + if(t % tau_ == 0 && scheduler_) { + if (options_->get<std::string>("cost-type") != "ce-sum") + cost /= (tau_ * devices_.size()); + + if (tau_ > 1) { + std::vector<size_t> fakeLength = {1, 1}; + auto fb = data::CorpusBatch::fakeBatch(fakeLength, + num_seen_sentences, + NULL); + fb->front()->setWords(num_seen_words); + scheduler_->update(cost, fb); + } else { + scheduler_->update(cost, fullBatch); + } + + num_seen_words = 0; + num_seen_sentences = 0; + cost = 0; + + if((scheduler_->saving() || scheduler_->validating())) { + #if MPI_FOUND + //wait until other nodes are ready + MPI_Barrier(MPI_COMM_WORLD); + + // TODO: Saving is broken + //if(mpi_my_rank_ == 0 && scheduler_->saving()) + // this->save(graph); + + if(mpi_my_rank_ == 0 && scheduler_->validating()) { + // temporarily save current params + if(movingAvg_) + accGradientsSync->copyFrom(clientGraphs_[0]->params()->vals()); + + if(movingAvg_) + for(auto graph : clientGraphs_) + graph->params()->vals()->copyFrom(paramsAvg_); + + scheduler_->validate(clientGraphs_); + + if(movingAvg_) + for(auto graph : clientGraphs_) + graph->params()->vals()->copyFrom(accGradientsSync); + } + + // inform other nodes to continue + MPI_Barrier(MPI_COMM_WORLD); + #endif + } + } + +} +} diff --git a/src/training/graph_group_multinode_sync.h b/src/training/graph_group_multinode_sync.h new file mode 100644 index 00000000..f13e8890 --- /dev/null +++ b/src/training/graph_group_multinode_sync.h @@ -0,0 +1,305 @@ +#pragma once + +#if MPI_FOUND +#include "mpi.h" +#endif + +#ifdef CUDA_FOUND +#include "cuda_runtime.h" +#endif + +#include <condition_variable> +#include <future> +#include <thread> + +#include <boost/filesystem.hpp> +#include <boost/thread/locks.hpp> +#include <boost/thread/shared_mutex.hpp> + +#include "3rd_party/threadpool.h" +#include "training/graph_group.h" + +namespace marian { + +/** + * Multi-node graph group for asynchronous training over multiple + * machines each with one or multiple GPUs + */ +class MultiNodeGraphGroupSync : public GraphGroup { +public: + virtual void setScheduler(Ptr<Scheduler> scheduler); + +protected: + //////////////////////////////////////////////////////////////////////////// + // General variables. + + /** Number of clients on nodes in MPI world (cluster). */ + std::vector<int> numberClientsOfNodes_; //@TODO not used for now, but might + // be useful maybe? + + /** Whether graph group has been properly initialized with a first batch. */ + bool initialized_{false}; + + /** Memory allocators for tensors (GPUs). */ + std::vector<Ptr<TensorAllocator>> allocators_; + + //////////////////////////////////////////////////////////////////////////// + // Client variables. + + /** Graph builders for clients (which run forward and backward passes). */ + std::vector<Ptr<models::ModelBase>> clientBuilders_; + + /** Graphs of clients. */ + std::vector<Ptr<ExpressionGraph>> clientGraphs_; + + /** Devices (GPUs) on this node. */ + std::vector<size_t> devices_; + + /** Mutex to ensure clients are uniquely assigned to graphs and builders. */ + std::mutex mutexClientInit_; + + /** Mutex to avoid race conditions in scheduler. */ + std::mutex schedulerMutex_; + + /** + * Batch number counter used for evenly distributing mini-batches across + * nodes. + */ + size_t batchIter_ = 0; + + //////////////////////////////////////////////////////////////////////////// + // Communication variables. + + /** MPI rank of this node. */ + int mpi_my_rank_{0}; + + /** Number of nodes in MPI world (cluster). */ + int mpi_comm_world_size_{1}; + + /** + * Variables for optimizer delay and synchronous SGD + */ + size_t tau_{1}; + std::mutex sumGradientMutex_; + std::mutex updateParamsMutex_; + std::mutex sumCostMutex_; + Tensor accGradientsSync; + Tensor sumGradientBuffer; + Tensor paramsAvg_; + std::vector<float> accGradientsSync_cpu; + std::vector<float> receiveBuffer_cpu; + bool synchronization_happened{false}; + + Ptr<OptimizerBase> syncOptimizer_; + + std::vector<std::mutex> optDelayMutex_; + std::vector<size_t> delay_count; + std::vector<int> totalBatchWords; + std::vector<Tensor> accGradients, accGradientBuffer; + + bool movingAvg_{false}; + float mvDecay_{1e-4}; + + /** + * Allocate new tensor on given GPU and store allocator. + */ + Tensor newTensor(int size, Ptr<Backend> backend); + + /* + * exponential smoothing + */ + void updateMovingAverage(Tensor paramsAvg, Tensor params, size_t batches); + + /** + * Setup training environment and launch server thread and (if enabled) client + * communication overlap threads.. + * Includes setting up MPI, node and shard sizes, clients, server shards and + * communication overlap stuff. + */ + virtual void init(Ptr<data::Batch> batch); + + /** + * Setup MPI world size and rank of this node. + */ + void setupMPI(); + + /** + * Setup clients that will compute gradients and communicate them with the + * server shards. + * There is one client per GPU. + */ + void setupClients(Ptr<data::Batch> batch); + + /** + * Initialize the graphs (models) of all clients on this node with the given + * batch. + */ + void runBatchThroughClientGraphs(Ptr<data::Batch> batch); + + /** + * Initialize the CPU arrays, with pinned memory for faster CudaMemCpy + * operations. + */ + void initCPUArrays(); + + /** + * Sums the gradients from a node, taking care of locking + * @param gradient - the gradient + */ + + void sumGRAD(Tensor gradient); + + /** + * Does the MPI Communication, parameter update and copying back parameters. + * @TODO ALHAM. God function too godly? + */ + void sendReceiveUpdateSync(); + + void execute(Ptr<data::Batch> batch); + + /** + * Load the GPU configuration of this node (i.e. which GPUs to use) and the + * number of GPUs on the other nodes. + */ + void loadDeviceConfig(std::vector<size_t> deviceConfig) { + size_t index = 0, node = 0, nClientsSeen = 0; + numberClientsOfNodes_ = std::vector<int>(mpi_comm_world_size_, 0); + while(index < deviceConfig.size()) { + if(numberClientsOfNodes_[node] == 0) { + numberClientsOfNodes_[node] = deviceConfig[index]; + nClientsSeen = 0; + } else if(nClientsSeen < numberClientsOfNodes_[node]) { + if(node == mpi_my_rank_) { + devices_.push_back(deviceConfig[index]); + } + nClientsSeen++; + } else { + node++; + index--; + } + index++; + } + } + +public: + /** + * (Constructor) Call super class and initialize client graphs and builders. + */ + MultiNodeGraphGroupSync(Ptr<Config> options) + : GraphGroup(options), + tau_{options_->get<size_t>("optimizer-delay")}, + movingAvg_{options_->get<float>("exponential-smoothing") > 0}, + mvDecay_{options_->get<float>("exponential-smoothing")}, + syncOptimizer_{Optimizer(options_)} { + // Set up devices for this node + setupMPI(); // Setup MPI before creating device vectors + std::vector<size_t> devices; + for(auto& d : options_->getDevices()) + devices.push_back(d.no); + loadDeviceConfig(devices); + + // Create builders and graphs for clients. + for(size_t i = 0; i < devices_.size(); i++) { + clientGraphs_.push_back(New<ExpressionGraph>()); + clientGraphs_[i]->setDevice({devices_[i], DeviceType::gpu}); + clientGraphs_[i]->reserveWorkspaceMB(options_->get<size_t>("workspace")); + clientBuilders_.push_back( + models::from_config(options_, models::usage::training)); + } + } + + /** + * Update any client model with given batch if batch is assigned to this node. + */ + void update(Ptr<data::Batch> batch) { + ABORT_IF(finalized_, "Training has already finished."); + if(batchIter_ % mpi_comm_world_size_ + == mpi_my_rank_) { // Only take batch assigned to this node + execute(batch); + } + batchIter_++; + } + + /** + * Load models from disk if file exists and setting is not disabled + */ + void load() { + if(!options_->get<bool>("no-reload")) { + std::string name = options_->get<std::string>("model"); + + if(boost::filesystem::exists(name)) { + if(scheduler_) + scheduler_->load(name); + size_t i = 0; + for(auto graph : clientGraphs_) + clientBuilders_[i++]->load(graph, name); + } else if(options_->has("pretrained-model")) { + std::string init = options_->get<std::string>("pretrained-model"); + LOG(info, + "Initialize model weights with the pre-trained model {}", + init); + size_t i = 0; + for(auto graph : clientGraphs_) + clientBuilders_[i++]->load(graph, init, false); + } + } + } + + /** + * Save model of first client's graph to disk + */ + void save(bool final = false) { save(clientGraphs_[0], final); } + + /** + * Save model of given graph to disk. + */ + void save(Ptr<ExpressionGraph> graph, bool final = false) { + int idx = 0; + for(int i = 0; i < clientGraphs_.size(); ++i) { + if(graph == clientGraphs_[i]) { + idx = i; + break; + } + } + + if(options_->get<bool>("overwrite")) { + std::string name = options_->get<std::string>("model"); + + clientBuilders_[idx]->save(clientGraphs_[idx], name, true); + if(scheduler_) + scheduler_->save(name); + } else { + std::string name = options_->get<std::string>("model"); + + if(!final) { + std::string numberOfBatches + = scheduler_ ? std::to_string(scheduler_->numberOfBatches()) + : "unknown"; + std::string nameOverwrite = name; + nameOverwrite.replace( + name.size() - 4, 4, ".iter" + numberOfBatches + ".npz"); + clientBuilders_[idx]->save(clientGraphs_[idx], nameOverwrite); + } + + clientBuilders_[idx]->save(clientGraphs_[idx], name, true); + if(scheduler_) + scheduler_->save(name); + } + } + + /** + * Collect statistics from first client's graph. + */ + Ptr<data::BatchStats> collectStats() { + return GraphGroup::collectStats( + clientGraphs_[0], clientBuilders_[0], devices_.size()); + } + + virtual void finalize() { + finalized_ = true; +#if MPI_FOUND + MPI_Finalize(); +#endif + } +}; +} diff --git a/src/training/graph_group_sync.cpp b/src/training/graph_group_sync.cpp index 1f4d4caf..7fe88f7a 100644 --- a/src/training/graph_group_sync.cpp +++ b/src/training/graph_group_sync.cpp @@ -4,6 +4,27 @@ namespace marian { +SyncGraphGroup::SyncGraphGroup(Ptr<Config> config) + : GraphGroup(config), + devices_{options_->getDevices()}, + movingAvg_{options_->get<float>("exponential-smoothing") > 0}, + mvDecay_{options_->get<float>("exponential-smoothing")}, + delay_{options_->get<size_t>("optimizer-delay")} { + + for(auto device : devices_) { + auto graph = New<ExpressionGraph>(); + graph->setDevice(device); + graph->reserveWorkspaceMB(options_->get<size_t>("workspace")); + graph->getBackend()->setClip(options_->get<float>("clip-gemm")); + + graphs_.push_back(graph); + shardOpt_.push_back(Optimizer(options_)); + builders_.push_back(models::from_config(options_, models::usage::training)); + } + + comm_ = createCommunicator(graphs_, options_->get<bool>("no-nccl", false)); +} + void SyncGraphGroup::setScheduler(Ptr<Scheduler> scheduler) { scheduler_ = scheduler; // optimizer has to be registered last to see changes of learning rate @@ -22,169 +43,133 @@ void SyncGraphGroup::updateMovingAverage(Tensor paramsAvg, Element(_1 = ((1.f - decay) * _1) + (decay * _2), paramsAvg, params); } -void SyncGraphGroup::fetchParams(Tensor oldParams, - const std::vector<Tensor>& params) { - // @TODO read guard on parameters - int pos = 0; - std::vector<std::thread> threads; - for(int idx = 0; idx < devices_.size(); idx++) { - threads.emplace_back(std::thread( - [=](int idx, int pos) { - oldParams->subtensor(pos, params[idx]->size())->copyFrom(params[idx]); - }, - idx, - pos)); - pos += shardSize_; - } - for(auto&& t : threads) { - t.join(); +void SyncGraphGroup::initialize(const std::vector<Ptr<data::Batch>>& batches) { + // Initialize 0th graph with random weights in one forward step + { + THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]); + graphs_[0]->forward();); + + // Copy weights from 0th graph to all other graphs + // to have equal weights across devices + ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1); + for(size_t i = 1; i < graphs_.size(); ++i) { + auto init = [&](size_t i) { + // initialize i-th graph and weights + builders_[i]->build(graphs_[i], batches[0]); + graphs_[i]->forward(); + // overwrite weights of i-th graph with weights from 0th graph + graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals()); + }; + pool.enqueue(init, i); + } } -} - -void SyncGraphGroup::execute(Ptr<data::Batch> fullBatch) { - std::vector<Ptr<data::Batch>> delayedBatches = - delay_ > 1 ? - fullBatch->split(delay_) : - std::vector<Ptr<data::Batch>>({ fullBatch }); - - std::vector<float> costs(devices_.size(), 0.f); - - size_t t = 1; - for(auto batch : delayedBatches) { - std::vector<Ptr<data::Batch>> batches = batch->split(devices_.size()); - - if(first_) { - { - THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]); - graphs_[0]->forward();); - - ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1); - for(size_t i = 1; i < graphs_.size(); ++i) { - auto init = [&](size_t i) { - builders_[i]->build(graphs_[i], batches[0]); - graphs_[i]->forward(); - graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals()); - }; - pool.enqueue(init, i); - } - } - if(params_.size() == 0) { - int totalSize = graphs_[0]->params()->vals()->size(); - shardSize_ = ceil(totalSize / (float)devices_.size()); + if(movingAvg_ && paramsAvg_.size() == 0) { + int totalSize = graphs_[0]->params()->vals()->size(); + shardSize_ = ceil(totalSize / (float)devices_.size()); - int pos = 0; - for(auto graph : graphs_) { - int __size__ = std::min(shardSize_, totalSize); + int pos = 0; + for(auto graph : graphs_) { + int __size__ = std::min(shardSize_, totalSize); - auto paramsAlloc = New<TensorAllocator>(graph->getBackend()); - paramsAllocs_.push_back(paramsAlloc); + auto paramsAlloc = New<TensorAllocator>(graph->getBackend()); + paramsAllocs_.push_back(paramsAlloc); - paramsAlloc->reserveExact(3 * __size__ * sizeof(float)); + paramsAlloc->reserveExact(__size__ * sizeof(float)); - Tensor param, grad, tmp; - paramsAlloc->allocate(param, {1, __size__}); - paramsAlloc->allocate(grad, {1, __size__}); - paramsAlloc->allocate(tmp, {1, __size__}); - params_.push_back(param); + Tensor paramAvg; + paramsAlloc->allocate(paramAvg, {1, __size__}); + paramsAvg_.push_back(paramAvg); - grad->set(0.f); - grads_.push_back(grad); + paramAvg->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__)); - tmpTensors_.push_back(tmp); - - param->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__)); - pos += __size__; - totalSize -= __size__; - } - } - - if(movingAvg_ && paramsAvg_.size() == 0) { - int totalSize = graphs_[0]->params()->vals()->size(); - - int i = 0; - for(auto graph : graphs_) { - int __size__ = std::min(shardSize_, totalSize); - totalSize -= __size__; - Tensor paramAvg; - auto allocator = New<TensorAllocator>(graph->getBackend()); - - allocator->reserveExact(__size__ * sizeof(float)); - allocator->allocate(paramAvg, {1, __size__}); - - paramAvg->copyFrom(params_[i++]); + // move to next shard + pos += __size__; + totalSize -= __size__; + } + } +} - paramsAllocAvg_.push_back(allocator); - paramsAvg_.push_back(paramAvg); - } +void SyncGraphGroup::execute(Ptr<data::Batch> batch) { + size_t devs = devices_.size(); + auto batches = batch->split(delay_ * devs); + + float div = batches.size(); // no. of batches + // do not average gradients if cost type is sum. + if(options_->get<std::string>("cost-type") == "ce-sum") + div = 1; + + std::vector<std::vector<Ptr<data::Batch>>> delayedBatches; + + for(int i = 0; i < delay_; ++i) { + if(i * devs < batches.size()) { + delayedBatches.emplace_back(); + for(int j = 0; j < devs; ++j) { + size_t index = i * devs + j; + if(index < batches.size()) + delayedBatches.back().push_back(batches[i * devs + j]); + else + delayedBatches.back().push_back(nullptr); } - - first_ = false; } + } - { - auto task = [this, &costs, batches](size_t idx) { - auto graph = graphs_[idx]; - auto batch = batches[idx]; - - if(batch->size() > 0) { - auto costNode = builders_[idx]->build(graph, batch); - graph->forward(); - costs[idx] += costNode->scalar(); - graph->backward(); - } - }; + std::vector<float> costs(devices_.size(), 0.f); + size_t t = 1; - ThreadPool pool(devices_.size(), devices_.size()); - for(int idx = 0; idx < batches.size(); ++idx) - pool.enqueue(task, idx); + for(const auto& curBatches : delayedBatches) { + if(first_) { + initialize(curBatches); + first_ = false; } - { - auto task = [this, batches](size_t idx, int pos, bool update) { - int size = params_[idx]->size(); - int i = 0; - - float div = devices_.size(); // no. of GPUs + // Execute single forward/backward step + auto forwardBackward = [this, &costs, curBatches, t](size_t idx, int pos) { + auto graph = graphs_[idx]; + auto batch = curBatches[idx]; - // do not average gradients if cost type is sum. - if(options_->get<std::string>("cost-type") == "ce-sum") { - div = 1; - } + if(batch) { + auto costNode = builders_[idx]->build(graph, batch); + graph->forward(); + costs[idx] += costNode->scalar(); - for(auto graph : graphs_) { - if(batches[i]->size() > 0) { - auto subGrad = graph->params()->grads()->subtensor(pos, size); - tmpTensors_[idx]->copyFrom(subGrad); + // only reset gradients to 0 if t == 1 + graph->backward(t == 1); + } + else { + // handle case of empty batch, execute do-nothing fw-bw step for + // proper inits and resets. + graph->forward(); + graph->backward(t == 1); + } + }; - using namespace functional; - Element(_1 = _1 + (_2 / div), grads_[idx], tmpTensors_[idx]); - } - i++; - } + // Update parameter shard with gradient shard + auto update = [this, div](size_t idx, int pos) { + int totalSize = graphs_[0]->params()->vals()->size(); + int shardSize = ceil(totalSize / (float)devices_.size()); - if(update) { - shardOpt_[idx]->update(params_[idx], grads_[idx]); - grads_[idx]->set(0.f); + int size = std::min(totalSize - pos, shardSize); - if(movingAvg_) - updateMovingAverage( - paramsAvg_[idx], params_[idx], scheduler_->numberOfBatches()); + auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, size); + auto curParam = graphs_[idx]->params()->vals()->subtensor(pos, size); - for(auto graph : graphs_) { - auto subParam = graph->params()->vals()->subtensor(pos, size); - subParam->copyFrom(params_[idx]); - } - } + if(div != 1) { + using namespace functional; + Element(_1 = _1 / div, curGrad); + } - }; + shardOpt_[idx]->update(curParam, curGrad); - ThreadPool pool(devices_.size(), devices_.size()); - int pos = 0; - for(int idx = 0; idx < devices_.size(); ++idx) { - pool.enqueue(task, idx, pos, t == delay_); - pos += params_[idx]->size(); - } + if(movingAvg_) + updateMovingAverage(paramsAvg_[idx], curParam, scheduler_->numberOfBatches()); + }; + + comm_->foreach(forwardBackward); + if(t == delayedBatches.size()) { + comm_->scatterReduce(); + comm_->foreach(update); + comm_->allGather(); } t++; @@ -202,24 +187,108 @@ void SyncGraphGroup::execute(Ptr<data::Batch> fullBatch) { } if(scheduler_) { - scheduler_->update(cost, fullBatch); + scheduler_->update(cost, batches); if(scheduler_->saving()) { this->save(); } if(scheduler_->validating()) { - if(movingAvg_) - for(auto graph : graphs_) - fetchParams(graph->params()->vals(), paramsAvg_); + if(movingAvg_) { + comm_->swapParams(paramsAvg_); + } // safe, because all graphs are idle during validation with sync sgd scheduler_->validate(graphs_); - if(movingAvg_) - for(auto graph : graphs_) - fetchParams(graph->params()->vals(), params_); + if(movingAvg_) { + comm_->swapParams(paramsAvg_); + } + } + } +} + +void SyncGraphGroup::load() { + if(!options_->get<bool>("no-reload")) { + std::string name = options_->get<std::string>("model"); + + if(boost::filesystem::exists(name)) { + size_t i = 0; + if(scheduler_) + scheduler_->load(name); + for(auto graph : graphs_) + builders_[i++]->load(graph, name); + + // @TODO: probably we want to have the list of DeviceIds as an attribute + std::vector<Ptr<Backend>> backends; + for(auto graph : graphs_) + backends.push_back(graph->getBackend()); + shardOpt_[0]->load(name + ".optimizer.npz", shardOpt_, backends); + + } else if(options_->has("pretrained-model")) { + std::string init = options_->get<std::string>("pretrained-model"); + LOG(info, + "Initialize model weights with the pre-trained model {}", + init); + size_t i = 0; + for(auto graph : graphs_) + builders_[i++]->load(graph, init, false); } } } + +void SyncGraphGroup::save(bool final) { + if(final && scheduler_) { + if(movingAvg_ && paramsAvg_.size() > 0) + comm_->swapParams(paramsAvg_); + + scheduler_->validate(graphs_, true); + + if(movingAvg_ && paramsAvg_.size() > 0) + comm_->swapParams(paramsAvg_); + } + save(graphs_[0], final); + } + + void SyncGraphGroup::save(Ptr<ExpressionGraph> graph, bool final) { + int idx = 0; + for(int i = 0; i < graphs_.size(); ++i) { + if(graph == graphs_[i]) { + idx = i; + break; + } + } + + if(movingAvg_ && paramsAvg_.size() > 0) + comm_->swapParams(paramsAvg_); + + std::string name = options_->get<std::string>("model"); + + if(options_->get<bool>("overwrite")) { + builders_[idx]->save(graphs_[idx], name, true); + if(scheduler_) + scheduler_->save(name); + } else { + if(!final) { + std::string numberOfBatches + = scheduler_ ? std::to_string(scheduler_->numberOfBatches()) + : "unknown"; + std::string nameOverwrite = name; + nameOverwrite.replace( + name.size() - 4, 4, ".iter" + numberOfBatches + ".npz"); + builders_[idx]->save(graphs_[idx], nameOverwrite); + } + + builders_[idx]->save(graphs_[idx], name, true); + if(scheduler_) + scheduler_->save(name); + } + + if(movingAvg_ && paramsAvg_.size() > 0) + comm_->swapParams(paramsAvg_); + + size_t totalSize = graphs_[idx]->params()->vals()->size(); + shardOpt_[idx]->save(name + ".optimizer.npz", shardOpt_, totalSize); + } + } diff --git a/src/training/graph_group_sync.h b/src/training/graph_group_sync.h index b0e2b428..8a7c913f 100644 --- a/src/training/graph_group_sync.h +++ b/src/training/graph_group_sync.h @@ -4,6 +4,7 @@ #include "3rd_party/threadpool.h" #include "training/graph_group.h" +#include "training/communicator.h" namespace marian { @@ -12,26 +13,27 @@ public: virtual void setScheduler(Ptr<Scheduler> scheduler); private: + + Ptr<Communicator> comm_; + std::vector<Ptr<models::ModelBase>> builders_; std::vector<Ptr<ExpressionGraph>> graphs_; std::vector<DeviceId> devices_; - std::vector<Tensor> params_; - std::vector<Tensor> grads_; - std::vector<Tensor> tmpTensors_; - std::vector<Ptr<TensorAllocator>> paramsAllocs_; - std::vector<Ptr<OptimizerBase>> shardOpt_; int shardSize_; bool first_{true}; std::vector<Tensor> paramsAvg_; - std::vector<Ptr<TensorAllocator>> paramsAllocAvg_; + std::vector<Ptr<TensorAllocator>> paramsAllocs_; + bool movingAvg_{false}; float mvDecay_{1e-4f}; size_t delay_{1}; + void initialize(const std::vector<Ptr<data::Batch>>& batches); + void updateMovingAverage(Tensor paramsAvg, Tensor params, size_t batches); void fetchParams(Tensor oldParams, const std::vector<Tensor>& params); @@ -39,117 +41,23 @@ private: void execute(Ptr<data::Batch> batch); public: - SyncGraphGroup(Ptr<Config> config) - : GraphGroup(config), - devices_{options_->getDevices()}, - movingAvg_{options_->get<float>("exponential-smoothing") > 0}, - mvDecay_{options_->get<float>("exponential-smoothing")}, - delay_{options_->get<size_t>("optimizer-delay")} { - for(auto device : devices_) { - auto graph = New<ExpressionGraph>(); - graph->setDevice(device); - graph->reserveWorkspaceMB(options_->get<size_t>("workspace")); - graph->getBackend()->setClip(options_->get<float>("clip-gemm")); - - graphs_.push_back(graph); - shardOpt_.push_back(Optimizer(options_)); - builders_.push_back(models::from_config(options_, models::usage::training)); - } - } + SyncGraphGroup(Ptr<Config> config); void update(Ptr<data::Batch> batch) { ABORT_IF(finalized_, "Training has already finished."); execute(batch); } - void load() { - if(!options_->get<bool>("no-reload")) { - std::string name = options_->get<std::string>("model"); - - if(boost::filesystem::exists(name)) { - size_t i = 0; - if(scheduler_) - scheduler_->load(name); - for(auto graph : graphs_) - builders_[i++]->load(graph, name); - - // @TODO: probably we want to have the list of DeviceIds as an attribute - std::vector<Ptr<Backend>> backends; - for(auto graph : graphs_) - backends.push_back(graph->getBackend()); - shardOpt_[0]->load(name + ".optimizer.npz", shardOpt_, backends); - - } else if(options_->has("pretrained-model")) { - std::string init = options_->get<std::string>("pretrained-model"); - LOG(info, - "Initialize model weights with the pre-trained model {}", - init); - size_t i = 0; - for(auto graph : graphs_) - builders_[i++]->load(graph, init, false); - } - } - } - - void save(bool final = false) { - if(final && scheduler_) { - if(movingAvg_ && paramsAvg_.size() > 0) - for(auto graph : graphs_) - fetchParams(graph->params()->vals(), paramsAvg_); - - scheduler_->validate(graphs_, true); - - if(movingAvg_ && paramsAvg_.size() > 0) - for(auto graph : graphs_) - fetchParams(graph->params()->vals(), params_); - } + void load(); + void save(bool final = false); + void save(Ptr<ExpressionGraph> graph, bool final = false); - save(graphs_[0], final); - } - - void save(Ptr<ExpressionGraph> graph, bool final = false) { - int idx = 0; - for(int i = 0; i < graphs_.size(); ++i) { - if(graph == graphs_[i]) { - idx = i; - break; - } - } - - if(movingAvg_ && paramsAvg_.size() > 0) - fetchParams(graphs_[idx]->params()->vals(), paramsAvg_); - - std::string name = options_->get<std::string>("model"); - - if(options_->get<bool>("overwrite")) { - builders_[idx]->save(graphs_[idx], name, true); - if(scheduler_) - scheduler_->save(name); - } else { - if(!final) { - std::string numberOfBatches - = scheduler_ ? std::to_string(scheduler_->numberOfBatches()) - : "unknown"; - std::string nameOverwrite = name; - nameOverwrite.replace( - name.size() - 4, 4, ".iter" + numberOfBatches + ".npz"); - builders_[idx]->save(graphs_[idx], nameOverwrite); - } - - builders_[idx]->save(graphs_[idx], name, true); - if(scheduler_) - scheduler_->save(name); - } - - if(movingAvg_ && paramsAvg_.size() > 0) - fetchParams(graphs_[idx]->params()->vals(), params_); - - size_t totalSize = graphs_[idx]->params()->vals()->size(); - shardOpt_[idx]->save(name + ".optimizer.npz", shardOpt_, totalSize); + Ptr<data::BatchStats> collectStats() { + return GraphGroup::collectStats(graphs_[0], builders_[0], numBatches()); } - Ptr<data::BatchStats> collectStats() { - return GraphGroup::collectStats(graphs_[0], builders_[0], devices_.size() * delay_); + size_t numBatches() { + return devices_.size() * delay_; } virtual void finalize() { diff --git a/src/training/scheduler.h b/src/training/scheduler.h index 6dc1f0a9..24dce827 100644 --- a/src/training/scheduler.h +++ b/src/training/scheduler.h @@ -154,10 +154,20 @@ public: } void update(float cost, Ptr<data::Batch> batch) { + update(cost, std::vector<Ptr<data::Batch>>({batch})); + } + + void update(float cost, const std::vector<Ptr<data::Batch>>& batches) { state_->validated = false; - auto batchSize = batch->size(); // number of sentences in batch - auto batchLabels = batch->words(-1); // number of target words in batch + auto batchSize = 0; // number of sentences in batch + auto batchLabels = 0; // number of target words in batch + + for(const auto& batch : batches) { + batchSize += batch->size(); + batchLabels += batch->words(-1); + } + // reconstruct sum cost, for displaying epoch-level averages instead of minibatch-level auto costType = options_->get<std::string>("cost-type"); auto dispLabelCounts = options_->get<bool>("disp-label-counts"); // if true then show as "cost per label * number of labels" @@ -178,6 +188,7 @@ public: state_->wordsDisp += batchLabels; // target words processed since last display, for speed display state_->samplesEpoch += batchSize; // sentences processed in this epoch state_->labelsTotal += batchLabels; // total labels processed + state_->newBatch(); if(state_->batches % options_->get<size_t>("disp-freq") == 0) { diff --git a/src/training/validator.h b/src/training/validator.h index b25f462a..5d5bdbca 100644 --- a/src/training/validator.h +++ b/src/training/validator.h @@ -14,7 +14,7 @@ #include "translator/beam_search.h" #include "translator/history.h" #include "translator/output_collector.h" -#include "translator/printer.h" +#include "translator/output_printer.h" #include "translator/scorers.h" namespace marian { @@ -294,9 +294,11 @@ public: boost::timer::cpu_timer timer; { + auto printer = New<OutputPrinter>(options_, vocabs_.back()); auto collector = options_->has("valid-translation-output") ? New<OutputCollector>(fileName) : New<OutputCollector>(*tempFile); + if(quiet_) collector->setPrintingStrategy(New<QuietPrinting>()); else @@ -329,7 +331,7 @@ public: for(auto history : histories) { std::stringstream best1; std::stringstream bestn; - Printer(options_, vocabs_.back(), history, best1, bestn); + printer->print(history, best1, bestn); collector->Write(history->GetLineNum(), best1.str(), bestn.str(), diff --git a/src/translator/beam_search.h b/src/translator/beam_search.h index 1fbca214..0ff65917 100644 --- a/src/translator/beam_search.h +++ b/src/translator/beam_search.h @@ -36,16 +36,22 @@ public: const Beams& beams, std::vector<Ptr<ScorerState>>& states, size_t beamSize, - bool first) { + bool first, + Ptr<data::CorpusBatch> batch) { Beams newBeams(beams.size()); - for(int i = 0; i < keys.size(); ++i) { - // keys is contains indices to vocab items in the entire beam. - // values can be between 0 and beamSize * vocabSize. + std::vector<float> alignments; + if(options_->get<float>("alignment", 0.f)) + // Use alignments from the first scorer, even if ensemble + alignments = scorers_[0]->getAlignment(); + + for(int i = 0; i < keys.size(); ++i) { + // Keys contains indices to vocab items in the entire beam. + // Values can be between 0 and beamSize * vocabSize. int embIdx = keys[i] % vocabSize; int beamIdx = i / beamSize; - // retrieve short list for final softmax (based on words aligned + // Retrieve short list for final softmax (based on words aligned // to source sentences). If short list has been set, map the indices // in the sub-selected vocabulary matrix back to their original positions. auto shortlist = scorers_[0]->getShortlist(); @@ -72,6 +78,8 @@ public: beamHypIdx = 0; auto hyp = New<Hypothesis>(beam[beamHypIdx], embIdx, hypIdxTrans, cost); + + // Set cost breakdown for n-best lists if(options_->get<bool>("n-best")) { std::vector<float> breakDown(states.size(), 0); beam[beamHypIdx]->GetCostBreakdown().resize(states.size(), 0); @@ -82,12 +90,57 @@ public: } hyp->GetCostBreakdown() = breakDown; } + + // Set alignments + if(!alignments.empty()) { + auto align = getHardAlignmentsForHypothesis( + alignments, batch, beamSize, beamHypIdx, beamIdx); + hyp->SetAlignment(align); + } + newBeam.push_back(hyp); } } return newBeams; } + std::vector<float> getHardAlignmentsForHypothesis( + const std::vector<float> alignments, + Ptr<data::CorpusBatch> batch, + int beamSize, + int beamHypIdx, + int beamIdx) { + // Let's B be the beam size, N be the number of batched sentences, + // and L the number of words in the longest sentence in the batch. + // The alignment vector: + // + // if(first) + // * has length of N x L if it's the first beam + // * stores elements in the following order: + // beam1 = [word1-batch1, word1-batch2, ..., word2-batch1, ...] + // else + // * has length of N x L x B + // * stores elements in the following order: + // beams = [beam1, beam2, ..., beam_n] + // + // The mask vector is always of length N x L and has 1/0s stored like + // in a single beam, i.e.: + // * [word1-batch1, word1-batch2, ..., word2-batch1, ...] + // + size_t batchSize = batch->size(); + size_t batchWidth = batch->width() * batchSize; + std::vector<float> align; + + for(size_t w = 0; w < batchWidth / batchSize; ++w) { + size_t a = ((batchWidth * beamHypIdx) + beamIdx) + (batchSize * w); + size_t m = a % batchWidth; + if(batch->front()->mask()[m] != 0) + align.emplace_back(alignments[a]); + } + + return align; + } + Beams pruneBeam(const Beams& beams) { Beams newBeams; for(auto beam : beams) { @@ -108,7 +161,9 @@ public: Histories histories; for(int i = 0; i < dimBatch; ++i) { size_t sentId = batch->getSentenceIds()[i]; - auto history = New<History>(sentId, options_->get<float>("normalize"), options_->get<float>("word-penalty")); + auto history = New<History>(sentId, + options_->get<float>("normalize"), + options_->get<float>("word-penalty")); histories.push_back(history); } @@ -183,8 +238,12 @@ public: // BUGBUG: it's not cost but score (higher=better) for(int i = 0; i < scorers_.size(); ++i) { - states[i] = scorers_[i]->step( - graph, states[i], hypIndices, embIndices, dimBatch, localBeamSize); + states[i] = scorers_[i]->step(graph, + states[i], + hypIndices, + embIndices, + dimBatch, + localBeamSize); if(scorers_[i]->getWeight() != 1.f) totalCosts @@ -219,14 +278,22 @@ public: nth->getNBestList(beamSizes, totalCosts->val(), outCosts, outKeys, first); int dimTrgVoc = totalCosts->shape()[-1]; - beams = toHyps( - outKeys, outCosts, dimTrgVoc, beams, states, localBeamSize, first); + beams = toHyps(outKeys, + outCosts, + dimTrgVoc, + beams, + states, + localBeamSize, + first, + batch); auto prunedBeams = pruneBeam(beams); for(int i = 0; i < dimBatch; ++i) { if(!beams[i].empty()) { final = final - || histories[i]->size() >= options_->get<float>("max-length-factor") * batch->front()->batchWidth(); + || histories[i]->size() + >= options_->get<float>("max-length-factor") + * batch->front()->batchWidth(); histories[i]->Add(beams[i], trgEosId_, prunedBeams[i].empty() || final); } } diff --git a/src/translator/hypothesis.h b/src/translator/hypothesis.h index 1bc4fe47..ac9d6699 100644 --- a/src/translator/hypothesis.h +++ b/src/translator/hypothesis.h @@ -24,6 +24,9 @@ public: float GetCost() const { return cost_; } std::vector<float>& GetCostBreakdown() { return costBreakdown_; } + std::vector<float>& GetAlignment() { return alignment_; } + + void SetAlignment(const std::vector<float>& align) { alignment_ = align; }; private: const Ptr<Hypothesis> prevHyp_; @@ -32,6 +35,7 @@ private: const float cost_; std::vector<float> costBreakdown_; + std::vector<float> alignment_; }; typedef std::vector<Ptr<Hypothesis>> Beam; diff --git a/src/translator/output_printer.cpp b/src/translator/output_printer.cpp new file mode 100644 index 00000000..2230755d --- /dev/null +++ b/src/translator/output_printer.cpp @@ -0,0 +1,64 @@ +#include "output_printer.h" + +namespace marian { + +std::vector<HardAlignment> OutputPrinter::getAlignment( + const Ptr<Hypothesis>& hyp, + float threshold) { + std::vector<SoftAlignment> alignSoft; + // Skip EOS + auto last = hyp->GetPrevHyp(); + // Get soft alignments for each target word + while(last->GetPrevHyp().get() != nullptr) { + alignSoft.push_back(last->GetAlignment()); + last = last->GetPrevHyp(); + } + + std::vector<HardAlignment> align; + // Alignments by maximum value + if(threshold == 1.f) { + for(size_t t = 0; t < alignSoft.size(); ++t) { + // Retrieved alignments are in reversed order + size_t rev = alignSoft.size() - t - 1; + size_t maxArg = 0; + for(size_t s = 0; s < alignSoft[0].size(); ++s) { + if(alignSoft[rev][maxArg] < alignSoft[rev][s]) { + maxArg = s; + } + } + align.push_back(std::make_pair(maxArg, t)); + } + } else { + // Alignments by greather-than-threshold + for(size_t t = 0; t < alignSoft.size(); ++t) { + // Retrieved alignments are in reversed order + size_t rev = alignSoft.size() - t - 1; + for(size_t s = 0; s < alignSoft[0].size(); ++s) { + if(alignSoft[rev][s] > threshold) { + align.push_back(std::make_pair(s, t)); + } + } + } + } + + // Sort alignment pairs in ascending order + std::sort(align.begin(), + align.end(), + [](const HardAlignment& a, const HardAlignment& b) { + return (a.first == b.first) ? a.second < b.second + : a.first < b.first; + }); + + return align; +} + +std::string OutputPrinter::getAlignmentString( + const std::vector<HardAlignment>& align) { + std::stringstream alignStr; + alignStr << " |||"; + for(auto p = align.begin(); p != align.end(); ++p) { + alignStr << " " << p->first << "-" << p->second; + } + return alignStr.str(); +} +} diff --git a/src/translator/output_printer.h b/src/translator/output_printer.h new file mode 100644 index 00000000..e309484e --- /dev/null +++ b/src/translator/output_printer.h @@ -0,0 +1,86 @@ +#pragma once + +#include <vector> + +#include "common/config.h" +#include "common/utils.h" +#include "data/vocab.h" +#include "translator/history.h" +#include "translator/hypothesis.h" + +namespace marian { + +typedef std::vector<float> SoftAlignment; +typedef std::pair<size_t, size_t> HardAlignment; + +class OutputPrinter { +public: + OutputPrinter(Ptr<Config> options, Ptr<Vocab> vocab) + : vocab_(vocab), + reverse_(options->get<bool>("right-left")), + nbest_(options->get<bool>("n-best", false) + ? options->get<size_t>("beam-size") + : 0), + alignment_(options->get<float>("alignment", 0.f)) {} + + template <class OStream> + void print(Ptr<History> history, OStream& best1, OStream& bestn) { + const auto& nbl = history->NBest(nbest_); + + for(size_t i = 0; i < nbl.size(); ++i) { + const auto& result = nbl[i]; + const auto& words = std::get<0>(result); + const auto& hypo = std::get<1>(result); + + std::string translation = Join((*vocab_)(words), " ", reverse_); + bestn << history->GetLineNum() << " ||| " << translation; + + if(alignment_ > 0.f) { + auto align = getAlignment(hypo, alignment_); + bestn << getAlignmentString(align); + } + + bestn << " |||"; + + if(hypo->GetCostBreakdown().empty()) { + bestn << " F0=" << hypo->GetCost(); + } else { + for(size_t j = 0; j < hypo->GetCostBreakdown().size(); ++j) { + bestn << " F" << j << "= " << hypo->GetCostBreakdown()[j]; + } + } + + float realCost = std::get<2>(result); + bestn << " ||| " << realCost; + + if(i < nbl.size() - 1) + bestn << std::endl; + else + bestn << std::flush; + } + + auto result = history->Top(); + const auto& words = std::get<0>(result); + + std::string translation = Join((*vocab_)(words), " ", reverse_); + + best1 << translation; + if(alignment_ > 0.f) { + const auto& hypo = std::get<1>(result); + auto align = getAlignment(hypo, alignment_); + best1 << getAlignmentString(align); + } + best1 << std::flush; + } + +private: + Ptr<Vocab> vocab_; + bool reverse_{false}; + size_t nbest_{0}; + float alignment_{0.f}; + + std::vector<HardAlignment> getAlignment(const Ptr<Hypothesis>& hyp, + float threshold); + std::string getAlignmentString(const std::vector<HardAlignment>& align); +}; +} diff --git a/src/translator/printer.cpp b/src/translator/printer.cpp deleted file mode 100644 index 50f83d20..00000000 --- a/src/translator/printer.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "printer.h" - -namespace marian { - -std::vector<size_t> GetAlignment(const HypothesisPtr& hypothesis) { - std::vector<SoftAlignment> aligns; - HypothesisPtr last = hypothesis->GetPrevHyp(); - while(last->GetPrevHyp().get() != nullptr) { - aligns.push_back(*(last->GetAlignment(0))); - last = last->GetPrevHyp(); - } - - std::vector<size_t> alignment; - for(auto it = aligns.rbegin(); it != aligns.rend(); ++it) { - size_t maxArg = 0; - for(size_t i = 0; i < it->size(); ++i) { - if((*it)[maxArg] < (*it)[i]) { - maxArg = i; - } - } - alignment.push_back(maxArg); - } - - return alignment; -} - -std::string GetAlignmentString(const std::vector<size_t>& alignment) { - std::stringstream alignString; - alignString << " |||"; - for(size_t wordIdx = 0; wordIdx < alignment.size(); ++wordIdx) { - alignString << " " << wordIdx << "-" << alignment[wordIdx]; - } - return alignString.str(); -} -} diff --git a/src/translator/printer.h b/src/translator/printer.h deleted file mode 100644 index a14d5d96..00000000 --- a/src/translator/printer.h +++ /dev/null @@ -1,56 +0,0 @@ -#pragma once - -#include <vector> - -#include "common/utils.h" -#include "data/vocab.h" -#include "translator/history.h" - -namespace marian { - -template <class OStream> -void Printer(Ptr<Config> options, - Ptr<Vocab> vocab, - Ptr<History> history, - OStream& best1, - OStream& bestn) { - bool reverse = options->get<bool>("right-left"); - - if(options->has("n-best") && options->get<bool>("n-best")) { - const auto& nbl = history->NBest(options->get<size_t>("beam-size")); - - for(size_t i = 0; i < nbl.size(); ++i) { - const auto& result = nbl[i]; - const auto& words = std::get<0>(result); - const auto& hypo = std::get<1>(result); - - float realCost = std::get<2>(result); - - std::string translation = Join((*vocab)(words), " ", reverse); - - bestn << history->GetLineNum() << " ||| " << translation << " |||"; - - if(hypo->GetCostBreakdown().empty()) { - bestn << " F0=" << hypo->GetCost(); - } else { - for(size_t j = 0; j < hypo->GetCostBreakdown().size(); ++j) { - bestn << " F" << j << "= " << hypo->GetCostBreakdown()[j]; - } - } - - bestn << " ||| " << realCost; - - if(i < nbl.size() - 1) - bestn << std::endl; - else - bestn << std::flush; - } - } - - auto bestTranslation = history->Top(); - - std::string translation - = Join((*vocab)(std::get<0>(bestTranslation)), " ", reverse); - best1 << translation << std::flush; -} -} diff --git a/src/translator/scorers.h b/src/translator/scorers.h index 30fd9d1d..ae5df297 100644 --- a/src/translator/scorers.h +++ b/src/translator/scorers.h @@ -1,8 +1,9 @@ #pragma once #include "marian.h" -#include "models/model_factory.h" + #include "data/shortlist.h" +#include "models/model_factory.h" namespace marian { @@ -41,8 +42,10 @@ public: virtual void init(Ptr<ExpressionGraph> graph) {} - virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) {}; + virtual void setShortlistGenerator( + Ptr<data::ShortlistGenerator> shortlistGenerator){}; virtual Ptr<data::Shortlist> getShortlist() { return nullptr; }; + virtual std::vector<float> getAlignment() { return {}; }; }; class ScorerWrapperState : public ScorerState { @@ -104,97 +107,19 @@ public: graph, wrappedState, hypIndices, embIndices, dimBatch, beamSize)); } - virtual void setShortlistGenerator(Ptr<data::ShortlistGenerator> shortlistGenerator) { + virtual void setShortlistGenerator( + Ptr<data::ShortlistGenerator> shortlistGenerator) { encdec_->setShortlistGenerator(shortlistGenerator); }; virtual Ptr<data::Shortlist> getShortlist() { return encdec_->getShortlist(); }; -}; -//class WordPenaltyState : public ScorerState { -//private: -// int dimVocab_; -// Expr penalties_; -// -//public: -// WordPenaltyState(int dimVocab, Expr penalties) -// : dimVocab_(dimVocab), penalties_(penalties) {} -// -// virtual Expr getProbs() { return penalties_; }; -// -// virtual float breakDown(size_t i) { -// return getProbs()->val()->get(i % dimVocab_); -// } -//}; -// -//class WordPenalty : public Scorer { -//private: -// int dimVocab_; -// Expr penalties_; -// -//public: -// WordPenalty(const std::string& name, float weight, int dimVocab) -// : Scorer(name, weight), dimVocab_(dimVocab) {} -// -// virtual void clear(Ptr<ExpressionGraph> graph) {} -// -// virtual Ptr<ScorerState> startState(Ptr<ExpressionGraph> graph, -// Ptr<data::CorpusBatch> batch) { -// std::vector<float> p(dimVocab_, 1); -// p[0] = 0; -// p[2] = 0; -// -// penalties_ = graph->constant({1, dimVocab_}, inits::from_vector(p)); -// return New<WordPenaltyState>(dimVocab_, penalties_); -// } -// -// virtual Ptr<ScorerState> step(Ptr<ExpressionGraph> graph, -// Ptr<ScorerState> state, -// const std::vector<size_t>& hypIndices, -// const std::vector<size_t>& embIndices, -// int dimBatch, -// int beamSize) { -// return state; -// } -//}; -// -//class UnseenWordPenalty : public Scorer { -//private: -// int batchIndex_; -// int dimVocab_; -// Expr penalties_; -// -//public: -// UnseenWordPenalty(const std::string& name, -// float weight, -// int dimVocab, -// int batchIndex) -// : Scorer(name, weight), dimVocab_(dimVocab), batchIndex_(batchIndex) {} -// -// virtual void clear(Ptr<ExpressionGraph> graph) {} -// -// virtual Ptr<ScorerState> startState(Ptr<ExpressionGraph> graph, -// Ptr<data::CorpusBatch> batch) { -// std::vector<float> p(dimVocab_, -1); -// for(auto i : (*batch)[batchIndex_]->data()) -// p[i] = 0; -// p[2] = 0; -// -// penalties_ = graph->constant({1, dimVocab_}, inits::from_vector(p)); -// return New<WordPenaltyState>(dimVocab_, penalties_); -// } -// -// virtual Ptr<ScorerState> step(Ptr<ExpressionGraph> graph, -// Ptr<ScorerState> state, -// const std::vector<size_t>& hypIndices, -// const std::vector<size_t>& embIndices, -// int dimBatch, -// int beamSize) { -// return state; -// } -//}; + virtual std::vector<float> getAlignment() { + return encdec_->getAlignment(); + } +}; Ptr<Scorer> scorerByType(std::string fname, float weight, diff --git a/src/translator/translator.h b/src/translator/translator.h index 84a85d06..759669ee 100644 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -8,7 +8,7 @@ #include "3rd_party/threadpool.h" #include "translator/history.h" #include "translator/output_collector.h" -#include "translator/printer.h" +#include "translator/output_printer.h" #include "models/model_task.h" #include "translator/scorers.h" @@ -83,6 +83,7 @@ public: size_t batchId = 0; auto collector = New<OutputCollector>(); + auto printer = New<OutputPrinter>(options_, trgVocab_); if(options_->get<bool>("quiet-translation")) collector->setPrintingStrategy(New<QuietPrinting>()); @@ -111,7 +112,7 @@ public: for(auto history : histories) { std::stringstream best1; std::stringstream bestn; - Printer(options_, trgVocab_, history, best1, bestn); + printer->print(history, best1, bestn); collector->Write(history->GetLineNum(), best1.str(), bestn.str(), @@ -176,6 +177,7 @@ public: data::BatchGenerator<data::TextInput> bg(corpus_, options_); auto collector = New<StringCollector>(); + auto printer = New<OutputPrinter>(options_, trgVocab_); size_t batchId = 0; // @TODO: unify this and get rid of Config object. @@ -205,7 +207,7 @@ public: for(auto history : histories) { std::stringstream best1; std::stringstream bestn; - Printer(options_, trgVocab_, history, best1, bestn); + printer->print(history, best1, bestn); collector->add(history->GetLineNum(), best1.str(), bestn.str()); } }; |