diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-05-20 12:29:30 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-05-20 12:29:30 +0300 |
commit | 507b683fecfb647788417f226323e58a553b9d32 (patch) | |
tree | 99522555d064270b51e19d03c2d34cd35dbc2f17 | |
parent | 0719f8890b72c4a9cb4697c47968b54f941bbe55 (diff) | |
parent | be5fb6b267c1b0cae3ac18c707de39e48bf3624f (diff) |
Merge branch 'smh-run-request-finalizers-on-timeout' into 'master'
Disjoint request finalizer timeout from the RPC
Closes #3624
See merge request gitlab-org/gitaly!3515
-rw-r--r-- | internal/command/command.go | 12 | ||||
-rw-r--r-- | internal/command/command_test.go | 84 | ||||
-rw-r--r-- | internal/gitaly/service/operations/squash.go | 4 | ||||
-rw-r--r-- | internal/helper/suppressed_context.go | 18 | ||||
-rw-r--r-- | internal/helper/suppressed_context_test.go | 58 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 92 |
7 files changed, 178 insertions, 98 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index 1a2bc8a9e..5dd4813cc 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -397,15 +397,3 @@ func (c *Command) Env() []string { func (c *Command) Pid() int { return c.cmd.Process.Pid } - -// suppressedContext suppresses cancellation or expiration of the context. -type suppressedContext struct{ context.Context } - -func (suppressedContext) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false } - -func (suppressedContext) Done() <-chan struct{} { return nil } - -func (suppressedContext) Err() error { return nil } - -// SuppressCancellation returns a context that suppresses cancellation or expiration of the parent context. -func SuppressCancellation(ctx context.Context) context.Context { return suppressedContext{ctx} } diff --git a/internal/command/command_test.go b/internal/command/command_test.go index e3e66b55b..4c2bcaec4 100644 --- a/internal/command/command_test.go +++ b/internal/command/command_test.go @@ -392,87 +392,3 @@ func extractMessage(logMessage string) string { return subMatches[1] } - -func TestUncancellableContext(t *testing.T) { - t.Run("cancellation", func(t *testing.T) { - parent, cancel := context.WithCancel(context.Background()) - ctx := SuppressCancellation(parent) - - cancel() - require.Equal(t, context.Canceled, parent.Err(), "sanity check: context should be cancelled") - - require.Nil(t, ctx.Err(), "cancellation of the parent shouldn't propagate via Err") - select { - case <-ctx.Done(): - require.FailNow(t, "cancellation of the parent shouldn't propagate via Done") - default: - } - }) - - t.Run("timeout", func(t *testing.T) { - parent, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - defer cancel() - - ctx := SuppressCancellation(parent) - - time.Sleep(time.Millisecond) - require.Equal(t, context.DeadlineExceeded, parent.Err(), "sanity check: context should be expired after awaiting") - - require.Nil(t, ctx.Err(), "timeout on the parent shouldn't propagate via Err") - select { - case <-ctx.Done(): - require.FailNow(t, "timeout on the parent shouldn't propagate via Done") - default: - } - _, ok := ctx.Deadline() - require.False(t, ok, "no deadline should be set") - }) - - t.Run("re-cancellation", func(t *testing.T) { - parent, cancelParent := context.WithCancel(context.Background()) - ctx := SuppressCancellation(parent) - child, cancelChild := context.WithCancel(ctx) - defer cancelChild() - - cancelParent() - select { - case <-child.Done(): - require.FailNow(t, "uncancellable context should suppress cancellation on the parent") - default: - // all good - } - - cancelChild() - require.Equal(t, context.Canceled, child.Err(), "context derived from cancellable could be cancelled") - - select { - case <-child.Done(): - // all good - default: - require.FailNow(t, "child context should be canceled despite if parent is uncancellable") - } - }) - - t.Run("context values are preserved", func(t *testing.T) { - type ctxKey string - k1 := ctxKey("1") - k2 := ctxKey("2") - - parent, cancel := context.WithCancel(context.Background()) - defer cancel() - - parent = context.WithValue(parent, k1, 1) - parent = context.WithValue(parent, k2, "two") - - ctx := SuppressCancellation(parent) - - require.Equal(t, 1, ctx.Value(k1)) - require.Equal(t, "two", ctx.Value(k2)) - - cancel() - require.Equal(t, context.Canceled, parent.Err(), "sanity check: context should be cancelled") - - require.Equal(t, 1, ctx.Value(k1), "should be accessible after parent context cancellation") - require.Equal(t, "two", ctx.Value(k2)) - }) -} diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go index da5921607..92111fecb 100644 --- a/internal/gitaly/service/operations/squash.go +++ b/internal/gitaly/service/operations/squash.go @@ -163,7 +163,7 @@ func (s *Server) userSquashWithDiffInFiles(ctx context.Context, req *gitalypb.Us } defer func(worktreeName string) { - ctx, cancel := context.WithCancel(command.SuppressCancellation(ctx)) + ctx, cancel := context.WithCancel(helper.SuppressCancellation(ctx)) defer cancel() if err := s.removeWorktree(ctx, repo, worktreeName); err != nil { @@ -263,7 +263,7 @@ func (s *Server) userSquashWithNoDiff(ctx context.Context, req *gitalypb.UserSqu } defer func(worktreeName string) { - ctx, cancel := context.WithCancel(command.SuppressCancellation(ctx)) + ctx, cancel := context.WithCancel(helper.SuppressCancellation(ctx)) defer cancel() if err := s.removeWorktree(ctx, repo, worktreeName); err != nil { diff --git a/internal/helper/suppressed_context.go b/internal/helper/suppressed_context.go new file mode 100644 index 000000000..3806b91d8 --- /dev/null +++ b/internal/helper/suppressed_context.go @@ -0,0 +1,18 @@ +package helper + +import ( + "context" + "time" +) + +// suppressedContext suppresses cancellation or expiration of the context. +type suppressedContext struct{ context.Context } + +func (suppressedContext) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false } + +func (suppressedContext) Done() <-chan struct{} { return nil } + +func (suppressedContext) Err() error { return nil } + +// SuppressCancellation returns a context that suppresses cancellation or expiration of the parent context. +func SuppressCancellation(ctx context.Context) context.Context { return suppressedContext{ctx} } diff --git a/internal/helper/suppressed_context_test.go b/internal/helper/suppressed_context_test.go new file mode 100644 index 000000000..7c910e6ca --- /dev/null +++ b/internal/helper/suppressed_context_test.go @@ -0,0 +1,58 @@ +package helper + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSuppressCancellation(t *testing.T) { + type key struct{} + + parentDeadline := time.Now() + parentCtx, cancel := context.WithDeadline(context.WithValue(context.Background(), key{}, "value"), parentDeadline) + cancel() + + t.Run("no deadline on suppressed context", func(t *testing.T) { + ctx := SuppressCancellation(parentCtx) + + deadline, ok := ctx.Deadline() + require.False(t, ok) + require.Equal(t, time.Time{}, deadline) + + require.Nil(t, ctx.Done()) + require.NoError(t, ctx.Err()) + + require.Equal(t, ctx.Value(key{}), "value") + }) + + t.Run("with deadline on suppressed context", func(t *testing.T) { + newDeadline := parentDeadline.Add(24 * time.Hour) + ctx, cancel := context.WithDeadline(SuppressCancellation(parentCtx), newDeadline) + + deadline, ok := ctx.Deadline() + require.True(t, ok) + require.Equal(t, newDeadline, deadline) + + require.NoError(t, ctx.Err()) + select { + case <-ctx.Done(): + t.Fatal("context should not be done yet") + default: + require.NotNil(t, ctx.Done()) + } + + require.Equal(t, ctx.Value(key{}), "value") + + cancel() + + require.Error(t, context.Canceled) + select { + case <-ctx.Done(): + default: + t.Fatal("context should have been done") + } + }) +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index ea53a5fd4..3286b3b31 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" @@ -851,6 +852,13 @@ func (c *Coordinator) newRequestFinalizer( cause string, ) func() error { return func() error { + // Use a separate timeout for the database operations. If the request times out, the passed in context is + // canceled. We need to perform the database updates regardless whether the request was canceled or not as + // the primary replica could have been dirtied and secondaries become outdated. Otherwise we'd have no idea of + // the possible changes performed on the disk. + ctx, cancel := context.WithTimeout(helper.SuppressCancellation(ctx), 30*time.Second) + defer cancel() + log := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ "replication.cause": cause, "replication.change": change, diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index fdb4b543d..6f85bd875 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1798,3 +1798,95 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { }) } } + +func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { + type ctxKey struct{} + + parentDeadline := time.Now() + ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), ctxKey{}, "value"), parentDeadline) + cancel() + + requireSuppressedCancellation := func(t testing.TB, ctx context.Context) { + deadline, ok := ctx.Deadline() + require.True(t, ok) + require.NotEqual(t, parentDeadline, deadline) + require.Equal(t, ctx.Value(ctxKey{}), "value") + require.Nil(t, ctx.Err()) + select { + case <-ctx.Done(): + t.Fatal("context should not be canceled if the parent is canceled") + default: + require.NotNil(t, ctx.Done()) + } + } + + err := errors.New("error") + + for _, tc := range []struct { + change datastore.ChangeType + errMsg string + }{ + { + change: datastore.UpdateRepo, + errMsg: "increment generation: error", + }, + { + change: datastore.RenameRepo, + errMsg: "rename repository: error", + }, + { + change: datastore.DeleteRepo, + errMsg: "delete repository: error", + }, + { + change: "replication jobs only", + errMsg: "enqueue replication event: error", + }, + } { + t.Run(string(tc.change), func(t *testing.T) { + require.EqualError(t, + NewCoordinator( + &datastore.MockReplicationEventQueue{ + EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { + requireSuppressedCancellation(t, ctx) + return datastore.ReplicationEvent{}, err + }, + }, + datastore.MockRepositoryStore{ + IncrementGenerationFunc: func(ctx context.Context, _, _, _ string, _ []string) error { + requireSuppressedCancellation(t, ctx) + return err + }, + RenameRepositoryFunc: func(ctx context.Context, _, _, _, _ string) error { + requireSuppressedCancellation(t, ctx) + return err + }, + DeleteRepositoryFunc: func(ctx context.Context, _, _, _ string) error { + requireSuppressedCancellation(t, ctx) + return err + }, + CreateRepositoryFunc: func(ctx context.Context, _, _, _ string, _, _ []string, _, _ bool) error { + requireSuppressedCancellation(t, ctx) + return err + }, + }, + nil, + nil, + config.Config{}, + nil, + ).newRequestFinalizer( + ctx, + "virtual storage", + &gitalypb.Repository{}, + "primary", + []string{}, + []string{"secondary"}, + tc.change, + datastore.Params{"RelativePath": "relative-path"}, + "rpc-name", + )(), + tc.errMsg, + ) + }) + } +} |