Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-10-01 23:11:21 +0300
committerJohn Cai <jcai@gitlab.com>2019-10-03 00:07:05 +0300
commit0b150abbaf2da0443e157802f9c5e779ec908a31 (patch)
treecdb3afa53f751b83e6d29aa8bb8dfb4816e69dc9
parente7dce1c30d10ce45e970d81ef9356ce469cd21ae (diff)
Add CleanupOrphanedRepos RPCjc-cleanup-repos
-rw-r--r--cmd/praefect/main.go5
-rw-r--r--internal/praefect/coordinator.go11
-rw-r--r--internal/praefect/coordinator_test.go13
-rw-r--r--internal/praefect/datastore/datastore.go (renamed from internal/praefect/datastore.go)21
-rw-r--r--internal/praefect/datastore/datastore_test.go (renamed from internal/praefect/datastore_test.go)2
-rw-r--r--internal/praefect/replicator.go17
-rw-r--r--internal/praefect/replicator_test.go9
-rw-r--r--internal/praefect/server.go9
-rw-r--r--internal/praefect/server_test.go1
-rw-r--r--internal/praefect/service/info/cleanup.go60
-rw-r--r--internal/praefect/service/info/server.go13
-rw-r--r--internal/service/server/cleanup.go53
-rw-r--r--internal/service/storage/deleteall.go2
-rw-r--r--internal/tempdir/tempdir.go4
-rw-r--r--proto/go/gitalypb/ha-cleanup.pb.go200
-rw-r--r--proto/go/gitalypb/protolist.go1
-rw-r--r--proto/ha-cleanup.proto23
-rw-r--r--ruby/proto/gitaly.rb2
-rw-r--r--ruby/proto/gitaly/ha-cleanup_pb.rb18
-rw-r--r--ruby/proto/gitaly/ha-cleanup_services_pb.rb22
20 files changed, 447 insertions, 39 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 415b37349..fca80fbd4 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/labkit/tracing"
)
@@ -94,10 +95,10 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
- datastore = praefect.NewMemoryDatastore(conf)
+ datastore = datastore.NewMemoryDatastore(conf)
coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr("default", logger, datastore, coordinator)
- srv = praefect.NewServer(coordinator, repl, nil, logger, conf)
+ srv = praefect.NewServer(coordinator, datastore, repl, nil, logger, conf)
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
termCh = make(chan os.Signal, len(signals))
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 1c9ba2e1a..4ccfe7e1d 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -12,6 +12,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -32,14 +33,14 @@ type Coordinator struct {
failoverMutex sync.RWMutex
connMutex sync.RWMutex
- datastore Datastore
+ datastore datastore.Datastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, datastore datastore.Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -155,7 +156,7 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git
primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
if err != nil {
- if err != ErrPrimaryNotSet {
+ if err != datastore.ErrPrimaryNotSet {
return nil, err
}
// if there are no primaries for this repository, pick one
@@ -216,8 +217,8 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func()
return func() {
for _, jobID := range jobIDs {
- if err := c.datastore.UpdateReplJob(jobID, JobStateReady); err != nil {
- c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", JobStateReady)
+ if err := c.datastore.UpdateReplJob(jobID, datastore.JobStateReady); err != nil {
+ c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", datastore.JobStateReady)
}
}
}, nil
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index d6e3c519a..418c5b329 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -27,7 +28,7 @@ func TestSecondaryRotation(t *testing.T) {
}
func TestStreamDirector(t *testing.T) {
- datastore := NewMemoryDatastore(config.Config{
+ datastore := datastore.NewMemoryDatastore(config.Config{
Nodes: []*models.Node{
&models.Node{
Address: "tcp://gitaly-primary.example.com",
@@ -65,7 +66,7 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use")
- jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
+ jobs, err := datastore.GetJobs(datastore.JobStatePending, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
@@ -74,11 +75,11 @@ func TestStreamDirector(t *testing.T) {
sourceNode, err := datastore.GetStorageNode(0)
require.NoError(t, err)
- expectedJob := ReplJob{
+ expectedJob := datastore.ReplJob{
ID: 1,
TargetNode: targetNode,
SourceNode: sourceNode,
- State: JobStatePending,
+ State: datastore.JobStatePending,
Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}},
}
@@ -86,11 +87,11 @@ func TestStreamDirector(t *testing.T) {
jobUpdateFunc()
- jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10)
+ jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- expectedJob.State = JobStateReady
+ expectedJob.State = datastore.JobStateReady
require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady")
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore/datastore.go
index b336bff0c..7ceb5b5fd 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -1,9 +1,9 @@
-// Package praefect provides data models and datastore persistence abstractions
+// Package datastore provides data models and datastore persistence abstractions
// for tracking the state of repository replicas.
//
// See original design discussion:
// https://gitlab.com/gitlab-org/gitaly/issues/1495
-package praefect
+package datastore
import (
"errors"
@@ -86,6 +86,8 @@ type ReplicasDatastore interface {
RemoveReplica(relativePath string, storageNodeID int) error
GetRepository(relativePath string) (*models.Repository, error)
+
+ GetRepositories() ([]*models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -313,6 +315,21 @@ func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repositor
return &repository, nil
}
+// GetRepositories gets the repository for a repository relative path
+func (md *MemoryDatastore) GetRepositories() ([]*models.Repository, error) {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ var repositories []*models.Repository
+
+ for _, repository := range md.repositories.m {
+ repositories = append(repositories, &repository)
+
+ }
+
+ return repositories, nil
+}
+
// ErrReplicasMissing indicates the repository does not have any backup
// replicas
var ErrReplicasMissing = errors.New("repository missing secondary replicas")
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 3634aa969..c2a670d2e 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -1,4 +1,4 @@
-package praefect
+package datastore
import (
"testing"
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 61cfdceed..81f57b666 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -53,14 +54,14 @@ func init() {
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error
+ Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Entry
}
-func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
repository := &gitalypb.Repository{
StorageName: job.TargetNode.Storage,
RelativePath: job.Repository.RelativePath,
@@ -149,7 +150,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Entry
- datastore Datastore
+ datastore datastore.Datastore
coordinator *Coordinator
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
@@ -163,7 +164,7 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
datastore: datastore,
@@ -234,7 +235,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
}
for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(JobStateReady, node.ID, 10)
+ jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.ID, 10)
if err != nil {
return err
}
@@ -270,8 +271,8 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
}
}
-func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error {
- if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) error {
+ if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateInProgress); err != nil {
return err
}
@@ -302,7 +303,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error {
replDuration := time.Since(replStart)
recordReplicationLatency(float64(replDuration / time.Millisecond))
- if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil {
return err
}
return nil
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b50db5194..664dfd412 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -15,6 +15,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -68,9 +69,9 @@ func TestProceessReplicationJob(t *testing.T) {
},
)
- job := jobRecord{state: JobStateReady}
+ job := jobRecord{state: datastore.JobStateReady}
- m := &MemoryDatastore{
+ m := &datastore.MemoryDatastore{
jobs: &struct {
sync.RWMutex
records map[uint64]jobRecord // all jobs indexed by ID
@@ -79,11 +80,11 @@ func TestProceessReplicationJob(t *testing.T) {
},
}
- replJob := ReplJob{ID: 1,
+ replJob := datastore.ReplJob{ID: 1,
TargetNode: models.Node{Storage: backupStorageName, Address: srvSocketPath},
SourceNode: models.Node{Storage: "default", Address: srvSocketPath, Token: testhelper.RepositoryAuthToken},
Repository: models.Repository{Primary: models.Node{Storage: "default", Address: srvSocketPath}, RelativePath: testRepo.GetRelativePath()},
- State: JobStateReady,
+ State: datastore.JobStateReady,
}
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 60725f568..cbd958dc5 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
server "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -27,17 +28,18 @@ type Server struct {
repl ReplMgr
s *grpc.Server
conf config.Config
+ d datastore.ReplicasDatastore
}
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, conf config.Config) *Server {
+func NewServer(c *Coordinator, replicasDatastore datastore.ReplicasDatastore, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, conf config.Config) *Server {
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
grpc_prometheus.StreamServerInterceptor,
- cancelhandler.Stream, // Should be below LogHandler
+ cancelhandler.Stream, // Should
grpctracing.StreamServerTracingInterceptor(),
// Panic handler should remain last so that application panics will be
// converted to errors and logged
@@ -60,6 +62,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
coordinator: c,
repl: repl,
conf: conf,
+ d: replicasDatastore,
}
}
@@ -82,7 +85,7 @@ func (srv *Server) Start(lis net.Listener) error {
// registerServices will register any services praefect needs to handle rpcs on its own
func (srv *Server) registerServices() {
// ServerServiceServer is necessary for the ServerInfo RPC
- gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf))
+ gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.d))
}
// Shutdown will attempt a graceful shutdown of the grpc server. If unable
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index ba7296740..b2b0e0cba 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -92,6 +92,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
)
prf := NewServer(
coordinator,
+ datastore,
replmgr,
nil,
logEntry,
diff --git a/internal/praefect/service/info/cleanup.go b/internal/praefect/service/info/cleanup.go
new file mode 100644
index 000000000..ab281d640
--- /dev/null
+++ b/internal/praefect/service/info/cleanup.go
@@ -0,0 +1,60 @@
+package server
+
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
+)
+
+func (s *Server) CleanupOrphanedRepos(ctx context.Context, in *gitalypb.CleanupOrphanedReposRequest) (*gitalypb.CleanupOrphanedReposResponse, error) {
+ validRepos := make(map[string]struct{})
+
+ for _, repo := range in.GetValidRepositories() {
+ validRepos[repo.GetRelativePath()] = struct{}{}
+ }
+
+ allRepositories, err := s.datastore.GetRepositories()
+ if err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ // map keyed on storage with values of slices representing the repositories to remove for each primary storage
+ repositoriesToRemove := make(map[string][]*models.Repository)
+
+ for _, repositoryOnDisk := range allRepositories {
+ if _, ok := validRepos[repositoryOnDisk.RelativePath]; !ok {
+ repositoriesToRemove[repositoryOnDisk.Primary.Storage] = append(repositoriesToRemove[repositoryOnDisk.Primary.Storage], repositoryOnDisk)
+ }
+ }
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ for storage, repos := range repositoriesToRemove {
+ cc, ok := s.nodes[storage]
+ if !ok {
+ return nil, helper.ErrInternalf("could not find client connection for %s", storage)
+ }
+
+ repositoryClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ g.Go(func() error {
+ for _, repo := range repos {
+ if _, err = repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: &gitalypb.Repository{StorageName: repo.Primary.Storage, RelativePath: repo.RelativePath},
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ }
+
+ if err = g.Wait(); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ return &gitalypb.CleanupOrphanedReposResponse{}, nil
+}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 0272d0ed5..9a520b6ed 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -2,20 +2,23 @@ package server
import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
)
// Server is a ServerService server
type Server struct {
- nodes map[string]*grpc.ClientConn
- conf config.Config
+ nodes map[string]*grpc.ClientConn
+ conf config.Config
+ datastore datastore.ReplicasDatastore
}
// NewServer creates a new instance of a grpc ServerServiceServer
-func NewServer(conf config.Config) gitalypb.ServerServiceServer {
+func NewServer(conf config.Config, d datastore.ReplicasDatastore) gitalypb.ServerServiceServer {
return &Server{
- nodes: make(map[string]*grpc.ClientConn),
- conf: conf,
+ nodes: make(map[string]*grpc.ClientConn),
+ conf: conf,
+ datastore: d,
}
}
diff --git a/internal/service/server/cleanup.go b/internal/service/server/cleanup.go
new file mode 100644
index 000000000..fa356761d
--- /dev/null
+++ b/internal/service/server/cleanup.go
@@ -0,0 +1,53 @@
+package server
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func (s *server) CleanupOrphanedRepos(ctx context.Context, in *gitalypb.CleanupOrphanedReposRequest) (*gitalypb.CleanupOrphanedReposResponse, error) {
+ validRepos := make(map[string]struct{})
+
+ for _, repo := range in.GetValidRepositories() {
+ fullRepoPath, err := helper.GetRepoPath(repo)
+ if err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+ validRepos[fullRepoPath] = struct{}{}
+ }
+
+ for _, storage := range config.Config.Storages {
+ deleteDir, err := tempdir.ForDeletedRepositories(storage.Name)
+ if err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ filepath.Walk(storage.Path, func(path string, info os.FileInfo, err error) error {
+ if !info.IsDir() || !strings.HasSuffix(path, ".git") {
+ return nil
+ }
+
+ if _, ok := validRepos[path]; !ok {
+ moveDir := filepath.Join(deleteDir, path)
+ if err = os.MkdirAll(filepath.Dir(moveDir), 0755); err != nil {
+ return err
+ }
+
+ if err = os.Rename(path, moveDir); err != nil {
+ return err
+ }
+ }
+
+ return nil
+ })
+ }
+
+ return &gitalypb.CleanupOrphanedReposResponse{}, nil
+}
diff --git a/internal/service/storage/deleteall.go b/internal/service/storage/deleteall.go
index f560b0d16..a3e9f3d10 100644
--- a/internal/service/storage/deleteall.go
+++ b/internal/service/storage/deleteall.go
@@ -21,7 +21,7 @@ func (s *server) DeleteAllRepositories(ctx context.Context, req *gitalypb.Delete
return nil, status.Errorf(codes.InvalidArgument, "storage lookup failed: %v", err)
}
- trashDir, err := tempdir.ForDeleteAllRepositories(req.StorageName)
+ trashDir, err := tempdir.ForDeletedRepositories(req.StorageName)
if err != nil {
return nil, status.Errorf(codes.Internal, "create trash dir: %v", err)
}
diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go
index 55e3be5e5..7a6abfd30 100644
--- a/internal/tempdir/tempdir.go
+++ b/internal/tempdir/tempdir.go
@@ -49,8 +49,8 @@ func StateDir(storage config.Storage) string { return filepath.Join(storage.Path
// TempDir returns the path to the temp dir for a storage location
func TempDir(storage config.Storage) string { return filepath.Join(storage.Path, tmpRootPrefix) }
-// ForDeleteAllRepositories returns a temporary directory for the given storage. It is not context-scoped but it will get removed eventuall (after MaxAge).
-func ForDeleteAllRepositories(storageName string) (string, error) {
+// ForDeletedRepositories returns a temporary directory for the given storage. It is not context-scoped but it will get removed eventually (after MaxAge).
+func ForDeletedRepositories(storageName string) (string, error) {
prefix := fmt.Sprintf("%s-repositories.old.%d.", storageName, time.Now().Unix())
_, path, err := newAsRepository(context.Background(), storageName, prefix)
diff --git a/proto/go/gitalypb/ha-cleanup.pb.go b/proto/go/gitalypb/ha-cleanup.pb.go
new file mode 100644
index 000000000..531288cb9
--- /dev/null
+++ b/proto/go/gitalypb/ha-cleanup.pb.go
@@ -0,0 +1,200 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: ha-cleanup.proto
+
+package gitalypb
+
+import (
+ context "context"
+ fmt "fmt"
+ proto "github.com/golang/protobuf/proto"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+ math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type CleanupOrphanedReposRequest struct {
+ ValidRepositories []*Repository `protobuf:"bytes,1,rep,name=valid_repositories,json=validRepositories,proto3" json:"valid_repositories,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CleanupOrphanedReposRequest) Reset() { *m = CleanupOrphanedReposRequest{} }
+func (m *CleanupOrphanedReposRequest) String() string { return proto.CompactTextString(m) }
+func (*CleanupOrphanedReposRequest) ProtoMessage() {}
+func (*CleanupOrphanedReposRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_bebeab995a676ed6, []int{0}
+}
+
+func (m *CleanupOrphanedReposRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CleanupOrphanedReposRequest.Unmarshal(m, b)
+}
+func (m *CleanupOrphanedReposRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CleanupOrphanedReposRequest.Marshal(b, m, deterministic)
+}
+func (m *CleanupOrphanedReposRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CleanupOrphanedReposRequest.Merge(m, src)
+}
+func (m *CleanupOrphanedReposRequest) XXX_Size() int {
+ return xxx_messageInfo_CleanupOrphanedReposRequest.Size(m)
+}
+func (m *CleanupOrphanedReposRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_CleanupOrphanedReposRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CleanupOrphanedReposRequest proto.InternalMessageInfo
+
+func (m *CleanupOrphanedReposRequest) GetValidRepositories() []*Repository {
+ if m != nil {
+ return m.ValidRepositories
+ }
+ return nil
+}
+
+type CleanupOrphanedReposResponse struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *CleanupOrphanedReposResponse) Reset() { *m = CleanupOrphanedReposResponse{} }
+func (m *CleanupOrphanedReposResponse) String() string { return proto.CompactTextString(m) }
+func (*CleanupOrphanedReposResponse) ProtoMessage() {}
+func (*CleanupOrphanedReposResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_bebeab995a676ed6, []int{1}
+}
+
+func (m *CleanupOrphanedReposResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_CleanupOrphanedReposResponse.Unmarshal(m, b)
+}
+func (m *CleanupOrphanedReposResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_CleanupOrphanedReposResponse.Marshal(b, m, deterministic)
+}
+func (m *CleanupOrphanedReposResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_CleanupOrphanedReposResponse.Merge(m, src)
+}
+func (m *CleanupOrphanedReposResponse) XXX_Size() int {
+ return xxx_messageInfo_CleanupOrphanedReposResponse.Size(m)
+}
+func (m *CleanupOrphanedReposResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_CleanupOrphanedReposResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_CleanupOrphanedReposResponse proto.InternalMessageInfo
+
+func init() {
+ proto.RegisterType((*CleanupOrphanedReposRequest)(nil), "gitaly.CleanupOrphanedReposRequest")
+ proto.RegisterType((*CleanupOrphanedReposResponse)(nil), "gitaly.CleanupOrphanedReposResponse")
+}
+
+func init() { proto.RegisterFile("ha-cleanup.proto", fileDescriptor_bebeab995a676ed6) }
+
+var fileDescriptor_bebeab995a676ed6 = []byte{
+ // 223 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xd4, 0x4d,
+ 0xce, 0x49, 0x4d, 0xcc, 0x2b, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x4b, 0xcf,
+ 0x2c, 0x49, 0xcc, 0xa9, 0x94, 0xe2, 0x29, 0xce, 0x48, 0x2c, 0x4a, 0x4d, 0x81, 0x88, 0x2a, 0x25,
+ 0x70, 0x49, 0x3b, 0x43, 0x94, 0xf9, 0x17, 0x15, 0x64, 0x24, 0xe6, 0xa5, 0xa6, 0x04, 0xa5, 0x16,
+ 0xe4, 0x17, 0x07, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0x39, 0x72, 0x09, 0x95, 0x25, 0xe6,
+ 0x64, 0xa6, 0xc4, 0x17, 0x81, 0x44, 0x33, 0x4b, 0xf2, 0x8b, 0x32, 0x53, 0x8b, 0x25, 0x18, 0x15,
+ 0x98, 0x35, 0xb8, 0x8d, 0x84, 0xf4, 0x20, 0x26, 0xea, 0x05, 0xc1, 0xe4, 0x2a, 0x83, 0x04, 0xc1,
+ 0xaa, 0x83, 0x90, 0x14, 0x2b, 0xc9, 0x71, 0xc9, 0x60, 0xb7, 0xa1, 0xb8, 0x20, 0x3f, 0xaf, 0x38,
+ 0xd5, 0xa8, 0x8e, 0x8b, 0xdf, 0xc3, 0x11, 0xaa, 0x22, 0x38, 0xb5, 0xa8, 0x2c, 0x33, 0x55, 0x28,
+ 0x9b, 0x4b, 0x04, 0x9b, 0x16, 0x21, 0x65, 0x98, 0x8d, 0x78, 0x9c, 0x2c, 0xa5, 0x82, 0x5f, 0x11,
+ 0xc4, 0x56, 0x25, 0x8e, 0x5f, 0xd3, 0x35, 0x58, 0x38, 0x18, 0x05, 0x18, 0x9d, 0x0c, 0xa2, 0x40,
+ 0x1a, 0x72, 0x12, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0x21, 0x4c, 0xdd, 0xfc, 0xa2, 0x74, 0x7d,
+ 0x88, 0x31, 0xfa, 0xe0, 0x70, 0xd2, 0x4f, 0xcf, 0x87, 0xf2, 0x0b, 0x92, 0x92, 0xd8, 0xc0, 0x42,
+ 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x57, 0x4d, 0x52, 0x64, 0x01, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// HACleanupServieClient is the client API for HACleanupServie service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type HACleanupServieClient interface {
+ CleanupOrphanedRepos(ctx context.Context, in *CleanupOrphanedReposRequest, opts ...grpc.CallOption) (*CleanupOrphanedReposResponse, error)
+}
+
+type hACleanupServieClient struct {
+ cc *grpc.ClientConn
+}
+
+func NewHACleanupServieClient(cc *grpc.ClientConn) HACleanupServieClient {
+ return &hACleanupServieClient{cc}
+}
+
+func (c *hACleanupServieClient) CleanupOrphanedRepos(ctx context.Context, in *CleanupOrphanedReposRequest, opts ...grpc.CallOption) (*CleanupOrphanedReposResponse, error) {
+ out := new(CleanupOrphanedReposResponse)
+ err := c.cc.Invoke(ctx, "/gitaly.HACleanupServie/CleanupOrphanedRepos", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+// HACleanupServieServer is the server API for HACleanupServie service.
+type HACleanupServieServer interface {
+ CleanupOrphanedRepos(context.Context, *CleanupOrphanedReposRequest) (*CleanupOrphanedReposResponse, error)
+}
+
+// UnimplementedHACleanupServieServer can be embedded to have forward compatible implementations.
+type UnimplementedHACleanupServieServer struct {
+}
+
+func (*UnimplementedHACleanupServieServer) CleanupOrphanedRepos(ctx context.Context, req *CleanupOrphanedReposRequest) (*CleanupOrphanedReposResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method CleanupOrphanedRepos not implemented")
+}
+
+func RegisterHACleanupServieServer(s *grpc.Server, srv HACleanupServieServer) {
+ s.RegisterService(&_HACleanupServie_serviceDesc, srv)
+}
+
+func _HACleanupServie_CleanupOrphanedRepos_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(CleanupOrphanedReposRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(HACleanupServieServer).CleanupOrphanedRepos(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/gitaly.HACleanupServie/CleanupOrphanedRepos",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(HACleanupServieServer).CleanupOrphanedRepos(ctx, req.(*CleanupOrphanedReposRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+var _HACleanupServie_serviceDesc = grpc.ServiceDesc{
+ ServiceName: "gitaly.HACleanupServie",
+ HandlerType: (*HACleanupServieServer)(nil),
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "CleanupOrphanedRepos",
+ Handler: _HACleanupServie_CleanupOrphanedRepos_Handler,
+ },
+ },
+ Streams: []grpc.StreamDesc{},
+ Metadata: "ha-cleanup.proto",
+}
diff --git a/proto/go/gitalypb/protolist.go b/proto/go/gitalypb/protolist.go
index c85049d62..ca32af747 100644
--- a/proto/go/gitalypb/protolist.go
+++ b/proto/go/gitalypb/protolist.go
@@ -9,6 +9,7 @@ var GitalyProtos = []string{
"commit.proto",
"conflicts.proto",
"diff.proto",
+ "ha-cleanup.proto",
"namespace.proto",
"objectpool.proto",
"operations.proto",
diff --git a/proto/ha-cleanup.proto b/proto/ha-cleanup.proto
new file mode 100644
index 000000000..4ec0c1a17
--- /dev/null
+++ b/proto/ha-cleanup.proto
@@ -0,0 +1,23 @@
+syntax = "proto3";
+
+package gitaly;
+
+option go_package = "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb";
+
+import "shared.proto";
+
+service HACleanupServie {
+ rpc CleanupOrphanedRepos(CleanupOrphanedReposRequest) returns (CleanupOrphanedReposResponse) {
+ option (op_type) = {
+ op: MUTATOR
+ scope_level: SERVER
+ };
+ }
+}
+
+message CleanupOrphanedReposRequest {
+ repeated Repository valid_repositories = 1;
+}
+
+message CleanupOrphanedReposResponse{}
+
diff --git a/ruby/proto/gitaly.rb b/ruby/proto/gitaly.rb
index c35bc6e06..60b8623a4 100644
--- a/ruby/proto/gitaly.rb
+++ b/ruby/proto/gitaly.rb
@@ -13,6 +13,8 @@ require 'gitaly/conflicts_services_pb'
require 'gitaly/diff_services_pb'
+require 'gitaly/ha-cleanup_services_pb'
+
require 'gitaly/namespace_services_pb'
require 'gitaly/objectpool_services_pb'
diff --git a/ruby/proto/gitaly/ha-cleanup_pb.rb b/ruby/proto/gitaly/ha-cleanup_pb.rb
new file mode 100644
index 000000000..444260184
--- /dev/null
+++ b/ruby/proto/gitaly/ha-cleanup_pb.rb
@@ -0,0 +1,18 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: ha-cleanup.proto
+
+require 'google/protobuf'
+
+require 'shared_pb'
+Google::Protobuf::DescriptorPool.generated_pool.build do
+ add_message "gitaly.CleanupOrphanedReposRequest" do
+ repeated :valid_repositories, :message, 1, "gitaly.Repository"
+ end
+ add_message "gitaly.CleanupOrphanedReposResponse" do
+ end
+end
+
+module Gitaly
+ CleanupOrphanedReposRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.CleanupOrphanedReposRequest").msgclass
+ CleanupOrphanedReposResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.CleanupOrphanedReposResponse").msgclass
+end
diff --git a/ruby/proto/gitaly/ha-cleanup_services_pb.rb b/ruby/proto/gitaly/ha-cleanup_services_pb.rb
new file mode 100644
index 000000000..66595f074
--- /dev/null
+++ b/ruby/proto/gitaly/ha-cleanup_services_pb.rb
@@ -0,0 +1,22 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# Source: ha-cleanup.proto for package 'gitaly'
+
+require 'grpc'
+require 'ha-cleanup_pb'
+
+module Gitaly
+ module HACleanupServie
+ class Service
+
+ include GRPC::GenericService
+
+ self.marshal_class_method = :encode
+ self.unmarshal_class_method = :decode
+ self.service_name = 'gitaly.HACleanupServie'
+
+ rpc :CleanupOrphanedRepos, CleanupOrphanedReposRequest, CleanupOrphanedReposResponse
+ end
+
+ Stub = Service.rpc_stub_class
+ end
+end