diff options
-rw-r--r-- | cmd/praefect/subcmd_accept_dataloss_test.go | 20 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss_test.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_set_replication_factor_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/server.go | 2 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 23 |
5 files changed, 5 insertions, 44 deletions
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index 8f8531833..cb3e9bf2b 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "testing" "github.com/stretchr/testify/require" @@ -54,22 +53,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, generation)) } - q := &datastore.MockReplicationEventQueue{ - EnqueueFunc: func(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { - if event.Job.TargetNodeStorage == st2 { - return event, fmt.Errorf("replication event scheduled for authoritative storage %q", st2) - } - - generation, err := rs.GetGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) - if err != nil { - return event, err - } - - return event, rs.SetGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage, generation) - }, - } - - ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, q, rs, nil, nil, nil))}) + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, rs, nil, nil, nil))}) defer clean() conf.SocketPath = ln.Addr().String() @@ -152,7 +136,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { desc: "success", args: []string{"-virtual-storage=test-virtual-storage-1", "-repository=test-repository-1", "-authoritative-storage=test-physical-storage-2"}, matchError: matchNoError(), - expectedGenerations: map[string]int{st1: 2, st2: 2, st3: 2}, + expectedGenerations: map[string]int{st1: 1, st2: 2, st3: datastore.GenerationUnknown}, }, } { t.Run(tc.desc, func(t *testing.T) { diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index a66f69d11..19acc6459 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -80,7 +80,7 @@ func TestDatalossSubcommand(t *testing.T) { require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0)) ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(cfg, nil, gs, nil, nil, nil))}) + registerPraefectInfoServer(info.NewServer(cfg, gs, nil, nil, nil))}) defer clean() for _, tc := range []struct { desc string diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index f55316de0..5b0d07605 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -96,7 +96,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( - info.NewServer(config.Config{}, nil, nil, store, nil, nil), + info.NewServer(config.Config{}, nil, store, nil, nil), )}) defer clean() diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 017d6742f..4eb4b1c4c 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -154,7 +154,7 @@ func registerServices( ) { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, conns)) - gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(conf, queue, rs, assignmentStore, conns, primaryGetter)) + gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(conf, rs, assignmentStore, conns, primaryGetter)) gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 3e32d9135..8ab27ab55 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -37,7 +37,6 @@ type PrimaryGetter interface { type Server struct { gitalypb.UnimplementedPraefectInfoServiceServer conf config.Config - queue datastore.ReplicationEventQueue rs datastore.RepositoryStore assignmentStore AssignmentStore conns service.Connections @@ -47,7 +46,6 @@ type Server struct { // NewServer creates a new instance of a grpc InfoServiceServer func NewServer( conf config.Config, - queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, assignmentStore AssignmentStore, conns service.Connections, @@ -55,7 +53,6 @@ func NewServer( ) gitalypb.PraefectInfoServiceServer { return &Server{ conf: conf, - queue: queue, rs: rs, assignmentStore: assignmentStore, conns: conns, @@ -89,25 +86,5 @@ func (s *Server) SetAuthoritativeStorage(ctx context.Context, req *gitalypb.SetA return nil, helper.ErrInternal(err) } - // Schedule replication jobs to other physical storages to get them consistent with the - // new authoritative repository. - for _, storage := range storages { - if storage == req.AuthoritativeStorage { - continue - } - - if _, err := s.queue.Enqueue(ctx, datastore.ReplicationEvent{ - Job: datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: req.VirtualStorage, - RelativePath: req.RelativePath, - SourceNodeStorage: req.AuthoritativeStorage, - TargetNodeStorage: storage, - }, - }); err != nil { - return nil, helper.ErrInternal(err) - } - } - return &gitalypb.SetAuthoritativeStorageResponse{}, nil } |