diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2020-04-28 17:00:08 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2020-04-28 17:00:08 +0300 |
commit | 8ad75fbb0ae265fb317bb1640070daa6d0adf4e7 (patch) | |
tree | a0211d3b320f50f81a5629f31781c07bf09f1a18 | |
parent | 9bfdd53b6b9beca5f88500c8dd12d031d0fb6bc9 (diff) | |
parent | e3316425eec8384c2fdaf22dce920a51fcee2b2d (diff) |
Merge branch 'jc-fix-chunker' into 'master'
Modify chunker to send message based on size
Closes #2696
See merge request gitlab-org/gitaly!2096
-rw-r--r-- | changelogs/unreleased/jc-fix-chunker.yml | 5 | ||||
-rw-r--r-- | internal/helper/chunk/chunker.go | 37 | ||||
-rw-r--r-- | internal/helper/chunk/chunker_test.go | 96 | ||||
-rw-r--r-- | internal/helper/chunk/pb/test.pb.go | 231 | ||||
-rw-r--r-- | internal/helper/chunk/pb/test.proto | 14 | ||||
-rw-r--r-- | internal/service/cleanup/apply_bfg_object_map_stream.go | 5 | ||||
-rw-r--r-- | internal/service/commit/between.go | 6 | ||||
-rw-r--r-- | internal/service/commit/commits_by_message.go | 6 | ||||
-rw-r--r-- | internal/service/commit/find_all_commits.go | 6 | ||||
-rw-r--r-- | internal/service/commit/find_commits.go | 11 | ||||
-rw-r--r-- | internal/service/commit/list_commits_by_oid.go | 5 | ||||
-rw-r--r-- | internal/service/commit/list_commits_by_ref_name.go | 5 | ||||
-rw-r--r-- | internal/service/commit/list_files.go | 7 | ||||
-rw-r--r-- | internal/service/commit/tree_entries.go | 5 | ||||
-rw-r--r-- | internal/service/ref/refnames.go | 12 | ||||
-rw-r--r-- | internal/service/ref/refnames_containing.go | 14 | ||||
-rw-r--r-- | internal/service/ref/refs.go | 5 | ||||
-rw-r--r-- | internal/service/remote/remotes.go | 5 | ||||
-rw-r--r-- | internal/service/repository/raw_changes.go | 5 |
19 files changed, 422 insertions, 58 deletions
diff --git a/changelogs/unreleased/jc-fix-chunker.yml b/changelogs/unreleased/jc-fix-chunker.yml new file mode 100644 index 000000000..24fc921a5 --- /dev/null +++ b/changelogs/unreleased/jc-fix-chunker.yml @@ -0,0 +1,5 @@ +--- +title: Modify chunker to send message based on size +merge_request: 2096 +author: +type: changed diff --git a/internal/helper/chunk/chunker.go b/internal/helper/chunk/chunker.go index 48d99f03f..b02bc2a88 100644 --- a/internal/helper/chunk/chunker.go +++ b/internal/helper/chunk/chunker.go @@ -1,8 +1,8 @@ package chunk -// Item could be e.g. a commit in an RPC that returns a chunked stream of -// commits. -type Item interface{} +import ( + "github.com/golang/protobuf/proto" +) // Sender encapsulates a gRPC response stream and the current chunk // that's being built. @@ -12,7 +12,7 @@ type Sender interface { // Reset should create a fresh response message. Reset() // Append should append the given item to the slice in the current response message - Append(Item) + Append(proto.Message) // Send should send the current response message Send() error } @@ -23,35 +23,42 @@ func New(s Sender) *Chunker { return &Chunker{s: s} } // Chunker lets you spread items you want to send over multiple chunks. // This type is not thread-safe. type Chunker struct { - s Sender - n int + s Sender + size int } +// maxMessageSize is maximum size per protobuf message +const maxMessageSize = 1 * 1024 * 1024 + // Send will append an item to the current chunk and send the chunk if it is full. -func (c *Chunker) Send(it Item) error { - if c.n == 0 { +func (c *Chunker) Send(it proto.Message) error { + if c.size == 0 { c.s.Reset() } - c.s.Append(it) - c.n++ + itSize := proto.Size(it) - const chunkSize = 20 - if c.n >= chunkSize { - return c.sendResponseMsg() + if itSize+c.size >= maxMessageSize { + if err := c.sendResponseMsg(); err != nil { + return err + } + c.s.Reset() } + c.s.Append(it) + c.size += itSize + return nil } func (c *Chunker) sendResponseMsg() error { - c.n = 0 + c.size = 0 return c.s.Send() } // Flush sends remaining items in the current chunk, if any. func (c *Chunker) Flush() error { - if c.n == 0 { + if c.size == 0 { return nil } diff --git a/internal/helper/chunk/chunker_test.go b/internal/helper/chunk/chunker_test.go new file mode 100644 index 000000000..0dd22e1b5 --- /dev/null +++ b/internal/helper/chunk/chunker_test.go @@ -0,0 +1,96 @@ +package chunk + +import ( + "io" + "net" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/wrappers" + "github.com/stretchr/testify/require" + test "gitlab.com/gitlab-org/gitaly/internal/helper/chunk/pb" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +type testSender struct { + stream test.Test_StreamOutputServer + output [][]byte +} + +func (ts *testSender) Reset() { ts.output = nil } +func (ts *testSender) Append(m proto.Message) { + ts.output = append(ts.output, m.(*wrappers.BytesValue).Value) +} + +func (ts *testSender) Send() error { + return ts.stream.Send(&test.StreamOutputResponse{ + Msg: ts.output, + }) +} + +func TestChunker(t *testing.T) { + s := &server{} + srv, serverSocketPath := runServer(t, s) + defer srv.Stop() + + client, conn := newClient(t, serverSocketPath) + defer conn.Close() + + ctx, cancel := testhelper.Context() + defer cancel() + + stream, err := client.StreamOutput(ctx, &test.StreamOutputRequest{BytesToReturn: 3.5 * maxMessageSize}) + require.NoError(t, err) + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.Less(t, proto.Size(resp), maxMessageSize) + } +} + +type server struct{} + +func (s *server) StreamOutput(req *test.StreamOutputRequest, srv test.Test_StreamOutputServer) error { + const kilobyte = 1024 + + c := New(&testSender{stream: srv}) + for numBytes := 0; numBytes < int(req.GetBytesToReturn()); numBytes += kilobyte { + if err := c.Send(&wrappers.BytesValue{Value: make([]byte, kilobyte)}); err != nil { + return err + } + } + + if err := c.Flush(); err != nil { + return err + } + return nil +} + +func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) { + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + grpcServer := grpc.NewServer(opt...) + test.RegisterTestServer(grpcServer, s) + + lis, err := net.Listen("unix", serverSocketPath) + require.NoError(t, err) + + go grpcServer.Serve(lis) + + return grpcServer, "unix://" + serverSocketPath +} + +func newClient(t *testing.T, serverSocketPath string) (test.TestClient, *grpc.ClientConn) { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return test.NewTestClient(conn), conn +} diff --git a/internal/helper/chunk/pb/test.pb.go b/internal/helper/chunk/pb/test.pb.go new file mode 100644 index 000000000..523d1237e --- /dev/null +++ b/internal/helper/chunk/pb/test.pb.go @@ -0,0 +1,231 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: test.proto + +package test + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type StreamOutputRequest struct { + BytesToReturn int32 `protobuf:"varint,1,opt,name=bytes_to_return,json=bytesToReturn,proto3" json:"bytes_to_return,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StreamOutputRequest) Reset() { *m = StreamOutputRequest{} } +func (m *StreamOutputRequest) String() string { return proto.CompactTextString(m) } +func (*StreamOutputRequest) ProtoMessage() {} +func (*StreamOutputRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{0} +} + +func (m *StreamOutputRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StreamOutputRequest.Unmarshal(m, b) +} +func (m *StreamOutputRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StreamOutputRequest.Marshal(b, m, deterministic) +} +func (m *StreamOutputRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamOutputRequest.Merge(m, src) +} +func (m *StreamOutputRequest) XXX_Size() int { + return xxx_messageInfo_StreamOutputRequest.Size(m) +} +func (m *StreamOutputRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StreamOutputRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamOutputRequest proto.InternalMessageInfo + +func (m *StreamOutputRequest) GetBytesToReturn() int32 { + if m != nil { + return m.BytesToReturn + } + return 0 +} + +type StreamOutputResponse struct { + Msg [][]byte `protobuf:"bytes,1,rep,name=msg,proto3" json:"msg,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StreamOutputResponse) Reset() { *m = StreamOutputResponse{} } +func (m *StreamOutputResponse) String() string { return proto.CompactTextString(m) } +func (*StreamOutputResponse) ProtoMessage() {} +func (*StreamOutputResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{1} +} + +func (m *StreamOutputResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StreamOutputResponse.Unmarshal(m, b) +} +func (m *StreamOutputResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StreamOutputResponse.Marshal(b, m, deterministic) +} +func (m *StreamOutputResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamOutputResponse.Merge(m, src) +} +func (m *StreamOutputResponse) XXX_Size() int { + return xxx_messageInfo_StreamOutputResponse.Size(m) +} +func (m *StreamOutputResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StreamOutputResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamOutputResponse proto.InternalMessageInfo + +func (m *StreamOutputResponse) GetMsg() [][]byte { + if m != nil { + return m.Msg + } + return nil +} + +func init() { + proto.RegisterType((*StreamOutputRequest)(nil), "test.StreamOutputRequest") + proto.RegisterType((*StreamOutputResponse)(nil), "test.StreamOutputResponse") +} + +func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) } + +var fileDescriptor_c161fcfdc0c3ff1e = []byte{ + // 160 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e, + 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0x95, 0x6c, 0xb9, 0x84, 0x83, 0x4b, + 0x8a, 0x52, 0x13, 0x73, 0xfd, 0x4b, 0x4b, 0x0a, 0x4a, 0x4b, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, + 0x4b, 0x84, 0xd4, 0xb8, 0xf8, 0x93, 0x2a, 0x4b, 0x52, 0x8b, 0xe3, 0x4b, 0xf2, 0xe3, 0x8b, 0x52, + 0x4b, 0x4a, 0x8b, 0xf2, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x83, 0x78, 0xc1, 0xc2, 0x21, 0xf9, + 0x41, 0x60, 0x41, 0x25, 0x0d, 0x2e, 0x11, 0x54, 0xed, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, 0x42, + 0x02, 0x5c, 0xcc, 0xb9, 0xc5, 0xe9, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0x3c, 0x41, 0x20, 0xa6, 0x51, + 0x20, 0x17, 0x4b, 0x08, 0xc8, 0x64, 0x4f, 0x2e, 0x1e, 0x64, 0x1d, 0x42, 0x92, 0x7a, 0x60, 0x37, + 0x61, 0x71, 0x84, 0x94, 0x14, 0x36, 0x29, 0x88, 0x05, 0x4a, 0x0c, 0x06, 0x8c, 0x49, 0x6c, 0x60, + 0x8f, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xae, 0x1b, 0x22, 0xff, 0xd6, 0x00, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// TestClient is the client API for Test service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type TestClient interface { + StreamOutput(ctx context.Context, in *StreamOutputRequest, opts ...grpc.CallOption) (Test_StreamOutputClient, error) +} + +type testClient struct { + cc *grpc.ClientConn +} + +func NewTestClient(cc *grpc.ClientConn) TestClient { + return &testClient{cc} +} + +func (c *testClient) StreamOutput(ctx context.Context, in *StreamOutputRequest, opts ...grpc.CallOption) (Test_StreamOutputClient, error) { + stream, err := c.cc.NewStream(ctx, &_Test_serviceDesc.Streams[0], "/test.Test/StreamOutput", opts...) + if err != nil { + return nil, err + } + x := &testStreamOutputClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Test_StreamOutputClient interface { + Recv() (*StreamOutputResponse, error) + grpc.ClientStream +} + +type testStreamOutputClient struct { + grpc.ClientStream +} + +func (x *testStreamOutputClient) Recv() (*StreamOutputResponse, error) { + m := new(StreamOutputResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TestServer is the server API for Test service. +type TestServer interface { + StreamOutput(*StreamOutputRequest, Test_StreamOutputServer) error +} + +// UnimplementedTestServer can be embedded to have forward compatible implementations. +type UnimplementedTestServer struct { +} + +func (*UnimplementedTestServer) StreamOutput(req *StreamOutputRequest, srv Test_StreamOutputServer) error { + return status.Errorf(codes.Unimplemented, "method StreamOutput not implemented") +} + +func RegisterTestServer(s *grpc.Server, srv TestServer) { + s.RegisterService(&_Test_serviceDesc, srv) +} + +func _Test_StreamOutput_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(StreamOutputRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TestServer).StreamOutput(m, &testStreamOutputServer{stream}) +} + +type Test_StreamOutputServer interface { + Send(*StreamOutputResponse) error + grpc.ServerStream +} + +type testStreamOutputServer struct { + grpc.ServerStream +} + +func (x *testStreamOutputServer) Send(m *StreamOutputResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _Test_serviceDesc = grpc.ServiceDesc{ + ServiceName: "test.Test", + HandlerType: (*TestServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamOutput", + Handler: _Test_StreamOutput_Handler, + ServerStreams: true, + }, + }, + Metadata: "test.proto", +} diff --git a/internal/helper/chunk/pb/test.proto b/internal/helper/chunk/pb/test.proto new file mode 100644 index 000000000..d5e419805 --- /dev/null +++ b/internal/helper/chunk/pb/test.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package test; + +service Test { + rpc StreamOutput(StreamOutputRequest) returns (stream StreamOutputResponse) {} +} + +message StreamOutputRequest { + int32 bytes_to_return = 1; +} +message StreamOutputResponse { + repeated bytes msg = 1; +} diff --git a/internal/service/cleanup/apply_bfg_object_map_stream.go b/internal/service/cleanup/apply_bfg_object_map_stream.go index e2276a434..204516fe3 100644 --- a/internal/service/cleanup/apply_bfg_object_map_stream.go +++ b/internal/service/cleanup/apply_bfg_object_map_stream.go @@ -4,6 +4,7 @@ import ( "fmt" "io" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/internal/service/cleanup/internalrefs" @@ -89,10 +90,10 @@ func (r *bfgStreamReader) streamReader() io.Reader { return streamio.NewReader(r.readOne) } -func (w *bfgStreamWriter) Append(it chunk.Item) { +func (w *bfgStreamWriter) Append(m proto.Message) { w.entries = append( w.entries, - it.(*gitalypb.ApplyBfgObjectMapStreamResponse_Entry), + m.(*gitalypb.ApplyBfgObjectMapStreamResponse_Entry), ) } diff --git a/internal/service/commit/between.go b/internal/service/commit/between.go index 9eee9f522..6f6684881 100644 --- a/internal/service/commit/between.go +++ b/internal/service/commit/between.go @@ -3,9 +3,9 @@ package commit import ( "fmt" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -15,8 +15,8 @@ type commitsBetweenSender struct { } func (sender *commitsBetweenSender) Reset() { sender.commits = nil } -func (sender *commitsBetweenSender) Append(it chunk.Item) { - sender.commits = append(sender.commits, it.(*gitalypb.GitCommit)) +func (sender *commitsBetweenSender) Append(m proto.Message) { + sender.commits = append(sender.commits, m.(*gitalypb.GitCommit)) } func (sender *commitsBetweenSender) Send() error { diff --git a/internal/service/commit/commits_by_message.go b/internal/service/commit/commits_by_message.go index 2f95a06ef..861edbf30 100644 --- a/internal/service/commit/commits_by_message.go +++ b/internal/service/commit/commits_by_message.go @@ -3,9 +3,9 @@ package commit import ( "fmt" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -15,8 +15,8 @@ type commitsByMessageSender struct { } func (sender *commitsByMessageSender) Reset() { sender.commits = nil } -func (sender *commitsByMessageSender) Append(it chunk.Item) { - sender.commits = append(sender.commits, it.(*gitalypb.GitCommit)) +func (sender *commitsByMessageSender) Append(m proto.Message) { + sender.commits = append(sender.commits, m.(*gitalypb.GitCommit)) } func (sender *commitsByMessageSender) Send() error { diff --git a/internal/service/commit/find_all_commits.go b/internal/service/commit/find_all_commits.go index 09bc7d9b6..158d7817d 100644 --- a/internal/service/commit/find_all_commits.go +++ b/internal/service/commit/find_all_commits.go @@ -3,9 +3,9 @@ package commit import ( "fmt" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/internal/service/ref" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -19,8 +19,8 @@ type findAllCommitsSender struct { } func (sender *findAllCommitsSender) Reset() { sender.commits = nil } -func (sender *findAllCommitsSender) Append(it chunk.Item) { - sender.commits = append(sender.commits, it.(*gitalypb.GitCommit)) +func (sender *findAllCommitsSender) Append(m proto.Message) { + sender.commits = append(sender.commits, m.(*gitalypb.GitCommit)) } func (sender *findAllCommitsSender) Send() error { diff --git a/internal/service/commit/find_commits.go b/internal/service/commit/find_commits.go index 8b852734b..a35b25719 100644 --- a/internal/service/commit/find_commits.go +++ b/internal/service/commit/find_commits.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/command" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" @@ -16,8 +17,6 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -const commitsPerPage int = 20 - func (s *server) FindCommits(req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) error { ctx := stream.Context() @@ -65,7 +64,7 @@ func findCommits(ctx context.Context, req *gitalypb.FindCommitsRequest, stream g getCommits.Offset(int(req.GetOffset())) } - if err := streamPaginatedCommits(getCommits, commitsPerPage, stream); err != nil { + if err := streamCommits(getCommits, stream); err != nil { return fmt.Errorf("error streaming commits: %v", err) } return nil @@ -125,15 +124,15 @@ type findCommitsSender struct { } func (s *findCommitsSender) Reset() { s.commits = nil } -func (s *findCommitsSender) Append(it chunk.Item) { - s.commits = append(s.commits, it.(*gitalypb.GitCommit)) +func (s *findCommitsSender) Append(m proto.Message) { + s.commits = append(s.commits, m.(*gitalypb.GitCommit)) } func (s *findCommitsSender) Send() error { return s.stream.Send(&gitalypb.FindCommitsResponse{Commits: s.commits}) } -func streamPaginatedCommits(getCommits *GetCommits, commitsPerPage int, stream gitalypb.CommitService_FindCommitsServer) error { +func streamCommits(getCommits *GetCommits, stream gitalypb.CommitService_FindCommitsServer) error { chunker := chunk.New(&findCommitsSender{stream: stream}) for getCommits.Scan() { diff --git a/internal/service/commit/list_commits_by_oid.go b/internal/service/commit/list_commits_by_oid.go index 9d024566d..2dc467496 100644 --- a/internal/service/commit/list_commits_by_oid.go +++ b/internal/service/commit/list_commits_by_oid.go @@ -1,6 +1,7 @@ package commit import ( + "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log" @@ -58,8 +59,8 @@ type commitsByOidSender struct { stream gitalypb.CommitService_ListCommitsByOidServer } -func (c *commitsByOidSender) Append(it chunk.Item) { - c.response.Commits = append(c.response.Commits, it.(*gitalypb.GitCommit)) +func (c *commitsByOidSender) Append(m proto.Message) { + c.response.Commits = append(c.response.Commits, m.(*gitalypb.GitCommit)) } func (c *commitsByOidSender) Send() error { return c.stream.Send(c.response) } diff --git a/internal/service/commit/list_commits_by_ref_name.go b/internal/service/commit/list_commits_by_ref_name.go index baf281214..25ba21fc0 100644 --- a/internal/service/commit/list_commits_by_ref_name.go +++ b/internal/service/commit/list_commits_by_ref_name.go @@ -1,6 +1,7 @@ package commit import ( + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log" "gitlab.com/gitlab-org/gitaly/internal/helper" @@ -40,8 +41,8 @@ type commitsByRefNameSender struct { stream gitalypb.CommitService_ListCommitsByRefNameServer } -func (c *commitsByRefNameSender) Append(it chunk.Item) { - c.response.Commits = append(c.response.Commits, it.(*gitalypb.GitCommit)) +func (c *commitsByRefNameSender) Append(m proto.Message) { + c.response.Commits = append(c.response.Commits, m.(*gitalypb.GitCommit)) } func (c *commitsByRefNameSender) Send() error { return c.stream.Send(c.response) } diff --git a/internal/service/commit/list_files.go b/internal/service/commit/list_files.go index 441cae9a5..622fd8885 100644 --- a/internal/service/commit/list_files.go +++ b/internal/service/commit/list_files.go @@ -4,6 +4,7 @@ import ( "fmt" "io" + "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/git" @@ -80,7 +81,7 @@ func listFiles(repo *gitalypb.Repository, revision string, stream gitalypb.Commi continue } - if err := sender.Send([]byte(entry.Path)); err != nil { + if err := sender.Send(&gitalypb.ListFilesResponse{Paths: [][]byte{[]byte(entry.Path)}}); err != nil { return err } } @@ -95,6 +96,6 @@ type listFilesSender struct { func (s *listFilesSender) Reset() { s.response = &gitalypb.ListFilesResponse{} } func (s *listFilesSender) Send() error { return s.stream.Send(s.response) } -func (s *listFilesSender) Append(it chunk.Item) { - s.response.Paths = append(s.response.Paths, it.([]byte)) +func (s *listFilesSender) Append(m proto.Message) { + s.response.Paths = append(s.response.Paths, m.(*gitalypb.ListFilesResponse).Paths...) } diff --git a/internal/service/commit/tree_entries.go b/internal/service/commit/tree_entries.go index 70c386271..e1238c9e3 100644 --- a/internal/service/commit/tree_entries.go +++ b/internal/service/commit/tree_entries.go @@ -3,6 +3,7 @@ package commit import ( "fmt" + "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/git" @@ -78,8 +79,8 @@ type treeEntriesSender struct { stream gitalypb.CommitService_GetTreeEntriesServer } -func (c *treeEntriesSender) Append(it chunk.Item) { - c.response.Entries = append(c.response.Entries, it.(*gitalypb.TreeEntry)) +func (c *treeEntriesSender) Append(m proto.Message) { + c.response.Entries = append(c.response.Entries, m.(*gitalypb.TreeEntry)) } func (c *treeEntriesSender) Send() error { return c.stream.Send(c.response) } diff --git a/internal/service/ref/refnames.go b/internal/service/ref/refnames.go index 60348e3f8..8a3ece87b 100644 --- a/internal/service/ref/refnames.go +++ b/internal/service/ref/refnames.go @@ -4,6 +4,8 @@ import ( "bufio" "context" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/wrappers" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -22,8 +24,8 @@ type findAllBranchNamesSender struct { } func (ts *findAllBranchNamesSender) Reset() { ts.branchNames = nil } -func (ts *findAllBranchNamesSender) Append(it chunk.Item) { - ts.branchNames = append(ts.branchNames, []byte(it.(string))) +func (ts *findAllBranchNamesSender) Append(m proto.Message) { + ts.branchNames = append(ts.branchNames, []byte(m.(*wrappers.StringValue).Value)) } func (ts *findAllBranchNamesSender) Send() error { @@ -43,8 +45,8 @@ type findAllTagNamesSender struct { } func (ts *findAllTagNamesSender) Reset() { ts.tagNames = nil } -func (ts *findAllTagNamesSender) Append(it chunk.Item) { - ts.tagNames = append(ts.tagNames, []byte(it.(string))) +func (ts *findAllTagNamesSender) Append(m proto.Message) { + ts.tagNames = append(ts.tagNames, []byte(m.(*wrappers.StringValue).Value)) } func (ts *findAllTagNamesSender) Send() error { @@ -74,7 +76,7 @@ func listRefNames(ctx context.Context, chunker *chunk.Chunker, prefix string, re // Important: don't use scanner.Bytes() because the slice will become // invalid on the next loop iteration. Instead, use scanner.Text() to // force a copy. - if err := chunker.Send(scanner.Text()); err != nil { + if err := chunker.Send(&wrappers.StringValue{Value: scanner.Text()}); err != nil { return err } } diff --git a/internal/service/ref/refnames_containing.go b/internal/service/ref/refnames_containing.go index 39a21997a..43b91f1e5 100644 --- a/internal/service/ref/refnames_containing.go +++ b/internal/service/ref/refnames_containing.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/wrappers" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" @@ -45,8 +47,8 @@ type branchNamesContainingCommitSender struct { } func (bs *branchNamesContainingCommitSender) Reset() { bs.branchNames = nil } -func (bs *branchNamesContainingCommitSender) Append(it chunk.Item) { - bs.branchNames = append(bs.branchNames, stripPrefix(it, "refs/heads/")) +func (bs *branchNamesContainingCommitSender) Append(m proto.Message) { + bs.branchNames = append(bs.branchNames, stripPrefix(m.(*wrappers.StringValue).Value, "refs/heads/")) } func (bs *branchNamesContainingCommitSender) Send() error { @@ -75,14 +77,14 @@ type tagNamesContainingCommitSender struct { } func (ts *tagNamesContainingCommitSender) Reset() { ts.tagNames = nil } -func (ts *tagNamesContainingCommitSender) Append(it chunk.Item) { - ts.tagNames = append(ts.tagNames, stripPrefix(it, "refs/tags/")) +func (ts *tagNamesContainingCommitSender) Append(m proto.Message) { + ts.tagNames = append(ts.tagNames, stripPrefix(m.(*wrappers.StringValue).Value, "refs/tags/")) } func (ts *tagNamesContainingCommitSender) Send() error { return ts.stream.Send(&gitalypb.ListTagNamesContainingCommitResponse{TagNames: ts.tagNames}) } -func stripPrefix(it chunk.Item, prefix string) []byte { - return []byte(strings.TrimPrefix(it.(string), prefix)) +func stripPrefix(s string, prefix string) []byte { + return []byte(strings.TrimPrefix(s, prefix)) } diff --git a/internal/service/ref/refs.go b/internal/service/ref/refs.go index f7de0d344..0618a7ac3 100644 --- a/internal/service/ref/refs.go +++ b/internal/service/ref/refs.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log" @@ -69,8 +70,8 @@ func (t *tagSender) Reset() { t.tags = nil } -func (t *tagSender) Append(i chunk.Item) { - t.tags = append(t.tags, i.(*gitalypb.Tag)) +func (t *tagSender) Append(m proto.Message) { + t.tags = append(t.tags, m.(*gitalypb.Tag)) } func (t *tagSender) Send() error { diff --git a/internal/service/remote/remotes.go b/internal/service/remote/remotes.go index 7e18fda41..ba659b741 100644 --- a/internal/service/remote/remotes.go +++ b/internal/service/remote/remotes.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "strings" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/remote" "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" @@ -178,8 +179,8 @@ type listRemotesSender struct { remotes []*gitalypb.ListRemotesResponse_Remote } -func (l *listRemotesSender) Append(it chunk.Item) { - l.remotes = append(l.remotes, it.(*gitalypb.ListRemotesResponse_Remote)) +func (l *listRemotesSender) Append(m proto.Message) { + l.remotes = append(l.remotes, m.(*gitalypb.ListRemotesResponse_Remote)) } func (l *listRemotesSender) Send() error { diff --git a/internal/service/repository/raw_changes.go b/internal/service/repository/raw_changes.go index 3c179b90e..5ff3eec93 100644 --- a/internal/service/repository/raw_changes.go +++ b/internal/service/repository/raw_changes.go @@ -7,6 +7,7 @@ import ( "strconv" "unicode/utf8" + "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/internal/git/rawdiff" @@ -104,8 +105,8 @@ type rawChangesSender struct { } func (s *rawChangesSender) Reset() { s.changes = nil } -func (s *rawChangesSender) Append(it chunk.Item) { - s.changes = append(s.changes, it.(*gitalypb.GetRawChangesResponse_RawChange)) +func (s *rawChangesSender) Append(m proto.Message) { + s.changes = append(s.changes, m.(*gitalypb.GetRawChangesResponse_RawChange)) } func (s *rawChangesSender) Send() error { |