diff options
author | Andrew Newdigate <andrew@gitlab.com> | 2019-02-20 18:22:51 +0300 |
---|---|---|
committer | Andrew Newdigate <andrew@gitlab.com> | 2019-02-22 10:56:46 +0300 |
commit | ecf355c711defc3096dc37eb532e279a77d882db (patch) | |
tree | d29eac80e44e09ad60c16b9f07d244db69137c14 | |
parent | bdafb162b31ddae5a0123d30b3138360b1fa3c83 (diff) |
Support distributed tracing in gitaly-ssh
-rw-r--r-- | .gitignore | 4 | ||||
-rw-r--r-- | cmd/gitaly-ssh/main.go | 61 | ||||
-rw-r--r-- | cmd/gitaly-ssh/main_test.go | 103 | ||||
-rw-r--r-- | cmd/gitaly-ssh/receive_pack.go | 4 | ||||
-rw-r--r-- | cmd/gitaly-ssh/upload_archive.go | 4 | ||||
-rw-r--r-- | cmd/gitaly-ssh/upload_pack.go | 4 | ||||
-rw-r--r-- | internal/service/repository/fork.go | 5 |
7 files changed, 167 insertions, 18 deletions
diff --git a/.gitignore b/.gitignore index 0c41b8995..eff022ea3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ /_build /gitaly -/**/gitaly-ssh +cmd/gitaly-ssh/gitaly-ssh +/gitaly-ssh +**/testdata/gitaly-libexec/ /*.deb /_support/package/bin /_support/bin diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go index 9f4fdbc12..4049cc47b 100644 --- a/cmd/gitaly-ssh/main.go +++ b/cmd/gitaly-ssh/main.go @@ -1,16 +1,21 @@ package main import ( + "context" "fmt" "log" "os" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/client" + grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" + "gitlab.com/gitlab-org/labkit/tracing" + grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" "google.golang.org/grpc" ) -type packFn func(_ *grpc.ClientConn, _ string) (int32, error) +type packFn func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) // GITALY_ADDRESS="tcp://1.2.3.4:9999" or "unix:/var/run/gitaly.sock" // GITALY_TOKEN="foobar1234" @@ -24,8 +29,9 @@ func main() { log.Fatalf("invalid number of arguments, expected at least 1, got %d", n-1) } + command := os.Args[1] var packer packFn - switch os.Args[1] { + switch command { case "upload-pack": packer = uploadPack case "receive-pack": @@ -33,27 +39,47 @@ func main() { case "upload-archive": packer = uploadArchive default: - log.Fatalf("invalid pack command: %q", os.Args[1]) + log.Fatalf("invalid pack command: %q", command) } - if wd := os.Getenv("GITALY_WD"); wd != "" { - if err := os.Chdir(wd); err != nil { - log.Fatalf("change to : %v", err) + gitalyWorkingDir := os.Getenv("GITALY_WD") + gitalyAddress := os.Getenv("GITALY_ADDRESS") + gitalyPayload := os.Getenv("GITALY_PAYLOAD") + + code, err := run(packer, gitalyWorkingDir, gitalyAddress, gitalyPayload) + if err != nil { + log.Printf("%s: %v", command, err) + } + + os.Exit(code) +} + +func run(packer packFn, gitalyWorkingDir string, gitalyAddress string, gitalyPayload string) (int, error) { + // Configure distributed tracing + closer := tracing.Initialize(tracing.WithServiceName("gitaly-ssh")) + defer closer.Close() + + ctx, finished := tracing.ExtractFromEnv(context.Background()) + defer finished() + + if gitalyWorkingDir != "" { + if err := os.Chdir(gitalyWorkingDir); err != nil { + return 1, fmt.Errorf("unable to chdir to %v", gitalyWorkingDir) } } - conn, err := getConnection(os.Getenv("GITALY_ADDRESS")) + conn, err := getConnection(gitalyAddress) if err != nil { - log.Fatalf("%s: %v", os.Args[1], err) + return 1, err } defer conn.Close() - code, err := packer(conn, os.Getenv("GITALY_PAYLOAD")) + code, err := packer(ctx, conn, gitalyPayload) if err != nil { - log.Fatalf("%s: %v", os.Args[1], err) + return 1, err } - os.Exit(int(code)) + return int(code), nil } func getConnection(url string) (*grpc.ClientConn, error) { @@ -70,5 +96,18 @@ func dialOpts() []grpc.DialOption { connOpts = append(connOpts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token))) } + // Add grpc client interceptors + connOpts = append(connOpts, grpc.WithStreamInterceptor( + grpc_middleware.ChainStreamClient( + grpctracing.StreamClientTracingInterceptor(), // Tracing + grpccorrelation.StreamClientCorrelationInterceptor(), // Correlation + )), + + grpc.WithUnaryInterceptor( + grpc_middleware.ChainUnaryClient( + grpctracing.UnaryClientTracingInterceptor(), // Tracing + grpccorrelation.UnaryClientCorrelationInterceptor(), // Correlation + ))) + return connOpts } diff --git a/cmd/gitaly-ssh/main_test.go b/cmd/gitaly-ssh/main_test.go new file mode 100644 index 000000000..97073794e --- /dev/null +++ b/cmd/gitaly-ssh/main_test.go @@ -0,0 +1,103 @@ +package main + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +func TestRun(t *testing.T) { + var successPacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 0, nil } + var exitCodePacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 123, nil } + var errorPacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 1, fmt.Errorf("fail") } + + gitalyTCPAddress := "tcp://localhost:9999" + gitalyUnixAddress := fmt.Sprintf("unix://%s", testhelper.GetTemporaryGitalySocketFileName()) + + tests := []struct { + name string + workingDir string + gitalyAddress string + packer packFn + wantCode int + wantErr bool + }{ + { + name: "trivial_tcp", + packer: successPacker, + gitalyAddress: gitalyTCPAddress, + wantCode: 0, + wantErr: false, + }, + { + name: "trivial_unix", + packer: successPacker, + gitalyAddress: gitalyUnixAddress, + wantCode: 0, + wantErr: false, + }, + { + name: "with_working_dir", + workingDir: os.TempDir(), + gitalyAddress: gitalyTCPAddress, + packer: successPacker, + wantCode: 0, + wantErr: false, + }, + { + name: "incorrect_working_dir", + workingDir: "directory_does_not_exist", + gitalyAddress: gitalyTCPAddress, + packer: successPacker, + wantCode: 1, + wantErr: true, + }, + { + name: "empty_gitaly_address", + gitalyAddress: "", + packer: successPacker, + wantCode: 1, + wantErr: true, + }, + { + name: "invalid_gitaly_address", + gitalyAddress: "invalid_gitaly_address", + packer: successPacker, + wantCode: 1, + wantErr: true, + }, + { + name: "exit_code", + gitalyAddress: gitalyTCPAddress, + packer: exitCodePacker, + wantCode: 123, + wantErr: false, + }, + { + name: "error", + gitalyAddress: gitalyTCPAddress, + packer: errorPacker, + wantCode: 1, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotCode, err := run(tt.packer, tt.workingDir, tt.gitalyAddress, "{}") + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, tt.wantCode, gotCode) + }) + } +} diff --git a/cmd/gitaly-ssh/receive_pack.go b/cmd/gitaly-ssh/receive_pack.go index cb4f404ed..910eb7628 100644 --- a/cmd/gitaly-ssh/receive_pack.go +++ b/cmd/gitaly-ssh/receive_pack.go @@ -11,13 +11,13 @@ import ( "google.golang.org/grpc" ) -func receivePack(conn *grpc.ClientConn, req string) (int32, error) { +func receivePack(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) { var request gitalypb.SSHReceivePackRequest if err := jsonpb.UnmarshalString(req, &request); err != nil { return 0, fmt.Errorf("json unmarshal: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() return client.ReceivePack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) diff --git a/cmd/gitaly-ssh/upload_archive.go b/cmd/gitaly-ssh/upload_archive.go index b7bb86466..104227d36 100644 --- a/cmd/gitaly-ssh/upload_archive.go +++ b/cmd/gitaly-ssh/upload_archive.go @@ -11,13 +11,13 @@ import ( "google.golang.org/grpc" ) -func uploadArchive(conn *grpc.ClientConn, req string) (int32, error) { +func uploadArchive(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) { var request gitalypb.SSHUploadArchiveRequest if err := jsonpb.UnmarshalString(req, &request); err != nil { return 0, fmt.Errorf("json unmarshal: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() return client.UploadArchive(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) diff --git a/cmd/gitaly-ssh/upload_pack.go b/cmd/gitaly-ssh/upload_pack.go index d883260e1..98c688758 100644 --- a/cmd/gitaly-ssh/upload_pack.go +++ b/cmd/gitaly-ssh/upload_pack.go @@ -11,13 +11,13 @@ import ( "google.golang.org/grpc" ) -func uploadPack(conn *grpc.ClientConn, req string) (int32, error) { +func uploadPack(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) { var request gitalypb.SSHUploadPackRequest if err := jsonpb.UnmarshalString(req, &request); err != nil { return 0, fmt.Errorf("json unmarshal: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() return client.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) diff --git a/internal/service/repository/fork.go b/internal/service/repository/fork.go index fedef0227..ce480e7d1 100644 --- a/internal/service/repository/fork.go +++ b/internal/service/repository/fork.go @@ -11,12 +11,15 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/labkit/tracing" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const gitalyInternalURL = "ssh://gitaly/internal.git" +var envInjector = tracing.NewEnvInjector() + func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) { targetRepository := req.Repository sourceRepository := req.SourceRepository @@ -73,6 +76,8 @@ func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest fmt.Sprintf("GITALY_TOKEN=%s", sourceRepositoryGitalyToken), fmt.Sprintf("GIT_SSH_COMMAND=%s upload-pack", gitalySSHPath), } + env = envInjector(ctx, env) + args := []string{ "clone", "--bare", |