Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-12-13 18:58:21 +0300
committerJohn Cai <jcai@gitlab.com>2021-12-13 18:58:21 +0300
commit15a1323ae16dffd3ba6b078f6cb81e283a96c72d (patch)
tree64e7810c28e3e27fc2245924619fab0888ec1998
parent896612200106b19068dfdb4f8f67de0dd4807c0a (diff)
parentdffc3458ce32ede5ccfea0ade0282962b3370d38 (diff)
Merge branch 'jc-replicate-immediately' into 'master'
praefect: replicate immediately after track-repository Closes #3892 and #3909 See merge request gitlab-org/gitaly!4183
-rw-r--r--cmd/praefect/subcmd.go2
-rw-r--r--cmd/praefect/subcmd_track_repository.go97
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go176
-rw-r--r--internal/praefect/replicator.go7
4 files changed, 198 insertions, 84 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index f6b63ee2c..b11d155bd 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -40,7 +40,7 @@ var subcommands = map[string]subcmd{
acceptDatalossCmdName: &acceptDatalossSubcommand{},
setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout),
removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout),
- trackRepositoryCmdName: newTrackRepository(logger),
+ trackRepositoryCmdName: newTrackRepository(logger, os.Stdout),
listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
checkCmdName: newCheckSubcommand(
os.Stdout,
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index 810df39a8..ec4e3489a 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -6,14 +6,18 @@ import (
"errors"
"flag"
"fmt"
+ "io"
"math/rand"
"time"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
@@ -24,16 +28,18 @@ const (
)
type trackRepository struct {
+ w io.Writer
logger logrus.FieldLogger
virtualStorage string
relativePath string
authoritativeStorage string
+ replicateImmediately bool
}
var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
-func newTrackRepository(logger logrus.FieldLogger) *trackRepository {
- return &trackRepository{logger: logger}
+func newTrackRepository(logger logrus.FieldLogger, w io.Writer) *trackRepository {
+ return &trackRepository{w: w, logger: logger}
}
func (cmd *trackRepository) FlagSet() *flag.FlagSet {
@@ -41,11 +47,14 @@ func (cmd *trackRepository) FlagSet() *flag.FlagSet {
fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative")
+ fs.BoolVar(&cmd.replicateImmediately, "replicate-immediately", false, "kick off a replication immediately")
fs.Usage = func() {
printfErr("Description:\n" +
" This command adds a given repository to be tracked by Praefect.\n" +
- " It checks if the repository exists on disk on the authoritative storage, " +
- " and whether database records are absent from tracking the repository.")
+ " It checks if the repository exists on disk on the authoritative storage,\n" +
+ " and whether database records are absent from tracking the repository.\n" +
+ " If -replicate-immediately is used, the command will attempt to replicate the repository to the secondaries.\n" +
+ " Otherwise, replication jobs will be created and will be excuted eventually by Praefect itself.\n")
fs.PrintDefaults()
}
return fs
@@ -137,18 +146,79 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
}
- if err := cmd.trackRepository(
+ nodeSet, err := praefect.DialNodes(
ctx,
- datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
+ cfg.VirtualStorages,
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ if err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+ defer nodeSet.Close()
+
+ store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ replMgr := praefect.NewReplMgr(
+ cmd.logger,
+ cfg.StorageNames(),
+ queue,
+ store,
+ praefect.StaticHealthChecker(cfg.StorageNames()),
+ nodeSet,
+ )
+
+ repositoryID, err := cmd.trackRepository(
+ ctx,
+ store,
+ queue,
primary,
secondaries,
savePrimary,
variableReplicationFactorEnabled,
- ); err != nil {
+ )
+ if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
- logger.Debug("finished adding new repository to be tracked in praefect database.")
+ fmt.Fprintln(cmd.w, "Finished adding new repository to be tracked in praefect database.")
+
+ correlationID := correlation.SafeRandomID()
+ connections := nodeSet.Connections()[cmd.virtualStorage]
+
+ for _, secondary := range secondaries {
+ event := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RepositoryID: repositoryID,
+ Change: datastore.UpdateRepo,
+ RelativePath: cmd.relativePath,
+ VirtualStorage: cmd.virtualStorage,
+ SourceNodeStorage: primary,
+ TargetNodeStorage: secondary,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ }
+ if cmd.replicateImmediately {
+ conn, ok := connections[secondary]
+ if !ok {
+ return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary)
+ }
+
+ if err := replMgr.ProcessReplicationEvent(ctx, event, conn); err != nil {
+ return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err)
+ }
+
+ fmt.Fprintf(cmd.w, "Finished replicating repository to %q.\n", secondary)
+ continue
+ }
+
+ if _, err := queue.Enqueue(ctx, event); err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+ fmt.Fprintf(cmd.w, "Added replication job to replicate repository to %q.\n", secondary)
+ }
return nil
}
@@ -156,19 +226,20 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
func (cmd *trackRepository) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
+ queue datastore.ReplicationEventQueue,
primary string,
secondaries []string,
savePrimary bool,
variableReplicationFactorEnabled bool,
-) error {
+) (int64, error) {
repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath)
if err != nil {
if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
cmd.logger.Print("repository is already tracked in praefect database")
- return nil
+ return 0, nil
}
- return fmt.Errorf("ReserveRepositoryID: %w", err)
+ return 0, fmt.Errorf("ReserveRepositoryID: %w", err)
}
if err := ds.CreateRepository(
@@ -183,10 +254,10 @@ func (cmd *trackRepository) trackRepository(
savePrimary,
variableReplicationFactorEnabled,
); err != nil {
- return fmt.Errorf("CreateRepository: %w", err)
+ return 0, fmt.Errorf("CreateRepository: %w", err)
}
- return nil
+ return repositoryID, nil
}
func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 0e2667af3..ed55c1ce7 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -1,8 +1,8 @@
package main
import (
+ "bytes"
"flag"
- "fmt"
"io"
"path/filepath"
"testing"
@@ -76,6 +76,8 @@ func TestAddRepository_Exec(t *testing.T) {
t.Parallel()
g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+ testcfg.BuildGitalyHooks(t, g2Cfg)
+ testcfg.BuildGitalySSH(t, g2Cfg)
g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
@@ -102,6 +104,10 @@ func TestAddRepository_Exec(t *testing.T) {
},
},
DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
}
gitalyCC, err := client.Dial(g1Addr, nil)
@@ -126,67 +132,77 @@ func TestAddRepository_Exec(t *testing.T) {
return err
}
- testCases := map[string]struct {
- failoverConfig config.Failover
- authoritativeStorage string
- }{
- "sql election": {
- failoverConfig: config.Failover{
- Enabled: true,
- ElectionStrategy: config.ElectionStrategySQL,
- },
- authoritativeStorage: "",
- },
- "per repository election": {
- failoverConfig: config.Failover{
- Enabled: true,
- ElectionStrategy: config.ElectionStrategyPerRepository,
- },
- authoritativeStorage: g1Cfg.Storages[0].Name,
- },
- }
-
+ authoritativeStorage := g1Cfg.Storages[0].Name
logger := testhelper.NewDiscardingLogger(t)
- for tn, tc := range testCases {
- t.Run(tn, func(t *testing.T) {
- addCmdConf := conf
- addCmdConf.Failover = tc.failoverConfig
- t.Run("ok", func(t *testing.T) {
- nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ t.Run("ok", func(t *testing.T) {
+ testCases := []struct {
+ relativePath string
+ desc string
+ replicateImmediately bool
+ expectedOutput string
+ }{
+ {
+ relativePath: "path/to/test/repo1",
+ desc: "force replication",
+ replicateImmediately: true,
+ expectedOutput: "Finished replicating repository to \"gitaly-2\".\n",
+ },
+ {
+ relativePath: "path/to/test/repo2",
+ desc: "do not force replication",
+ replicateImmediately: false,
+ expectedOutput: "Added replication job to replicate repository to \"gitaly-2\".\n",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
rmRepoCmd := &removeRepository{
logger: logger,
virtualStorage: virtualStorageName,
- relativePath: relativePath,
+ relativePath: tc.relativePath,
w: io.Discard,
apply: true,
}
-
require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
// create the repo on Gitaly without Praefect knowing
- require.NoError(t, createRepoThroughGitaly1(relativePath))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+ require.NoError(t, createRepoThroughGitaly1(tc.relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, tc.relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, tc.relativePath))
+ var stdout bytes.Buffer
addRepoCmd := &trackRepository{
logger: logger,
virtualStorage: virtualStorageName,
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
+ relativePath: tc.relativePath,
+ authoritativeStorage: authoritativeStorage,
+ replicateImmediately: tc.replicateImmediately,
+ w: &stdout,
}
- require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
as := datastore.NewAssignmentStore(db, conf.StorageNames())
- repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath)
+ repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, tc.relativePath)
require.NoError(t, err)
assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
@@ -195,45 +211,67 @@ func TestAddRepository_Exec(t *testing.T) {
assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
- exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath)
require.NoError(t, err)
assert.True(t, exists)
- })
-
- t.Run("repository does not exist", func(t *testing.T) {
- relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
-
- cmd := &trackRepository{
- logger: testhelper.NewDiscardingLogger(t),
- virtualStorage: "praefect",
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
+ assert.Contains(t, stdout.String(), tc.expectedOutput)
+
+ if !tc.replicateImmediately {
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1)
+ require.NoError(t, err)
+ assert.Len(t, events, 1)
+ assert.Equal(t, tc.relativePath, events[0].Job.RelativePath)
}
-
- assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist)
})
+ }
+ })
- t.Run("records already exist", func(t *testing.T) {
- relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
+ t.Run("repository does not exist", func(t *testing.T) {
+ relativePath := "path/to/test/repo_1"
- require.NoError(t, createRepoThroughGitaly1(relativePath))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+ cmd := &trackRepository{
+ w: &bytes.Buffer{},
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: "praefect",
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
- ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
- require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+ assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist)
+ })
- cmd := &trackRepository{
- logger: testhelper.NewDiscardingLogger(t),
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
+ t.Run("records already exist", func(t *testing.T) {
+ relativePath := "path/to/test/repo_2"
- assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
- })
- })
- }
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.NoError(t, ds.CreateRepository(
+ ctx,
+ id,
+ virtualStorageName,
+ relativePath,
+ relativePath,
+ g1Cfg.Storages[0].Name,
+ nil,
+ nil,
+ true,
+ true,
+ ))
+
+ cmd := &trackRepository{
+ w: &bytes.Buffer{},
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
+
+ assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ })
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 09f797d15..2660373fe 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -440,7 +440,7 @@ func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(log logrus.FieldLogger, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
queue: queue,
@@ -780,6 +780,11 @@ func (r ReplMgr) backfillReplicaPath(ctx context.Context, event datastore.Replic
}
}
+// ProcessReplicationEvent processes a single replication event given the target client connection
+func (r ReplMgr) ProcessReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
+ return r.processReplicationEvent(ctx, event, targetCC)
+}
+
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
var cancel func()