diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-07-19 02:05:26 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-07-19 02:05:26 +0300 |
commit | d882ac830951e0ca861ff357921ea0ff8a0a793c (patch) | |
tree | 5dd9c91812a0e7440bc9654e66a282d25c365510 | |
parent | 2212d1aff8ed94c23d08377ea67ab626bc9f0c97 (diff) |
Unary gRPC interceptor for cache invalidation
-rw-r--r-- | changelogs/unreleased/po-cache-unary-intercept.yml | 5 | ||||
-rw-r--r-- | internal/middleware/cache/cache.go | 42 | ||||
-rw-r--r-- | internal/middleware/cache/cache_test.go | 35 | ||||
-rw-r--r-- | internal/middleware/cache/testdata/stream.pb.go | 106 | ||||
-rw-r--r-- | internal/middleware/cache/testdata/stream.proto | 13 |
5 files changed, 178 insertions, 23 deletions
diff --git a/changelogs/unreleased/po-cache-unary-intercept.yml b/changelogs/unreleased/po-cache-unary-intercept.yml new file mode 100644 index 000000000..6aa547554 --- /dev/null +++ b/changelogs/unreleased/po-cache-unary-intercept.yml @@ -0,0 +1,5 @@ +--- +title: Unary gRPC interceptor for cache invalidation +merge_request: 1371 +author: +type: performance diff --git a/internal/middleware/cache/cache.go b/internal/middleware/cache/cache.go index 6ad16a59c..af0dd38e5 100644 --- a/internal/middleware/cache/cache.go +++ b/internal/middleware/cache/cache.go @@ -1,6 +1,7 @@ package cache import ( + "context" "fmt" "sync" @@ -75,6 +76,47 @@ func StreamInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.StreamS } } +// UnaryInvalidator will invalidate any mutating RPC that targets a +// repository in a gRPC unary RPC +func UnaryInvalidator(ci Invalidator, reg *protoregistry.Registry) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + mInfo, err := reg.LookupMethod(info.FullMethod) + countRPCType(mInfo) + if err != nil { + logrus.WithField("full_method_name", info.FullMethod).Errorf("unable to lookup method information for %+v", info) + } + + if mInfo.Operation == protoregistry.OpAccessor { + return handler(ctx, req) + } + + pbReq, ok := req.(proto.Message) + if !ok { + return nil, fmt.Errorf("cache invalidation expected protobuf request, but got %T", req) + } + + target, err := mInfo.TargetRepo(pbReq) + if err != nil { + return nil, err + } + + le, err := ci.StartLease(target) + if err != nil { + return nil, err + } + + // wrap the handler to ensure the lease is always ended + return func() (resp interface{}, err error) { + defer func() { + if err := le.EndLease(ctx); err != nil { + logrus.Errorf("unable to end lease: %q", err) + } + }() + return handler(ctx, req) + }() + } +} + type recvMsgCallback func(interface{}, error) error func invalidateCache(ci Invalidator, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler) (grpc.StreamHandler, recvMsgCallback) { diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go index c04cb4d6a..332c9316a 100644 --- a/internal/middleware/cache/cache_test.go +++ b/internal/middleware/cache/cache_test.go @@ -21,7 +21,7 @@ import ( ) //go:generate make testdata/stream.pb.go -func TestStreamInvalidator(t *testing.T) { +func TestInvalidators(t *testing.T) { mCache := newMockCache() reg := protoregistry.New() @@ -33,6 +33,11 @@ func TestStreamInvalidator(t *testing.T) { cache.StreamInvalidator(mCache, reg), ), ), + grpc.UnaryInterceptor( + grpc.UnaryServerInterceptor( + cache.UnaryInvalidator(mCache, reg), + ), + ), ) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -70,8 +75,8 @@ func TestStreamInvalidator(t *testing.T) { StorageName: "3", } - expectedSvcRequests := []gitalypb.Repository{*repo1, *repo2, *repo3} - expectedInvalidations := []gitalypb.Repository{*repo2, *repo3} + expectedSvcRequests := []gitalypb.Repository{*repo1, *repo2, *repo3, *repo1, *repo2} + expectedInvalidations := []gitalypb.Repository{*repo2, *repo3, *repo1} // Should NOT trigger cache invalidation c, err := cli.ClientStreamRepoAccessor(ctx, &testdata.Request{ @@ -97,9 +102,21 @@ func TestStreamInvalidator(t *testing.T) { _, err = c.Recv() // make client call synchronous by waiting for close assert.Equal(t, err, io.EOF) + // Should trigger cache invalidation + _, err = cli.ClientUnaryRepoMutator(ctx, &testdata.Request{ + Destination: repo1, + }) + require.NoError(t, err) + + // Should NOT trigger cache invalidation + _, err = cli.ClientUnaryRepoAccessor(ctx, &testdata.Request{ + Destination: repo2, + }) + require.NoError(t, err) + require.Equal(t, expectedInvalidations, mCache.(*mockCache).invalidatedRepos) require.Equal(t, expectedSvcRequests, svc.repoRequests) - require.Equal(t, 2, mCache.(*mockCache).endedLeases.count) + require.Equal(t, 3, mCache.(*mockCache).endedLeases.count) } // mockCache allows us to relay back via channel which repos are being @@ -181,3 +198,13 @@ func (ts *testSvc) ClientStreamRepoAccessor(req *testdata.Request, _ testdata.Te ts.repoRequests = append(ts.repoRequests, *req.GetDestination()) return nil } + +func (ts *testSvc) ClientUnaryRepoMutator(_ context.Context, req *testdata.Request) (*testdata.Response, error) { + ts.repoRequests = append(ts.repoRequests, *req.GetDestination()) + return &testdata.Response{}, nil +} + +func (ts *testSvc) ClientUnaryRepoAccessor(_ context.Context, req *testdata.Request) (*testdata.Response, error) { + ts.repoRequests = append(ts.repoRequests, *req.GetDestination()) + return &testdata.Response{}, nil +} diff --git a/internal/middleware/cache/testdata/stream.pb.go b/internal/middleware/cache/testdata/stream.pb.go index cbce32ff3..f7e052bc6 100644 --- a/internal/middleware/cache/testdata/stream.pb.go +++ b/internal/middleware/cache/testdata/stream.pb.go @@ -101,24 +101,25 @@ func init() { func init() { proto.RegisterFile("stream.proto", fileDescriptor_bb17ef3f514bfe54) } var fileDescriptor_bb17ef3f514bfe54 = []byte{ - // 257 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0xd0, 0xb1, 0x4e, 0xc3, 0x30, - 0x10, 0x06, 0x60, 0xb9, 0x12, 0x25, 0xb8, 0x5d, 0xf0, 0x42, 0x95, 0x09, 0x65, 0xca, 0x82, 0x5d, - 0x0a, 0x7b, 0x55, 0x18, 0x51, 0x97, 0x94, 0x89, 0xed, 0xea, 0x9c, 0x52, 0x4b, 0x8e, 0x1d, 0x7c, - 0x57, 0x50, 0x9f, 0xa4, 0xcf, 0xc0, 0x2b, 0x76, 0x42, 0x34, 0x54, 0xaa, 0x98, 0xd8, 0xac, 0x5f, - 0xd6, 0x77, 0xff, 0x9d, 0x1c, 0x13, 0x27, 0x84, 0x56, 0x77, 0x29, 0x72, 0x54, 0x19, 0x23, 0x71, - 0x0d, 0x0c, 0xf9, 0x98, 0x36, 0x90, 0xb0, 0xee, 0xf3, 0x62, 0x2e, 0x2f, 0x2b, 0x7c, 0xdf, 0x22, - 0xb1, 0x7a, 0x94, 0xa3, 0x1a, 0x89, 0x5d, 0x00, 0x76, 0x31, 0x4c, 0xc4, 0xad, 0x28, 0x47, 0x33, - 0xa5, 0x1b, 0xc7, 0xe0, 0x77, 0xba, 0xc2, 0x2e, 0x92, 0xe3, 0x98, 0x76, 0xd5, 0xf9, 0xb7, 0x42, - 0xca, 0xac, 0x42, 0xea, 0x62, 0x20, 0x9c, 0x7d, 0x09, 0x39, 0x7a, 0x45, 0xe2, 0x15, 0xa6, 0x0f, - 0x67, 0x51, 0x2d, 0xe5, 0xcd, 0xb3, 0x77, 0x18, 0x78, 0x75, 0xac, 0xf2, 0x43, 0x2c, 0xb7, 0x0c, - 0x1c, 0x93, 0xba, 0xd6, 0xa7, 0x42, 0xfa, 0x77, 0x7e, 0xae, 0xce, 0xa3, 0x5e, 0x2c, 0xae, 0x0e, - 0xfb, 0xf2, 0x22, 0x13, 0xb9, 0xb8, 0x9f, 0x0a, 0xf5, 0x22, 0x27, 0x7f, 0xb9, 0x85, 0xb5, 0x48, - 0xf4, 0x7f, 0x6f, 0x78, 0xd8, 0x97, 0x83, 0x6c, 0x30, 0x15, 0x4f, 0x8b, 0xb7, 0x79, 0xe3, 0xd8, - 0xc3, 0x5a, 0xdb, 0xd8, 0x9a, 0xfe, 0x79, 0x17, 0x53, 0x63, 0xfa, 0x7d, 0x8d, 0x0b, 0x8c, 0x29, - 0x80, 0x37, 0xad, 0xab, 0x6b, 0x8f, 0x9f, 0x90, 0xd0, 0x58, 0xb0, 0x1b, 0x34, 0x27, 0x75, 0x3d, - 0x3c, 0x9e, 0xf0, 0xe1, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x77, 0x8a, 0x5d, 0x72, 0x6a, 0x01, 0x00, - 0x00, + // 281 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x91, 0x31, 0x4f, 0xf3, 0x30, + 0x10, 0x86, 0xe5, 0x4a, 0x5f, 0xbf, 0xe0, 0x74, 0xc1, 0x03, 0x54, 0x99, 0x50, 0xa6, 0x2c, 0xd8, + 0xa5, 0xb0, 0x57, 0x85, 0x05, 0x81, 0xba, 0xa4, 0xb0, 0xb0, 0x5d, 0x9d, 0x53, 0x6a, 0x29, 0xb1, + 0x83, 0x7d, 0x05, 0xe5, 0x97, 0xf4, 0xe7, 0xf0, 0xbf, 0x3a, 0x21, 0x1a, 0x82, 0x2a, 0x26, 0x04, + 0xdb, 0xe9, 0xd5, 0xe9, 0xb9, 0xe7, 0xf4, 0xf2, 0x51, 0x20, 0x8f, 0x50, 0xcb, 0xc6, 0x3b, 0x72, + 0x22, 0x22, 0x0c, 0x54, 0x00, 0x41, 0x32, 0x0a, 0x6b, 0xf0, 0x58, 0x74, 0x79, 0x3a, 0xe3, 0xff, + 0x73, 0x7c, 0xde, 0x60, 0x20, 0x71, 0xc5, 0xe3, 0x02, 0x03, 0x19, 0x0b, 0x64, 0x9c, 0x1d, 0xb3, + 0x33, 0x96, 0xc5, 0x53, 0x21, 0x4b, 0x43, 0x50, 0xb5, 0x32, 0xc7, 0xc6, 0x05, 0x43, 0xce, 0xb7, + 0xf9, 0xe1, 0x5a, 0xca, 0x79, 0x94, 0x63, 0x68, 0x9c, 0x0d, 0x38, 0x7d, 0x1b, 0xf0, 0xf8, 0x01, + 0x03, 0x2d, 0xd1, 0xbf, 0x18, 0x8d, 0x62, 0xc1, 0x4f, 0x6f, 0x2a, 0x83, 0x96, 0x96, 0x7b, 0x95, + 0x0f, 0xc4, 0x62, 0x43, 0x40, 0xce, 0x8b, 0x63, 0xd9, 0x0b, 0xc9, 0xcf, 0xfb, 0x89, 0x38, 0x8c, + 0x3a, 0x62, 0x7a, 0xb4, 0xdb, 0x66, 0xff, 0x22, 0x96, 0xb0, 0x8b, 0x09, 0x13, 0xf7, 0x7c, 0xfc, + 0x1d, 0x37, 0xd7, 0x1a, 0x43, 0xf8, 0x39, 0x6f, 0xb8, 0xdb, 0x66, 0x83, 0x68, 0x30, 0x61, 0xe2, + 0x8e, 0x9f, 0x74, 0xb0, 0x47, 0x0b, 0xbe, 0xfd, 0x93, 0x9a, 0xb8, 0xed, 0xff, 0xfc, 0x62, 0xfd, + 0xd2, 0xeb, 0x7a, 0xfe, 0x34, 0x2b, 0x0d, 0x55, 0xb0, 0x92, 0xda, 0xd5, 0xaa, 0x1b, 0xcf, 0x9d, + 0x2f, 0x55, 0xd7, 0x82, 0x32, 0x96, 0xd0, 0x5b, 0xa8, 0x54, 0x6d, 0x8a, 0xa2, 0xc2, 0x57, 0xf0, + 0xa8, 0x34, 0xe8, 0x35, 0xaa, 0x9e, 0xb9, 0x1a, 0xee, 0x8b, 0xbd, 0x7c, 0x0f, 0x00, 0x00, 0xff, + 0xff, 0xa3, 0x8a, 0x30, 0x23, 0x00, 0x02, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -135,6 +136,8 @@ const _ = grpc.SupportPackageIsVersion4 type TestServiceClient interface { ClientStreamRepoMutator(ctx context.Context, in *Request, opts ...grpc.CallOption) (TestService_ClientStreamRepoMutatorClient, error) ClientStreamRepoAccessor(ctx context.Context, in *Request, opts ...grpc.CallOption) (TestService_ClientStreamRepoAccessorClient, error) + ClientUnaryRepoMutator(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) + ClientUnaryRepoAccessor(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) } type testServiceClient struct { @@ -209,10 +212,30 @@ func (x *testServiceClientStreamRepoAccessorClient) Recv() (*Response, error) { return m, nil } +func (c *testServiceClient) ClientUnaryRepoMutator(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/testdata.TestService/ClientUnaryRepoMutator", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *testServiceClient) ClientUnaryRepoAccessor(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/testdata.TestService/ClientUnaryRepoAccessor", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // TestServiceServer is the server API for TestService service. type TestServiceServer interface { ClientStreamRepoMutator(*Request, TestService_ClientStreamRepoMutatorServer) error ClientStreamRepoAccessor(*Request, TestService_ClientStreamRepoAccessorServer) error + ClientUnaryRepoMutator(context.Context, *Request) (*Response, error) + ClientUnaryRepoAccessor(context.Context, *Request) (*Response, error) } func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { @@ -261,10 +284,55 @@ func (x *testServiceClientStreamRepoAccessorServer) Send(m *Response) error { return x.ServerStream.SendMsg(m) } +func _TestService_ClientUnaryRepoMutator_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestServiceServer).ClientUnaryRepoMutator(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/testdata.TestService/ClientUnaryRepoMutator", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestServiceServer).ClientUnaryRepoMutator(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + +func _TestService_ClientUnaryRepoAccessor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestServiceServer).ClientUnaryRepoAccessor(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/testdata.TestService/ClientUnaryRepoAccessor", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestServiceServer).ClientUnaryRepoAccessor(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + var _TestService_serviceDesc = grpc.ServiceDesc{ ServiceName: "testdata.TestService", HandlerType: (*TestServiceServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "ClientUnaryRepoMutator", + Handler: _TestService_ClientUnaryRepoMutator_Handler, + }, + { + MethodName: "ClientUnaryRepoAccessor", + Handler: _TestService_ClientUnaryRepoAccessor_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "ClientStreamRepoMutator", diff --git a/internal/middleware/cache/testdata/stream.proto b/internal/middleware/cache/testdata/stream.proto index f10aa08af..20f2c5dcb 100644 --- a/internal/middleware/cache/testdata/stream.proto +++ b/internal/middleware/cache/testdata/stream.proto @@ -25,4 +25,17 @@ service TestService { op: ACCESSOR }; } + + rpc ClientUnaryRepoMutator(Request) returns (Response) { + option (gitaly.op_type) = { + op: MUTATOR + target_repository_field: "1" + }; + } + + rpc ClientUnaryRepoAccessor(Request) returns (Response) { + option (gitaly.op_type) = { + op: ACCESSOR + }; + } } |