Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-09-23 15:17:49 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-12 12:02:54 +0300
commit810e6a261dde1e19e945042d6679e4b3a3d3cfc2 (patch)
tree55682c2a7df29c94352ae6a987c7cfc1a29c22ec
parent0d2ea9ff5c3651dd152a258ecae5d9238457cc6a (diff)
transactions: Expose RPC to gracefully stop transactions
This commit adds the ability to stop transactions via the RefTransaction service. In contrast to state cancellation, this is considered to be a graceful stop of transactions: nodes which cast votes on stopped transactions are not supposed to treat it as an error if the transaction has been stopped, but should instead just terminate it and stop proceeding. To tell apart "real" errors from a stopped transaction, this commit thus adds a new "STOP" state which tells the client to stop processing it.
-rw-r--r--changelogs/unreleased/pks-transaction-stop.yml5
-rw-r--r--internal/gitaly/service/hook/reference_transaction_test.go1
-rw-r--r--internal/gitaly/service/operations/branches_test.go1
-rw-r--r--internal/gitaly/service/smarthttp/receive_pack_test.go1
-rw-r--r--internal/praefect/service/transaction/server.go25
-rw-r--r--internal/praefect/transaction_test.go213
-rw-r--r--proto/go/gitalypb/transaction.pb.go169
-rw-r--r--proto/transaction.proto16
-rw-r--r--ruby/proto/gitaly/transaction_pb.rb9
-rw-r--r--ruby/proto/gitaly/transaction_services_pb.rb1
10 files changed, 417 insertions, 24 deletions
diff --git a/changelogs/unreleased/pks-transaction-stop.yml b/changelogs/unreleased/pks-transaction-stop.yml
new file mode 100644
index 000000000..fa4dba282
--- /dev/null
+++ b/changelogs/unreleased/pks-transaction-stop.yml
@@ -0,0 +1,5 @@
+---
+title: 'transactions: Implement RPC to gracefully stop transactions'
+merge_request: 2532
+author:
+type: added
diff --git a/internal/gitaly/service/hook/reference_transaction_test.go b/internal/gitaly/service/hook/reference_transaction_test.go
index 4afef9783..67ce79d8d 100644
--- a/internal/gitaly/service/hook/reference_transaction_test.go
+++ b/internal/gitaly/service/hook/reference_transaction_test.go
@@ -17,6 +17,7 @@ import (
)
type testTransactionServer struct {
+ gitalypb.UnimplementedRefTransactionServer
handler func(in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
}
diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go
index bc5e57eed..e07ae5ec3 100644
--- a/internal/gitaly/service/operations/branches_test.go
+++ b/internal/gitaly/service/operations/branches_test.go
@@ -23,6 +23,7 @@ import (
)
type testTransactionServer struct {
+ gitalypb.UnimplementedRefTransactionServer
called int
}
diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go
index 40bbc6b8c..eb3dbbfee 100644
--- a/internal/gitaly/service/smarthttp/receive_pack_test.go
+++ b/internal/gitaly/service/smarthttp/receive_pack_test.go
@@ -548,6 +548,7 @@ func TestPostReceiveWithTransactionsViaPraefect(t *testing.T) {
}
type testTransactionServer struct {
+ gitalypb.UnimplementedRefTransactionServer
called int
}
diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go
index 96fd12efd..0a3edaea0 100644
--- a/internal/praefect/service/transaction/server.go
+++ b/internal/praefect/service/transaction/server.go
@@ -31,6 +31,10 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti
return nil, helper.ErrNotFound(err)
case errors.Is(err, transactions.ErrTransactionCanceled):
return nil, helper.DecorateError(codes.Canceled, err)
+ case errors.Is(err, transactions.ErrTransactionStopped):
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_STOP,
+ }, nil
case errors.Is(err, transactions.ErrTransactionVoteFailed):
return &gitalypb.VoteTransactionResponse{
State: gitalypb.VoteTransactionResponse_ABORT,
@@ -44,3 +48,24 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti
State: gitalypb.VoteTransactionResponse_COMMIT,
}, nil
}
+
+// StopTransaction is called by a client who wants to gracefully stop a
+// transaction. All voters waiting for quorum will be stopped and new votes
+// will not get accepted anymore. It is fine to call this RPC multiple times on
+// the same transaction.
+func (s *Server) StopTransaction(ctx context.Context, in *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) {
+ if err := s.txMgr.StopTransaction(ctx, in.TransactionId); err != nil {
+ switch {
+ case errors.Is(err, transactions.ErrNotFound):
+ return nil, helper.ErrNotFound(err)
+ case errors.Is(err, transactions.ErrTransactionCanceled):
+ return nil, helper.DecorateError(codes.Canceled, err)
+ case errors.Is(err, transactions.ErrTransactionStopped):
+ return &gitalypb.StopTransactionResponse{}, nil
+ default:
+ return nil, helper.ErrInternal(err)
+ }
+ }
+
+ return &gitalypb.StopTransactionResponse{}, nil
+}
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 71f5e3bc0..a1a1920b5 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -38,7 +38,7 @@ func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Ma
}
type counterMetrics struct {
- registered, started, invalid, committed int
+ registered, started, invalid, committed, stopped int
}
func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) {
@@ -52,6 +52,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected
{"started", expected.started},
{"invalid", expected.invalid},
{"committed", expected.committed},
+ {"stopped", expected.stopped},
}
var expectedMetric bytes.Buffer
@@ -680,3 +681,213 @@ func TestTransactionCancellation(t *testing.T) {
})
}
}
+
+func TestStopTransaction(t *testing.T) {
+ hash := sha1.Sum([]byte("foo"))
+
+ t.Run("stopping nonexisting transaction fails", func(t *testing.T) {
+ cc, _, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: 1234,
+ })
+ require.Equal(t, codes.NotFound, status.Code(err))
+ })
+
+ t.Run("stopping transaction multiple times succeeds", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ for i := 0; i < 5; i++ {
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+ }
+
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ registered: 2,
+ stopped: 5,
+ })
+ })
+
+ t.Run("stopping a single voter", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 1)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+
+ results := transaction.State()
+ require.Equal(t, results["voter"], false)
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ registered: 1,
+ started: 1,
+ stopped: 2,
+ })
+ })
+
+ t.Run("stopping in-progress transaction", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "successful-voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State)
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ response, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "failing-voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+
+ results := transaction.State()
+ require.True(t, results["successful-voter"], "Successful voter should succeed")
+ require.False(t, results["failing-voter"], "Failing voter should fail")
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ committed: 1,
+ registered: 2,
+ started: 2,
+ stopped: 2,
+ })
+ })
+
+ t.Run("stopping cancelled transaction fails", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+
+ cancelTransaction()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.Error(t, err)
+ require.Equal(t, codes.NotFound, status.Code(err))
+ })
+
+ t.Run("stopping concurrent voter", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ // This create a single voter waiting for the threshold to be
+ // reached. As the second voter will never appear, the node
+ // will instead be stopped by the call to `StopTransaction`
+ // below.
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ hash := sha1.Sum([]byte("hash"))
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "1",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+ }()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ wg.Wait()
+ })
+}
diff --git a/proto/go/gitalypb/transaction.pb.go b/proto/go/gitalypb/transaction.pb.go
index 4048f2eba..6f70b1cfa 100644
--- a/proto/go/gitalypb/transaction.pb.go
+++ b/proto/go/gitalypb/transaction.pb.go
@@ -31,16 +31,19 @@ type VoteTransactionResponse_TransactionState int32
const (
VoteTransactionResponse_COMMIT VoteTransactionResponse_TransactionState = 0
VoteTransactionResponse_ABORT VoteTransactionResponse_TransactionState = 1
+ VoteTransactionResponse_STOP VoteTransactionResponse_TransactionState = 2
)
var VoteTransactionResponse_TransactionState_name = map[int32]string{
0: "COMMIT",
1: "ABORT",
+ 2: "STOP",
}
var VoteTransactionResponse_TransactionState_value = map[string]int32{
"COMMIT": 0,
"ABORT": 1,
+ "STOP": 2,
}
func (x VoteTransactionResponse_TransactionState) String() string {
@@ -156,38 +159,122 @@ func (m *VoteTransactionResponse) GetState() VoteTransactionResponse_Transaction
return VoteTransactionResponse_COMMIT
}
+type StopTransactionRequest struct {
+ Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"`
+ // ID of the transaction we're processing
+ TransactionId uint64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StopTransactionRequest) Reset() { *m = StopTransactionRequest{} }
+func (m *StopTransactionRequest) String() string { return proto.CompactTextString(m) }
+func (*StopTransactionRequest) ProtoMessage() {}
+func (*StopTransactionRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2cc4e03d2c28c490, []int{2}
+}
+
+func (m *StopTransactionRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StopTransactionRequest.Unmarshal(m, b)
+}
+func (m *StopTransactionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StopTransactionRequest.Marshal(b, m, deterministic)
+}
+func (m *StopTransactionRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StopTransactionRequest.Merge(m, src)
+}
+func (m *StopTransactionRequest) XXX_Size() int {
+ return xxx_messageInfo_StopTransactionRequest.Size(m)
+}
+func (m *StopTransactionRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_StopTransactionRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StopTransactionRequest proto.InternalMessageInfo
+
+func (m *StopTransactionRequest) GetRepository() *Repository {
+ if m != nil {
+ return m.Repository
+ }
+ return nil
+}
+
+func (m *StopTransactionRequest) GetTransactionId() uint64 {
+ if m != nil {
+ return m.TransactionId
+ }
+ return 0
+}
+
+type StopTransactionResponse struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StopTransactionResponse) Reset() { *m = StopTransactionResponse{} }
+func (m *StopTransactionResponse) String() string { return proto.CompactTextString(m) }
+func (*StopTransactionResponse) ProtoMessage() {}
+func (*StopTransactionResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2cc4e03d2c28c490, []int{3}
+}
+
+func (m *StopTransactionResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StopTransactionResponse.Unmarshal(m, b)
+}
+func (m *StopTransactionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StopTransactionResponse.Marshal(b, m, deterministic)
+}
+func (m *StopTransactionResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StopTransactionResponse.Merge(m, src)
+}
+func (m *StopTransactionResponse) XXX_Size() int {
+ return xxx_messageInfo_StopTransactionResponse.Size(m)
+}
+func (m *StopTransactionResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_StopTransactionResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StopTransactionResponse proto.InternalMessageInfo
+
func init() {
proto.RegisterEnum("gitaly.VoteTransactionResponse_TransactionState", VoteTransactionResponse_TransactionState_name, VoteTransactionResponse_TransactionState_value)
proto.RegisterType((*VoteTransactionRequest)(nil), "gitaly.VoteTransactionRequest")
proto.RegisterType((*VoteTransactionResponse)(nil), "gitaly.VoteTransactionResponse")
+ proto.RegisterType((*StopTransactionRequest)(nil), "gitaly.StopTransactionRequest")
+ proto.RegisterType((*StopTransactionResponse)(nil), "gitaly.StopTransactionResponse")
}
func init() { proto.RegisterFile("transaction.proto", fileDescriptor_2cc4e03d2c28c490) }
var fileDescriptor_2cc4e03d2c28c490 = []byte{
- // 339 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x51, 0xcd, 0x4e, 0xf2, 0x40,
- 0x14, 0xfd, 0x86, 0xaf, 0x34, 0x72, 0x45, 0xc4, 0x89, 0xc1, 0x86, 0x85, 0x36, 0x4d, 0x4c, 0x6a,
- 0xa2, 0x2d, 0x41, 0x17, 0x6e, 0xc5, 0xc4, 0xc8, 0x82, 0x90, 0x8c, 0xe8, 0x82, 0x0d, 0x19, 0xe8,
- 0x85, 0x36, 0xa9, 0x9d, 0x3a, 0x33, 0x2c, 0x78, 0x07, 0xf7, 0xfa, 0x3e, 0x26, 0x3e, 0x94, 0x2b,
- 0x43, 0xab, 0xd8, 0x60, 0x88, 0xbb, 0x7b, 0xcf, 0xcf, 0xcc, 0x39, 0x33, 0xb0, 0xa7, 0x25, 0x4f,
- 0x14, 0x9f, 0xe8, 0x48, 0x24, 0x5e, 0x2a, 0x85, 0x16, 0xd4, 0x9c, 0x45, 0x9a, 0xc7, 0x8b, 0x26,
- 0xc4, 0x51, 0xa2, 0x73, 0xac, 0x59, 0x55, 0x21, 0x97, 0x18, 0xe4, 0x9b, 0xf3, 0x46, 0xa0, 0xf1,
- 0x20, 0x34, 0x0e, 0x7e, 0xbc, 0x0c, 0x9f, 0xe6, 0xa8, 0x34, 0xbd, 0x04, 0x90, 0x98, 0x0a, 0x15,
- 0x69, 0x21, 0x17, 0x16, 0xb1, 0x89, 0xbb, 0xdd, 0xa6, 0x5e, 0x7e, 0xa2, 0xc7, 0x56, 0x4c, 0xc7,
- 0x78, 0x7d, 0x3f, 0x25, 0xac, 0xa0, 0xa5, 0xc7, 0x50, 0x2b, 0x64, 0x19, 0x45, 0x81, 0x55, 0xb2,
- 0x89, 0x6b, 0xb0, 0x9d, 0x02, 0xda, 0x0d, 0x28, 0x05, 0x23, 0x11, 0x01, 0x5a, 0xff, 0x6d, 0xe2,
- 0x56, 0x58, 0x36, 0xd3, 0x0b, 0x68, 0x48, 0x9c, 0xa2, 0xc4, 0x64, 0x82, 0xa3, 0x79, 0x1a, 0x70,
- 0x8d, 0x6a, 0x14, 0x72, 0x15, 0x5a, 0x86, 0x4d, 0xdc, 0x2a, 0xdb, 0x5f, 0xb1, 0xf7, 0x39, 0x79,
- 0xcb, 0x55, 0xe8, 0x3c, 0x13, 0x38, 0xf8, 0xd5, 0x42, 0xa5, 0x22, 0x51, 0x48, 0x6f, 0xa0, 0xac,
- 0x34, 0xd7, 0x98, 0x35, 0xa8, 0xb5, 0x5b, 0xdf, 0x0d, 0x36, 0xe8, 0xbd, 0x02, 0x76, 0xb7, 0xf4,
- 0xb1, 0xdc, 0xee, 0x9c, 0x40, 0x7d, 0x9d, 0xa2, 0x00, 0xe6, 0x75, 0xbf, 0xd7, 0xeb, 0x0e, 0xea,
- 0xff, 0x68, 0x05, 0xca, 0x57, 0x9d, 0x3e, 0x1b, 0xd4, 0x49, 0x3b, 0x86, 0x1a, 0xc3, 0x69, 0x41,
- 0x4d, 0x87, 0xb0, 0xbb, 0x76, 0x1f, 0x3d, 0xdc, 0x18, 0x24, 0x7b, 0xfe, 0xe6, 0xd1, 0x1f, 0x41,
- 0x1d, 0xf3, 0xe3, 0xc5, 0x2d, 0x6d, 0x91, 0x4e, 0x6b, 0xb8, 0x54, 0xc6, 0x7c, 0xec, 0x4d, 0xc4,
- 0xa3, 0x9f, 0x8f, 0x67, 0x42, 0xce, 0xfc, 0xdc, 0xef, 0x67, 0x1f, 0xed, 0xcf, 0xc4, 0xd7, 0x9e,
- 0x8e, 0xc7, 0x66, 0x06, 0x9d, 0x7f, 0x06, 0x00, 0x00, 0xff, 0xff, 0xbe, 0x43, 0x1d, 0x41, 0x32,
- 0x02, 0x00, 0x00,
+ // 385 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x52, 0xc1, 0x6e, 0xda, 0x40,
+ 0x14, 0xec, 0x52, 0x63, 0xc1, 0x2b, 0xa5, 0xee, 0xaa, 0x02, 0x97, 0x43, 0x6b, 0x59, 0xaa, 0xe4,
+ 0x43, 0x6b, 0x23, 0xe8, 0xa1, 0xd7, 0x52, 0xa9, 0x2a, 0x07, 0x44, 0xb5, 0x38, 0x39, 0x70, 0x41,
+ 0x0b, 0x5e, 0xb0, 0x25, 0xe2, 0x75, 0x76, 0x97, 0x03, 0x3f, 0x92, 0xe4, 0x7f, 0x22, 0x45, 0xf9,
+ 0xa6, 0x9c, 0x22, 0xbc, 0x01, 0x2c, 0x08, 0xca, 0x31, 0xb7, 0x7d, 0x33, 0x6f, 0xc6, 0x6f, 0x46,
+ 0x86, 0x8f, 0x4a, 0xd0, 0x54, 0xd2, 0x99, 0x4a, 0x78, 0xea, 0x67, 0x82, 0x2b, 0x8e, 0xcd, 0x45,
+ 0xa2, 0xe8, 0x72, 0xdd, 0x82, 0x65, 0x92, 0x2a, 0x8d, 0xb5, 0x6a, 0x32, 0xa6, 0x82, 0x45, 0x7a,
+ 0x72, 0x6f, 0x11, 0x34, 0xce, 0xb9, 0x62, 0xe1, 0x5e, 0x4b, 0xd8, 0xe5, 0x8a, 0x49, 0x85, 0x7f,
+ 0x01, 0x08, 0x96, 0x71, 0x99, 0x28, 0x2e, 0xd6, 0x36, 0x72, 0x90, 0xf7, 0xae, 0x83, 0x7d, 0xed,
+ 0xe8, 0x93, 0x1d, 0xd3, 0x33, 0x6e, 0xee, 0xbe, 0x23, 0x52, 0xd8, 0xc5, 0xdf, 0xa0, 0x5e, 0xb8,
+ 0x65, 0x92, 0x44, 0x76, 0xc9, 0x41, 0x9e, 0x41, 0xde, 0x17, 0xd0, 0x7e, 0x84, 0x31, 0x18, 0x29,
+ 0x8f, 0x98, 0xfd, 0xd6, 0x41, 0x5e, 0x95, 0xe4, 0x6f, 0xfc, 0x13, 0x1a, 0x82, 0xcd, 0x99, 0x60,
+ 0xe9, 0x8c, 0x4d, 0x56, 0x59, 0x44, 0x15, 0x93, 0x93, 0x98, 0xca, 0xd8, 0x36, 0x1c, 0xe4, 0xd5,
+ 0xc8, 0xa7, 0x1d, 0x7b, 0xa6, 0xc9, 0x7f, 0x54, 0xc6, 0xee, 0x15, 0x82, 0xe6, 0x51, 0x0a, 0x99,
+ 0xf1, 0x54, 0x32, 0xfc, 0x17, 0xca, 0x52, 0x51, 0xc5, 0xf2, 0x04, 0xf5, 0x4e, 0x7b, 0x9b, 0xe0,
+ 0xc4, 0xbe, 0x5f, 0xc0, 0x46, 0x1b, 0x1d, 0xd1, 0x72, 0xb7, 0x0b, 0xd6, 0x21, 0x85, 0x01, 0xcc,
+ 0x3f, 0xc3, 0xc1, 0xa0, 0x1f, 0x5a, 0x6f, 0x70, 0x15, 0xca, 0xbf, 0x7b, 0x43, 0x12, 0x5a, 0x08,
+ 0x57, 0xc0, 0x18, 0x85, 0xc3, 0xff, 0x56, 0xc9, 0x5d, 0x43, 0x63, 0xa4, 0x78, 0xf6, 0x0a, 0xed,
+ 0xba, 0x9f, 0xa1, 0x79, 0xf4, 0x69, 0x1d, 0xb1, 0x73, 0x8f, 0xa0, 0x4e, 0xd8, 0xbc, 0x40, 0xe1,
+ 0x31, 0x7c, 0x38, 0x28, 0x04, 0x7f, 0x39, 0xd9, 0x54, 0x9e, 0xa0, 0xf5, 0xf5, 0x85, 0x26, 0x5d,
+ 0xf3, 0xe1, 0xda, 0x2b, 0x55, 0xd0, 0xc6, 0xfb, 0xe0, 0x92, 0xbd, 0xf7, 0xf3, 0xed, 0xec, 0xbd,
+ 0x4f, 0x44, 0xd8, 0x7a, 0xf7, 0xda, 0xe3, 0xcd, 0xe6, 0x92, 0x4e, 0xfd, 0x19, 0xbf, 0x08, 0xf4,
+ 0xf3, 0x07, 0x17, 0x8b, 0x40, 0xeb, 0x83, 0xfc, 0x2f, 0x0f, 0x16, 0xfc, 0x69, 0xce, 0xa6, 0x53,
+ 0x33, 0x87, 0xba, 0x8f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x19, 0xe6, 0x7d, 0x2f, 0x03, 0x00,
+ 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -203,6 +290,7 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RefTransactionClient interface {
VoteTransaction(ctx context.Context, in *VoteTransactionRequest, opts ...grpc.CallOption) (*VoteTransactionResponse, error)
+ StopTransaction(ctx context.Context, in *StopTransactionRequest, opts ...grpc.CallOption) (*StopTransactionResponse, error)
}
type refTransactionClient struct {
@@ -222,9 +310,19 @@ func (c *refTransactionClient) VoteTransaction(ctx context.Context, in *VoteTran
return out, nil
}
+func (c *refTransactionClient) StopTransaction(ctx context.Context, in *StopTransactionRequest, opts ...grpc.CallOption) (*StopTransactionResponse, error) {
+ out := new(StopTransactionResponse)
+ err := c.cc.Invoke(ctx, "/gitaly.RefTransaction/StopTransaction", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// RefTransactionServer is the server API for RefTransaction service.
type RefTransactionServer interface {
VoteTransaction(context.Context, *VoteTransactionRequest) (*VoteTransactionResponse, error)
+ StopTransaction(context.Context, *StopTransactionRequest) (*StopTransactionResponse, error)
}
// UnimplementedRefTransactionServer can be embedded to have forward compatible implementations.
@@ -234,6 +332,9 @@ type UnimplementedRefTransactionServer struct {
func (*UnimplementedRefTransactionServer) VoteTransaction(ctx context.Context, req *VoteTransactionRequest) (*VoteTransactionResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VoteTransaction not implemented")
}
+func (*UnimplementedRefTransactionServer) StopTransaction(ctx context.Context, req *StopTransactionRequest) (*StopTransactionResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method StopTransaction not implemented")
+}
func RegisterRefTransactionServer(s *grpc.Server, srv RefTransactionServer) {
s.RegisterService(&_RefTransaction_serviceDesc, srv)
@@ -257,6 +358,24 @@ func _RefTransaction_VoteTransaction_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
+func _RefTransaction_StopTransaction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(StopTransactionRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(RefTransactionServer).StopTransaction(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/gitaly.RefTransaction/StopTransaction",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(RefTransactionServer).StopTransaction(ctx, req.(*StopTransactionRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
var _RefTransaction_serviceDesc = grpc.ServiceDesc{
ServiceName: "gitaly.RefTransaction",
HandlerType: (*RefTransactionServer)(nil),
@@ -265,6 +384,10 @@ var _RefTransaction_serviceDesc = grpc.ServiceDesc{
MethodName: "VoteTransaction",
Handler: _RefTransaction_VoteTransaction_Handler,
},
+ {
+ MethodName: "StopTransaction",
+ Handler: _RefTransaction_StopTransaction_Handler,
+ },
},
Streams: []grpc.StreamDesc{},
Metadata: "transaction.proto",
diff --git a/proto/transaction.proto b/proto/transaction.proto
index 1e7f1f5d7..fb11879c4 100644
--- a/proto/transaction.proto
+++ b/proto/transaction.proto
@@ -16,6 +16,13 @@ service RefTransaction {
};
}
+ rpc StopTransaction (StopTransactionRequest) returns (StopTransactionResponse) {
+ option (op_type) = {
+ op: MUTATOR
+ scope_level: REPOSITORY
+ };
+ }
+
}
message VoteTransactionRequest {
@@ -34,7 +41,16 @@ message VoteTransactionResponse {
enum TransactionState {
COMMIT = 0;
ABORT = 1;
+ STOP = 2;
}
TransactionState state = 1;
}
+
+message StopTransactionRequest {
+ Repository repository = 1[(target_repository)=true];
+ // ID of the transaction we're processing
+ uint64 transaction_id = 2;
+}
+
+message StopTransactionResponse {}
diff --git a/ruby/proto/gitaly/transaction_pb.rb b/ruby/proto/gitaly/transaction_pb.rb
index 77eff56bd..59c7f5626 100644
--- a/ruby/proto/gitaly/transaction_pb.rb
+++ b/ruby/proto/gitaly/transaction_pb.rb
@@ -19,6 +19,13 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
add_enum "gitaly.VoteTransactionResponse.TransactionState" do
value :COMMIT, 0
value :ABORT, 1
+ value :STOP, 2
+ end
+ add_message "gitaly.StopTransactionRequest" do
+ optional :repository, :message, 1, "gitaly.Repository"
+ optional :transaction_id, :uint64, 2
+ end
+ add_message "gitaly.StopTransactionResponse" do
end
end
end
@@ -27,4 +34,6 @@ module Gitaly
VoteTransactionRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionRequest").msgclass
VoteTransactionResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionResponse").msgclass
VoteTransactionResponse::TransactionState = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionResponse.TransactionState").enummodule
+ StopTransactionRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionRequest").msgclass
+ StopTransactionResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionResponse").msgclass
end
diff --git a/ruby/proto/gitaly/transaction_services_pb.rb b/ruby/proto/gitaly/transaction_services_pb.rb
index 8867d9076..174948246 100644
--- a/ruby/proto/gitaly/transaction_services_pb.rb
+++ b/ruby/proto/gitaly/transaction_services_pb.rb
@@ -15,6 +15,7 @@ module Gitaly
self.service_name = 'gitaly.RefTransaction'
rpc :VoteTransaction, Gitaly::VoteTransactionRequest, Gitaly::VoteTransactionResponse
+ rpc :StopTransaction, Gitaly::StopTransactionRequest, Gitaly::StopTransactionResponse
end
Stub = Service.rpc_stub_class