From 21370ba12030cc21649f6082b5e9830055c334b4 Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Thu, 30 Sep 2021 16:57:29 -0700 Subject: Spike: Send checksums with transactions --- internal/git/updateref/update_with_hooks.go | 74 ++++++++++++++++++++++ internal/praefect/coordinator.go | 6 +- internal/praefect/datastore/repository_store.go | 25 ++++++++ .../praefect/datastore/repository_store_mock.go | 9 +++ internal/praefect/transactions/subtransaction.go | 6 ++ internal/praefect/transactions/transaction.go | 27 ++++++++ 6 files changed, 146 insertions(+), 1 deletion(-) diff --git a/internal/git/updateref/update_with_hooks.go b/internal/git/updateref/update_with_hooks.go index a1b04aa56..d3359853c 100644 --- a/internal/git/updateref/update_with_hooks.go +++ b/internal/git/updateref/update_with_hooks.go @@ -1,10 +1,15 @@ package updateref import ( + "bufio" "bytes" "context" + "crypto/sha1" + "encoding/hex" "errors" "fmt" + "math/big" + "regexp" "strings" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -18,6 +23,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) // UpdaterWithHooks updates a ref with Git hooks. @@ -89,6 +97,7 @@ func (u *UpdaterWithHooks) UpdateReference( newrev, oldrev git.ObjectID, pushOptions ...string, ) error { + fmt.Println("=== UpdateReference called") var transaction *txinfo.Transaction if tx, err := txinfo.TransactionFromContext(ctx); err == nil { transaction = &tx @@ -192,6 +201,13 @@ func (u *UpdaterWithHooks) UpdateReference( return Error{reference: reference.String()} } + checksum, err := u.calculateChecksum(ctx, repo) + if err != nil { + return err + } + ctx = injectChecksum(ctx, checksum) + fmt.Printf("=== injecting checksum: %s\n", checksum) + if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionCommitted, []string{hooksPayload}, strings.NewReader(changes)); err != nil { return HookError{err: err} } @@ -206,3 +222,61 @@ func (u *UpdaterWithHooks) UpdateReference( func (u *UpdaterWithHooks) localrepo(repo repository.GitRepo) *localrepo.Repo { return localrepo.New(u.gitCmdFactory, u.catfileCache, repo, u.cfg) } + +func injectChecksum(ctx context.Context, checksum string) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + + if !ok { + md = metadata.New(map[string]string{}) + } else { + md = md.Copy() + } + + md.Set("gitaly-repository-checksum", checksum) + + return metadata.NewOutgoingContext(ctx, md) +} + +func (u *UpdaterWithHooks) calculateChecksum(ctx context.Context, repo *gitalypb.Repository) (string, error) { + var refWhitelist = regexp.MustCompile(`HEAD|(refs/(heads|tags|keep-around|merge-requests|environments|notes)/)`) + + // Get checksum here and send it along to the committed hook + cmd, err := u.gitCmdFactory.New(ctx, repo, git.SubCmd{Name: "show-ref", Flags: []git.Option{git.Flag{Name: "--head"}}}) + if err != nil { + return "", err + } + + var checksum *big.Int + + scanner := bufio.NewScanner(cmd) + for scanner.Scan() { + ref := scanner.Bytes() + + if !refWhitelist.Match(ref) { + continue + } + + h := sha1.New() + // hash.Hash will never return an error. + _, _ = h.Write(ref) + + hash := hex.EncodeToString(h.Sum(nil)) + hashIntBase16, _ := (&big.Int{}).SetString(hash, 16) + + if checksum == nil { + checksum = hashIntBase16 + } else { + checksum.Xor(checksum, hashIntBase16) + } + } + + if err := scanner.Err(); err != nil { + return "", status.Errorf(codes.Internal, err.Error()) + } + + if err := cmd.Wait(); checksum == nil || err != nil { + return "", err + } + + return hex.EncodeToString(checksum.Bytes()), nil +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 938f47fe7..c8398a52a 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -570,6 +570,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall change, params, call.fullMethodName, + nil, )) } @@ -843,7 +844,7 @@ func (c *Coordinator) createTransactionFinalizer( return c.newRequestFinalizer( ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, - updated, outdated, change, params, cause)() + updated, outdated, change, params, cause, transaction.Checksums())() } } @@ -983,6 +984,7 @@ func (c *Coordinator) newRequestFinalizer( change datastore.ChangeType, params datastore.Params, cause string, + checksums map[string]string, ) func() error { return func() error { // Use a separate timeout for the database operations. If the request times out, the passed in context is @@ -1005,6 +1007,8 @@ func (c *Coordinator) newRequestFinalizer( } log.Info("queueing replication jobs") + c.rs.UpdateChecksums(ctx, virtualStorage, targetRepo.GetRelativePath(), checksums) + switch change { case datastore.UpdateRepo: // If this fails, the primary might have changes on it that are not recorded in the database. The secondaries will appear diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 9c8dcf30e..42226ab96 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -86,6 +86,8 @@ var ErrNoRowsAffected = errors.New("no rows were affected by the query") type RepositoryStore interface { // GetGeneration gets the repository's generation on a given storage. GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) + // UpdateChecksums updates the checksums for a given storage for a given node + UpdateChecksums(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error // IncrementGeneration increments the generations of up to date nodes. IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error // SetGeneration sets the repository's generation on the given storage. If the generation is higher @@ -168,6 +170,29 @@ AND storage = $3 return gen, nil } +func (rs *PostgresRepositoryStore) UpdateChecksums(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error { + const q = ` + UPDATE storage_repositories + SET checksum = $1 + WHERE virtual_storage = $2 + AND relative_path = $3 + AND storage = $4 + ` + var lastErr error + + // TODO: Perform in transaction with generation + for name, checksum := range checksums { + _, err := rs.db.ExecContext(ctx, q, checksum, virtualStorage, relativePath, name) + + if err != nil { + fmt.Println("=== error updating: %v", err) + lastErr = err + } + } + + return lastErr +} + func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error { const q = ` WITH updated_replicas AS ( diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 0e3c46c92..162cc5af0 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -6,6 +6,7 @@ import "context" // default to what could be considered success if not set. type MockRepositoryStore struct { GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) + UpdateChecksumsFunc func(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error @@ -30,6 +31,14 @@ func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, return m.GetGenerationFunc(ctx, virtualStorage, relativePath, storage) } +func (m MockRepositoryStore) UpdateChecksums(ctx context.Context, virtualStorage, relativePath string, checksums map[string]string) error { + if m.UpdateChecksumsFunc == nil { + return nil + } + + return m.UpdateChecksumsFunc(ctx, virtualStorage, relativePath, checksums) +} + func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error { if m.IncrementGenerationFunc == nil { return nil diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 87b0ae1b0..056c2b38c 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -7,6 +7,7 @@ import ( "sync" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" + "google.golang.org/grpc/metadata" ) // VoteResult represents the outcome of a transaction for a single voter. @@ -298,6 +299,11 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { return ctx.Err() } + md, ok := metadata.FromIncomingContext(ctx) + if ok { + fmt.Printf("=== Got transaction context: %v\n", md) + } + switch voter.result { case VoteCommitted: // Happy case, we are part of the quorum. diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 9b6a15886..2f5fb8193 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -6,6 +6,7 @@ import ( "sync" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" + "google.golang.org/grpc/metadata" ) var ( @@ -59,6 +60,8 @@ type Transaction interface { State() (map[string]VoteResult, error) // DidVote returns whether the given node has cast a vote. DidVote(string) bool + // Checksums for each node for the completed transaction + Checksums() map[string]string } // transaction is a session where a set of voters votes on one or more @@ -68,6 +71,7 @@ type Transaction interface { type transaction struct { id uint64 threshold uint + checksums map[string]string voters []Voter lock sync.Mutex @@ -108,6 +112,7 @@ func newTransaction(id uint64, voters []Voter, threshold uint) (*transaction, er threshold: threshold, voters: voters, state: transactionOpen, + checksums: make(map[string]string, len(voters)), }, nil } @@ -177,6 +182,10 @@ func (t *transaction) State() (map[string]VoteResult, error) { return results, nil } +func (t *transaction) Checksums() map[string]string { + return t.checksums +} + // CountSubtransactions counts the number of subtransactions created as part of // the transaction. func (t *transaction) CountSubtransactions() int { @@ -271,6 +280,8 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e } func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error { + t.updateChecksum(ctx, node) + subtransaction, err := t.getOrCreateSubtransaction(node) if err != nil { return err @@ -282,3 +293,19 @@ func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) e return subtransaction.collectVotes(ctx, node) } + +func (t *transaction) updateChecksum(ctx context.Context, node string) { + md, ok := metadata.FromIncomingContext(ctx) + + if !ok { + return + } + + result := md.Get("gitaly-repository-checksum") + + if len(result) == 0 { + return + } + + t.checksums[node] = result[0] +} -- cgit v1.2.3