diff options
author | James Fargher <proglottis@gmail.com> | 2021-07-26 03:29:28 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2021-07-26 03:29:28 +0300 |
commit | a7e4fcccdaffda7284b0325bb5f86cde77b99aed (patch) | |
tree | 36e4c41f8e658e1139d5d36613b3e6e5ca51e180 | |
parent | dd968975406e7ba801e186c2c1414467ebaeb64f (diff) | |
parent | a9801375db1253f7413449e9dbc9a12b7325cf31 (diff) |
Merge branch 'ps-backup-supports-cloud-sink' into 'master'
backup: support of the Cloud storages
Closes #3633
See merge request gitlab-org/gitaly!3687
-rw-r--r-- | cmd/gitaly-backup/create.go | 9 | ||||
-rw-r--r-- | cmd/gitaly-backup/restore.go | 9 | ||||
-rw-r--r-- | internal/backup/backup.go | 27 | ||||
-rw-r--r-- | internal/backup/backup_test.go | 93 |
4 files changed, 134 insertions, 4 deletions
diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go index 03afe4fda..2885bf33f 100644 --- a/cmd/gitaly-backup/create.go +++ b/cmd/gitaly-backup/create.go @@ -34,10 +34,15 @@ func (cmd *createSubcommand) Flags(fs *flag.FlagSet) { } func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error { - fsBackup := backup.NewManager(backup.NewFilesystemSink(cmd.backupPath)) + sink, err := backup.ResolveSink(ctx, cmd.backupPath) + if err != nil { + return fmt.Errorf("create: resolve sink: %w", err) + } + + manager := backup.NewManager(sink) var pipeline backup.CreatePipeline - pipeline = backup.NewPipeline(log.StandardLogger(), fsBackup) + pipeline = backup.NewPipeline(log.StandardLogger(), manager) if cmd.parallel > 0 { pipeline = backup.NewParallelCreatePipeline(pipeline, cmd.parallel) } diff --git a/cmd/gitaly-backup/restore.go b/cmd/gitaly-backup/restore.go index 1814afdcd..98fe58773 100644 --- a/cmd/gitaly-backup/restore.go +++ b/cmd/gitaly-backup/restore.go @@ -31,8 +31,13 @@ func (cmd *restoreSubcommand) Flags(fs *flag.FlagSet) { } func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error { - fsBackup := backup.NewManager(backup.NewFilesystemSink(cmd.backupPath)) - pipeline := backup.NewPipeline(log.StandardLogger(), fsBackup) + sink, err := backup.ResolveSink(ctx, cmd.backupPath) + if err != nil { + return fmt.Errorf("restore: resolve sink: %w", err) + } + + manager := backup.NewManager(sink) + pipeline := backup.NewPipeline(log.StandardLogger(), manager) decoder := json.NewDecoder(stdin) for { diff --git a/internal/backup/backup.go b/internal/backup/backup.go index ffcd78f94..adca22866 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "net/url" "path/filepath" "strings" @@ -12,6 +13,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" + "gocloud.dev/blob/azureblob" + "gocloud.dev/blob/gcsblob" + "gocloud.dev/blob/s3blob" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -32,6 +36,29 @@ type Sink interface { GetReader(ctx context.Context, relativePath string) (io.ReadCloser, error) } +// ResolveSink returns a sink implementation based on the provided path. +func ResolveSink(ctx context.Context, path string) (Sink, error) { + parsed, err := url.Parse(path) + if err != nil { + return nil, err + } + scheme := parsed.Scheme + if i := strings.LastIndex(scheme, "+"); i > 0 { + // the url may include additional configuration options like service name + // we don't include it into the scheme definition as it will push us to create + // a full set of variations. Instead we trim it up to the service option only. + scheme = scheme[i+1:] + } + + switch scheme { + case s3blob.Scheme, azureblob.Scheme, gcsblob.Scheme: + sink, err := NewStorageServiceSink(ctx, path) + return sink, err + default: + return NewFilesystemSink(path), nil + } +} + // Manager manages process of the creating/restoring backups. type Manager struct { sink Sink diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index d1c67419f..b0c16c35b 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -1,6 +1,7 @@ package backup import ( + "context" "errors" "io/ioutil" "os" @@ -201,3 +202,95 @@ func TestManager_Restore(t *testing.T) { }) } } + +func TestResolveSink(t *testing.T) { + isStorageServiceSink := func(expErrMsg string) func(t *testing.T, sink Sink) { + return func(t *testing.T, sink Sink) { + t.Helper() + sssink, ok := sink.(*StorageServiceSink) + require.True(t, ok) + _, err := sssink.bucket.List(nil).Next(context.TODO()) + ierr, ok := err.(interface{ Unwrap() error }) + require.True(t, ok) + terr := ierr.Unwrap() + require.Contains(t, terr.Error(), expErrMsg) + } + } + + tmpDir := testhelper.TempDir(t) + gsCreds := filepath.Join(tmpDir, "gs.creds") + require.NoError(t, ioutil.WriteFile(gsCreds, []byte(` +{ + "type": "service_account", + "project_id": "hostfactory-179005", + "private_key_id": "6253b144ccd94f50ce1224a73ffc48bda256d0a7", + "private_key": "-----BEGIN PRIVATE KEY-----\nXXXX<KEY CONTENT OMMIT HERR> \n-----END PRIVATE KEY-----\n", + "client_email": "303721356529-compute@developer.gserviceaccount.com", + "client_id": "116595416948414952474", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://accounts.google.com/o/oauth2/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/303724477529-compute%40developer.gserviceaccount.com" +}`), 0655)) + + for _, tc := range []struct { + desc string + envs map[string]string + path string + verify func(t *testing.T, sink Sink) + errMsg string + }{ + { + desc: "AWS S3", + envs: map[string]string{ + "AWS_ACCESS_KEY_ID": "test", + "AWS_SECRET_ACCESS_KEY": "test", + "AWS_REGION": "us-east-1", + }, + path: "s3://bucket", + verify: isStorageServiceSink("The AWS Access Key Id you provided does not exist in our records."), + }, + { + desc: "Google Cloud Storage", + envs: map[string]string{ + "GOOGLE_APPLICATION_CREDENTIALS": gsCreds, + }, + path: "blob+gs://bucket", + verify: isStorageServiceSink("storage.googleapis.com"), + }, + { + desc: "Azure Cloud File Storage", + envs: map[string]string{ + "AZURE_STORAGE_ACCOUNT": "test", + "AZURE_STORAGE_KEY": "test", + "AZURE_STORAGE_SAS_TOKEN": "test", + }, + path: "blob+bucket+azblob://bucket", + verify: isStorageServiceSink("https://test.blob.core.windows.net"), + }, + { + desc: "Filesystem", + path: "/some/path", + verify: func(t *testing.T, sink Sink) { + require.IsType(t, &FilesystemSink{}, sink) + }, + }, + { + desc: "undefined", + path: "some:invalid:path\x00", + errMsg: `parse "some:invalid:path\x00": net/url: invalid control character in URL`, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + for k, v := range tc.envs { + t.Cleanup(testhelper.ModifyEnvironment(t, k, v)) + } + sink, err := ResolveSink(context.TODO(), tc.path) + if tc.errMsg != "" { + require.EqualError(t, err, tc.errMsg) + return + } + tc.verify(t, sink) + }) + } +} |