diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-12-17 14:07:55 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-12-17 14:07:55 +0300 |
commit | 27703f4874296dfe9c346c80d175c36b2a5e71e1 (patch) | |
tree | e67f34c3930390c0274992fe9ab66555892cd71e | |
parent | 008843a1129b4657c38cb0bae770d34046f3ac38 (diff) | |
parent | ab7a446c24a3bba7eadd347750eb232415d01f0c (diff) |
Merge branch 'jc-repl-info-attributes' into 'master'
Sync info attributes in ReplicateRepository
Closes #1655
See merge request gitlab-org/gitaly!1693
-rw-r--r-- | changelogs/unreleased/jc-repl-info-attributes.yml | 5 | ||||
-rw-r--r-- | internal/helper/storage.go | 27 | ||||
-rw-r--r-- | internal/safe/file_writer.go | 2 | ||||
-rw-r--r-- | internal/service/repository/replicate.go | 79 | ||||
-rw-r--r-- | internal/service/repository/replicate_test.go | 14 |
5 files changed, 121 insertions, 6 deletions
diff --git a/changelogs/unreleased/jc-repl-info-attributes.yml b/changelogs/unreleased/jc-repl-info-attributes.yml new file mode 100644 index 000000000..bdd5046c1 --- /dev/null +++ b/changelogs/unreleased/jc-repl-info-attributes.yml @@ -0,0 +1,5 @@ +--- +title: Sync info attributes in ReplicateRepository +merge_request: 1693 +author: +type: other diff --git a/internal/helper/storage.go b/internal/helper/storage.go index f74a3302f..a4994f7d3 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/internal/storage" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -60,3 +62,28 @@ func InjectGitalyServers(ctx context.Context, name, address, token string) (cont return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil } + +// ClientConnection creates a grpc.ClientConn from the injected gitaly-servers metadata +func ClientConnection(ctx context.Context, storageName string) (*grpc.ClientConn, error) { + gitalyServersInfo, err := ExtractGitalyServers(ctx) + if err != nil { + return nil, err + } + + repoStorageInfo, ok := gitalyServersInfo[storageName] + if !ok { + return nil, fmt.Errorf("gitaly server info for %q not found", storageName) + } + + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(repoStorageInfo["token"])), + } + + conn, err := grpc.Dial(repoStorageInfo["address"], connOpts...) + if err != nil { + return nil, fmt.Errorf("could not dial source: %v", err) + } + + return conn, nil +} diff --git a/internal/safe/file_writer.go b/internal/safe/file_writer.go index 018fea3c9..5f7e45926 100644 --- a/internal/safe/file_writer.go +++ b/internal/safe/file_writer.go @@ -91,7 +91,7 @@ func (fw *FileWriter) syncDir() error { return f.Sync() } -// Close will close and remove the temp file artifact iff it exists. If the file +// Close will close and remove the temp file artifact if it exists. If the file // was already committed, an ErrAlreadyClosed error will be returned and no // changes will be made to the filesystem. func (fw *FileWriter) Close() error { diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go index ede8a213f..9cc5eae01 100644 --- a/internal/service/repository/replicate.go +++ b/internal/service/repository/replicate.go @@ -4,11 +4,18 @@ import ( "context" "errors" "fmt" + "io" + "os" + "path/filepath" + + "gitlab.com/gitlab-org/gitaly/internal/safe" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" + "golang.org/x/sync/errgroup" ) func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { @@ -22,9 +29,18 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate return nil, helper.ErrInternal(err) } - outCtx := helper.IncomingToOutgoing(ctx) + g, ctx := errgroup.WithContext(ctx) + outgoingCtx := helper.IncomingToOutgoing(ctx) + + for _, f := range []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{ + syncRepository, + syncInfoAttributes, + } { + f := f // rescoping f + g.Go(func() error { return f(outgoingCtx, in) }) + } - if err := syncRepository(outCtx, in); err != nil { + if err := g.Wait(); err != nil { return nil, helper.ErrInternal(err) } @@ -67,6 +83,55 @@ func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest return nil } +func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName()) + if err != nil { + return err + } + + repoPath, err := helper.GetRepoPath(in.GetRepository()) + if err != nil { + return err + } + + infoPath := filepath.Join(repoPath, "info") + attributesPath := filepath.Join(infoPath, "attributes") + + if err := os.MkdirAll(infoPath, 0755); err != nil { + return err + } + + fw, err := safe.CreateFileWriter(attributesPath) + if err != nil { + return err + } + defer fw.Close() + + stream, err := repoClient.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{ + Repository: in.GetSource(), + }) + if err != nil { + return err + } + + if _, err := io.Copy(fw, streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetAttributes(), err + })); err != nil { + return err + } + + if err = fw.Commit(); err != nil { + return err + } + + if err := os.Chmod(attributesPath, attributesFileMode); err != nil { + return err + } + + return os.Rename(attributesPath, attributesPath) +} + // newRemoteClient creates a new RemoteClient that talks to the same gitaly server func newRemoteClient() (gitalypb.RemoteServiceClient, error) { conn, err := client.Dial(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), nil) @@ -76,3 +141,13 @@ func newRemoteClient() (gitalypb.RemoteServiceClient, error) { return gitalypb.NewRemoteServiceClient(conn), nil } + +// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository +func newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { + conn, err := helper.ClientConnection(ctx, storageName) + if err != nil { + return nil, err + } + + return gitalypb.NewRepositoryServiceClient(conn), nil +} diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go index b48975a50..b82c96ac3 100644 --- a/internal/service/repository/replicate_test.go +++ b/internal/service/repository/replicate_test.go @@ -2,7 +2,9 @@ package repository_test import ( "fmt" + "io/ioutil" "os" + "path" "path/filepath" "testing" @@ -49,6 +51,11 @@ func TestReplicateRepository(t *testing.T) { repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath) defer conn.Close() + // write info attributes + attrFilePath := path.Join(testRepoPath, "info", "attributes") + attrData := []byte("*.pbxproj binary\n") + require.NoError(t, ioutil.WriteFile(attrFilePath, attrData, 0644)) + targetRepo := *testRepo targetRepo.StorageName = "replica" @@ -68,9 +75,10 @@ func TestReplicateRepository(t *testing.T) { testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "fsck") - sourceRefs := testhelper.GetRepositoryRefs(t, testRepoPath) - targetRefs := testhelper.GetRepositoryRefs(t, targetRepoPath) - require.Equal(t, sourceRefs, targetRefs) + replicatedAttrFilePath := path.Join(targetRepoPath, "info", "attributes") + replicatedAttrData, err := ioutil.ReadFile(replicatedAttrFilePath) + require.NoError(t, err) + require.Equal(t, string(attrData), string(replicatedAttrData), "info/attributes files must match") // create another branch _, anotherNewBranch := testhelper.CreateCommitOnNewBranch(t, testRepoPath) |