Welcome to mirror list, hosted at ThFree Co, Russian Federation.

graph_group_multinode_sync.h « training « src - github.com/marian-nmt/marian.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c4bd97307d55ade61a7433cffe4c577baf748887 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#pragma once

#include "training/graph_group.h"
#include "training/communicator.h"
#include "3rd_party/threadpool.h"

#include <condition_variable>
#include <future>
#include <thread>

namespace marian {

/**
 * Multi-node graph group for asynchronous training over multiple
 * machines each with one or multiple GPUs
 */
class MultiNodeGraphGroupSync : public MultiNodeGraphGroupBase {
  using Base = MultiNodeGraphGroupBase;
public:
  virtual void setScheduler(Ptr<Scheduler> scheduler) override;

private:
    ////////////////////////////////////////////////////////////////////////////
  // General variables.

  /** Whether graph group has been properly initialized with a first batch. */
  bool initialized_{false};

  /** Memory allocators for tensors (GPUs). */
  std::vector<Ptr<TensorAllocator>> allocators_;

  ////////////////////////////////////////////////////////////////////////////
  // Client variables.

  /** Mutex to ensure clients are uniquely assigned to graphs and builders. */
  std::mutex mutexClientInit_;

  /** Mutex to avoid race conditions in scheduler. */
  std::mutex schedulerMutex_;

  /**
   * Batch number counter used for evenly distributing mini-batches across
   * nodes.
   */
  size_t batchIter_ = 0;

  ////////////////////////////////////////////////////////////////////////////
  // Communication variables.

  /** Number of clients on nodes in MPI world (cluster). */
  std::vector<int> numberClientsOfNodes_;  //@TODO not used for now, but might
                                           // be useful maybe?

  /**
   * Variables for optimizer delay and synchronous SGD
   */
  size_t tau_{1};
  std::mutex sumGradientMutex_;
  std::mutex updateParamsMutex_;
  std::mutex sumCostMutex_;
  Tensor accGradientsSync;
  Tensor sumGradientBuffer;
  Tensor paramsAvg_;
  std::vector<float> accGradientsSync_cpu;
  std::vector<float> receiveBuffer_cpu;
  bool synchronization_happened{false};

  Ptr<OptimizerBase> syncOptimizer_;

  std::vector<std::mutex> optDelayMutex_;
  std::vector<size_t> delay_count;
  std::vector<int> totalBatchWords;
  std::vector<Tensor> accGradients, accGradientBuffer;

  bool movingAvg_{false};
  float mvDecay_{1e-4f};

  /**
   * Allocate new tensor on given GPU and store allocator.
   */
  Tensor newTensor(int size, Ptr<Backend> backend);

  /*
   * exponential smoothing
   */
  void updateAvgParams(Tensor paramsAvg, Tensor params, size_t batches);

  /**
   * Setup training environment and launch server thread and (if enabled) client
   * communication overlap threads..
   * Includes setting up MPI, node and shard sizes, clients, server shards and
   * communication overlap stuff.
   */
  virtual void init(Ptr<data::Batch> batch);

  /**
   * Setup clients that will compute gradients and communicate them with the
   * server shards.
   * There is one client per GPU.
   */
  void setupClients(Ptr<data::Batch> batch);

  /**
   * Initialize the graphs (models) of all clients on this node with the given
   * batch.
   */
  void runBatchThroughClientGraphs(Ptr<data::Batch> batch);

  /**
   * Initialize the CPU arrays, with pinned memory for faster CudaMemCpy
   * operations.
   */
  void initCPUArrays();

  /**
   * Sums the gradients from a node, taking care of locking
   * @param gradient - the gradient
   */

  void sumGRAD(Tensor gradient);

  /**
   * Does the MPI Communication, parameter update and copying back parameters.
   * @TODO ALHAM. God function too godly?
   */
  void sendReceiveUpdateSync();

  void execute(Ptr<data::Batch> batch);

public:
  /**
   * (Constructor) Call super class and initialize client graphs and builders.
   */
  MultiNodeGraphGroupSync(Ptr<Options> options, Ptr<IMPIWrapper> mpi)
      : Base(options, mpi),
        tau_{(size_t)options_->get<double>("optimizer-delay")},
        syncOptimizer_{Optimizer(options_)},
        movingAvg_{options_->get<float>("exponential-smoothing") > 0},
        mvDecay_{options_->get<float>("exponential-smoothing")} {
  }

  /**
   * Update any client model with given batch if batch is assigned to this node.
   */
  void update(Ptr<data::Batch> batch) override {
    validate();
    if(batchIter_ % mpi_->numMPIProcesses() == mpi_->myMPIRank()) {  // Only take batch assigned to this node
      execute(batch);
    }
    batchIter_++;
  }

  /**
   * Load models from disk if file exists and setting is not disabled
   */
  void load() override {
    if(!options_->get<bool>("no-reload")) {
      std::string name = options_->get<std::string>("model");

      if(filesystem::exists(name)) {
        if(scheduler_)
          scheduler_->load(name);
        size_t i = 0;
        for(auto graph : clientGraphs_)
          clientBuilders_[i++]->load(graph, name);
      } else if(options_->has("pretrained-model")) {
        std::string init = options_->get<std::string>("pretrained-model");
        LOG(info,
            "Initialize model weights with the pre-trained model {}",
            init);
        size_t i = 0;
        for(auto graph : clientGraphs_)
          clientBuilders_[i++]->load(graph, init, false);
      }
    }
  }

  /**
   * Save model of first client's graph to disk
   */
  void save(bool final = false) override { save(clientGraphs_[0], final); }

  /**
   * Save model of given graph to disk.
   */
  void save(Ptr<ExpressionGraph> graph, bool final = false) {
    int idx = 0;
    for(int i = 0; i < clientGraphs_.size(); ++i) {
      if(graph == clientGraphs_[i]) {
        idx = i;
        break;
      }
    }

    if(options_->get<bool>("overwrite")) {
      std::string name = options_->get<std::string>("model");

      clientBuilders_[idx]->save(clientGraphs_[idx], name, true);
      if(scheduler_)
        scheduler_->save(name);
    } else {
      std::string name = options_->get<std::string>("model");

      if(!final) {
        std::string numberOfBatches
            = scheduler_ ? std::to_string(scheduler_->numberOfBatches())
                         : "unknown";
        std::string nameOverwrite = name;
        nameOverwrite.replace(
            name.size() - 4, 4, ".iter" + numberOfBatches + ".npz");
        clientBuilders_[idx]->save(clientGraphs_[idx], nameOverwrite);
      }

      clientBuilders_[idx]->save(clientGraphs_[idx], name, true);
      if(scheduler_)
        scheduler_->save(name);
    }
  }

  /**
   * Collect statistics from first client's graph.
   */
  Ptr<data::BatchStats> collectStats(const std::vector<Ptr<Vocab>>& vocabs) {
    return GraphGroup::collectStats(
        clientGraphs_[0], clientBuilders_[0], vocabs, (double)devices_.size());
  }
};
}  // namespace marian