Welcome to mirror list, hosted at ThFree Co, Russian Federation.

partition_manager.go « gitaly « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 408d5f4a32467c0eee70b6127313b5034f1c04f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package gitaly

import (
	"context"
	"errors"
	"fmt"
	"os"
	"sync"

	"github.com/dgraph-io/badger/v3"
	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
	repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository"
	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
	"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
)

// ErrPartitionManagerStopped is returned when the PartitionManager stops processing transactions.
var ErrPartitionManagerStopped = errors.New("partition manager stopped")

// PartitionManager is responsible for managing the lifecycle of each TransactionManager.
type PartitionManager struct {
	// mu is the mutex to synchronize access to the partitions.
	mu sync.Mutex
	// db is the handle to the key-value store used for storing the write-ahead log related state.
	// It is used to create each transaction manager.
	db *badger.DB
	// partitions contains all the active partitions for which there are pending transactions.
	// Each repository can have up to one partition.
	partitions map[string]*partition
	// localRepoFactory is used by PartitionManager to construct `localrepo.Repo`.
	localRepoFactory localrepo.Factory
	// logger handles all logging for PartitionManager.
	logger logrus.FieldLogger
	// stopped tracks whether the PartitionManager has been stopped. If the manager is stopped,
	// no new transactions are allowed to begin.
	stopped bool
	// partitionsWG keeps track of running partitions.
	partitionsWG sync.WaitGroup
	// stagingDirectory is the directory where all of the TransactionManager staging directories
	// should be created.
	stagingDirectory string
	// storages are the storages configured in this Gitaly server. They are keyed by the name and the
	// value is the storage's path.
	storages map[string]string
}

// partition contains the transaction manager and tracks the number of in-flight transactions for the partition.
type partition struct {
	// shuttingDown is set when the partition shutdown was initiated due to being idle.
	shuttingDown bool
	// shutdown is closed to signal when the partition is finished shutting down. Clients stumbling on the
	// partition when it is shutting down wait on this channel to know when the partition has shut down and they
	// should retry.
	shutdown chan struct{}
	// transactionManager manages all transactions for the partition.
	transactionManager *TransactionManager
	// pendingTransactionCount holds the current number of in flight transactions being processed by the manager.
	pendingTransactionCount uint
}

// NewPartitionManager returns a new PartitionManager.
func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, stagingDir string) *PartitionManager {
	storagesMap := make(map[string]string, len(storages))
	for _, storage := range storages {
		storagesMap[storage.Name] = storage.Path
	}

	return &PartitionManager{
		db:               db,
		partitions:       make(map[string]*partition),
		localRepoFactory: localRepoFactory,
		logger:           logger,
		stagingDirectory: stagingDir,
		storages:         storagesMap,
	}
}

// getPartitionKey returns a partitions's key.
func getPartitionKey(storageName, relativePath string) string {
	return storageName + ":" + relativePath
}

// Begin gets the TransactionManager for the specified repository and starts a Transaction. If a
// TransactionManager is not already running, a new one is created and used. The partition tracks
// the number of pending transactions and this counter gets incremented when Begin is invoked.
func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Transaction, error) {
	storagePath, ok := pm.storages[repo.GetStorageName()]
	if !ok {
		return nil, structerr.NewNotFound("unknown storage: %q", repo.GetStorageName())
	}

	relativePath, err := storage.ValidateRelativePath(storagePath, repo.GetRelativePath())
	if err != nil {
		return nil, structerr.NewInvalidArgument("validate relative path: %w", err)
	}

	partitionKey := getPartitionKey(repo.GetStorageName(), relativePath)

	for {
		pm.mu.Lock()
		if pm.stopped {
			pm.mu.Unlock()
			return nil, ErrPartitionManagerStopped
		}

		ptn, ok := pm.partitions[partitionKey]
		if !ok {
			ptn = &partition{
				shutdown: make(chan struct{}),
			}

			stagingDir, err := os.MkdirTemp(pm.stagingDirectory, "")
			if err != nil {
				pm.mu.Unlock()
				return nil, fmt.Errorf("create staging directory: %w", err)
			}

			storageScopedFactory, err := pm.localRepoFactory.ScopeByStorage(repo.GetStorageName())
			if err != nil {
				pm.mu.Unlock()
				return nil, fmt.Errorf("scope by storage: %w", err)
			}

			mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.transactionFinalizerFactory(ptn))
			ptn.transactionManager = mgr

			pm.partitions[partitionKey] = ptn

			pm.partitionsWG.Add(1)
			go func() {
				if err := mgr.Run(); err != nil {
					pm.logger.WithError(err).Error("partition failed")
				}

				// In the event that TransactionManager stops running, a new TransactionManager will
				// need to be started in order to continue processing transactions. The partition is
				// deleted allowing the next transaction for the repository to create a new partition
				// and TransactionManager.
				pm.mu.Lock()
				delete(pm.partitions, partitionKey)
				pm.mu.Unlock()

				close(ptn.shutdown)

				if err := os.RemoveAll(stagingDir); err != nil {
					pm.logger.WithError(err).Error("failed removing partition's staging directory")
				}

				pm.partitionsWG.Done()
			}()
		}

		if ptn.shuttingDown {
			// If the partition is in the process of shutting down, the partition should not be
			// used. The lock is released while waiting for the partition to complete shutdown as to
			// not block other partitions from processing transactions. Once shutdown is complete, a
			// new attempt is made to get a valid partition.
			pm.mu.Unlock()
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-ptn.shutdown:
			}

			continue
		}

		ptn.pendingTransactionCount++
		pm.mu.Unlock()

		transaction, err := ptn.transactionManager.Begin(ctx)
		if err != nil {
			// The pending transaction count needs to be decremented since the transaction is no longer
			// inflight. A transaction failing does not necessarily mean the transaction manager has
			// stopped running. Consequently, if there are no other pending transactions the partition
			// should be stopped.
			pm.transactionFinalizerFactory(ptn)()

			return nil, err
		}

		return transaction, nil
	}
}

// Stop stops transaction processing for all running transaction managers and waits for shutdown
// completion.
func (pm *PartitionManager) Stop() {
	pm.mu.Lock()
	// Mark the PartitionManager as stopped so no new transactions can begin anymore. This
	// also means no more partitions are spawned.
	pm.stopped = true
	for _, ptn := range pm.partitions {
		// Stop all partitions.
		ptn.stop()
	}
	pm.mu.Unlock()

	// Wait for all goroutines to complete.
	pm.partitionsWG.Wait()
}

// stop stops the partition's transaction manager.
func (ptn *partition) stop() {
	ptn.shuttingDown = true
	ptn.transactionManager.Stop()
}

// transactionFinalizerFactory is executed when a transaction completes. The pending transaction counter
// for the partition is decremented by one and TransactionManager stopped if there are no longer
// any pending transactions.
func (pm *PartitionManager) transactionFinalizerFactory(ptn *partition) func() {
	return func() {
		pm.mu.Lock()
		defer pm.mu.Unlock()

		ptn.pendingTransactionCount--
		if ptn.pendingTransactionCount == 0 {
			ptn.stop()
		}
	}
}