diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-09-23 15:17:49 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-12 12:02:54 +0300 |
commit | 810e6a261dde1e19e945042d6679e4b3a3d3cfc2 (patch) | |
tree | 55682c2a7df29c94352ae6a987c7cfc1a29c22ec | |
parent | 0d2ea9ff5c3651dd152a258ecae5d9238457cc6a (diff) |
transactions: Expose RPC to gracefully stop transactions
This commit adds the ability to stop transactions via the RefTransaction
service. In contrast to state cancellation, this is considered to be a
graceful stop of transactions: nodes which cast votes on stopped
transactions are not supposed to treat it as an error if the transaction
has been stopped, but should instead just terminate it and stop
proceeding. To tell apart "real" errors from a stopped transaction, this
commit thus adds a new "STOP" state which tells the client to stop
processing it.
-rw-r--r-- | changelogs/unreleased/pks-transaction-stop.yml | 5 | ||||
-rw-r--r-- | internal/gitaly/service/hook/reference_transaction_test.go | 1 | ||||
-rw-r--r-- | internal/gitaly/service/operations/branches_test.go | 1 | ||||
-rw-r--r-- | internal/gitaly/service/smarthttp/receive_pack_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/service/transaction/server.go | 25 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 213 | ||||
-rw-r--r-- | proto/go/gitalypb/transaction.pb.go | 169 | ||||
-rw-r--r-- | proto/transaction.proto | 16 | ||||
-rw-r--r-- | ruby/proto/gitaly/transaction_pb.rb | 9 | ||||
-rw-r--r-- | ruby/proto/gitaly/transaction_services_pb.rb | 1 |
10 files changed, 417 insertions, 24 deletions
diff --git a/changelogs/unreleased/pks-transaction-stop.yml b/changelogs/unreleased/pks-transaction-stop.yml new file mode 100644 index 000000000..fa4dba282 --- /dev/null +++ b/changelogs/unreleased/pks-transaction-stop.yml @@ -0,0 +1,5 @@ +--- +title: 'transactions: Implement RPC to gracefully stop transactions' +merge_request: 2532 +author: +type: added diff --git a/internal/gitaly/service/hook/reference_transaction_test.go b/internal/gitaly/service/hook/reference_transaction_test.go index 4afef9783..67ce79d8d 100644 --- a/internal/gitaly/service/hook/reference_transaction_test.go +++ b/internal/gitaly/service/hook/reference_transaction_test.go @@ -17,6 +17,7 @@ import ( ) type testTransactionServer struct { + gitalypb.UnimplementedRefTransactionServer handler func(in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) } diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index bc5e57eed..e07ae5ec3 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -23,6 +23,7 @@ import ( ) type testTransactionServer struct { + gitalypb.UnimplementedRefTransactionServer called int } diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 40bbc6b8c..eb3dbbfee 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -548,6 +548,7 @@ func TestPostReceiveWithTransactionsViaPraefect(t *testing.T) { } type testTransactionServer struct { + gitalypb.UnimplementedRefTransactionServer called int } diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go index 96fd12efd..0a3edaea0 100644 --- a/internal/praefect/service/transaction/server.go +++ b/internal/praefect/service/transaction/server.go @@ -31,6 +31,10 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti return nil, helper.ErrNotFound(err) case errors.Is(err, transactions.ErrTransactionCanceled): return nil, helper.DecorateError(codes.Canceled, err) + case errors.Is(err, transactions.ErrTransactionStopped): + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + }, nil case errors.Is(err, transactions.ErrTransactionVoteFailed): return &gitalypb.VoteTransactionResponse{ State: gitalypb.VoteTransactionResponse_ABORT, @@ -44,3 +48,24 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti State: gitalypb.VoteTransactionResponse_COMMIT, }, nil } + +// StopTransaction is called by a client who wants to gracefully stop a +// transaction. All voters waiting for quorum will be stopped and new votes +// will not get accepted anymore. It is fine to call this RPC multiple times on +// the same transaction. +func (s *Server) StopTransaction(ctx context.Context, in *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + if err := s.txMgr.StopTransaction(ctx, in.TransactionId); err != nil { + switch { + case errors.Is(err, transactions.ErrNotFound): + return nil, helper.ErrNotFound(err) + case errors.Is(err, transactions.ErrTransactionCanceled): + return nil, helper.DecorateError(codes.Canceled, err) + case errors.Is(err, transactions.ErrTransactionStopped): + return &gitalypb.StopTransactionResponse{}, nil + default: + return nil, helper.ErrInternal(err) + } + } + + return &gitalypb.StopTransactionResponse{}, nil +} diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 71f5e3bc0..a1a1920b5 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -38,7 +38,7 @@ func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Ma } type counterMetrics struct { - registered, started, invalid, committed int + registered, started, invalid, committed, stopped int } func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) { @@ -52,6 +52,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected {"started", expected.started}, {"invalid", expected.invalid}, {"committed", expected.committed}, + {"stopped", expected.stopped}, } var expectedMetric bytes.Buffer @@ -680,3 +681,213 @@ func TestTransactionCancellation(t *testing.T) { }) } } + +func TestStopTransaction(t *testing.T) { + hash := sha1.Sum([]byte("foo")) + + t.Run("stopping nonexisting transaction fails", func(t *testing.T) { + cc, _, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: 1234, + }) + require.Equal(t, codes.NotFound, status.Code(err)) + }) + + t.Run("stopping transaction multiple times succeeds", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + for i := 0; i < 5; i++ { + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + } + + verifyCounterMetrics(t, txMgr, counterMetrics{ + registered: 2, + stopped: 5, + }) + }) + + t.Run("stopping a single voter", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 1) + require.NoError(t, err) + defer cancelTransaction() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + + results := transaction.State() + require.Equal(t, results["voter"], false) + verifyCounterMetrics(t, txMgr, counterMetrics{ + registered: 1, + started: 1, + stopped: 2, + }) + }) + + t.Run("stopping in-progress transaction", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "successful-voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State) + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + response, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "failing-voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + + results := transaction.State() + require.True(t, results["successful-voter"], "Successful voter should succeed") + require.False(t, results["failing-voter"], "Failing voter should fail") + verifyCounterMetrics(t, txMgr, counterMetrics{ + committed: 1, + registered: 2, + started: 2, + stopped: 2, + }) + }) + + t.Run("stopping cancelled transaction fails", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + + cancelTransaction() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.Error(t, err) + require.Equal(t, codes.NotFound, status.Code(err)) + }) + + t.Run("stopping concurrent voter", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + // This create a single voter waiting for the threshold to be + // reached. As the second voter will never appear, the node + // will instead be stopped by the call to `StopTransaction` + // below. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + hash := sha1.Sum([]byte("hash")) + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "1", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + }() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + wg.Wait() + }) +} diff --git a/proto/go/gitalypb/transaction.pb.go b/proto/go/gitalypb/transaction.pb.go index 4048f2eba..6f70b1cfa 100644 --- a/proto/go/gitalypb/transaction.pb.go +++ b/proto/go/gitalypb/transaction.pb.go @@ -31,16 +31,19 @@ type VoteTransactionResponse_TransactionState int32 const ( VoteTransactionResponse_COMMIT VoteTransactionResponse_TransactionState = 0 VoteTransactionResponse_ABORT VoteTransactionResponse_TransactionState = 1 + VoteTransactionResponse_STOP VoteTransactionResponse_TransactionState = 2 ) var VoteTransactionResponse_TransactionState_name = map[int32]string{ 0: "COMMIT", 1: "ABORT", + 2: "STOP", } var VoteTransactionResponse_TransactionState_value = map[string]int32{ "COMMIT": 0, "ABORT": 1, + "STOP": 2, } func (x VoteTransactionResponse_TransactionState) String() string { @@ -156,38 +159,122 @@ func (m *VoteTransactionResponse) GetState() VoteTransactionResponse_Transaction return VoteTransactionResponse_COMMIT } +type StopTransactionRequest struct { + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + // ID of the transaction we're processing + TransactionId uint64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StopTransactionRequest) Reset() { *m = StopTransactionRequest{} } +func (m *StopTransactionRequest) String() string { return proto.CompactTextString(m) } +func (*StopTransactionRequest) ProtoMessage() {} +func (*StopTransactionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{2} +} + +func (m *StopTransactionRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StopTransactionRequest.Unmarshal(m, b) +} +func (m *StopTransactionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StopTransactionRequest.Marshal(b, m, deterministic) +} +func (m *StopTransactionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StopTransactionRequest.Merge(m, src) +} +func (m *StopTransactionRequest) XXX_Size() int { + return xxx_messageInfo_StopTransactionRequest.Size(m) +} +func (m *StopTransactionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StopTransactionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StopTransactionRequest proto.InternalMessageInfo + +func (m *StopTransactionRequest) GetRepository() *Repository { + if m != nil { + return m.Repository + } + return nil +} + +func (m *StopTransactionRequest) GetTransactionId() uint64 { + if m != nil { + return m.TransactionId + } + return 0 +} + +type StopTransactionResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StopTransactionResponse) Reset() { *m = StopTransactionResponse{} } +func (m *StopTransactionResponse) String() string { return proto.CompactTextString(m) } +func (*StopTransactionResponse) ProtoMessage() {} +func (*StopTransactionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{3} +} + +func (m *StopTransactionResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StopTransactionResponse.Unmarshal(m, b) +} +func (m *StopTransactionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StopTransactionResponse.Marshal(b, m, deterministic) +} +func (m *StopTransactionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StopTransactionResponse.Merge(m, src) +} +func (m *StopTransactionResponse) XXX_Size() int { + return xxx_messageInfo_StopTransactionResponse.Size(m) +} +func (m *StopTransactionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StopTransactionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StopTransactionResponse proto.InternalMessageInfo + func init() { proto.RegisterEnum("gitaly.VoteTransactionResponse_TransactionState", VoteTransactionResponse_TransactionState_name, VoteTransactionResponse_TransactionState_value) proto.RegisterType((*VoteTransactionRequest)(nil), "gitaly.VoteTransactionRequest") proto.RegisterType((*VoteTransactionResponse)(nil), "gitaly.VoteTransactionResponse") + proto.RegisterType((*StopTransactionRequest)(nil), "gitaly.StopTransactionRequest") + proto.RegisterType((*StopTransactionResponse)(nil), "gitaly.StopTransactionResponse") } func init() { proto.RegisterFile("transaction.proto", fileDescriptor_2cc4e03d2c28c490) } var fileDescriptor_2cc4e03d2c28c490 = []byte{ - // 339 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x51, 0xcd, 0x4e, 0xf2, 0x40, - 0x14, 0xfd, 0x86, 0xaf, 0x34, 0x72, 0x45, 0xc4, 0x89, 0xc1, 0x86, 0x85, 0x36, 0x4d, 0x4c, 0x6a, - 0xa2, 0x2d, 0x41, 0x17, 0x6e, 0xc5, 0xc4, 0xc8, 0x82, 0x90, 0x8c, 0xe8, 0x82, 0x0d, 0x19, 0xe8, - 0x85, 0x36, 0xa9, 0x9d, 0x3a, 0x33, 0x2c, 0x78, 0x07, 0xf7, 0xfa, 0x3e, 0x26, 0x3e, 0x94, 0x2b, - 0x43, 0xab, 0xd8, 0x60, 0x88, 0xbb, 0x7b, 0xcf, 0xcf, 0xcc, 0x39, 0x33, 0xb0, 0xa7, 0x25, 0x4f, - 0x14, 0x9f, 0xe8, 0x48, 0x24, 0x5e, 0x2a, 0x85, 0x16, 0xd4, 0x9c, 0x45, 0x9a, 0xc7, 0x8b, 0x26, - 0xc4, 0x51, 0xa2, 0x73, 0xac, 0x59, 0x55, 0x21, 0x97, 0x18, 0xe4, 0x9b, 0xf3, 0x46, 0xa0, 0xf1, - 0x20, 0x34, 0x0e, 0x7e, 0xbc, 0x0c, 0x9f, 0xe6, 0xa8, 0x34, 0xbd, 0x04, 0x90, 0x98, 0x0a, 0x15, - 0x69, 0x21, 0x17, 0x16, 0xb1, 0x89, 0xbb, 0xdd, 0xa6, 0x5e, 0x7e, 0xa2, 0xc7, 0x56, 0x4c, 0xc7, - 0x78, 0x7d, 0x3f, 0x25, 0xac, 0xa0, 0xa5, 0xc7, 0x50, 0x2b, 0x64, 0x19, 0x45, 0x81, 0x55, 0xb2, - 0x89, 0x6b, 0xb0, 0x9d, 0x02, 0xda, 0x0d, 0x28, 0x05, 0x23, 0x11, 0x01, 0x5a, 0xff, 0x6d, 0xe2, - 0x56, 0x58, 0x36, 0xd3, 0x0b, 0x68, 0x48, 0x9c, 0xa2, 0xc4, 0x64, 0x82, 0xa3, 0x79, 0x1a, 0x70, - 0x8d, 0x6a, 0x14, 0x72, 0x15, 0x5a, 0x86, 0x4d, 0xdc, 0x2a, 0xdb, 0x5f, 0xb1, 0xf7, 0x39, 0x79, - 0xcb, 0x55, 0xe8, 0x3c, 0x13, 0x38, 0xf8, 0xd5, 0x42, 0xa5, 0x22, 0x51, 0x48, 0x6f, 0xa0, 0xac, - 0x34, 0xd7, 0x98, 0x35, 0xa8, 0xb5, 0x5b, 0xdf, 0x0d, 0x36, 0xe8, 0xbd, 0x02, 0x76, 0xb7, 0xf4, - 0xb1, 0xdc, 0xee, 0x9c, 0x40, 0x7d, 0x9d, 0xa2, 0x00, 0xe6, 0x75, 0xbf, 0xd7, 0xeb, 0x0e, 0xea, - 0xff, 0x68, 0x05, 0xca, 0x57, 0x9d, 0x3e, 0x1b, 0xd4, 0x49, 0x3b, 0x86, 0x1a, 0xc3, 0x69, 0x41, - 0x4d, 0x87, 0xb0, 0xbb, 0x76, 0x1f, 0x3d, 0xdc, 0x18, 0x24, 0x7b, 0xfe, 0xe6, 0xd1, 0x1f, 0x41, - 0x1d, 0xf3, 0xe3, 0xc5, 0x2d, 0x6d, 0x91, 0x4e, 0x6b, 0xb8, 0x54, 0xc6, 0x7c, 0xec, 0x4d, 0xc4, - 0xa3, 0x9f, 0x8f, 0x67, 0x42, 0xce, 0xfc, 0xdc, 0xef, 0x67, 0x1f, 0xed, 0xcf, 0xc4, 0xd7, 0x9e, - 0x8e, 0xc7, 0x66, 0x06, 0x9d, 0x7f, 0x06, 0x00, 0x00, 0xff, 0xff, 0xbe, 0x43, 0x1d, 0x41, 0x32, - 0x02, 0x00, 0x00, + // 385 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x52, 0xc1, 0x6e, 0xda, 0x40, + 0x14, 0xec, 0x52, 0x63, 0xc1, 0x2b, 0xa5, 0xee, 0xaa, 0x02, 0x97, 0x43, 0x6b, 0x59, 0xaa, 0xe4, + 0x43, 0x6b, 0x23, 0xe8, 0xa1, 0xd7, 0x52, 0xa9, 0x2a, 0x07, 0x44, 0xb5, 0x38, 0x39, 0x70, 0x41, + 0x0b, 0x5e, 0xb0, 0x25, 0xe2, 0x75, 0x76, 0x97, 0x03, 0x3f, 0x92, 0xe4, 0x7f, 0x22, 0x45, 0xf9, + 0xa6, 0x9c, 0x22, 0xbc, 0x01, 0x2c, 0x08, 0xca, 0x31, 0xb7, 0x7d, 0x33, 0x6f, 0xc6, 0x6f, 0x46, + 0x86, 0x8f, 0x4a, 0xd0, 0x54, 0xd2, 0x99, 0x4a, 0x78, 0xea, 0x67, 0x82, 0x2b, 0x8e, 0xcd, 0x45, + 0xa2, 0xe8, 0x72, 0xdd, 0x82, 0x65, 0x92, 0x2a, 0x8d, 0xb5, 0x6a, 0x32, 0xa6, 0x82, 0x45, 0x7a, + 0x72, 0x6f, 0x11, 0x34, 0xce, 0xb9, 0x62, 0xe1, 0x5e, 0x4b, 0xd8, 0xe5, 0x8a, 0x49, 0x85, 0x7f, + 0x01, 0x08, 0x96, 0x71, 0x99, 0x28, 0x2e, 0xd6, 0x36, 0x72, 0x90, 0xf7, 0xae, 0x83, 0x7d, 0xed, + 0xe8, 0x93, 0x1d, 0xd3, 0x33, 0x6e, 0xee, 0xbe, 0x23, 0x52, 0xd8, 0xc5, 0xdf, 0xa0, 0x5e, 0xb8, + 0x65, 0x92, 0x44, 0x76, 0xc9, 0x41, 0x9e, 0x41, 0xde, 0x17, 0xd0, 0x7e, 0x84, 0x31, 0x18, 0x29, + 0x8f, 0x98, 0xfd, 0xd6, 0x41, 0x5e, 0x95, 0xe4, 0x6f, 0xfc, 0x13, 0x1a, 0x82, 0xcd, 0x99, 0x60, + 0xe9, 0x8c, 0x4d, 0x56, 0x59, 0x44, 0x15, 0x93, 0x93, 0x98, 0xca, 0xd8, 0x36, 0x1c, 0xe4, 0xd5, + 0xc8, 0xa7, 0x1d, 0x7b, 0xa6, 0xc9, 0x7f, 0x54, 0xc6, 0xee, 0x15, 0x82, 0xe6, 0x51, 0x0a, 0x99, + 0xf1, 0x54, 0x32, 0xfc, 0x17, 0xca, 0x52, 0x51, 0xc5, 0xf2, 0x04, 0xf5, 0x4e, 0x7b, 0x9b, 0xe0, + 0xc4, 0xbe, 0x5f, 0xc0, 0x46, 0x1b, 0x1d, 0xd1, 0x72, 0xb7, 0x0b, 0xd6, 0x21, 0x85, 0x01, 0xcc, + 0x3f, 0xc3, 0xc1, 0xa0, 0x1f, 0x5a, 0x6f, 0x70, 0x15, 0xca, 0xbf, 0x7b, 0x43, 0x12, 0x5a, 0x08, + 0x57, 0xc0, 0x18, 0x85, 0xc3, 0xff, 0x56, 0xc9, 0x5d, 0x43, 0x63, 0xa4, 0x78, 0xf6, 0x0a, 0xed, + 0xba, 0x9f, 0xa1, 0x79, 0xf4, 0x69, 0x1d, 0xb1, 0x73, 0x8f, 0xa0, 0x4e, 0xd8, 0xbc, 0x40, 0xe1, + 0x31, 0x7c, 0x38, 0x28, 0x04, 0x7f, 0x39, 0xd9, 0x54, 0x9e, 0xa0, 0xf5, 0xf5, 0x85, 0x26, 0x5d, + 0xf3, 0xe1, 0xda, 0x2b, 0x55, 0xd0, 0xc6, 0xfb, 0xe0, 0x92, 0xbd, 0xf7, 0xf3, 0xed, 0xec, 0xbd, + 0x4f, 0x44, 0xd8, 0x7a, 0xf7, 0xda, 0xe3, 0xcd, 0xe6, 0x92, 0x4e, 0xfd, 0x19, 0xbf, 0x08, 0xf4, + 0xf3, 0x07, 0x17, 0x8b, 0x40, 0xeb, 0x83, 0xfc, 0x2f, 0x0f, 0x16, 0xfc, 0x69, 0xce, 0xa6, 0x53, + 0x33, 0x87, 0xba, 0x8f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x19, 0xe6, 0x7d, 0x2f, 0x03, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -203,6 +290,7 @@ const _ = grpc.SupportPackageIsVersion4 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type RefTransactionClient interface { VoteTransaction(ctx context.Context, in *VoteTransactionRequest, opts ...grpc.CallOption) (*VoteTransactionResponse, error) + StopTransaction(ctx context.Context, in *StopTransactionRequest, opts ...grpc.CallOption) (*StopTransactionResponse, error) } type refTransactionClient struct { @@ -222,9 +310,19 @@ func (c *refTransactionClient) VoteTransaction(ctx context.Context, in *VoteTran return out, nil } +func (c *refTransactionClient) StopTransaction(ctx context.Context, in *StopTransactionRequest, opts ...grpc.CallOption) (*StopTransactionResponse, error) { + out := new(StopTransactionResponse) + err := c.cc.Invoke(ctx, "/gitaly.RefTransaction/StopTransaction", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // RefTransactionServer is the server API for RefTransaction service. type RefTransactionServer interface { VoteTransaction(context.Context, *VoteTransactionRequest) (*VoteTransactionResponse, error) + StopTransaction(context.Context, *StopTransactionRequest) (*StopTransactionResponse, error) } // UnimplementedRefTransactionServer can be embedded to have forward compatible implementations. @@ -234,6 +332,9 @@ type UnimplementedRefTransactionServer struct { func (*UnimplementedRefTransactionServer) VoteTransaction(ctx context.Context, req *VoteTransactionRequest) (*VoteTransactionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method VoteTransaction not implemented") } +func (*UnimplementedRefTransactionServer) StopTransaction(ctx context.Context, req *StopTransactionRequest) (*StopTransactionResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StopTransaction not implemented") +} func RegisterRefTransactionServer(s *grpc.Server, srv RefTransactionServer) { s.RegisterService(&_RefTransaction_serviceDesc, srv) @@ -257,6 +358,24 @@ func _RefTransaction_VoteTransaction_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _RefTransaction_StopTransaction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StopTransactionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RefTransactionServer).StopTransaction(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.RefTransaction/StopTransaction", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RefTransactionServer).StopTransaction(ctx, req.(*StopTransactionRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _RefTransaction_serviceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.RefTransaction", HandlerType: (*RefTransactionServer)(nil), @@ -265,6 +384,10 @@ var _RefTransaction_serviceDesc = grpc.ServiceDesc{ MethodName: "VoteTransaction", Handler: _RefTransaction_VoteTransaction_Handler, }, + { + MethodName: "StopTransaction", + Handler: _RefTransaction_StopTransaction_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "transaction.proto", diff --git a/proto/transaction.proto b/proto/transaction.proto index 1e7f1f5d7..fb11879c4 100644 --- a/proto/transaction.proto +++ b/proto/transaction.proto @@ -16,6 +16,13 @@ service RefTransaction { }; } + rpc StopTransaction (StopTransactionRequest) returns (StopTransactionResponse) { + option (op_type) = { + op: MUTATOR + scope_level: REPOSITORY + }; + } + } message VoteTransactionRequest { @@ -34,7 +41,16 @@ message VoteTransactionResponse { enum TransactionState { COMMIT = 0; ABORT = 1; + STOP = 2; } TransactionState state = 1; } + +message StopTransactionRequest { + Repository repository = 1[(target_repository)=true]; + // ID of the transaction we're processing + uint64 transaction_id = 2; +} + +message StopTransactionResponse {} diff --git a/ruby/proto/gitaly/transaction_pb.rb b/ruby/proto/gitaly/transaction_pb.rb index 77eff56bd..59c7f5626 100644 --- a/ruby/proto/gitaly/transaction_pb.rb +++ b/ruby/proto/gitaly/transaction_pb.rb @@ -19,6 +19,13 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_enum "gitaly.VoteTransactionResponse.TransactionState" do value :COMMIT, 0 value :ABORT, 1 + value :STOP, 2 + end + add_message "gitaly.StopTransactionRequest" do + optional :repository, :message, 1, "gitaly.Repository" + optional :transaction_id, :uint64, 2 + end + add_message "gitaly.StopTransactionResponse" do end end end @@ -27,4 +34,6 @@ module Gitaly VoteTransactionRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionRequest").msgclass VoteTransactionResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionResponse").msgclass VoteTransactionResponse::TransactionState = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionResponse.TransactionState").enummodule + StopTransactionRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionRequest").msgclass + StopTransactionResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionResponse").msgclass end diff --git a/ruby/proto/gitaly/transaction_services_pb.rb b/ruby/proto/gitaly/transaction_services_pb.rb index 8867d9076..174948246 100644 --- a/ruby/proto/gitaly/transaction_services_pb.rb +++ b/ruby/proto/gitaly/transaction_services_pb.rb @@ -15,6 +15,7 @@ module Gitaly self.service_name = 'gitaly.RefTransaction' rpc :VoteTransaction, Gitaly::VoteTransactionRequest, Gitaly::VoteTransactionResponse + rpc :StopTransaction, Gitaly::StopTransactionRequest, Gitaly::StopTransactionResponse end Stub = Service.rpc_stub_class |