Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-05-20 00:43:19 +0300
committerJohn Cai <jcai@gitlab.com>2020-05-20 00:43:19 +0300
commit382ead9c7ef38e7dde4de7a9d2eba37a739060be (patch)
tree6ab7d1148dabbad516b31d97d649daa7af4bc92a
parent33bc0a7191b68d92e290e25a4458054bc7382735 (diff)
parent9824e4669918148a4c73ddbbefdbf1a05feb4ca1 (diff)
Merge branch 'pks-2pc-cleanups' into 'master'
Follow-ups for transactions See merge request gitlab-org/gitaly!2188
-rw-r--r--internal/git/receivepack.go5
-rw-r--r--internal/praefect/metadata/server.go30
-rw-r--r--internal/praefect/metadata/transaction.go17
-rw-r--r--internal/praefect/transactions/manager.go40
-rw-r--r--internal/service/hooks/pre_receive.go3
5 files changed, 76 insertions, 19 deletions
diff --git a/internal/git/receivepack.go b/internal/git/receivepack.go
index 7033d8b12..8e57fea22 100644
--- a/internal/git/receivepack.go
+++ b/internal/git/receivepack.go
@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
- "os"
"strconv"
"github.com/golang/protobuf/jsonpb"
@@ -59,7 +58,7 @@ func ReceivePackHookEnv(ctx context.Context, req ReceivePackRequest) ([]string,
}
env = append(env, praefectEnv)
- } else if !errors.Is(err, os.ErrNotExist) {
+ } else if !errors.Is(err, metadata.ErrPraefectServerNotFound) {
return nil, err
}
@@ -71,7 +70,7 @@ func ReceivePackHookEnv(ctx context.Context, req ReceivePackRequest) ([]string,
}
env = append(env, transactionEnv)
- } else if !errors.Is(err, os.ErrNotExist) {
+ } else if !errors.Is(err, metadata.ErrTransactionNotFound) {
return nil, err
}
diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go
index 671fafe49..ff06a2a95 100644
--- a/internal/praefect/metadata/server.go
+++ b/internal/praefect/metadata/server.go
@@ -4,8 +4,8 @@ import (
"context"
"encoding/base64"
"encoding/json"
+ "errors"
"fmt"
- "os"
"strings"
"gitlab.com/gitlab-org/gitaly/auth"
@@ -16,13 +16,26 @@ import (
)
const (
+ // PraefectMetadataKey is the key used to store Praefect server
+ // information in the gRPC metadata.
PraefectMetadataKey = "praefect-server"
- PraefectEnvKey = "PRAEFECT_SERVER"
+ // PraefectEnvKey is the key used to store Praefect server information
+ // in environment variables.
+ PraefectEnvKey = "PRAEFECT_SERVER"
)
+var (
+ // ErrPraefectServerNotFound indicates the Praefect server metadata
+ // could not be found
+ ErrPraefectServerNotFound = errors.New("Praefect server info not found")
+)
+
+// PraefectServer stores parameters required to connect to a Praefect server
type PraefectServer struct {
+ // Address is the address of the Praefect server
Address string `json:"address"`
- Token string `json:"token"`
+ // Token is the token required to authenticate with the Praefect server
+ Token string `json:"token"`
}
// InjectPraefectServer injects Praefect connection metadata into an incoming context
@@ -56,16 +69,16 @@ func InjectPraefectServer(ctx context.Context, conf config.Config) (context.Cont
}
// ExtractPraefectServer extracts `PraefectServer` from an incoming context. In
-// case the metadata key is not set, the function will return `os.ErrNotExist`.
+// case the metadata key is not set, the function will return `ErrPraefectServerNotFound`.
func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
- return nil, os.ErrNotExist
+ return nil, ErrPraefectServerNotFound
}
encoded := md[PraefectMetadataKey]
if len(encoded) == 0 {
- return nil, os.ErrNotExist
+ return nil, ErrPraefectServerNotFound
}
decoded, err := base64.StdEncoding.DecodeString(encoded[0])
@@ -82,7 +95,7 @@ func ExtractPraefectServer(ctx context.Context) (p *PraefectServer, err error) {
// PraefectFromEnv extracts `PraefectServer` from the environment variable
// `PraefectEnvKey`. In case the variable is not set, the function will return
-// `os.ErrNotExist`.
+// `ErrPraefectServerNotFound`.
func PraefectFromEnv(envvars []string) (*PraefectServer, error) {
praefectKey := fmt.Sprintf("%s=", PraefectEnvKey)
praefectEnv := ""
@@ -93,7 +106,7 @@ func PraefectFromEnv(envvars []string) (*PraefectServer, error) {
}
}
if praefectEnv == "" {
- return nil, os.ErrNotExist
+ return nil, ErrPraefectServerNotFound
}
decoded, err := base64.StdEncoding.DecodeString(praefectEnv)
@@ -120,6 +133,7 @@ func (p PraefectServer) Env() (string, error) {
return fmt.Sprintf("%s=%s", PraefectEnvKey, encoded), nil
}
+// Dial will try to connect to the given Praefect server
func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithBlock(),
diff --git a/internal/praefect/metadata/transaction.go b/internal/praefect/metadata/transaction.go
index 6426fe26a..dff14e786 100644
--- a/internal/praefect/metadata/transaction.go
+++ b/internal/praefect/metadata/transaction.go
@@ -4,8 +4,8 @@ import (
"context"
"encoding/base64"
"encoding/json"
+ "errors"
"fmt"
- "os"
"strings"
"google.golang.org/grpc/metadata"
@@ -20,6 +20,12 @@ const (
TransactionEnvKey = "REFERENCE_TRANSACTION"
)
+var (
+ // ErrTransactionNotFound indicates the transaction metadata could not
+ // be found
+ ErrTransactionNotFound = errors.New("transaction not found")
+)
+
// Transaction stores parameters required to identify a reference
// transaction.
type Transaction struct {
@@ -77,16 +83,17 @@ func InjectTransaction(ctx context.Context, tranasctionID uint64, node string) (
}
// ExtractTransaction extracts `Transaction` from an incoming context. In
-// case the metadata key is not set, the function will return `os.ErrNotExist`.
+// case the metadata key is not set, the function will return
+// `ErrTransactionNotFound`.
func ExtractTransaction(ctx context.Context) (Transaction, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
- return Transaction{}, os.ErrNotExist
+ return Transaction{}, ErrTransactionNotFound
}
serialized := md[TransactionMetadataKey]
if len(serialized) == 0 {
- return Transaction{}, os.ErrNotExist
+ return Transaction{}, ErrTransactionNotFound
}
return FromSerialized(serialized[0])
@@ -114,7 +121,7 @@ func TransactionFromEnv(envvars []string) (Transaction, error) {
}
}
if transactionEnv == "" {
- return Transaction{}, os.ErrNotExist
+ return Transaction{}, ErrTransactionNotFound
}
return FromSerialized(transactionEnv)
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index c7f4e647a..93068100d 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -2,7 +2,9 @@ package transactions
import (
"context"
+ cryptorand "crypto/rand"
"crypto/sha1"
+ "encoding/binary"
"encoding/hex"
"fmt"
"math/rand"
@@ -20,12 +22,40 @@ import (
// for Praefect to handle transactions directly instead of having to reach out
// to reference transaction RPCs.
type Manager struct {
+ txIdGenerator TransactionIdGenerator
lock sync.Mutex
transactions map[uint64]string
counterMetric *prometheus.CounterVec
delayMetric metrics.HistogramVec
}
+// TransactionIdGenerator is an interface for types that can generate transaction IDs.
+type TransactionIdGenerator interface {
+ // Id generates a new transaction identifier
+ Id() uint64
+}
+
+type transactionIdGenerator struct {
+ rand *rand.Rand
+}
+
+func newTransactionIdGenerator() *transactionIdGenerator {
+ var seed [8]byte
+
+ // Ignore any errors. In case we weren't able to generate a seed, the
+ // best we can do is to just use the all-zero seed.
+ cryptorand.Read(seed[:])
+ source := rand.NewSource(int64(binary.LittleEndian.Uint64(seed[:])))
+
+ return &transactionIdGenerator{
+ rand: rand.New(source),
+ }
+}
+
+func (t *transactionIdGenerator) Id() uint64 {
+ return rand.Uint64()
+}
+
// ManagerOpt is a self referential option for Manager
type ManagerOpt func(*Manager)
@@ -43,9 +73,17 @@ func WithDelayMetric(delayMetric metrics.HistogramVec) ManagerOpt {
}
}
+// WithTransactionIdGenerator is an option to set the transaction ID generator
+func WithTransactionIdGenerator(generator TransactionIdGenerator) ManagerOpt {
+ return func(mgr *Manager) {
+ mgr.txIdGenerator = generator
+ }
+}
+
// NewManager creates a new transactions Manager.
func NewManager(opts ...ManagerOpt) *Manager {
mgr := &Manager{
+ txIdGenerator: newTransactionIdGenerator(),
transactions: make(map[uint64]string),
counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}),
delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}),
@@ -84,7 +122,7 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui
// that reset on restart of Praefect would be suboptimal, as the chance
// for collisions is a lot higher in case Praefect restarts when Gitaly
// nodes still have in-flight transactions.
- transactionID := rand.Uint64()
+ transactionID := mgr.txIdGenerator.Id()
if _, ok := mgr.transactions[transactionID]; ok {
return 0, nil, helper.ErrInternalf("transaction exists already")
}
diff --git a/internal/service/hooks/pre_receive.go b/internal/service/hooks/pre_receive.go
index c866dc249..5309028c1 100644
--- a/internal/service/hooks/pre_receive.go
+++ b/internal/service/hooks/pre_receive.go
@@ -5,7 +5,6 @@ import (
"crypto/sha1"
"errors"
"fmt"
- "os"
"os/exec"
"path/filepath"
"time"
@@ -80,7 +79,7 @@ func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectS
func (s *server) voteOnTransaction(stream gitalypb.HookService_PreReceiveHookServer, hash []byte, env []string) error {
tx, err := metadata.TransactionFromEnv(env)
if err != nil {
- if errors.Is(err, os.ErrNotExist) {
+ if errors.Is(err, metadata.ErrTransactionNotFound) {
// No transaction being present is valid, e.g. in case
// there is no Praefect server or the transactions
// feature flag is not set.