diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-12 13:28:55 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-19 08:52:05 +0300 |
commit | 248294d8a432a265ad9f3b7edb63546a80d822cf (patch) | |
tree | 2536f51382658f94e1bb21d87981d71ecaea06d9 | |
parent | 0993bb34b98ba0053bb027d09ff551c255f1b738 (diff) |
hooks: Let pre-receive hook reach out to the transaction manager
In order to allow for strong consistency for Git reference updates,
Gitaly nodes are expected to perform voting on the updates they intend
to perform. While the server side for this exists already in form of the
transaction manager and reference transaction service, the client-side
is not yet wired up.
In this first iteration, we implement the client side by reaching out to
the reference transaction service from the pre-receive hook. While this
won't yet catch all reference updates but only those performed as the
result of git-push(1), it serves as a first minimally viable
demonstration that a two-phase commit via Git hooks works.
One interesting bit is that starting the transaction is _not_ guarded by
a feature flag on the hook side, but only by presence of environment
variables that carry transaction information. As this environment
variable is only set in case Praefect has registered a transaction,
which in turn only happens if the "reference_transactions" feature flag
is set, we are still guarded.
Further note the hook is only implemented in the Go hook service, which
is in turn guarded by the "hooks_rpc" feature flag. As the work is
currently of experimental nature, it's deemed as a nice proof of concept
for the Go hooks RPC.
-rw-r--r-- | changelogs/unreleased/pks-2pc-single-transaction.yml | 5 | ||||
-rw-r--r-- | internal/praefect/metadata/server.go | 14 | ||||
-rw-r--r-- | internal/service/hooks/pre_receive.go | 93 | ||||
-rw-r--r-- | internal/service/hooks/server.go | 16 | ||||
-rw-r--r-- | internal/service/smarthttp/receive_pack_test.go | 85 |
5 files changed, 208 insertions, 5 deletions
diff --git a/changelogs/unreleased/pks-2pc-single-transaction.yml b/changelogs/unreleased/pks-2pc-single-transaction.yml new file mode 100644 index 000000000..5f5d9b360 --- /dev/null +++ b/changelogs/unreleased/pks-2pc-single-transaction.yml @@ -0,0 +1,5 @@ +--- +title: Single-node transactions via pre-receive hook +merge_request: 2147 +author: +type: added diff --git a/internal/praefect/metadata/server.go b/internal/praefect/metadata/server.go index de13c6fb7..671fafe49 100644 --- a/internal/praefect/metadata/server.go +++ b/internal/praefect/metadata/server.go @@ -8,7 +8,10 @@ import ( "os" "strings" + "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -116,3 +119,14 @@ func (p PraefectServer) Env() (string, error) { encoded := base64.StdEncoding.EncodeToString(marshalled) return fmt.Sprintf("%s=%s", PraefectEnvKey, encoded), nil } + +func (p PraefectServer) Dial(ctx context.Context) (*grpc.ClientConn, error) { + opts := []grpc.DialOption{ + grpc.WithBlock(), + } + if p.Token != "" { + opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.Token))) + } + + return client.DialContext(ctx, p.Address, opts) +} diff --git a/internal/service/hooks/pre_receive.go b/internal/service/hooks/pre_receive.go index a8261115f..c866dc249 100644 --- a/internal/service/hooks/pre_receive.go +++ b/internal/service/hooks/pre_receive.go @@ -1,16 +1,23 @@ package hook import ( + "context" + "crypto/sha1" "errors" + "fmt" + "os" "os/exec" "path/filepath" + "time" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git/alternates" "gitlab.com/gitlab-org/gitaly/internal/gitlabshell" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" + "google.golang.org/grpc" ) type hookRequest interface { @@ -44,6 +51,75 @@ func gitlabShellHook(hookName string) string { return filepath.Join(config.Config.Ruby.Dir, "gitlab-shell", "hooks", hookName) } +func (s *server) getPraefectConn(ctx context.Context, server *metadata.PraefectServer) (*grpc.ClientConn, error) { + s.mutex.RLock() + conn, ok := s.praefectConnPool[server.Address] + s.mutex.RUnlock() + + if ok { + return conn, nil + } + + s.mutex.Lock() + defer s.mutex.Unlock() + + conn, ok = s.praefectConnPool[server.Address] + if !ok { + var err error + conn, err = server.Dial(ctx) + if err != nil { + return nil, err + } + + s.praefectConnPool[server.Address] = conn + } + + return conn, nil +} + +func (s *server) voteOnTransaction(stream gitalypb.HookService_PreReceiveHookServer, hash []byte, env []string) error { + tx, err := metadata.TransactionFromEnv(env) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // No transaction being present is valid, e.g. in case + // there is no Praefect server or the transactions + // feature flag is not set. + return nil + } + return fmt.Errorf("could not extract transaction: %w", err) + } + + praefectServer, err := metadata.PraefectFromEnv(env) + if err != nil { + return fmt.Errorf("could not extract Praefect server: %w", err) + } + + ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second) + defer cancel() + + praefectConn, err := s.getPraefectConn(ctx, praefectServer) + if err != nil { + return err + } + + praefectClient := gitalypb.NewRefTransactionClient(praefectConn) + + response, err := praefectClient.StartTransaction(ctx, &gitalypb.StartTransactionRequest{ + TransactionId: tx.ID, + Node: tx.Node, + ReferenceUpdatesHash: hash, + }) + if err != nil { + return err + } + + if response.State != gitalypb.StartTransactionResponse_COMMIT { + return errors.New("transaction was aborted") + } + + return nil +} + func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer) error { firstRequest, err := stream.Recv() if err != nil { @@ -54,9 +130,19 @@ func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer return helper.ErrInvalidArgument(err) } + referenceUpdatesHasher := sha1.New() stdin := streamio.NewReader(func() ([]byte, error) { req, err := stream.Recv() - return req.GetStdin(), err + if err != nil { + return nil, err + } + + stdin := req.GetStdin() + if _, err := referenceUpdatesHasher.Write(stdin); err != nil { + return stdin, err + } + + return stdin, nil }) stdout := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.PreReceiveHookResponse{Stdout: p}) }) stderr := streamio.NewWriter(func(p []byte) error { return stream.Send(&gitalypb.PreReceiveHookResponse{Stderr: p}) }) @@ -81,11 +167,14 @@ func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer c, env, ) - if err != nil { return helper.ErrInternal(err) } + if err := s.voteOnTransaction(stream, referenceUpdatesHasher.Sum(nil), env); err != nil { + return helper.ErrInternalf("error voting on transaction: %w", err) + } + if err := stream.SendMsg(&gitalypb.PreReceiveHookResponse{ ExitStatus: &gitalypb.ExitStatus{Value: status}, }); err != nil { diff --git a/internal/service/hooks/server.go b/internal/service/hooks/server.go index 1977af1df..0acd1d502 100644 --- a/internal/service/hooks/server.go +++ b/internal/service/hooks/server.go @@ -1,10 +1,20 @@ package hook -import "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +import ( + "sync" -type server struct{} + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type server struct { + mutex sync.RWMutex + praefectConnPool map[string]*grpc.ClientConn +} // NewServer creates a new instance of a gRPC namespace server func NewServer() gitalypb.HookServiceServer { - return &server{} + return &server{ + praefectConnPool: make(map[string]*grpc.ClientConn), + } } diff --git a/internal/service/smarthttp/receive_pack_test.go b/internal/service/smarthttp/receive_pack_test.go index e9dc039d8..d60672ade 100644 --- a/internal/service/smarthttp/receive_pack_test.go +++ b/internal/service/smarthttp/receive_pack_test.go @@ -460,3 +460,88 @@ func runSmartHTTPHookServiceServer(t *testing.T) (*grpc.Server, string) { return server, "unix://" + serverSocketPath } + +func TestPostReceiveWithTransactions(t *testing.T) { + defer func(cfg config.Cfg) { + config.Config = cfg + }(config.Config) + + defer func(override string) { + hooks.Override = override + }(hooks.Override) + hooks.Override = "" + + secretToken := "secret token" + glID := "key-1234" + glRepository := "some_repo" + gitlabUser := "gitlab_user-1234" + gitlabPassword := "gitlabsecret9887" + + featureSets, err := testhelper.NewFeatureSets([]string{featureflag.HooksRPC, featureflag.ReferenceTransactions}) + require.NoError(t, err) + + for _, features := range featureSets { + t.Run(fmt.Sprintf("features:%s", features), func(t *testing.T) { + repo, repoPath, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + opts := testhelper.GitlabTestServerOptions{ + User: gitlabUser, + Password: gitlabPassword, + SecretToken: secretToken, + GLID: glID, + GLRepository: glRepository, + RepoPath: repoPath, + } + + gitlabServer := testhelper.NewGitlabTestServer(t, opts) + defer gitlabServer.Close() + + gitlabShellDir, cleanup := testhelper.CreateTemporaryGitlabShellDir(t) + defer cleanup() + config.Config.GitlabShell.Dir = gitlabShellDir + testhelper.WriteTemporaryGitlabShellConfigFile(t, + gitlabShellDir, + testhelper.GitlabShellConfig{ + GitlabURL: gitlabServer.URL, + HTTPSettings: testhelper.HTTPSettings{ + User: gitlabUser, + Password: gitlabPassword, + }, + }) + testhelper.WriteShellSecretFile(t, gitlabShellDir, secretToken) + + gitalyServer := testhelper.NewServerWithAuth(t, nil, nil, config.Config.Auth.Token) + gitalypb.RegisterSmartHTTPServiceServer(gitalyServer.GrpcServer(), NewServer()) + gitalypb.RegisterHookServiceServer(gitalyServer.GrpcServer(), hook.NewServer()) + reflection.Register(gitalyServer.GrpcServer()) + require.NoError(t, gitalyServer.Start()) + defer gitalyServer.Stop() + + internalSocket := config.GitalyInternalSocketPath() + internalListener, err := net.Listen("unix", internalSocket) + require.NoError(t, err) + + go func() { + gitalyServer.GrpcServer().Serve(internalListener) + }() + + client, conn := newSmartHTTPClient(t, "unix://"+gitalyServer.Socket()) + defer conn.Close() + + ctx, cancel := testhelper.Context() + defer cancel() + ctx = features.WithParent(ctx) + + stream, err := client.PostReceivePack(ctx) + require.NoError(t, err) + + push := newTestPush(t, nil) + request := &gitalypb.PostReceivePackRequest{Repository: repo, GlId: glID, GlRepository: glRepository} + response := doPush(t, stream, request, push.body) + + expectedResponse := "0030\x01000eunpack ok\n0019ok refs/heads/master\n00000000" + require.Equal(t, expectedResponse, string(response), "Expected response to be %q, got %q", expectedResponse, response) + }) + } +} |