diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-19 19:42:51 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-20 11:32:36 +0300 |
commit | be5fb6b267c1b0cae3ac18c707de39e48bf3624f (patch) | |
tree | 6562148dff79ace43488dc99a9881fc7ea09eb2a | |
parent | ba363664cfa363f50e76c6d1dcaf4e275f9f37f3 (diff) |
Disjoint request finalizer timeout from the RPC
Request finalizer currently uses the same context as the RPC. If the
RPC times out, the request finalizer's context is also canceled. This
prevents the request finalizer from updating the database state when
an RPC has timed out. The RPC could have performed some disk modifications
even if it timed out so we have to update the database state. This
commit gives the request finalizer a 30 second timeout independent of the
RPC that triggered the finalizer. This should be long enough to run the
database operations. The database state could still be left unupdated
due to various reasons like Praefect crashing or reaching it's graceful
shutdown limit.
Changelog: fixed
-rw-r--r-- | internal/praefect/coordinator.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 92 |
2 files changed, 100 insertions, 0 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 107b61f1a..bd23b22fb 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" @@ -844,6 +845,13 @@ func (c *Coordinator) newRequestFinalizer( cause string, ) func() error { return func() error { + // Use a separate timeout for the database operations. If the request times out, the passed in context is + // canceled. We need to perform the database updates regardless whether the request was canceled or not as + // the primary replica could have been dirtied and secondaries become outdated. Otherwise we'd have no idea of + // the possible changes performed on the disk. + ctx, cancel := context.WithTimeout(helper.SuppressCancellation(ctx), 30*time.Second) + defer cancel() + log := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ "replication.cause": cause, "replication.change": change, diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index fdb4b543d..6f85bd875 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1798,3 +1798,95 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { }) } } + +func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { + type ctxKey struct{} + + parentDeadline := time.Now() + ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), ctxKey{}, "value"), parentDeadline) + cancel() + + requireSuppressedCancellation := func(t testing.TB, ctx context.Context) { + deadline, ok := ctx.Deadline() + require.True(t, ok) + require.NotEqual(t, parentDeadline, deadline) + require.Equal(t, ctx.Value(ctxKey{}), "value") + require.Nil(t, ctx.Err()) + select { + case <-ctx.Done(): + t.Fatal("context should not be canceled if the parent is canceled") + default: + require.NotNil(t, ctx.Done()) + } + } + + err := errors.New("error") + + for _, tc := range []struct { + change datastore.ChangeType + errMsg string + }{ + { + change: datastore.UpdateRepo, + errMsg: "increment generation: error", + }, + { + change: datastore.RenameRepo, + errMsg: "rename repository: error", + }, + { + change: datastore.DeleteRepo, + errMsg: "delete repository: error", + }, + { + change: "replication jobs only", + errMsg: "enqueue replication event: error", + }, + } { + t.Run(string(tc.change), func(t *testing.T) { + require.EqualError(t, + NewCoordinator( + &datastore.MockReplicationEventQueue{ + EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { + requireSuppressedCancellation(t, ctx) + return datastore.ReplicationEvent{}, err + }, + }, + datastore.MockRepositoryStore{ + IncrementGenerationFunc: func(ctx context.Context, _, _, _ string, _ []string) error { + requireSuppressedCancellation(t, ctx) + return err + }, + RenameRepositoryFunc: func(ctx context.Context, _, _, _, _ string) error { + requireSuppressedCancellation(t, ctx) + return err + }, + DeleteRepositoryFunc: func(ctx context.Context, _, _, _ string) error { + requireSuppressedCancellation(t, ctx) + return err + }, + CreateRepositoryFunc: func(ctx context.Context, _, _, _ string, _, _ []string, _, _ bool) error { + requireSuppressedCancellation(t, ctx) + return err + }, + }, + nil, + nil, + config.Config{}, + nil, + ).newRequestFinalizer( + ctx, + "virtual storage", + &gitalypb.Repository{}, + "primary", + []string{}, + []string{"secondary"}, + tc.change, + datastore.Params{"RelativePath": "relative-path"}, + "rpc-name", + )(), + tc.errMsg, + ) + }) + } +} |