Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-03-13 21:03:31 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-03-13 21:03:34 +0300
commitd9ff755e526298a03042d236c578b359802af459 (patch)
tree62bb06c46733bf313ec635388809ffdfbde68826
parent3561b41098f34d364ccc544e4d54e9f8d1b953aa (diff)
move transaction to dedicated package
Transaction will be easy to isolate and work as a WIP if we keep it in a dedicated package. Also, this means using interfaces heavily to avoid relying on future features (coordinator, replication). Coordinator is now an interface to abstract the requirements of transactions.
-rw-r--r--internal/praefect/transaction/transaction.go (renamed from internal/praefect/transaction.go)39
-rw-r--r--internal/praefect/transaction/transaction_test.go64
2 files changed, 83 insertions, 20 deletions
diff --git a/internal/praefect/transaction.go b/internal/praefect/transaction/transaction.go
index 27ba57ba6..9bc49bc24 100644
--- a/internal/praefect/transaction.go
+++ b/internal/praefect/transaction/transaction.go
@@ -1,5 +1,5 @@
-/*Package praefect provides transaction management functionality to coordinate
-one-to-many clients attempting to modify the shards concurrently.
+/*Package transaction provides transaction management functionality to
+coordinate one-to-many clients attempting to modify the shards concurrently.
While Git is distributed in nature, there are some repository wide data points
that can conflict between replicas if something goes wrong. This includes
@@ -7,7 +7,7 @@ references, which is why the transaction manager provides an API that allows
an RPC transaction to read/write lock the references being accessed to prevent
contention.
*/
-package praefect
+package transaction
import (
"context"
@@ -30,16 +30,22 @@ type Verifier interface {
CheckSum(context.Context, Repository) ([]byte, error)
}
+// Coordinator allows the transaction manager to look up the shard for a repo
+// at the beginning of each transaction
+type Coordinator interface {
+ FetchShard(ctx context.Context, repo Repository) (*Shard, error)
+}
+
// ReplicationManager provides services to handle degraded nodes
type ReplicationManager interface {
NotifyDegradation(context.Context, Repository, Node) error
}
-// TransactionManager tracks the progress of RPCs being applied to multiple
+// Manager tracks the progress of RPCs being applied to multiple
// downstream servers that make up a shard. It prevents conflicts that may arise
// from contention between multiple clients trying to modify the same
// references.
-type TransactionManager struct {
+type Manager struct {
mu sync.Mutex
shards map[string]*Shard // shards keyed by project
@@ -48,8 +54,8 @@ type TransactionManager struct {
replman ReplicationManager
}
-func NewTransactionManager(v Verifier, c Coordinator, r ReplicationManager) *TransactionManager {
- return &TransactionManager{
+func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
+ return &Manager{
shards: map[string]*Shard{},
verifier: v,
coordinator: c,
@@ -238,8 +244,8 @@ func (t transaction) RLockRef(ctx context.Context, ref string) error {
// author to mark all relevant assets being modified during a mutating
// transaction to ensure they are locked and protected from other closures
// modifying the same.
-func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
- shard, err := tm.coordinator.FetchShard(ctx, repo)
+func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
+ shard, err := m.coordinator.FetchShard(ctx, repo)
if err != nil {
return err
}
@@ -259,7 +265,7 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu
node := node // rescope iterator var for goroutine closure
eg.Go(func() error {
- return tm.replman.NotifyDegradation(eCtx, repo, node)
+ return m.replman.NotifyDegradation(eCtx, repo, node)
})
}
@@ -269,7 +275,6 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu
}
defer tx.unlockAll()
- // run the transaction function
err = fn(tx)
if err != nil {
return err
@@ -278,8 +283,8 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu
return nil
}
-func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
- shard, err := tm.coordinator.FetchShard(ctx, repo)
+func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
+ shard, err := m.coordinator.FetchShard(ctx, repo)
if err != nil {
return err
}
@@ -299,7 +304,7 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu
node := node // rescope iterator var for goroutine closure
eg.Go(func() error {
- return tm.replman.NotifyDegradation(eCtx, repo, node)
+ return m.replman.NotifyDegradation(eCtx, repo, node)
})
}
@@ -309,7 +314,6 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu
}
defer tx.unlockAll()
- // run the transaction function
err = fn(tx)
if err != nil {
return err
@@ -317,8 +321,3 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu
return nil
}
-
-func (c *Coordinator) FetchShard(ctx context.Context, repo Repository) (*Shard, error) {
- // TODO: move this to coordinator.go
- return nil, nil
-}
diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go
new file mode 100644
index 000000000..ca7064e41
--- /dev/null
+++ b/internal/praefect/transaction/transaction_test.go
@@ -0,0 +1,64 @@
+package transaction_test
+
+import (
+ "context"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect"
+)
+
+func TestReplMan(t *testing.T) {
+ const (
+ projA = "project-A"
+ stor1 = "storage-1"
+ stor2 = "storage-2"
+ stor3 = "storage-3"
+ )
+
+ mv := &mockVerifier{
+ checksums: map[string]map[string][][]byte{
+ projA: map[string][][]byte{
+ stor1: {[]byte{1}},
+ stor2: {[]byte{1}},
+ stor3: {[]byte{1}},
+ },
+ },
+ }
+
+ // A replication manager needs to have the ability to verify the state of
+ // replicas, so it needs a Verifier.
+ rm := praefect.NewReplicationManager(mv)
+
+ // replication managers are typically used within the context of a request
+ // when a mutator RPC is received.
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+}
+
+type mockVerifier struct {
+ // checksums contains ordered checksums keyed by project and then storage
+ checksums map[string]map[string][][]byte
+}
+
+func (mv *mockVerifier) CheckSum(_ context.Context, project, storage string) ([]byte, error) {
+ storages, ok := mv.checksums[project]
+ if !ok {
+ panic("no project " + project)
+ }
+
+ sums, ok := storages[storage]
+ if !ok {
+ panic("no storage " + storage)
+ }
+
+ if len(sums) < 1 {
+ panic("no more checksums for " + project)
+ }
+
+ // pop first checksum off list
+ var sum []byte
+ sum, mv.checksums[project][storage] = sums[len(sums)-1], sums[:len(sums)-1]
+
+ return sum, nil
+}