Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-12 13:28:55 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-19 08:52:05 +0300
commit248294d8a432a265ad9f3b7edb63546a80d822cf (patch)
tree2536f51382658f94e1bb21d87981d71ecaea06d9
parent0993bb34b98ba0053bb027d09ff551c255f1b738 (diff)
hooks: Let pre-receive hook reach out to the transaction manager
In order to allow for strong consistency for Git reference updates, Gitaly nodes are expected to perform voting on the updates they intend to perform. While the server side for this exists already in form of the transaction manager and reference transaction service, the client-side is not yet wired up. In this first iteration, we implement the client side by reaching out to the reference transaction service from the pre-receive hook. While this won't yet catch all reference updates but only those performed as the result of git-push(1), it serves as a first minimally viable demonstration that a two-phase commit via Git hooks works. One interesting bit is that starting the transaction is _not_ guarded by a feature flag on the hook side, but only by presence of environment variables that carry transaction information. As this environment variable is only set in case Praefect has registered a transaction, which in turn only happens if the "reference_transactions" feature flag is set, we are still guarded. Further note the hook is only implemented in the Go hook service, which is in turn guarded by the "hooks_rpc" feature flag. As the work is currently of experimental nature, it's deemed as a nice proof of concept for the Go hooks RPC.
-rw-r--r--changelogs/unreleased/pks-2pc-single-transaction.yml5
-rw-r--r--internal/praefect/metadata/server.go14
-rw-r--r--internal/service/hooks/pre_receive.go93
-rw-r--r--internal/service/hooks/server.go16
-rw-r--r--internal/service/smarthttp/receive_pack_test.go85
5 files changed, 208 insertions, 5 deletions
diff --git a/changelogs/unreleased/pks-2pc-single-transaction.yml b/changelogs/unreleased/pks-2pc-single-transaction.yml
new file mode 100644
index 000000000..5f5d9b360
--- /dev/null
+++ b/changelogs/unreleased/pks-2pc-single-transaction.yml
@@ -0,0 +1,5 @@
+---
+title: Single-node transactions via pre-receive hook
+merge_request: 2147
+author:
+type: added
diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go
index de13c6fb7..671fafe49 100644
--- a/internal/praefect/metadata/server.go
+++ b/internal/praefect/metadata/server.go
@@ -8,7 +8,10 @@ import (
"os"
"strings"
+ "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@@ -116,3 +119,14 @@ func (p PraefectServer) Env() (string, error) {
encoded := base64.StdEncoding.EncodeToString(marshalled)
return fmt.Sprintf("%s=%s", PraefectEnvKey, encoded), nil
}
+
+func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) {
+ opts := []grpc.DialOption{
+ grpc.WithBlock(),
+ }
+ if p.Token != "" {
+ opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.Token)))
+ }
+
+ return client.DialContext(ctx, p.Address, opts)
+}
diff --git a/internal/service/hooks/pre_receive.go b/internal/service/hooks/pre_receive.go
index a8261115f..c866dc249 100644
--- a/internal/service/hooks/pre_receive.go
+++ b/internal/service/hooks/pre_receive.go
@@ -1,16 +1,23 @@
package hook
import (
+ "context"
+ "crypto/sha1"
"errors"
+ "fmt"
+ "os"
"os/exec"
"path/filepath"
+ "time"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git/alternates"
"gitlab.com/gitlab-org/gitaly/internal/gitlabshell"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
+ "google.golang.org/grpc"
)
type hookRequest interface {
@@ -44,6 +51,75 @@ func gitlabShellHook(hookName string) string {
return filepath.Join(config.Config.Ruby.Dir, "gitlab-shell", "hooks", hookName)
}
+func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectServer) (*grpc.ClientConn, error) {
+ s.mutex.RLock()
+ conn, ok := s.praefectConnPool[server.Address]
+ s.mutex.RUnlock()
+
+ if ok {
+ return conn, nil
+ }
+
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ conn, ok = s.praefectConnPool[server.Address]
+ if !ok {
+ var err error
+ conn, err = server.Dial(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ s.praefectConnPool[server.Address] = conn
+ }
+
+ return conn, nil
+}
+
+func (s *server) voteOnTransaction(stream gitalypb.HookService_PreReceiveHookServer, hash []byte, env []string) error {
+ tx, err := metadata.TransactionFromEnv(env)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ // No transaction being present is valid, e.g. in case
+ // there is no Praefect server or the transactions
+ // feature flag is not set.
+ return nil
+ }
+ return fmt.Errorf("could not extract transaction: %w", err)
+ }
+
+ praefectServer, err := metadata.PraefectFromEnv(env)
+ if err != nil {
+ return fmt.Errorf("could not extract Praefect server: %w", err)
+ }
+
+ ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
+ defer cancel()
+
+ praefectConn, err := s.getPraefectConn(ctx, praefectServer)
+ if err != nil {
+ return err
+ }
+
+ praefectClient := gitalypb.NewRefTransactionClient(praefectConn)
+
+ response, err := praefectClient.StartTransaction(ctx, &gitalypb.StartTransactionRequest{
+ TransactionId: tx.ID,
+ Node: tx.Node,
+ ReferenceUpdatesHash: hash,
+ })
+ if err != nil {
+ return err
+ }
+
+ if response.State != gitalypb.StartTransactionResponse_COMMIT {
+ return errors.New("transaction was aborted")
+ }
+
+ return nil
+}
+
func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer) error {
firstRequest, err := stream.Recv()
if err != nil {
@@ -54,9 +130,19 @@ func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer
return helper.ErrInvalidArgument(err)
}
+ referenceUpdatesHasher := sha1.New()
stdin := streamio.NewReader(func() ([]byte, error) {
req, err := stream.Recv()
- return req.GetStdin(), err
+ if err != nil {
+ return nil, err
+ }
+
+ stdin := req.GetStdin()
+ if _, err := referenceUpdatesHasher.Write(stdin); err != nil {
+ return stdin, err
+ }
+
+ return stdin, nil
})
stdout := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.PreReceiveHookResponse{Stdout: p}) })
stderr := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.PreReceiveHookResponse{Stderr: p}) })
@@ -81,11 +167,14 @@ func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer
c,
env,
)
-
if err != nil {
return helper.ErrInternal(err)
}
+ if err := s.voteOnTransaction(stream, referenceUpdatesHasher.Sum(nil), env); err != nil {
+ return helper.ErrInternalf("error voting on transaction: %w", err)
+ }
+
if err := stream.SendMsg(&gitalypb.PreReceiveHookResponse{
ExitStatus: &gitalypb.ExitStatus{Value: status},
}); err != nil {
diff --git a/internal/service/hooks/server.go b/internal/service/hooks/server.go
index 1977af1df..0acd1d502 100644
--- a/internal/service/hooks/server.go
+++ b/internal/service/hooks/server.go
@@ -1,10 +1,20 @@
package hook
-import "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+import (
+ "sync"
-type server struct{}
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type server struct {
+ mutex sync.RWMutex
+ praefectConnPool map[string]*grpc.ClientConn
+}
// NewServer creates a new instance of a gRPC namespace server
func NewServer() gitalypb.HookServiceServer {
- return &server{}
+ return &server{
+ praefectConnPool: make(map[string]*grpc.ClientConn),
+ }
}
diff --git a/internal/service/smarthttp/receive_pack_test.go b/internal/service/smarthttp/receive_pack_test.go
index e9dc039d8..d60672ade 100644
--- a/internal/service/smarthttp/receive_pack_test.go
+++ b/internal/service/smarthttp/receive_pack_test.go
@@ -460,3 +460,88 @@ func runSmartHTTPHookServiceServer(t *testing.T) (*grpc.Server, string) {
return server, "unix://" + serverSocketPath
}
+
+func TestPostReceiveWithTransactions(t *testing.T) {
+ defer func(cfg config.Cfg) {
+ config.Config = cfg
+ }(config.Config)
+
+ defer func(override string) {
+ hooks.Override = override
+ }(hooks.Override)
+ hooks.Override = ""
+
+ secretToken := "secret token"
+ glID := "key-1234"
+ glRepository := "some_repo"
+ gitlabUser := "gitlab_user-1234"
+ gitlabPassword := "gitlabsecret9887"
+
+ featureSets, err := testhelper.NewFeatureSets([]string{featureflag.HooksRPC, featureflag.ReferenceTransactions})
+ require.NoError(t, err)
+
+ for _, features := range featureSets {
+ t.Run(fmt.Sprintf("features:%s", features), func(t *testing.T) {
+ repo, repoPath, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+
+ opts := testhelper.GitlabTestServerOptions{
+ User: gitlabUser,
+ Password: gitlabPassword,
+ SecretToken: secretToken,
+ GLID: glID,
+ GLRepository: glRepository,
+ RepoPath: repoPath,
+ }
+
+ gitlabServer := testhelper.NewGitlabTestServer(t, opts)
+ defer gitlabServer.Close()
+
+ gitlabShellDir, cleanup := testhelper.CreateTemporaryGitlabShellDir(t)
+ defer cleanup()
+ config.Config.GitlabShell.Dir = gitlabShellDir
+ testhelper.WriteTemporaryGitlabShellConfigFile(t,
+ gitlabShellDir,
+ testhelper.GitlabShellConfig{
+ GitlabURL: gitlabServer.URL,
+ HTTPSettings: testhelper.HTTPSettings{
+ User: gitlabUser,
+ Password: gitlabPassword,
+ },
+ })
+ testhelper.WriteShellSecretFile(t, gitlabShellDir, secretToken)
+
+ gitalyServer := testhelper.NewServerWithAuth(t, nil, nil, config.Config.Auth.Token)
+ gitalypb.RegisterSmartHTTPServiceServer(gitalyServer.GrpcServer(), NewServer())
+ gitalypb.RegisterHookServiceServer(gitalyServer.GrpcServer(), hook.NewServer())
+ reflection.Register(gitalyServer.GrpcServer())
+ require.NoError(t, gitalyServer.Start())
+ defer gitalyServer.Stop()
+
+ internalSocket := config.GitalyInternalSocketPath()
+ internalListener, err := net.Listen("unix", internalSocket)
+ require.NoError(t, err)
+
+ go func() {
+ gitalyServer.GrpcServer().Serve(internalListener)
+ }()
+
+ client, conn := newSmartHTTPClient(t, "unix://"+gitalyServer.Socket())
+ defer conn.Close()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx = features.WithParent(ctx)
+
+ stream, err := client.PostReceivePack(ctx)
+ require.NoError(t, err)
+
+ push := newTestPush(t, nil)
+ request := &gitalypb.PostReceivePackRequest{Repository: repo, GlId: glID, GlRepository: glRepository}
+ response := doPush(t, stream, request, push.body)
+
+ expectedResponse := "0030\x01000eunpack ok\n0019ok refs/heads/master\n00000000"
+ require.Equal(t, expectedResponse, string(response), "Expected response to be %q, got %q", expectedResponse, response)
+ })
+ }
+}