Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-14 12:15:27 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-20 15:16:41 +0300
commitd781c2e2e76bfcc51cb4b0200d6c59539a574def (patch)
tree712e691d11988d42cfac2b10e15241276292416c
parent23378333c7c567b45824c7e52cb064378abc9e37 (diff)
coordinator: Split out function to compute metadata for Gitaly peers
When proxying a request to Gitaly, then we need to make sure to pass relevant metadata to Gitaly. Right now, this only requires us to convert the incoming metadata into outgoing metadata such that it's correctly passed on as part of any RPC calls. But we need to extend the logic to provide better handling for feature flags. Move the logic into a separate function `streamParametersContext()` to make it easily extensible.
-rw-r--r--internal/praefect/coordinator.go14
-rw-r--r--internal/praefect/coordinator_test.go172
2 files changed, 182 insertions, 4 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 5d9a3a862..27624a7bc 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -406,7 +406,7 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal
metrics.ReadDistribution.WithLabelValues(virtualStorage, node.Storage).Inc()
return proxy.NewStreamParameters(proxy.Destination{
- Ctx: metadata.IncomingToOutgoing(ctx),
+ Ctx: streamParametersContext(ctx),
Conn: node.Connection,
Msg: b,
}, nil, nil, nil), nil
@@ -487,7 +487,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
var finalizers []func() error
primaryDest := proxy.Destination{
- Ctx: metadata.IncomingToOutgoing(ctx),
+ Ctx: streamParametersContext(ctx),
Conn: route.Primary.Connection,
Msg: primaryMessage,
}
@@ -511,7 +511,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
if err != nil {
return nil, err
}
- primaryDest.Ctx = metadata.IncomingToOutgoing(injectedCtx)
+ primaryDest.Ctx = streamParametersContext(injectedCtx)
primaryDest.ErrHandler = func(err error) error {
nodeErrors.Lock()
defer nodeErrors.Unlock()
@@ -532,7 +532,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
}
secondaryDests = append(secondaryDests, proxy.Destination{
- Ctx: metadata.IncomingToOutgoing(injectedCtx),
+ Ctx: streamParametersContext(injectedCtx),
Conn: secondary.Connection,
Msg: secondaryMsg,
ErrHandler: func(err error) error {
@@ -595,6 +595,12 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil
}
+// streamParametersContexts converts the contexts with incoming metadata into a context that is
+// usable by peer Gitaly nodes.
+func streamParametersContext(ctx context.Context) context.Context {
+ return metadata.IncomingToOutgoing(ctx)
+}
+
// StreamDirector determines which downstream servers receive requests
func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
// For phase 1, we need to route messages based on the storage location
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 13fd4263d..18b945834 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -21,6 +21,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/cache"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
@@ -2137,3 +2139,173 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
})
}
}
+
+func TestStreamParametersContext(t *testing.T) {
+ // Because we're using NewFeatureFlag, they'll end up in the All array.
+ enabledFF := featureflag.NewFeatureFlag("default-enabled", true)
+ disabledFF := featureflag.NewFeatureFlag("default-disabled", false)
+
+ type expectedFlag struct {
+ flag featureflag.FeatureFlag
+ enabled bool
+ }
+
+ expectedFlags := func(overrides ...expectedFlag) []expectedFlag {
+ flagValues := map[featureflag.FeatureFlag]bool{}
+ for _, flag := range featureflag.All {
+ flagValues[flag] = flag.OnByDefault
+ }
+ for _, override := range overrides {
+ flagValues[override.flag] = override.enabled
+ }
+
+ expectedFlags := make([]expectedFlag, 0, len(flagValues))
+ for flag, value := range flagValues {
+ expectedFlags = append(expectedFlags, expectedFlag{
+ flag: flag, enabled: value,
+ })
+ }
+
+ return expectedFlags
+ }
+
+ for _, tc := range []struct {
+ desc string
+ setupContext func() context.Context
+ expectedIncomingMD metadata.MD
+ expectedOutgoingMD metadata.MD
+ expectedFlags []expectedFlag
+ }{
+ {
+ desc: "no metadata",
+ setupContext: func() context.Context {
+ return context.Background()
+ },
+ expectedFlags: expectedFlags(),
+ },
+ {
+ desc: "with incoming metadata",
+ setupContext: func() context.Context {
+ ctx := context.Background()
+ ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("key", "value"))
+ return ctx
+ },
+ expectedIncomingMD: metadata.Pairs("key", "value"),
+ expectedOutgoingMD: metadata.Pairs("key", "value"),
+ expectedFlags: expectedFlags(),
+ },
+ {
+ desc: "with outgoing metadata",
+ setupContext: func() context.Context {
+ ctx := context.Background()
+ ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("key", "value"))
+ return ctx
+ },
+ expectedOutgoingMD: metadata.Pairs("key", "value"),
+ expectedFlags: expectedFlags(),
+ },
+ {
+ desc: "with incoming and outgoing metadata",
+ setupContext: func() context.Context {
+ ctx := context.Background()
+ ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("incoming", "value"))
+ ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("outgoing", "value"))
+ return ctx
+ },
+ // This behaviour is quite subtle: in the previous test case where we only
+ // have outgoing metadata, we retain it. But in case we have both incoming
+ // and outgoing we'd discard the outgoing metadata altogether. It is
+ // debatable whether this is a bug or feature, so I'll just document this
+ // weird edge case here for now.
+ expectedIncomingMD: metadata.Pairs("incoming", "value"),
+ expectedOutgoingMD: metadata.Pairs("incoming", "value"),
+ expectedFlags: expectedFlags(),
+ },
+ {
+ desc: "with flags set to their default values",
+ setupContext: func() context.Context {
+ ctx := context.Background()
+ ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, enabledFF)
+ ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, disabledFF)
+ return ctx
+ },
+ expectedIncomingMD: metadata.Pairs(
+ enabledFF.MetadataKey(), "true",
+ disabledFF.MetadataKey(), "false",
+ ),
+ expectedOutgoingMD: metadata.Pairs(
+ enabledFF.MetadataKey(), "true",
+ disabledFF.MetadataKey(), "false",
+ ),
+ expectedFlags: expectedFlags(),
+ },
+ {
+ desc: "with flags set to their reverse default values",
+ setupContext: func() context.Context {
+ ctx := context.Background()
+ ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, enabledFF)
+ ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, disabledFF)
+ return ctx
+ },
+ expectedIncomingMD: metadata.Pairs(
+ enabledFF.MetadataKey(), "false",
+ disabledFF.MetadataKey(), "true",
+ ),
+ expectedOutgoingMD: metadata.Pairs(
+ enabledFF.MetadataKey(), "false",
+ disabledFF.MetadataKey(), "true",
+ ),
+ expectedFlags: expectedFlags(
+ expectedFlag{flag: enabledFF, enabled: false},
+ expectedFlag{flag: disabledFF, enabled: true},
+ ),
+ },
+ {
+ desc: "mixed flags and metadata",
+ setupContext: func() context.Context {
+ ctx := context.Background()
+ ctx = metadata.NewIncomingContext(ctx, metadata.Pairs(
+ disabledFF.MetadataKey(), "true",
+ "incoming", "value"),
+ )
+ return ctx
+ },
+ expectedIncomingMD: metadata.Pairs(
+ disabledFF.MetadataKey(), "true",
+ "incoming", "value",
+ ),
+ expectedOutgoingMD: metadata.Pairs(
+ disabledFF.MetadataKey(), "true",
+ "incoming", "value",
+ ),
+ expectedFlags: expectedFlags(
+ expectedFlag{flag: disabledFF, enabled: true},
+ ),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := streamParametersContext(tc.setupContext())
+
+ incomingMD, ok := metadata.FromIncomingContext(ctx)
+ if tc.expectedIncomingMD == nil {
+ require.False(t, ok)
+ } else {
+ require.True(t, ok)
+ }
+ require.Equal(t, tc.expectedIncomingMD, incomingMD)
+
+ outgoingMD, ok := metadata.FromOutgoingContext(ctx)
+ if tc.expectedOutgoingMD == nil {
+ require.False(t, ok)
+ } else {
+ require.True(t, ok)
+ }
+ require.Equal(t, tc.expectedOutgoingMD, outgoingMD)
+
+ incomingCtx := gitaly_metadata.OutgoingToIncoming(ctx)
+ for _, expectedFlag := range tc.expectedFlags {
+ require.Equal(t, expectedFlag.enabled, expectedFlag.flag.IsEnabled(incomingCtx))
+ }
+ })
+ }
+}