diff options
author | John Cai <jcai@gitlab.com> | 2020-05-20 00:43:19 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-05-20 00:43:19 +0300 |
commit | 382ead9c7ef38e7dde4de7a9d2eba37a739060be (patch) | |
tree | 6ab7d1148dabbad516b31d97d649daa7af4bc92a | |
parent | 33bc0a7191b68d92e290e25a4458054bc7382735 (diff) | |
parent | 9824e4669918148a4c73ddbbefdbf1a05feb4ca1 (diff) |
Merge branch 'pks-2pc-cleanups' into 'master'
Follow-ups for transactions
See merge request gitlab-org/gitaly!2188
-rw-r--r-- | internal/git/receivepack.go | 5 | ||||
-rw-r--r-- | internal/praefect/metadata/server.go | 30 | ||||
-rw-r--r-- | internal/praefect/metadata/transaction.go | 17 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 40 | ||||
-rw-r--r-- | internal/service/hooks/pre_receive.go | 3 |
5 files changed, 76 insertions, 19 deletions
diff --git a/internal/git/receivepack.go b/internal/git/receivepack.go index 7033d8b12..8e57fea22 100644 --- a/internal/git/receivepack.go +++ b/internal/git/receivepack.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "os" "strconv" "github.com/golang/protobuf/jsonpb" @@ -59,7 +58,7 @@ func ReceivePackHookEnv(ctx context.Context, req ReceivePackRequest) ([]string, } env = append(env, praefectEnv) - } else if !errors.Is(err, os.ErrNotExist) { + } else if !errors.Is(err, metadata.ErrPraefectServerNotFound) { return nil, err } @@ -71,7 +70,7 @@ func ReceivePackHookEnv(ctx context.Context, req ReceivePackRequest) ([]string, } env = append(env, transactionEnv) - } else if !errors.Is(err, os.ErrNotExist) { + } else if !errors.Is(err, metadata.ErrTransactionNotFound) { return nil, err } diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go index 671fafe49..ff06a2a95 100644 --- a/internal/praefect/metadata/server.go +++ b/internal/praefect/metadata/server.go @@ -4,8 +4,8 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" - "os" "strings" "gitlab.com/gitlab-org/gitaly/auth" @@ -16,13 +16,26 @@ import ( ) const ( + // PraefectMetadataKey is the key used to store Praefect server + // information in the gRPC metadata. PraefectMetadataKey = "praefect-server" - PraefectEnvKey = "PRAEFECT_SERVER" + // PraefectEnvKey is the key used to store Praefect server information + // in environment variables. + PraefectEnvKey = "PRAEFECT_SERVER" ) +var ( + // ErrPraefectServerNotFound indicates the Praefect server metadata + // could not be found + ErrPraefectServerNotFound = errors.New("Praefect server info not found") +) + +// PraefectServer stores parameters required to connect to a Praefect server type PraefectServer struct { + // Address is the address of the Praefect server Address string `json:"address"` - Token string `json:"token"` + // Token is the token required to authenticate with the Praefect server + Token string `json:"token"` } // InjectPraefectServer injects Praefect connection metadata into an incoming context @@ -56,16 +69,16 @@ func InjectPraefectServer(ctx context.Context, conf config.Config) (context.Cont } // ExtractPraefectServer extracts `PraefectServer` from an incoming context. In -// case the metadata key is not set, the function will return `os.ErrNotExist`. +// case the metadata key is not set, the function will return `ErrPraefectServerNotFound`. func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return nil, os.ErrNotExist + return nil, ErrPraefectServerNotFound } encoded := md[PraefectMetadataKey] if len(encoded) == 0 { - return nil, os.ErrNotExist + return nil, ErrPraefectServerNotFound } decoded, err := base64.StdEncoding.DecodeString(encoded[0]) @@ -82,7 +95,7 @@ func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) { // PraefectFromEnv extracts `PraefectServer` from the environment variable // `PraefectEnvKey`. In case the variable is not set, the function will return -// `os.ErrNotExist`. +// `ErrPraefectServerNotFound`. func PraefectFromEnv(envvars []string) (*PraefectServer, error) { praefectKey := fmt.Sprintf("%s=", PraefectEnvKey) praefectEnv := "" @@ -93,7 +106,7 @@ func PraefectFromEnv(envvars []string) (*PraefectServer, error) { } } if praefectEnv == "" { - return nil, os.ErrNotExist + return nil, ErrPraefectServerNotFound } decoded, err := base64.StdEncoding.DecodeString(praefectEnv) @@ -120,6 +133,7 @@ func (p PraefectServer) Env() (string, error) { return fmt.Sprintf("%s=%s", PraefectEnvKey, encoded), nil } +// Dial will try to connect to the given Praefect server func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithBlock(), diff --git a/internal/praefect/metadata/transaction.go b/internal/praefect/metadata/transaction.go index 6426fe26a..dff14e786 100644 --- a/internal/praefect/metadata/transaction.go +++ b/internal/praefect/metadata/transaction.go @@ -4,8 +4,8 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" - "os" "strings" "google.golang.org/grpc/metadata" @@ -20,6 +20,12 @@ const ( TransactionEnvKey = "REFERENCE_TRANSACTION" ) +var ( + // ErrTransactionNotFound indicates the transaction metadata could not + // be found + ErrTransactionNotFound = errors.New("transaction not found") +) + // Transaction stores parameters required to identify a reference // transaction. type Transaction struct { @@ -77,16 +83,17 @@ func InjectTransaction(ctx context.Context, tranasctionID uint64, node string) ( } // ExtractTransaction extracts `Transaction` from an incoming context. In -// case the metadata key is not set, the function will return `os.ErrNotExist`. +// case the metadata key is not set, the function will return +// `ErrTransactionNotFound`. func ExtractTransaction(ctx context.Context) (Transaction, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return Transaction{}, os.ErrNotExist + return Transaction{}, ErrTransactionNotFound } serialized := md[TransactionMetadataKey] if len(serialized) == 0 { - return Transaction{}, os.ErrNotExist + return Transaction{}, ErrTransactionNotFound } return FromSerialized(serialized[0]) @@ -114,7 +121,7 @@ func TransactionFromEnv(envvars []string) (Transaction, error) { } } if transactionEnv == "" { - return Transaction{}, os.ErrNotExist + return Transaction{}, ErrTransactionNotFound } return FromSerialized(transactionEnv) diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index c7f4e647a..93068100d 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -2,7 +2,9 @@ package transactions import ( "context" + cryptorand "crypto/rand" "crypto/sha1" + "encoding/binary" "encoding/hex" "fmt" "math/rand" @@ -20,12 +22,40 @@ import ( // for Praefect to handle transactions directly instead of having to reach out // to reference transaction RPCs. type Manager struct { + txIdGenerator TransactionIdGenerator lock sync.Mutex transactions map[uint64]string counterMetric *prometheus.CounterVec delayMetric metrics.HistogramVec } +// TransactionIdGenerator is an interface for types that can generate transaction IDs. +type TransactionIdGenerator interface { + // Id generates a new transaction identifier + Id() uint64 +} + +type transactionIdGenerator struct { + rand *rand.Rand +} + +func newTransactionIdGenerator() *transactionIdGenerator { + var seed [8]byte + + // Ignore any errors. In case we weren't able to generate a seed, the + // best we can do is to just use the all-zero seed. + cryptorand.Read(seed[:]) + source := rand.NewSource(int64(binary.LittleEndian.Uint64(seed[:]))) + + return &transactionIdGenerator{ + rand: rand.New(source), + } +} + +func (t *transactionIdGenerator) Id() uint64 { + return rand.Uint64() +} + // ManagerOpt is a self referential option for Manager type ManagerOpt func(*Manager) @@ -43,9 +73,17 @@ func WithDelayMetric(delayMetric metrics.HistogramVec) ManagerOpt { } } +// WithTransactionIdGenerator is an option to set the transaction ID generator +func WithTransactionIdGenerator(generator TransactionIdGenerator) ManagerOpt { + return func(mgr *Manager) { + mgr.txIdGenerator = generator + } +} + // NewManager creates a new transactions Manager. func NewManager(opts ...ManagerOpt) *Manager { mgr := &Manager{ + txIdGenerator: newTransactionIdGenerator(), transactions: make(map[uint64]string), counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}), delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}), @@ -84,7 +122,7 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui // that reset on restart of Praefect would be suboptimal, as the chance // for collisions is a lot higher in case Praefect restarts when Gitaly // nodes still have in-flight transactions. - transactionID := rand.Uint64() + transactionID := mgr.txIdGenerator.Id() if _, ok := mgr.transactions[transactionID]; ok { return 0, nil, helper.ErrInternalf("transaction exists already") } diff --git a/internal/service/hooks/pre_receive.go b/internal/service/hooks/pre_receive.go index c866dc249..5309028c1 100644 --- a/internal/service/hooks/pre_receive.go +++ b/internal/service/hooks/pre_receive.go @@ -5,7 +5,6 @@ import ( "crypto/sha1" "errors" "fmt" - "os" "os/exec" "path/filepath" "time" @@ -80,7 +79,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS func (s *server) voteOnTransaction(stream gitalypb.HookService_PreReceiveHookServer, hash []byte, env []string) error { tx, err := metadata.TransactionFromEnv(env) if err != nil { - if errors.Is(err, os.ErrNotExist) { + if errors.Is(err, metadata.ErrTransactionNotFound) { // No transaction being present is valid, e.g. in case // there is no Praefect server or the transactions // feature flag is not set. |