diff options
author | John Cai <jcai@gitlab.com> | 2019-07-23 04:44:10 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-23 04:46:42 +0300 |
commit | 751bb0923b4dbeafd70ef85a47afd5e957268667 (patch) | |
tree | f5b911156a5f376aa42b6c68d0b09a4f8c7d04c3 | |
parent | cad0664cffe32480bd4988cb086f49073e22ce50 (diff) |
Refactor datastore pacakge structure, adding cache
17 files changed, 232 insertions, 28 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index c974082c1..7b8566d73 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -19,7 +19,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/database" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/database" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" @@ -98,17 +99,21 @@ func run(listeners []net.Listener, conf config.Config) error { os.Getenv("PRAEFECT_PG_USER"), os.Getenv("PRAEFECT_PG_PASSWORD"), os.Getenv("PRAEFECT_PG_ADDRESS"), - os.Getenv("PRAEFECT_PG_DATABASE")) + os.Getenv("PRAEFECT_PG_DATABASE"), + os.Getenv("PRAEFECT_PG_NOTIFY_CHANNEL"), + ) if err != nil { return fmt.Errorf("failed to create sql datastore: %v", err) } + cachedDatastore := datastore.NewCachedDatastore(sqlDatastore, datastore.WithBustOnUpdate(sqlDatastore.Listener())) + var ( // top level server dependencies - datastore = praefect.NewMemoryDatastore() - coordinator = praefect.NewCoordinator(logger, sqlDatastore, protoregistry.GitalyProtoFileDescriptors...) - repl = praefect.NewReplMgr("default", logger, sqlDatastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) + datastore = datastore.NewMemoryDatastore() + coordinator = praefect.NewCoordinator(logger, cachedDatastore, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, cachedDatastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} @@ -2,7 +2,9 @@ cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -178,6 +180,7 @@ google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= gopkg.in/DataDog/dd-trace-go.v1 v1.7.0 h1:7wbMayb6JXcbAS95RN7MI42W3o1BCxCcdIzZfVWBAiE= gopkg.in/DataDog/dd-trace-go.v1 v1.7.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzwVCaGWylTe3tg= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 6a2a5b5d5..0f3feb32d 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -7,7 +7,7 @@ import ( "github.com/BurntSushi/toml" "gitlab.com/gitlab-org/gitaly/internal/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" ) // Config is a container for everything found in the TOML config file diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index eace5eb2f..2e6d96ccc 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" ) func TestConfigValidation(t *testing.T) { diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index c004f9ac4..37efefa8a 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -16,7 +16,8 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -34,14 +35,14 @@ type Coordinator struct { failoverMutex sync.RWMutex connMutex sync.RWMutex - datastore ReplicasDatastore + datastore datastore.ReplicasDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore datastore.ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) diff --git a/internal/praefect/datastore/cache.go b/internal/praefect/datastore/cache.go new file mode 100644 index 000000000..e60a84d1a --- /dev/null +++ b/internal/praefect/datastore/cache.go @@ -0,0 +1,38 @@ +package datastore + +import "sync" + +type Cache struct { + sync.RWMutex + d map[string]interface{} +} + +func NewCache() *Cache { + return &Cache{ + d: make(map[string]interface{}), + } +} + +func (c *Cache) Bust() { + c.Lock() + defer c.Unlock() + + c.d = make(map[string]interface{}) +} + +func (c *Cache) GetFromCache(key string, f func() (interface{}, error)) (interface{}, error) { + c.RLock() + defer c.RUnlock() + + if res, ok := c.d[key]; ok { + return res, nil + } + + res, err := f() + if err != nil { + return nil, err + } + c.d[key] = res + + return res, nil +} diff --git a/internal/praefect/datastore/cached_datastore.go b/internal/praefect/datastore/cached_datastore.go new file mode 100644 index 000000000..db9e87082 --- /dev/null +++ b/internal/praefect/datastore/cached_datastore.go @@ -0,0 +1,124 @@ +package datastore + +import ( + "fmt" + "time" + + "github.com/lib/pq" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" +) + +type CacheOption func(*Cache) + +type CachedDatastore struct { + datastore ReplicasDatastore + cache *Cache +} + +func NewCachedDatastore(datastore ReplicasDatastore, options ...CacheOption) *CachedDatastore { + c := &CachedDatastore{ + datastore: datastore, + cache: NewCache(), + } + + for _, option := range options { + option(c.cache) + } + + return c +} + +func WithBustOnUpdate(l *pq.Listener) CacheOption { + return func(c *Cache) { + go waitForNotification(l, c) + } +} + +func waitForNotification(l *pq.Listener, c *Cache) { + for { + select { + case <-l.Notify: + fmt.Println("received an update event. busting the cache") + c.Bust() + fmt.Println("cache busted") + return + case <-time.After(90 * time.Second): + fmt.Println("Received no events for 90 seconds, checking connection") + go func() { + l.Ping() + }() + return + } + } +} + +func key(args ...interface{}) string { + return fmt.Sprint(args...) +} + +func (cd *CachedDatastore) GetSecondaries(relativePath string) ([]models.StorageNode, error) { + res, err := cd.cache.GetFromCache(key("GetSecondaries", relativePath), func() (interface{}, error) { + return cd.datastore.GetNodeStorages() + }) + if err != nil { + return nil, err + } + + return res.([]models.StorageNode), nil +} + +func (cd *CachedDatastore) GetNodesForStorage(storageName string) ([]models.StorageNode, error) { + res, err := cd.cache.GetFromCache(key("GetNodesForStorage", storageName), func() (interface{}, error) { + return cd.datastore.GetNodesForStorage(storageName) + }) + if err != nil { + return nil, err + } + + return res.([]models.StorageNode), nil +} + +func (cd *CachedDatastore) GetNodeStorages() ([]models.StorageNode, error) { + res, err := cd.cache.GetFromCache(key("GetNodesStorages"), func() (interface{}, error) { + return cd.datastore.GetNodeStorages() + }) + if err != nil { + return nil, err + } + + return res.([]models.StorageNode), nil +} + +func (cd *CachedDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + res, err := cd.cache.GetFromCache(key("GetPrimary", relativePath), func() (interface{}, error) { + return cd.datastore.GetPrimary(relativePath) + }) + if err != nil { + return nil, err + } + + return res.(*models.StorageNode), nil +} + +func (cd *CachedDatastore) SetPrimary(relativePath string, storageNodeID int) error { + return cd.datastore.SetPrimary(relativePath, storageNodeID) +} + +func (cd *CachedDatastore) AddSecondary(relativePath string, storageNodeID int) error { + return cd.datastore.AddSecondary(relativePath, storageNodeID) +} + +func (cd *CachedDatastore) RemoveSecondary(relativePath string, storageNodeID int) error { + return cd.datastore.RemoveSecondary(relativePath, storageNodeID) +} + +func (cd *CachedDatastore) GetShard(relativePath string) (*models.Shard, error) { + res, err := cd.cache.GetFromCache(key("GetShard", relativePath), func() (interface{}, error) { + return cd.datastore.GetShard(relativePath) + }) + if err != nil { + return nil, err + } + + return res.(*models.Shard), nil +} diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/datastore/database/sql_datastore.go index ddd63b987..338168191 100644 --- a/internal/praefect/database/sql_datastore.go +++ b/internal/praefect/datastore/database/sql_datastore.go @@ -3,29 +3,61 @@ package database import ( "errors" "fmt" + "log" + "time" "database/sql" // the lib/pg package provides postgres bindings for the sql package + + "github.com/lib/pq" + // we need this for the postgres connection _ "github.com/lib/pq" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" ) // SQLDatastore is a sql based datastore that conforms to the ReplicasDatastore interface type SQLDatastore struct { - db *sql.DB + db *sql.DB + listener *pq.Listener } // NewSQLDatastore instantiates a new sql datastore with environment variables -func NewSQLDatastore(user, password, address, database string) (*SQLDatastore, error) { +func NewSQLDatastore(user, password, address, database, notificationChannel string) (*SQLDatastore, error) { connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", user, password, address, database) db, err := sql.Open("postgres", connStr) if err != nil { return nil, err } - return &SQLDatastore{db: db}, nil + updater := func(ev pq.ListenerEventType, err error) { + if err != nil { + log.Fatal(err.Error()) + } + + switch ev { + case pq.ListenerEventConnected: + log.Print("postgres listener connected") + case pq.ListenerEventDisconnected: + log.Print("postgres listener disconnected") + case pq.ListenerEventReconnected: + log.Print("postgres listener reconnected") + } + } + + listener := pq.NewListener(connStr, 10*time.Second, 20*time.Second, updater) + + if err := listener.Listen(notificationChannel); err != nil { + return nil, err + } + + return &SQLDatastore{db: db, listener: listener}, nil +} + + +func (sd *SQLDatastore) Listener() *pq.Listener { + return sd.listener } // GetSecondaries gets the secondaries for a shard based on the relative path diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore/datastore.go index 9bb760be7..3851f37b1 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -3,7 +3,7 @@ // // See original design discussion: // https://gitlab.com/gitlab-org/gitaly/issues/1495 -package praefect +package datastore import ( "errors" @@ -11,7 +11,7 @@ import ( "sort" "sync" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" ) var ( diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 5be41bb72..0f17d499c 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -1,10 +1,10 @@ -package praefect +package datastore import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" ) var ( diff --git a/internal/praefect/models/node.go b/internal/praefect/datastore/models/node.go index 878c5ac4b..878c5ac4b 100644 --- a/internal/praefect/models/node.go +++ b/internal/praefect/datastore/models/node.go diff --git a/internal/praefect/models/nodes.go b/internal/praefect/datastore/models/nodes.go index 854254d87..854254d87 100644 --- a/internal/praefect/models/nodes.go +++ b/internal/praefect/datastore/models/nodes.go diff --git a/internal/praefect/models/repository.go b/internal/praefect/datastore/models/repository.go index e11cdbf0a..e11cdbf0a 100644 --- a/internal/praefect/models/repository.go +++ b/internal/praefect/datastore/models/repository.go diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index 09a5e2e46..20167232c 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" ) // TestMemoryDatastoreWhitelist verifies that the in-memory datastore will diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index f82e3c313..2f5ad9085 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -7,7 +7,8 @@ import ( "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" "github.com/sirupsen/logrus" ) @@ -60,8 +61,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Logger - replicasDS ReplicasDatastore - replJobsDS ReplJobsDatastore + replicasDS datastore.ReplicasDatastore + replJobsDS datastore.ReplJobsDatastore coordinator *Coordinator targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic @@ -75,7 +76,7 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS ReplicasDatastore, jobsDS ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS datastore.ReplicasDatastore, jobsDS datastore.ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, replicasDS: replicasDS, @@ -148,7 +149,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { } for _, node := range nodes { - jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.StorageName, 10) + jobs, err := r.replJobsDS.GetJobs(datastore.JobStatePending|datastore.JobStateReady, node.StorageName, 10) if err != nil { return err } @@ -178,7 +179,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { r.log.WithField(logWithReplJobID, job.ID).WithField("storage", node).Info("got storage") - if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + if err := r.replJobsDS.UpdateReplJob(job.ID, datastore.JobStateInProgress); err != nil { return err } @@ -197,7 +198,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { return err } - if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil { + if err := r.replJobsDS.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil { return err } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index a0b71d719..0fa148fab 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -18,7 +18,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 921de1ce5..5fa037165 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -11,9 +11,9 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) |