Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2021-10-01 02:57:29 +0300
committerStan Hu <stanhu@gmail.com>2021-10-01 02:57:29 +0300
commit21370ba12030cc21649f6082b5e9830055c334b4 (patch)
tree31e2a9607519beaee00ab3b71a9a4327400d5b56
parent316a0b8bd61bbf5802fa1d0d277220e63ae681ee (diff)
Spike: Send checksums with transactionssh-spike-add-checksums
-rw-r--r--internal/git/updateref/update_with_hooks.go74
-rw-r--r--internal/praefect/coordinator.go6
-rw-r--r--internal/praefect/datastore/repository_store.go25
-rw-r--r--internal/praefect/datastore/repository_store_mock.go9
-rw-r--r--internal/praefect/transactions/subtransaction.go6
-rw-r--r--internal/praefect/transactions/transaction.go27
6 files changed, 146 insertions, 1 deletions
diff --git a/internal/git/updateref/update_with_hooks.go b/internal/git/updateref/update_with_hooks.go
index a1b04aa56..d3359853c 100644
--- a/internal/git/updateref/update_with_hooks.go
+++ b/internal/git/updateref/update_with_hooks.go
@@ -1,10 +1,15 @@
package updateref
import (
+ "bufio"
"bytes"
"context"
+ "crypto/sha1"
+ "encoding/hex"
"errors"
"fmt"
+ "math/big"
+ "regexp"
"strings"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
@@ -18,6 +23,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
)
// UpdaterWithHooks updates a ref with Git hooks.
@@ -89,6 +97,7 @@ func (u *UpdaterWithHooks) UpdateReference(
newrev, oldrev git.ObjectID,
pushOptions ...string,
) error {
+ fmt.Println("=== UpdateReference called")
var transaction *txinfo.Transaction
if tx, err := txinfo.TransactionFromContext(ctx); err == nil {
transaction = &tx
@@ -192,6 +201,13 @@ func (u *UpdaterWithHooks) UpdateReference(
return Error{reference: reference.String()}
}
+ checksum, err := u.calculateChecksum(ctx, repo)
+ if err != nil {
+ return err
+ }
+ ctx = injectChecksum(ctx, checksum)
+ fmt.Printf("=== injecting checksum: %s\n", checksum)
+
if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionCommitted, []string{hooksPayload}, strings.NewReader(changes)); err != nil {
return HookError{err: err}
}
@@ -206,3 +222,61 @@ func (u *UpdaterWithHooks) UpdateReference(
func (u *UpdaterWithHooks) localrepo(repo repository.GitRepo) *localrepo.Repo {
return localrepo.New(u.gitCmdFactory, u.catfileCache, repo, u.cfg)
}
+
+func injectChecksum(ctx context.Context, checksum string) context.Context {
+ md, ok := metadata.FromOutgoingContext(ctx)
+
+ if !ok {
+ md = metadata.New(map[string]string{})
+ } else {
+ md = md.Copy()
+ }
+
+ md.Set("gitaly-repository-checksum", checksum)
+
+ return metadata.NewOutgoingContext(ctx, md)
+}
+
+func (u *UpdaterWithHooks) calculateChecksum(ctx context.Context, repo *gitalypb.Repository) (string, error) {
+ var refWhitelist = regexp.MustCompile(`HEAD|(refs/(heads|tags|keep-around|merge-requests|environments|notes)/)`)
+
+ // Get checksum here and send it along to the committed hook
+ cmd, err := u.gitCmdFactory.New(ctx, repo, git.SubCmd{Name: "show-ref", Flags: []git.Option{git.Flag{Name: "--head"}}})
+ if err != nil {
+ return "", err
+ }
+
+ var checksum *big.Int
+
+ scanner := bufio.NewScanner(cmd)
+ for scanner.Scan() {
+ ref := scanner.Bytes()
+
+ if !refWhitelist.Match(ref) {
+ continue
+ }
+
+ h := sha1.New()
+ // hash.Hash will never return an error.
+ _, _ = h.Write(ref)
+
+ hash := hex.EncodeToString(h.Sum(nil))
+ hashIntBase16, _ := (&big.Int{}).SetString(hash, 16)
+
+ if checksum == nil {
+ checksum = hashIntBase16
+ } else {
+ checksum.Xor(checksum, hashIntBase16)
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ return "", status.Errorf(codes.Internal, err.Error())
+ }
+
+ if err := cmd.Wait(); checksum == nil || err != nil {
+ return "", err
+ }
+
+ return hex.EncodeToString(checksum.Bytes()), nil
+}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 938f47fe7..c8398a52a 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -570,6 +570,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
change,
params,
call.fullMethodName,
+ nil,
))
}
@@ -843,7 +844,7 @@ func (c *Coordinator) createTransactionFinalizer(
return c.newRequestFinalizer(
ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage,
- updated, outdated, change, params, cause)()
+ updated, outdated, change, params, cause, transaction.Checksums())()
}
}
@@ -983,6 +984,7 @@ func (c *Coordinator) newRequestFinalizer(
change datastore.ChangeType,
params datastore.Params,
cause string,
+ checksums map[string]string,
) func() error {
return func() error {
// Use a separate timeout for the database operations. If the request times out, the passed in context is
@@ -1005,6 +1007,8 @@ func (c *Coordinator) newRequestFinalizer(
}
log.Info("queueing replication jobs")
+ c.rs.UpdateChecksums(ctx, virtualStorage, targetRepo.GetRelativePath(), checksums)
+
switch change {
case datastore.UpdateRepo:
// If this fails, the primary might have changes on it that are not recorded in the database. The secondaries will appear
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 9c8dcf30e..42226ab96 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -86,6 +86,8 @@ var ErrNoRowsAffected = errors.New("no rows were affected by the query")
type RepositoryStore interface {
// GetGeneration gets the repository's generation on a given storage.
GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
+ // UpdateChecksums updates the checksums for a given storage for a given node
+ UpdateChecksums(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error
// IncrementGeneration increments the generations of up to date nodes.
IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
// SetGeneration sets the repository's generation on the given storage. If the generation is higher
@@ -168,6 +170,29 @@ AND storage = $3
return gen, nil
}
+func (rs *PostgresRepositoryStore) UpdateChecksums(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error {
+ const q = `
+ UPDATE storage_repositories
+ SET checksum = $1
+ WHERE virtual_storage = $2
+ AND relative_path = $3
+ AND storage = $4
+ `
+ var lastErr error
+
+ // TODO: Perform in transaction with generation
+ for name, checksum := range checksums {
+ _, err := rs.db.ExecContext(ctx, q, checksum, virtualStorage, relativePath, name)
+
+ if err != nil {
+ fmt.Println("=== error updating: %v", err)
+ lastErr = err
+ }
+ }
+
+ return lastErr
+}
+
func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error {
const q = `
WITH updated_replicas AS (
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 0e3c46c92..162cc5af0 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -6,6 +6,7 @@ import "context"
// default to what could be considered success if not set.
type MockRepositoryStore struct {
GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
+ UpdateChecksumsFunc func(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error
IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
@@ -30,6 +31,14 @@ func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage,
return m.GetGenerationFunc(ctx, virtualStorage, relativePath, storage)
}
+func (m MockRepositoryStore) UpdateChecksums(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error {
+ if m.UpdateChecksumsFunc == nil {
+ return nil
+ }
+
+ return m.UpdateChecksumsFunc(ctx, virtualStorage, relativePath, checksums)
+}
+
func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error {
if m.IncrementGenerationFunc == nil {
return nil
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 87b0ae1b0..056c2b38c 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -7,6 +7,7 @@ import (
"sync"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
+ "google.golang.org/grpc/metadata"
)
// VoteResult represents the outcome of a transaction for a single voter.
@@ -298,6 +299,11 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
return ctx.Err()
}
+ md, ok := metadata.FromIncomingContext(ctx)
+ if ok {
+ fmt.Printf("=== Got transaction context: %v\n", md)
+ }
+
switch voter.result {
case VoteCommitted:
// Happy case, we are part of the quorum.
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 9b6a15886..2f5fb8193 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -6,6 +6,7 @@ import (
"sync"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
+ "google.golang.org/grpc/metadata"
)
var (
@@ -59,6 +60,8 @@ type Transaction interface {
State() (map[string]VoteResult, error)
// DidVote returns whether the given node has cast a vote.
DidVote(string) bool
+ // Checksums for each node for the completed transaction
+ Checksums() map[string]string
}
// transaction is a session where a set of voters votes on one or more
@@ -68,6 +71,7 @@ type Transaction interface {
type transaction struct {
id uint64
threshold uint
+ checksums map[string]string
voters []Voter
lock sync.Mutex
@@ -108,6 +112,7 @@ func newTransaction(id uint64, voters []Voter, threshold uint) (*transaction, er
threshold: threshold,
voters: voters,
state: transactionOpen,
+ checksums: make(map[string]string, len(voters)),
}, nil
}
@@ -177,6 +182,10 @@ func (t *transaction) State() (map[string]VoteResult, error) {
return results, nil
}
+func (t *transaction) Checksums() map[string]string {
+ return t.checksums
+}
+
// CountSubtransactions counts the number of subtransactions created as part of
// the transaction.
func (t *transaction) CountSubtransactions() int {
@@ -271,6 +280,8 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
}
func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error {
+ t.updateChecksum(ctx, node)
+
subtransaction, err := t.getOrCreateSubtransaction(node)
if err != nil {
return err
@@ -282,3 +293,19 @@ func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) e
return subtransaction.collectVotes(ctx, node)
}
+
+func (t *transaction) updateChecksum(ctx context.Context, node string) {
+ md, ok := metadata.FromIncomingContext(ctx)
+
+ if !ok {
+ return
+ }
+
+ result := md.Get("gitaly-repository-checksum")
+
+ if len(result) == 0 {
+ return
+ }
+
+ t.checksums[node] = result[0]
+}