1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
package internalgitaly
import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type serverWrapper struct {
gitalypb.InternalGitalyServer
WalkReposFunc func(*gitalypb.WalkReposRequest, gitalypb.InternalGitaly_WalkReposServer) error
}
func (w *serverWrapper) WalkRepos(req *gitalypb.WalkReposRequest, stream gitalypb.InternalGitaly_WalkReposServer) error {
return w.WalkReposFunc(req, stream)
}
type streamWrapper struct {
gitalypb.InternalGitaly_WalkReposServer
SendFunc func(*gitalypb.WalkReposResponse) error
}
func (w *streamWrapper) Send(resp *gitalypb.WalkReposResponse) error {
return w.SendFunc(resp)
}
func TestWalkRepos(t *testing.T) {
cfg := testcfg.Build(t)
storageName := cfg.Storages[0].Name
storageRoot := cfg.Storages[0].Path
// file walk happens lexicographically, so we delete repository in the middle
// of the seqeuence to ensure the walk proceeds normally
testRepo1, testRepo1Path := gittest.CloneRepo(t, cfg, cfg.Storages[0], gittest.CloneRepoOpts{
RelativePath: "a",
})
deletedRepo, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0], gittest.CloneRepoOpts{
RelativePath: "b",
})
testRepo2, testRepo2Path := gittest.CloneRepo(t, cfg, cfg.Storages[0], gittest.CloneRepoOpts{
RelativePath: "c",
})
modifiedDate := time.Now().Add(-1 * time.Hour)
require.NoError(
t,
os.Chtimes(testRepo1Path, time.Now(), modifiedDate),
)
require.NoError(
t,
os.Chtimes(testRepo2Path, time.Now(), modifiedDate),
)
// to test a directory being deleted during a walk, we must delete a directory after
// the file walk has started. To achieve that, we wrap the server to pass down a wrapped
// stream that allows us to hook in to stream responses. We then delete 'b' when
// the first repo 'a' is being streamed to the client.
deleteOnce := sync.Once{}
srv := NewServer([]config.Storage{{Name: storageName, Path: storageRoot}})
wsrv := &serverWrapper{
srv,
func(r *gitalypb.WalkReposRequest, s gitalypb.InternalGitaly_WalkReposServer) error {
return srv.WalkRepos(r, &streamWrapper{
s,
func(resp *gitalypb.WalkReposResponse) error {
deleteOnce.Do(func() {
require.NoError(t, os.RemoveAll(filepath.Join(storageRoot, deletedRepo.RelativePath)))
})
return s.Send(resp)
},
})
},
}
client := setupInternalGitalyService(t, cfg, wsrv)
ctx := testhelper.Context(t)
stream, err := client.WalkRepos(ctx, &gitalypb.WalkReposRequest{
StorageName: "invalid storage name",
})
require.NoError(t, err)
_, err = stream.Recv()
require.NotNil(t, err)
s, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.NotFound, s.Code())
stream, err = client.WalkRepos(ctx, &gitalypb.WalkReposRequest{
StorageName: storageName,
})
require.NoError(t, err)
actualRepos := consumeWalkReposStream(t, stream)
require.Equal(t, testRepo1.GetRelativePath(), actualRepos[0].GetRelativePath())
require.Equal(t, modifiedDate.UTC(), actualRepos[0].GetModificationTime().AsTime())
require.Equal(t, testRepo2.GetRelativePath(), actualRepos[1].GetRelativePath())
require.Equal(t, modifiedDate.UTC(), actualRepos[1].GetModificationTime().AsTime())
}
func consumeWalkReposStream(t *testing.T, stream gitalypb.InternalGitaly_WalkReposClient) []*gitalypb.WalkReposResponse {
var repos []*gitalypb.WalkReposResponse
for {
resp, err := stream.Recv()
if err == io.EOF {
break
} else {
require.NoError(t, err)
}
repos = append(repos, resp)
}
return repos
}
|