Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2020-04-28 17:00:08 +0300
committerJacob Vosmaer <jacob@gitlab.com>2020-04-28 17:00:08 +0300
commit8ad75fbb0ae265fb317bb1640070daa6d0adf4e7 (patch)
treea0211d3b320f50f81a5629f31781c07bf09f1a18
parent9bfdd53b6b9beca5f88500c8dd12d031d0fb6bc9 (diff)
parente3316425eec8384c2fdaf22dce920a51fcee2b2d (diff)
Merge branch 'jc-fix-chunker' into 'master'
Modify chunker to send message based on size Closes #2696 See merge request gitlab-org/gitaly!2096
-rw-r--r--changelogs/unreleased/jc-fix-chunker.yml5
-rw-r--r--internal/helper/chunk/chunker.go37
-rw-r--r--internal/helper/chunk/chunker_test.go96
-rw-r--r--internal/helper/chunk/pb/test.pb.go231
-rw-r--r--internal/helper/chunk/pb/test.proto14
-rw-r--r--internal/service/cleanup/apply_bfg_object_map_stream.go5
-rw-r--r--internal/service/commit/between.go6
-rw-r--r--internal/service/commit/commits_by_message.go6
-rw-r--r--internal/service/commit/find_all_commits.go6
-rw-r--r--internal/service/commit/find_commits.go11
-rw-r--r--internal/service/commit/list_commits_by_oid.go5
-rw-r--r--internal/service/commit/list_commits_by_ref_name.go5
-rw-r--r--internal/service/commit/list_files.go7
-rw-r--r--internal/service/commit/tree_entries.go5
-rw-r--r--internal/service/ref/refnames.go12
-rw-r--r--internal/service/ref/refnames_containing.go14
-rw-r--r--internal/service/ref/refs.go5
-rw-r--r--internal/service/remote/remotes.go5
-rw-r--r--internal/service/repository/raw_changes.go5
19 files changed, 422 insertions, 58 deletions
diff --git a/changelogs/unreleased/jc-fix-chunker.yml b/changelogs/unreleased/jc-fix-chunker.yml
new file mode 100644
index 000000000..24fc921a5
--- /dev/null
+++ b/changelogs/unreleased/jc-fix-chunker.yml
@@ -0,0 +1,5 @@
+---
+title: Modify chunker to send message based on size
+merge_request: 2096
+author:
+type: changed
diff --git a/internal/helper/chunk/chunker.go b/internal/helper/chunk/chunker.go
index 48d99f03f..b02bc2a88 100644
--- a/internal/helper/chunk/chunker.go
+++ b/internal/helper/chunk/chunker.go
@@ -1,8 +1,8 @@
package chunk
-// Item could be e.g. a commit in an RPC that returns a chunked stream of
-// commits.
-type Item interface{}
+import (
+ "github.com/golang/protobuf/proto"
+)
// Sender encapsulates a gRPC response stream and the current chunk
// that's being built.
@@ -12,7 +12,7 @@ type Sender interface {
// Reset should create a fresh response message.
Reset()
// Append should append the given item to the slice in the current response message
- Append(Item)
+ Append(proto.Message)
// Send should send the current response message
Send() error
}
@@ -23,35 +23,42 @@ func New(s Sender) *Chunker { return &Chunker{s: s} }
// Chunker lets you spread items you want to send over multiple chunks.
// This type is not thread-safe.
type Chunker struct {
- s Sender
- n int
+ s Sender
+ size int
}
+// maxMessageSize is maximum size per protobuf message
+const maxMessageSize = 1 * 1024 * 1024
+
// Send will append an item to the current chunk and send the chunk if it is full.
-func (c *Chunker) Send(it Item) error {
- if c.n == 0 {
+func (c *Chunker) Send(it proto.Message) error {
+ if c.size == 0 {
c.s.Reset()
}
- c.s.Append(it)
- c.n++
+ itSize := proto.Size(it)
- const chunkSize = 20
- if c.n >= chunkSize {
- return c.sendResponseMsg()
+ if itSize+c.size >= maxMessageSize {
+ if err := c.sendResponseMsg(); err != nil {
+ return err
+ }
+ c.s.Reset()
}
+ c.s.Append(it)
+ c.size += itSize
+
return nil
}
func (c *Chunker) sendResponseMsg() error {
- c.n = 0
+ c.size = 0
return c.s.Send()
}
// Flush sends remaining items in the current chunk, if any.
func (c *Chunker) Flush() error {
- if c.n == 0 {
+ if c.size == 0 {
return nil
}
diff --git a/internal/helper/chunk/chunker_test.go b/internal/helper/chunk/chunker_test.go
new file mode 100644
index 000000000..0dd22e1b5
--- /dev/null
+++ b/internal/helper/chunk/chunker_test.go
@@ -0,0 +1,96 @@
+package chunk
+
+import (
+ "io"
+ "net"
+ "testing"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/wrappers"
+ "github.com/stretchr/testify/require"
+ test "gitlab.com/gitlab-org/gitaly/internal/helper/chunk/pb"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "google.golang.org/grpc"
+)
+
+type testSender struct {
+ stream test.Test_StreamOutputServer
+ output [][]byte
+}
+
+func (ts *testSender) Reset() { ts.output = nil }
+func (ts *testSender) Append(m proto.Message) {
+ ts.output = append(ts.output, m.(*wrappers.BytesValue).Value)
+}
+
+func (ts *testSender) Send() error {
+ return ts.stream.Send(&test.StreamOutputResponse{
+ Msg: ts.output,
+ })
+}
+
+func TestChunker(t *testing.T) {
+ s := &server{}
+ srv, serverSocketPath := runServer(t, s)
+ defer srv.Stop()
+
+ client, conn := newClient(t, serverSocketPath)
+ defer conn.Close()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ stream, err := client.StreamOutput(ctx, &test.StreamOutputRequest{BytesToReturn: 3.5 * maxMessageSize})
+ require.NoError(t, err)
+
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.Less(t, proto.Size(resp), maxMessageSize)
+ }
+}
+
+type server struct{}
+
+func (s *server) StreamOutput(req *test.StreamOutputRequest, srv test.Test_StreamOutputServer) error {
+ const kilobyte = 1024
+
+ c := New(&testSender{stream: srv})
+ for numBytes := 0; numBytes < int(req.GetBytesToReturn()); numBytes += kilobyte {
+ if err := c.Send(&wrappers.BytesValue{Value: make([]byte, kilobyte)}); err != nil {
+ return err
+ }
+ }
+
+ if err := c.Flush(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) {
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+ grpcServer := grpc.NewServer(opt...)
+ test.RegisterTestServer(grpcServer, s)
+
+ lis, err := net.Listen("unix", serverSocketPath)
+ require.NoError(t, err)
+
+ go grpcServer.Serve(lis)
+
+ return grpcServer, "unix://" + serverSocketPath
+}
+
+func newClient(t *testing.T, serverSocketPath string) (test.TestClient, *grpc.ClientConn) {
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ }
+ conn, err := grpc.Dial(serverSocketPath, connOpts...)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return test.NewTestClient(conn), conn
+}
diff --git a/internal/helper/chunk/pb/test.pb.go b/internal/helper/chunk/pb/test.pb.go
new file mode 100644
index 000000000..523d1237e
--- /dev/null
+++ b/internal/helper/chunk/pb/test.pb.go
@@ -0,0 +1,231 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: test.proto
+
+package test
+
+import (
+ context "context"
+ fmt "fmt"
+ proto "github.com/golang/protobuf/proto"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+ math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type StreamOutputRequest struct {
+ BytesToReturn int32 `protobuf:"varint,1,opt,name=bytes_to_return,json=bytesToReturn,proto3" json:"bytes_to_return,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StreamOutputRequest) Reset() { *m = StreamOutputRequest{} }
+func (m *StreamOutputRequest) String() string { return proto.CompactTextString(m) }
+func (*StreamOutputRequest) ProtoMessage() {}
+func (*StreamOutputRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_c161fcfdc0c3ff1e, []int{0}
+}
+
+func (m *StreamOutputRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StreamOutputRequest.Unmarshal(m, b)
+}
+func (m *StreamOutputRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StreamOutputRequest.Marshal(b, m, deterministic)
+}
+func (m *StreamOutputRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StreamOutputRequest.Merge(m, src)
+}
+func (m *StreamOutputRequest) XXX_Size() int {
+ return xxx_messageInfo_StreamOutputRequest.Size(m)
+}
+func (m *StreamOutputRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_StreamOutputRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StreamOutputRequest proto.InternalMessageInfo
+
+func (m *StreamOutputRequest) GetBytesToReturn() int32 {
+ if m != nil {
+ return m.BytesToReturn
+ }
+ return 0
+}
+
+type StreamOutputResponse struct {
+ Msg [][]byte `protobuf:"bytes,1,rep,name=msg,proto3" json:"msg,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StreamOutputResponse) Reset() { *m = StreamOutputResponse{} }
+func (m *StreamOutputResponse) String() string { return proto.CompactTextString(m) }
+func (*StreamOutputResponse) ProtoMessage() {}
+func (*StreamOutputResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_c161fcfdc0c3ff1e, []int{1}
+}
+
+func (m *StreamOutputResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StreamOutputResponse.Unmarshal(m, b)
+}
+func (m *StreamOutputResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StreamOutputResponse.Marshal(b, m, deterministic)
+}
+func (m *StreamOutputResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StreamOutputResponse.Merge(m, src)
+}
+func (m *StreamOutputResponse) XXX_Size() int {
+ return xxx_messageInfo_StreamOutputResponse.Size(m)
+}
+func (m *StreamOutputResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_StreamOutputResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StreamOutputResponse proto.InternalMessageInfo
+
+func (m *StreamOutputResponse) GetMsg() [][]byte {
+ if m != nil {
+ return m.Msg
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*StreamOutputRequest)(nil), "test.StreamOutputRequest")
+ proto.RegisterType((*StreamOutputResponse)(nil), "test.StreamOutputResponse")
+}
+
+func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) }
+
+var fileDescriptor_c161fcfdc0c3ff1e = []byte{
+ // 160 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
+ 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0x95, 0x6c, 0xb9, 0x84, 0x83, 0x4b,
+ 0x8a, 0x52, 0x13, 0x73, 0xfd, 0x4b, 0x4b, 0x0a, 0x4a, 0x4b, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b,
+ 0x4b, 0x84, 0xd4, 0xb8, 0xf8, 0x93, 0x2a, 0x4b, 0x52, 0x8b, 0xe3, 0x4b, 0xf2, 0xe3, 0x8b, 0x52,
+ 0x4b, 0x4a, 0x8b, 0xf2, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x83, 0x78, 0xc1, 0xc2, 0x21, 0xf9,
+ 0x41, 0x60, 0x41, 0x25, 0x0d, 0x2e, 0x11, 0x54, 0xed, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, 0x42,
+ 0x02, 0x5c, 0xcc, 0xb9, 0xc5, 0xe9, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0x3c, 0x41, 0x20, 0xa6, 0x51,
+ 0x20, 0x17, 0x4b, 0x08, 0xc8, 0x64, 0x4f, 0x2e, 0x1e, 0x64, 0x1d, 0x42, 0x92, 0x7a, 0x60, 0x37,
+ 0x61, 0x71, 0x84, 0x94, 0x14, 0x36, 0x29, 0x88, 0x05, 0x4a, 0x0c, 0x06, 0x8c, 0x49, 0x6c, 0x60,
+ 0x8f, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xae, 0x1b, 0x22, 0xff, 0xd6, 0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// TestClient is the client API for Test service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type TestClient interface {
+ StreamOutput(ctx context.Context, in *StreamOutputRequest, opts ...grpc.CallOption) (Test_StreamOutputClient, error)
+}
+
+type testClient struct {
+ cc *grpc.ClientConn
+}
+
+func NewTestClient(cc *grpc.ClientConn) TestClient {
+ return &testClient{cc}
+}
+
+func (c *testClient) StreamOutput(ctx context.Context, in *StreamOutputRequest, opts ...grpc.CallOption) (Test_StreamOutputClient, error) {
+ stream, err := c.cc.NewStream(ctx, &_Test_serviceDesc.Streams[0], "/test.Test/StreamOutput", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &testStreamOutputClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Test_StreamOutputClient interface {
+ Recv() (*StreamOutputResponse, error)
+ grpc.ClientStream
+}
+
+type testStreamOutputClient struct {
+ grpc.ClientStream
+}
+
+func (x *testStreamOutputClient) Recv() (*StreamOutputResponse, error) {
+ m := new(StreamOutputResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+// TestServer is the server API for Test service.
+type TestServer interface {
+ StreamOutput(*StreamOutputRequest, Test_StreamOutputServer) error
+}
+
+// UnimplementedTestServer can be embedded to have forward compatible implementations.
+type UnimplementedTestServer struct {
+}
+
+func (*UnimplementedTestServer) StreamOutput(req *StreamOutputRequest, srv Test_StreamOutputServer) error {
+ return status.Errorf(codes.Unimplemented, "method StreamOutput not implemented")
+}
+
+func RegisterTestServer(s *grpc.Server, srv TestServer) {
+ s.RegisterService(&_Test_serviceDesc, srv)
+}
+
+func _Test_StreamOutput_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(StreamOutputRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(TestServer).StreamOutput(m, &testStreamOutputServer{stream})
+}
+
+type Test_StreamOutputServer interface {
+ Send(*StreamOutputResponse) error
+ grpc.ServerStream
+}
+
+type testStreamOutputServer struct {
+ grpc.ServerStream
+}
+
+func (x *testStreamOutputServer) Send(m *StreamOutputResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+var _Test_serviceDesc = grpc.ServiceDesc{
+ ServiceName: "test.Test",
+ HandlerType: (*TestServer)(nil),
+ Methods: []grpc.MethodDesc{},
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "StreamOutput",
+ Handler: _Test_StreamOutput_Handler,
+ ServerStreams: true,
+ },
+ },
+ Metadata: "test.proto",
+}
diff --git a/internal/helper/chunk/pb/test.proto b/internal/helper/chunk/pb/test.proto
new file mode 100644
index 000000000..d5e419805
--- /dev/null
+++ b/internal/helper/chunk/pb/test.proto
@@ -0,0 +1,14 @@
+syntax = "proto3";
+
+package test;
+
+service Test {
+ rpc StreamOutput(StreamOutputRequest) returns (stream StreamOutputResponse) {}
+}
+
+message StreamOutputRequest {
+ int32 bytes_to_return = 1;
+}
+message StreamOutputResponse {
+ repeated bytes msg = 1;
+}
diff --git a/internal/service/cleanup/apply_bfg_object_map_stream.go b/internal/service/cleanup/apply_bfg_object_map_stream.go
index e2276a434..204516fe3 100644
--- a/internal/service/cleanup/apply_bfg_object_map_stream.go
+++ b/internal/service/cleanup/apply_bfg_object_map_stream.go
@@ -4,6 +4,7 @@ import (
"fmt"
"io"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/internal/service/cleanup/internalrefs"
@@ -89,10 +90,10 @@ func (r *bfgStreamReader) streamReader() io.Reader {
return streamio.NewReader(r.readOne)
}
-func (w *bfgStreamWriter) Append(it chunk.Item) {
+func (w *bfgStreamWriter) Append(m proto.Message) {
w.entries = append(
w.entries,
- it.(*gitalypb.ApplyBfgObjectMapStreamResponse_Entry),
+ m.(*gitalypb.ApplyBfgObjectMapStreamResponse_Entry),
)
}
diff --git a/internal/service/commit/between.go b/internal/service/commit/between.go
index 9eee9f522..6f6684881 100644
--- a/internal/service/commit/between.go
+++ b/internal/service/commit/between.go
@@ -3,9 +3,9 @@ package commit
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -15,8 +15,8 @@ type commitsBetweenSender struct {
}
func (sender *commitsBetweenSender) Reset() { sender.commits = nil }
-func (sender *commitsBetweenSender) Append(it chunk.Item) {
- sender.commits = append(sender.commits, it.(*gitalypb.GitCommit))
+func (sender *commitsBetweenSender) Append(m proto.Message) {
+ sender.commits = append(sender.commits, m.(*gitalypb.GitCommit))
}
func (sender *commitsBetweenSender) Send() error {
diff --git a/internal/service/commit/commits_by_message.go b/internal/service/commit/commits_by_message.go
index 2f95a06ef..861edbf30 100644
--- a/internal/service/commit/commits_by_message.go
+++ b/internal/service/commit/commits_by_message.go
@@ -3,9 +3,9 @@ package commit
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -15,8 +15,8 @@ type commitsByMessageSender struct {
}
func (sender *commitsByMessageSender) Reset() { sender.commits = nil }
-func (sender *commitsByMessageSender) Append(it chunk.Item) {
- sender.commits = append(sender.commits, it.(*gitalypb.GitCommit))
+func (sender *commitsByMessageSender) Append(m proto.Message) {
+ sender.commits = append(sender.commits, m.(*gitalypb.GitCommit))
}
func (sender *commitsByMessageSender) Send() error {
diff --git a/internal/service/commit/find_all_commits.go b/internal/service/commit/find_all_commits.go
index 09bc7d9b6..158d7817d 100644
--- a/internal/service/commit/find_all_commits.go
+++ b/internal/service/commit/find_all_commits.go
@@ -3,9 +3,9 @@ package commit
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/internal/service/ref"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -19,8 +19,8 @@ type findAllCommitsSender struct {
}
func (sender *findAllCommitsSender) Reset() { sender.commits = nil }
-func (sender *findAllCommitsSender) Append(it chunk.Item) {
- sender.commits = append(sender.commits, it.(*gitalypb.GitCommit))
+func (sender *findAllCommitsSender) Append(m proto.Message) {
+ sender.commits = append(sender.commits, m.(*gitalypb.GitCommit))
}
func (sender *findAllCommitsSender) Send() error {
diff --git a/internal/service/commit/find_commits.go b/internal/service/commit/find_commits.go
index 8b852734b..a35b25719 100644
--- a/internal/service/commit/find_commits.go
+++ b/internal/service/commit/find_commits.go
@@ -7,6 +7,7 @@ import (
"fmt"
"strings"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/command"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
@@ -16,8 +17,6 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-const commitsPerPage int = 20
-
func (s *server) FindCommits(req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) error {
ctx := stream.Context()
@@ -65,7 +64,7 @@ func findCommits(ctx context.Context, req *gitalypb.FindCommitsRequest, stream g
getCommits.Offset(int(req.GetOffset()))
}
- if err := streamPaginatedCommits(getCommits, commitsPerPage, stream); err != nil {
+ if err := streamCommits(getCommits, stream); err != nil {
return fmt.Errorf("error streaming commits: %v", err)
}
return nil
@@ -125,15 +124,15 @@ type findCommitsSender struct {
}
func (s *findCommitsSender) Reset() { s.commits = nil }
-func (s *findCommitsSender) Append(it chunk.Item) {
- s.commits = append(s.commits, it.(*gitalypb.GitCommit))
+func (s *findCommitsSender) Append(m proto.Message) {
+ s.commits = append(s.commits, m.(*gitalypb.GitCommit))
}
func (s *findCommitsSender) Send() error {
return s.stream.Send(&gitalypb.FindCommitsResponse{Commits: s.commits})
}
-func streamPaginatedCommits(getCommits *GetCommits, commitsPerPage int, stream gitalypb.CommitService_FindCommitsServer) error {
+func streamCommits(getCommits *GetCommits, stream gitalypb.CommitService_FindCommitsServer) error {
chunker := chunk.New(&findCommitsSender{stream: stream})
for getCommits.Scan() {
diff --git a/internal/service/commit/list_commits_by_oid.go b/internal/service/commit/list_commits_by_oid.go
index 9d024566d..2dc467496 100644
--- a/internal/service/commit/list_commits_by_oid.go
+++ b/internal/service/commit/list_commits_by_oid.go
@@ -1,6 +1,7 @@
package commit
import (
+ "github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log"
@@ -58,8 +59,8 @@ type commitsByOidSender struct {
stream gitalypb.CommitService_ListCommitsByOidServer
}
-func (c *commitsByOidSender) Append(it chunk.Item) {
- c.response.Commits = append(c.response.Commits, it.(*gitalypb.GitCommit))
+func (c *commitsByOidSender) Append(m proto.Message) {
+ c.response.Commits = append(c.response.Commits, m.(*gitalypb.GitCommit))
}
func (c *commitsByOidSender) Send() error { return c.stream.Send(c.response) }
diff --git a/internal/service/commit/list_commits_by_ref_name.go b/internal/service/commit/list_commits_by_ref_name.go
index baf281214..25ba21fc0 100644
--- a/internal/service/commit/list_commits_by_ref_name.go
+++ b/internal/service/commit/list_commits_by_ref_name.go
@@ -1,6 +1,7 @@
package commit
import (
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log"
"gitlab.com/gitlab-org/gitaly/internal/helper"
@@ -40,8 +41,8 @@ type commitsByRefNameSender struct {
stream gitalypb.CommitService_ListCommitsByRefNameServer
}
-func (c *commitsByRefNameSender) Append(it chunk.Item) {
- c.response.Commits = append(c.response.Commits, it.(*gitalypb.GitCommit))
+func (c *commitsByRefNameSender) Append(m proto.Message) {
+ c.response.Commits = append(c.response.Commits, m.(*gitalypb.GitCommit))
}
func (c *commitsByRefNameSender) Send() error { return c.stream.Send(c.response) }
diff --git a/internal/service/commit/list_files.go b/internal/service/commit/list_files.go
index 441cae9a5..622fd8885 100644
--- a/internal/service/commit/list_files.go
+++ b/internal/service/commit/list_files.go
@@ -4,6 +4,7 @@ import (
"fmt"
"io"
+ "github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/git"
@@ -80,7 +81,7 @@ func listFiles(repo *gitalypb.Repository, revision string, stream gitalypb.Commi
continue
}
- if err := sender.Send([]byte(entry.Path)); err != nil {
+ if err := sender.Send(&gitalypb.ListFilesResponse{Paths: [][]byte{[]byte(entry.Path)}}); err != nil {
return err
}
}
@@ -95,6 +96,6 @@ type listFilesSender struct {
func (s *listFilesSender) Reset() { s.response = &gitalypb.ListFilesResponse{} }
func (s *listFilesSender) Send() error { return s.stream.Send(s.response) }
-func (s *listFilesSender) Append(it chunk.Item) {
- s.response.Paths = append(s.response.Paths, it.([]byte))
+func (s *listFilesSender) Append(m proto.Message) {
+ s.response.Paths = append(s.response.Paths, m.(*gitalypb.ListFilesResponse).Paths...)
}
diff --git a/internal/service/commit/tree_entries.go b/internal/service/commit/tree_entries.go
index 70c386271..e1238c9e3 100644
--- a/internal/service/commit/tree_entries.go
+++ b/internal/service/commit/tree_entries.go
@@ -3,6 +3,7 @@ package commit
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/git"
@@ -78,8 +79,8 @@ type treeEntriesSender struct {
stream gitalypb.CommitService_GetTreeEntriesServer
}
-func (c *treeEntriesSender) Append(it chunk.Item) {
- c.response.Entries = append(c.response.Entries, it.(*gitalypb.TreeEntry))
+func (c *treeEntriesSender) Append(m proto.Message) {
+ c.response.Entries = append(c.response.Entries, m.(*gitalypb.TreeEntry))
}
func (c *treeEntriesSender) Send() error { return c.stream.Send(c.response) }
diff --git a/internal/service/ref/refnames.go b/internal/service/ref/refnames.go
index 60348e3f8..8a3ece87b 100644
--- a/internal/service/ref/refnames.go
+++ b/internal/service/ref/refnames.go
@@ -4,6 +4,8 @@ import (
"bufio"
"context"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/wrappers"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -22,8 +24,8 @@ type findAllBranchNamesSender struct {
}
func (ts *findAllBranchNamesSender) Reset() { ts.branchNames = nil }
-func (ts *findAllBranchNamesSender) Append(it chunk.Item) {
- ts.branchNames = append(ts.branchNames, []byte(it.(string)))
+func (ts *findAllBranchNamesSender) Append(m proto.Message) {
+ ts.branchNames = append(ts.branchNames, []byte(m.(*wrappers.StringValue).Value))
}
func (ts *findAllBranchNamesSender) Send() error {
@@ -43,8 +45,8 @@ type findAllTagNamesSender struct {
}
func (ts *findAllTagNamesSender) Reset() { ts.tagNames = nil }
-func (ts *findAllTagNamesSender) Append(it chunk.Item) {
- ts.tagNames = append(ts.tagNames, []byte(it.(string)))
+func (ts *findAllTagNamesSender) Append(m proto.Message) {
+ ts.tagNames = append(ts.tagNames, []byte(m.(*wrappers.StringValue).Value))
}
func (ts *findAllTagNamesSender) Send() error {
@@ -74,7 +76,7 @@ func listRefNames(ctx context.Context, chunker *chunk.Chunker, prefix string, re
// Important: don't use scanner.Bytes() because the slice will become
// invalid on the next loop iteration. Instead, use scanner.Text() to
// force a copy.
- if err := chunker.Send(scanner.Text()); err != nil {
+ if err := chunker.Send(&wrappers.StringValue{Value: scanner.Text()}); err != nil {
return err
}
}
diff --git a/internal/service/ref/refnames_containing.go b/internal/service/ref/refnames_containing.go
index 39a21997a..43b91f1e5 100644
--- a/internal/service/ref/refnames_containing.go
+++ b/internal/service/ref/refnames_containing.go
@@ -4,6 +4,8 @@ import (
"fmt"
"strings"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/wrappers"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
@@ -45,8 +47,8 @@ type branchNamesContainingCommitSender struct {
}
func (bs *branchNamesContainingCommitSender) Reset() { bs.branchNames = nil }
-func (bs *branchNamesContainingCommitSender) Append(it chunk.Item) {
- bs.branchNames = append(bs.branchNames, stripPrefix(it, "refs/heads/"))
+func (bs *branchNamesContainingCommitSender) Append(m proto.Message) {
+ bs.branchNames = append(bs.branchNames, stripPrefix(m.(*wrappers.StringValue).Value, "refs/heads/"))
}
func (bs *branchNamesContainingCommitSender) Send() error {
@@ -75,14 +77,14 @@ type tagNamesContainingCommitSender struct {
}
func (ts *tagNamesContainingCommitSender) Reset() { ts.tagNames = nil }
-func (ts *tagNamesContainingCommitSender) Append(it chunk.Item) {
- ts.tagNames = append(ts.tagNames, stripPrefix(it, "refs/tags/"))
+func (ts *tagNamesContainingCommitSender) Append(m proto.Message) {
+ ts.tagNames = append(ts.tagNames, stripPrefix(m.(*wrappers.StringValue).Value, "refs/tags/"))
}
func (ts *tagNamesContainingCommitSender) Send() error {
return ts.stream.Send(&gitalypb.ListTagNamesContainingCommitResponse{TagNames: ts.tagNames})
}
-func stripPrefix(it chunk.Item, prefix string) []byte {
- return []byte(strings.TrimPrefix(it.(string), prefix))
+func stripPrefix(s string, prefix string) []byte {
+ return []byte(strings.TrimPrefix(s, prefix))
}
diff --git a/internal/service/ref/refs.go b/internal/service/ref/refs.go
index f7de0d344..0618a7ac3 100644
--- a/internal/service/ref/refs.go
+++ b/internal/service/ref/refs.go
@@ -8,6 +8,7 @@ import (
"fmt"
"strings"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log"
@@ -69,8 +70,8 @@ func (t *tagSender) Reset() {
t.tags = nil
}
-func (t *tagSender) Append(i chunk.Item) {
- t.tags = append(t.tags, i.(*gitalypb.Tag))
+func (t *tagSender) Append(m proto.Message) {
+ t.tags = append(t.tags, m.(*gitalypb.Tag))
}
func (t *tagSender) Send() error {
diff --git a/internal/service/remote/remotes.go b/internal/service/remote/remotes.go
index 7e18fda41..ba659b741 100644
--- a/internal/service/remote/remotes.go
+++ b/internal/service/remote/remotes.go
@@ -8,6 +8,7 @@ import (
"io/ioutil"
"strings"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/remote"
"gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
@@ -178,8 +179,8 @@ type listRemotesSender struct {
remotes []*gitalypb.ListRemotesResponse_Remote
}
-func (l *listRemotesSender) Append(it chunk.Item) {
- l.remotes = append(l.remotes, it.(*gitalypb.ListRemotesResponse_Remote))
+func (l *listRemotesSender) Append(m proto.Message) {
+ l.remotes = append(l.remotes, m.(*gitalypb.ListRemotesResponse_Remote))
}
func (l *listRemotesSender) Send() error {
diff --git a/internal/service/repository/raw_changes.go b/internal/service/repository/raw_changes.go
index 3c179b90e..5ff3eec93 100644
--- a/internal/service/repository/raw_changes.go
+++ b/internal/service/repository/raw_changes.go
@@ -7,6 +7,7 @@ import (
"strconv"
"unicode/utf8"
+ "github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/internal/git/rawdiff"
@@ -104,8 +105,8 @@ type rawChangesSender struct {
}
func (s *rawChangesSender) Reset() { s.changes = nil }
-func (s *rawChangesSender) Append(it chunk.Item) {
- s.changes = append(s.changes, it.(*gitalypb.GetRawChangesResponse_RawChange))
+func (s *rawChangesSender) Append(m proto.Message) {
+ s.changes = append(s.changes, m.(*gitalypb.GetRawChangesResponse_RawChange))
}
func (s *rawChangesSender) Send() error {