diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-14 12:15:27 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-20 15:16:41 +0300 |
commit | d781c2e2e76bfcc51cb4b0200d6c59539a574def (patch) | |
tree | 712e691d11988d42cfac2b10e15241276292416c | |
parent | 23378333c7c567b45824c7e52cb064378abc9e37 (diff) |
coordinator: Split out function to compute metadata for Gitaly peers
When proxying a request to Gitaly, then we need to make sure to pass
relevant metadata to Gitaly. Right now, this only requires us to convert
the incoming metadata into outgoing metadata such that it's correctly
passed on as part of any RPC calls. But we need to extend the logic to
provide better handling for feature flags.
Move the logic into a separate function `streamParametersContext()` to
make it easily extensible.
-rw-r--r-- | internal/praefect/coordinator.go | 14 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 172 |
2 files changed, 182 insertions, 4 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 5d9a3a862..27624a7bc 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -406,7 +406,7 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal metrics.ReadDistribution.WithLabelValues(virtualStorage, node.Storage).Inc() return proxy.NewStreamParameters(proxy.Destination{ - Ctx: metadata.IncomingToOutgoing(ctx), + Ctx: streamParametersContext(ctx), Conn: node.Connection, Msg: b, }, nil, nil, nil), nil @@ -487,7 +487,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall var finalizers []func() error primaryDest := proxy.Destination{ - Ctx: metadata.IncomingToOutgoing(ctx), + Ctx: streamParametersContext(ctx), Conn: route.Primary.Connection, Msg: primaryMessage, } @@ -511,7 +511,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall if err != nil { return nil, err } - primaryDest.Ctx = metadata.IncomingToOutgoing(injectedCtx) + primaryDest.Ctx = streamParametersContext(injectedCtx) primaryDest.ErrHandler = func(err error) error { nodeErrors.Lock() defer nodeErrors.Unlock() @@ -532,7 +532,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } secondaryDests = append(secondaryDests, proxy.Destination{ - Ctx: metadata.IncomingToOutgoing(injectedCtx), + Ctx: streamParametersContext(injectedCtx), Conn: secondary.Connection, Msg: secondaryMsg, ErrHandler: func(err error) error { @@ -595,6 +595,12 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return proxy.NewStreamParameters(primaryDest, secondaryDests, reqFinalizer, nil), nil } +// streamParametersContexts converts the contexts with incoming metadata into a context that is +// usable by peer Gitaly nodes. +func streamParametersContext(ctx context.Context) context.Context { + return metadata.IncomingToOutgoing(ctx) +} + // StreamDirector determines which downstream servers receive requests func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 13fd4263d..18b945834 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -21,6 +21,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" @@ -2137,3 +2139,173 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { }) } } + +func TestStreamParametersContext(t *testing.T) { + // Because we're using NewFeatureFlag, they'll end up in the All array. + enabledFF := featureflag.NewFeatureFlag("default-enabled", true) + disabledFF := featureflag.NewFeatureFlag("default-disabled", false) + + type expectedFlag struct { + flag featureflag.FeatureFlag + enabled bool + } + + expectedFlags := func(overrides ...expectedFlag) []expectedFlag { + flagValues := map[featureflag.FeatureFlag]bool{} + for _, flag := range featureflag.All { + flagValues[flag] = flag.OnByDefault + } + for _, override := range overrides { + flagValues[override.flag] = override.enabled + } + + expectedFlags := make([]expectedFlag, 0, len(flagValues)) + for flag, value := range flagValues { + expectedFlags = append(expectedFlags, expectedFlag{ + flag: flag, enabled: value, + }) + } + + return expectedFlags + } + + for _, tc := range []struct { + desc string + setupContext func() context.Context + expectedIncomingMD metadata.MD + expectedOutgoingMD metadata.MD + expectedFlags []expectedFlag + }{ + { + desc: "no metadata", + setupContext: func() context.Context { + return context.Background() + }, + expectedFlags: expectedFlags(), + }, + { + desc: "with incoming metadata", + setupContext: func() context.Context { + ctx := context.Background() + ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("key", "value")) + return ctx + }, + expectedIncomingMD: metadata.Pairs("key", "value"), + expectedOutgoingMD: metadata.Pairs("key", "value"), + expectedFlags: expectedFlags(), + }, + { + desc: "with outgoing metadata", + setupContext: func() context.Context { + ctx := context.Background() + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("key", "value")) + return ctx + }, + expectedOutgoingMD: metadata.Pairs("key", "value"), + expectedFlags: expectedFlags(), + }, + { + desc: "with incoming and outgoing metadata", + setupContext: func() context.Context { + ctx := context.Background() + ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("incoming", "value")) + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("outgoing", "value")) + return ctx + }, + // This behaviour is quite subtle: in the previous test case where we only + // have outgoing metadata, we retain it. But in case we have both incoming + // and outgoing we'd discard the outgoing metadata altogether. It is + // debatable whether this is a bug or feature, so I'll just document this + // weird edge case here for now. + expectedIncomingMD: metadata.Pairs("incoming", "value"), + expectedOutgoingMD: metadata.Pairs("incoming", "value"), + expectedFlags: expectedFlags(), + }, + { + desc: "with flags set to their default values", + setupContext: func() context.Context { + ctx := context.Background() + ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, enabledFF) + ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, disabledFF) + return ctx + }, + expectedIncomingMD: metadata.Pairs( + enabledFF.MetadataKey(), "true", + disabledFF.MetadataKey(), "false", + ), + expectedOutgoingMD: metadata.Pairs( + enabledFF.MetadataKey(), "true", + disabledFF.MetadataKey(), "false", + ), + expectedFlags: expectedFlags(), + }, + { + desc: "with flags set to their reverse default values", + setupContext: func() context.Context { + ctx := context.Background() + ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, enabledFF) + ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, disabledFF) + return ctx + }, + expectedIncomingMD: metadata.Pairs( + enabledFF.MetadataKey(), "false", + disabledFF.MetadataKey(), "true", + ), + expectedOutgoingMD: metadata.Pairs( + enabledFF.MetadataKey(), "false", + disabledFF.MetadataKey(), "true", + ), + expectedFlags: expectedFlags( + expectedFlag{flag: enabledFF, enabled: false}, + expectedFlag{flag: disabledFF, enabled: true}, + ), + }, + { + desc: "mixed flags and metadata", + setupContext: func() context.Context { + ctx := context.Background() + ctx = metadata.NewIncomingContext(ctx, metadata.Pairs( + disabledFF.MetadataKey(), "true", + "incoming", "value"), + ) + return ctx + }, + expectedIncomingMD: metadata.Pairs( + disabledFF.MetadataKey(), "true", + "incoming", "value", + ), + expectedOutgoingMD: metadata.Pairs( + disabledFF.MetadataKey(), "true", + "incoming", "value", + ), + expectedFlags: expectedFlags( + expectedFlag{flag: disabledFF, enabled: true}, + ), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := streamParametersContext(tc.setupContext()) + + incomingMD, ok := metadata.FromIncomingContext(ctx) + if tc.expectedIncomingMD == nil { + require.False(t, ok) + } else { + require.True(t, ok) + } + require.Equal(t, tc.expectedIncomingMD, incomingMD) + + outgoingMD, ok := metadata.FromOutgoingContext(ctx) + if tc.expectedOutgoingMD == nil { + require.False(t, ok) + } else { + require.True(t, ok) + } + require.Equal(t, tc.expectedOutgoingMD, outgoingMD) + + incomingCtx := gitaly_metadata.OutgoingToIncoming(ctx) + for _, expectedFlag := range tc.expectedFlags { + require.Equal(t, expectedFlag.enabled, expectedFlag.flag.IsEnabled(incomingCtx)) + } + }) + } +} |