1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package praefect
import (
"io/ioutil"
"testing"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
)
var testLogger = logrus.New()
func init() {
testLogger.SetOutput(ioutil.Discard)
}
func TestSecondaryRotation(t *testing.T) {
t.Skip("secondary rotation will change with the new data model")
}
func TestStreamDirector(t *testing.T) {
datastore := NewMemoryDatastore(config.Config{
Nodes: []*models.Node{
&models.Node{
Address: "tcp://gitaly-primary.example.com",
Storage: "praefect-internal-1",
}, &models.Node{
Address: "tcp://gitaly-backup1.example.com",
Storage: "praefect-internal-2",
}},
})
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
}
ctx, cancel := testhelper.Context()
defer cancel()
coordinator := NewCoordinator(log.Default(), datastore)
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{
Repository: &targetRepo,
})
require.NoError(t, err)
cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure())
require.NoError(t, err)
coordinator.setConn(0, cc)
_, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
require.NoError(t, err)
require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use")
jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
targetNode, err := datastore.GetStorageNode(1)
require.NoError(t, err)
sourceNode, err := datastore.GetStorageNode(0)
require.NoError(t, err)
expectedJob := ReplJob{
ID: 1,
TargetNode: targetNode,
SourceNode: sourceNode,
State: JobStatePending,
Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}},
}
require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
jobUpdateFunc()
jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
expectedJob.State = JobStateReady
require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady")
}
type mockPeeker struct {
frame []byte
}
func (m *mockPeeker) Peek() ([]byte, error) {
return m.frame, nil
}
func (m *mockPeeker) Modify(payload []byte) error {
return nil
}
|