Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-05-20 12:29:30 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-05-20 12:29:30 +0300
commit507b683fecfb647788417f226323e58a553b9d32 (patch)
tree99522555d064270b51e19d03c2d34cd35dbc2f17
parent0719f8890b72c4a9cb4697c47968b54f941bbe55 (diff)
parentbe5fb6b267c1b0cae3ac18c707de39e48bf3624f (diff)
Merge branch 'smh-run-request-finalizers-on-timeout' into 'master'
Disjoint request finalizer timeout from the RPC Closes #3624 See merge request gitlab-org/gitaly!3515
-rw-r--r--internal/command/command.go12
-rw-r--r--internal/command/command_test.go84
-rw-r--r--internal/gitaly/service/operations/squash.go4
-rw-r--r--internal/helper/suppressed_context.go18
-rw-r--r--internal/helper/suppressed_context_test.go58
-rw-r--r--internal/praefect/coordinator.go8
-rw-r--r--internal/praefect/coordinator_test.go92
7 files changed, 178 insertions, 98 deletions
diff --git a/internal/command/command.go b/internal/command/command.go
index 1a2bc8a9e..5dd4813cc 100644
--- a/internal/command/command.go
+++ b/internal/command/command.go
@@ -397,15 +397,3 @@ func (c *Command) Env() []string {
func (c *Command) Pid() int {
return c.cmd.Process.Pid
}
-
-// suppressedContext suppresses cancellation or expiration of the context.
-type suppressedContext struct{ context.Context }
-
-func (suppressedContext) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false }
-
-func (suppressedContext) Done() <-chan struct{} { return nil }
-
-func (suppressedContext) Err() error { return nil }
-
-// SuppressCancellation returns a context that suppresses cancellation or expiration of the parent context.
-func SuppressCancellation(ctx context.Context) context.Context { return suppressedContext{ctx} }
diff --git a/internal/command/command_test.go b/internal/command/command_test.go
index e3e66b55b..4c2bcaec4 100644
--- a/internal/command/command_test.go
+++ b/internal/command/command_test.go
@@ -392,87 +392,3 @@ func extractMessage(logMessage string) string {
return subMatches[1]
}
-
-func TestUncancellableContext(t *testing.T) {
- t.Run("cancellation", func(t *testing.T) {
- parent, cancel := context.WithCancel(context.Background())
- ctx := SuppressCancellation(parent)
-
- cancel()
- require.Equal(t, context.Canceled, parent.Err(), "sanity check: context should be cancelled")
-
- require.Nil(t, ctx.Err(), "cancellation of the parent shouldn't propagate via Err")
- select {
- case <-ctx.Done():
- require.FailNow(t, "cancellation of the parent shouldn't propagate via Done")
- default:
- }
- })
-
- t.Run("timeout", func(t *testing.T) {
- parent, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
- defer cancel()
-
- ctx := SuppressCancellation(parent)
-
- time.Sleep(time.Millisecond)
- require.Equal(t, context.DeadlineExceeded, parent.Err(), "sanity check: context should be expired after awaiting")
-
- require.Nil(t, ctx.Err(), "timeout on the parent shouldn't propagate via Err")
- select {
- case <-ctx.Done():
- require.FailNow(t, "timeout on the parent shouldn't propagate via Done")
- default:
- }
- _, ok := ctx.Deadline()
- require.False(t, ok, "no deadline should be set")
- })
-
- t.Run("re-cancellation", func(t *testing.T) {
- parent, cancelParent := context.WithCancel(context.Background())
- ctx := SuppressCancellation(parent)
- child, cancelChild := context.WithCancel(ctx)
- defer cancelChild()
-
- cancelParent()
- select {
- case <-child.Done():
- require.FailNow(t, "uncancellable context should suppress cancellation on the parent")
- default:
- // all good
- }
-
- cancelChild()
- require.Equal(t, context.Canceled, child.Err(), "context derived from cancellable could be cancelled")
-
- select {
- case <-child.Done():
- // all good
- default:
- require.FailNow(t, "child context should be canceled despite if parent is uncancellable")
- }
- })
-
- t.Run("context values are preserved", func(t *testing.T) {
- type ctxKey string
- k1 := ctxKey("1")
- k2 := ctxKey("2")
-
- parent, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- parent = context.WithValue(parent, k1, 1)
- parent = context.WithValue(parent, k2, "two")
-
- ctx := SuppressCancellation(parent)
-
- require.Equal(t, 1, ctx.Value(k1))
- require.Equal(t, "two", ctx.Value(k2))
-
- cancel()
- require.Equal(t, context.Canceled, parent.Err(), "sanity check: context should be cancelled")
-
- require.Equal(t, 1, ctx.Value(k1), "should be accessible after parent context cancellation")
- require.Equal(t, "two", ctx.Value(k2))
- })
-}
diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go
index da5921607..92111fecb 100644
--- a/internal/gitaly/service/operations/squash.go
+++ b/internal/gitaly/service/operations/squash.go
@@ -163,7 +163,7 @@ func (s *Server) userSquashWithDiffInFiles(ctx context.Context, req *gitalypb.Us
}
defer func(worktreeName string) {
- ctx, cancel := context.WithCancel(command.SuppressCancellation(ctx))
+ ctx, cancel := context.WithCancel(helper.SuppressCancellation(ctx))
defer cancel()
if err := s.removeWorktree(ctx, repo, worktreeName); err != nil {
@@ -263,7 +263,7 @@ func (s *Server) userSquashWithNoDiff(ctx context.Context, req *gitalypb.UserSqu
}
defer func(worktreeName string) {
- ctx, cancel := context.WithCancel(command.SuppressCancellation(ctx))
+ ctx, cancel := context.WithCancel(helper.SuppressCancellation(ctx))
defer cancel()
if err := s.removeWorktree(ctx, repo, worktreeName); err != nil {
diff --git a/internal/helper/suppressed_context.go b/internal/helper/suppressed_context.go
new file mode 100644
index 000000000..3806b91d8
--- /dev/null
+++ b/internal/helper/suppressed_context.go
@@ -0,0 +1,18 @@
+package helper
+
+import (
+ "context"
+ "time"
+)
+
+// suppressedContext suppresses cancellation or expiration of the context.
+type suppressedContext struct{ context.Context }
+
+func (suppressedContext) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false }
+
+func (suppressedContext) Done() <-chan struct{} { return nil }
+
+func (suppressedContext) Err() error { return nil }
+
+// SuppressCancellation returns a context that suppresses cancellation or expiration of the parent context.
+func SuppressCancellation(ctx context.Context) context.Context { return suppressedContext{ctx} }
diff --git a/internal/helper/suppressed_context_test.go b/internal/helper/suppressed_context_test.go
new file mode 100644
index 000000000..7c910e6ca
--- /dev/null
+++ b/internal/helper/suppressed_context_test.go
@@ -0,0 +1,58 @@
+package helper
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestSuppressCancellation(t *testing.T) {
+ type key struct{}
+
+ parentDeadline := time.Now()
+ parentCtx, cancel := context.WithDeadline(context.WithValue(context.Background(), key{}, "value"), parentDeadline)
+ cancel()
+
+ t.Run("no deadline on suppressed context", func(t *testing.T) {
+ ctx := SuppressCancellation(parentCtx)
+
+ deadline, ok := ctx.Deadline()
+ require.False(t, ok)
+ require.Equal(t, time.Time{}, deadline)
+
+ require.Nil(t, ctx.Done())
+ require.NoError(t, ctx.Err())
+
+ require.Equal(t, ctx.Value(key{}), "value")
+ })
+
+ t.Run("with deadline on suppressed context", func(t *testing.T) {
+ newDeadline := parentDeadline.Add(24 * time.Hour)
+ ctx, cancel := context.WithDeadline(SuppressCancellation(parentCtx), newDeadline)
+
+ deadline, ok := ctx.Deadline()
+ require.True(t, ok)
+ require.Equal(t, newDeadline, deadline)
+
+ require.NoError(t, ctx.Err())
+ select {
+ case <-ctx.Done():
+ t.Fatal("context should not be done yet")
+ default:
+ require.NotNil(t, ctx.Done())
+ }
+
+ require.Equal(t, ctx.Value(key{}), "value")
+
+ cancel()
+
+ require.Error(t, context.Canceled)
+ select {
+ case <-ctx.Done():
+ default:
+ t.Fatal("context should have been done")
+ }
+ })
+}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index ea53a5fd4..3286b3b31 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
+ "time"
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
@@ -851,6 +852,13 @@ func (c *Coordinator) newRequestFinalizer(
cause string,
) func() error {
return func() error {
+ // Use a separate timeout for the database operations. If the request times out, the passed in context is
+ // canceled. We need to perform the database updates regardless whether the request was canceled or not as
+ // the primary replica could have been dirtied and secondaries become outdated. Otherwise we'd have no idea of
+ // the possible changes performed on the disk.
+ ctx, cancel := context.WithTimeout(helper.SuppressCancellation(ctx), 30*time.Second)
+ defer cancel()
+
log := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
"replication.cause": cause,
"replication.change": change,
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index fdb4b543d..6f85bd875 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1798,3 +1798,95 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
})
}
}
+
+func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
+ type ctxKey struct{}
+
+ parentDeadline := time.Now()
+ ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), ctxKey{}, "value"), parentDeadline)
+ cancel()
+
+ requireSuppressedCancellation := func(t testing.TB, ctx context.Context) {
+ deadline, ok := ctx.Deadline()
+ require.True(t, ok)
+ require.NotEqual(t, parentDeadline, deadline)
+ require.Equal(t, ctx.Value(ctxKey{}), "value")
+ require.Nil(t, ctx.Err())
+ select {
+ case <-ctx.Done():
+ t.Fatal("context should not be canceled if the parent is canceled")
+ default:
+ require.NotNil(t, ctx.Done())
+ }
+ }
+
+ err := errors.New("error")
+
+ for _, tc := range []struct {
+ change datastore.ChangeType
+ errMsg string
+ }{
+ {
+ change: datastore.UpdateRepo,
+ errMsg: "increment generation: error",
+ },
+ {
+ change: datastore.RenameRepo,
+ errMsg: "rename repository: error",
+ },
+ {
+ change: datastore.DeleteRepo,
+ errMsg: "delete repository: error",
+ },
+ {
+ change: "replication jobs only",
+ errMsg: "enqueue replication event: error",
+ },
+ } {
+ t.Run(string(tc.change), func(t *testing.T) {
+ require.EqualError(t,
+ NewCoordinator(
+ &datastore.MockReplicationEventQueue{
+ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) {
+ requireSuppressedCancellation(t, ctx)
+ return datastore.ReplicationEvent{}, err
+ },
+ },
+ datastore.MockRepositoryStore{
+ IncrementGenerationFunc: func(ctx context.Context, _, _, _ string, _ []string) error {
+ requireSuppressedCancellation(t, ctx)
+ return err
+ },
+ RenameRepositoryFunc: func(ctx context.Context, _, _, _, _ string) error {
+ requireSuppressedCancellation(t, ctx)
+ return err
+ },
+ DeleteRepositoryFunc: func(ctx context.Context, _, _, _ string) error {
+ requireSuppressedCancellation(t, ctx)
+ return err
+ },
+ CreateRepositoryFunc: func(ctx context.Context, _, _, _ string, _, _ []string, _, _ bool) error {
+ requireSuppressedCancellation(t, ctx)
+ return err
+ },
+ },
+ nil,
+ nil,
+ config.Config{},
+ nil,
+ ).newRequestFinalizer(
+ ctx,
+ "virtual storage",
+ &gitalypb.Repository{},
+ "primary",
+ []string{},
+ []string{"secondary"},
+ tc.change,
+ datastore.Params{"RelativePath": "relative-path"},
+ "rpc-name",
+ )(),
+ tc.errMsg,
+ )
+ })
+ }
+}