Welcome to mirror list, hosted at ThFree Co, Russian Federation.

repository.go « repocleaner « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: afc8fd31778ffd2140bb32a4d16122caaccd7c01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package repocleaner

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)

// StateOwner performs check for the existence of the repositories.
type StateOwner interface {
	// DoesntExist returns replica path for each repository that doesn't exist in the database
	// by querying repositories and storage_repositories tables.
	DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]string, error)
}

// Acquirer acquires storage for processing and no any other Acquirer can acquire it again until it is released.
type Acquirer interface {
	// Populate adds provided storage into the pool of entries to acquire.
	Populate(ctx context.Context, virtualStorage, storage string) error
	// AcquireNextStorage acquires next storage based on the inactive time.
	AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*datastore.ClusterPath, func() error, error)
}

// Action is a procedure to be executed on the repositories that doesn't exist in praefect database.
type Action interface {
	// Perform runs actual action for non-existing repositories.
	Perform(ctx context.Context, virtualStorage, storage string, replicaPaths []string) error
}

// Runner scans healthy gitaly nodes for the repositories, verifies if
// found repositories are known by praefect and runs a special action.
type Runner struct {
	cfg           Cfg
	logger        logrus.FieldLogger
	healthChecker praefect.HealthChecker
	conns         praefect.Connections
	walker        *Walker
	stateOwner    StateOwner
	acquirer      Acquirer
	action        Action
}

// Cfg contains set of configuration parameters to run Runner.
type Cfg struct {
	// RunInterval: the check runs if the previous operation was done at least RunInterval before.
	RunInterval time.Duration
	// LivenessInterval: an update runs on the locked entity with provided period to signal that entity is in use.
	LivenessInterval time.Duration
	// RepositoriesInBatch is the number of repositories to pass as a batch for processing.
	RepositoriesInBatch int
}

// NewRunner returns instance of the Runner.
func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.HealthChecker, conns praefect.Connections, stateOwner StateOwner, acquirer Acquirer, action Action) *Runner {
	return &Runner{
		cfg:           cfg,
		logger:        logger.WithField("component", "repocleaner.repository_existence"),
		healthChecker: healthChecker,
		conns:         conns,
		walker:        NewWalker(conns, cfg.RepositoriesInBatch, 24*time.Hour),
		stateOwner:    stateOwner,
		acquirer:      acquirer,
		action:        action,
	}
}

// Run scans healthy gitaly nodes for the repositories, verifies if
// found repositories are known by praefect and runs a special action.
// It runs on each tick of the provided ticker and finishes with context cancellation.
func (gs *Runner) Run(ctx context.Context, ticker helper.Ticker) error {
	gs.logger.Info("started")
	defer gs.logger.Info("completed")

	defer ticker.Stop()

	for virtualStorage, connByStorage := range gs.conns {
		for storage := range connByStorage {
			if err := gs.acquirer.Populate(ctx, virtualStorage, storage); err != nil {
				return fmt.Errorf("populate database: %w", err)
			}
		}
	}

	var tick helper.Ticker
	for {
		// We use a local tick variable to run the first cycle
		// without wait. All the other iterations are waiting
		// for the next tick or context cancellation.
		if tick != nil {
			tick.Reset()
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-tick.C():
			}
		} else {
			tick = ticker
		}

		gs.run(ctx)
	}
}

func (gs *Runner) run(ctx context.Context) {
	clusterPath, release, err := gs.acquirer.AcquireNextStorage(ctx, gs.cfg.RunInterval, gs.cfg.LivenessInterval)
	if err != nil {
		gs.logger.WithError(err).Error("unable to acquire next storage to verify")
		return
	}

	logger := gs.logger
	defer func() {
		if err := release(); err != nil {
			logger.WithError(err).Error("failed to release storage acquired to verify")
		}
	}()

	if clusterPath == nil {
		gs.logger.Debug("no storages to verify")
		return
	}

	logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage)
	err = gs.walker.ExecOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(virtualStorage, storage string, relativePaths []string) error {
		notExisting, err := gs.stateOwner.DoesntExist(ctx, clusterPath.VirtualStorage, clusterPath.Storage, relativePaths)
		if err != nil {
			logger.WithError(err).WithField("repositories", relativePaths).Error("failed to check existence")
			return nil
		}

		if err := gs.action.Perform(ctx, clusterPath.VirtualStorage, clusterPath.Storage, notExisting); err != nil {
			logger.WithError(err).WithField("existence", notExisting).Error("perform action")
			return nil
		}

		return nil
	})
	if err != nil {
		logger.WithError(err).Error("failed to exec action on repositories")
		return
	}
}

func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger {
	return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage})
}

// Walker allows walk by the repositories of the gitaly storage.
type Walker struct {
	conns       praefect.Connections
	gracePeriod time.Duration
	batchSize   int
}

// NewWalker returns a new *Walker instance.
func NewWalker(conns praefect.Connections, batchSize int, gracePeriod time.Duration) *Walker {
	return &Walker{conns: conns, batchSize: batchSize, gracePeriod: gracePeriod}
}

// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action.
// The processing is done in batches to reduce cost of operations.
func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func(string, string, []string) error) error {
	gclient, err := wr.getInternalGitalyClient(virtualStorage, storage)
	if err != nil {
		return fmt.Errorf("setup gitaly client: %w", err)
	}

	resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage})
	if err != nil {
		return fmt.Errorf("unable to walk repos: %w", err)
	}

	batch := make([]string, 0, wr.batchSize)
	for {
		res, err := resp.Recv()
		if err != nil {
			if !errors.Is(err, io.EOF) {
				return fmt.Errorf("failure on walking repos: %w", err)
			}
			break
		}

		// repositories that are in the process of being created, where
		// they do not yet have a record in Praefect.
		if res.GetModificationTime().AsTime().After(time.Now().Add(-wr.gracePeriod)) {
			continue
		}

		batch = append(batch, res.RelativePath)

		if len(batch) == cap(batch) {
			if err := action(virtualStorage, storage, batch); err != nil {
				return err
			}
			batch = batch[:0]
		}
	}
	if len(batch) > 0 {
		if err := action(virtualStorage, storage, batch); err != nil {
			return err
		}
	}
	return nil
}

func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
	conn, found := wr.conns[virtualStorage][storage]
	if !found {
		return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage)
	}
	return gitalypb.NewInternalGitalyClient(conn), nil
}