Welcome to mirror list, hosted at ThFree Co, Russian Federation.

coordinator_test.go « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f9cdcf2bccee40a5fa8bcbf1e4bb4eb1c0f3519a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package praefect

import (
	"io/ioutil"
	"testing"

	"github.com/golang/protobuf/proto"
	"github.com/sirupsen/logrus"
	"github.com/stretchr/testify/require"
	"gitlab.com/gitlab-org/gitaly/internal/log"
	"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
	"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
	"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
	"gitlab.com/gitlab-org/gitaly/internal/testhelper"
	"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
	"google.golang.org/grpc"
)

var testLogger = logrus.New()

func init() {
	testLogger.SetOutput(ioutil.Discard)
}

func TestSecondaryRotation(t *testing.T) {
	t.Skip("secondary rotation will change with the new data model")
}

func TestStreamDirector(t *testing.T) {
	datastore := NewMemoryDatastore(config.Config{
		Nodes: []*models.Node{
			&models.Node{
				Address: "tcp://gitaly-primary.example.com",
				Storage: "praefect-internal-1",
			}, &models.Node{
				Address: "tcp://gitaly-backup1.example.com",
				Storage: "praefect-internal-2",
			}},
	})

	targetRepo := gitalypb.Repository{
		StorageName:  "praefect",
		RelativePath: "/path/to/hashed/storage",
	}

	ctx, cancel := testhelper.Context()
	defer cancel()

	coordinator := NewCoordinator(log.Default(), datastore)
	require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))

	frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{
		Repository: &targetRepo,
	})
	require.NoError(t, err)

	cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure())
	require.NoError(t, err)

	coordinator.setConn(0, cc)

	_, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
	require.NoError(t, err)
	require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use")

	jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
	require.NoError(t, err)
	require.Len(t, jobs, 1)

	targetNode, err := datastore.GetStorageNode(1)
	require.NoError(t, err)
	sourceNode, err := datastore.GetStorageNode(0)
	require.NoError(t, err)

	expectedJob := ReplJob{
		ID:         1,
		TargetNode: targetNode,
		SourceNode: sourceNode,
		State:      JobStatePending,
		Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}},
	}

	require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")

	jobUpdateFunc()

	jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10)
	require.NoError(t, err)
	require.Len(t, jobs, 1)

	expectedJob.State = JobStateReady
	require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady")
}

type mockPeeker struct {
	frame []byte
}

func (m *mockPeeker) Peek() ([]byte, error) {
	return m.frame, nil
}

func (m *mockPeeker) Modify(payload []byte) error {
	return nil
}