diff options
author | John Cai <jcai@gitlab.com> | 2019-07-18 20:11:24 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-18 20:13:22 +0300 |
commit | cde0d11a45591a8853d535d6f2a7d781a085d432 (patch) | |
tree | 14f95dd8b1eb42a0460f83c3a48d68ff83e5b87e | |
parent | 796e147be69c222bebe634d12f0ceefdb0bec201 (diff) |
repository->shard, collapsing some tables into columnsjc-praefect-sql
-rw-r--r-- | internal/praefect/database/schema/initial.sql | 45 | ||||
-rw-r--r-- | internal/praefect/database/sql_datastore.go | 136 | ||||
-rw-r--r-- | internal/praefect/models/node.go | 13 |
3 files changed, 164 insertions, 30 deletions
diff --git a/internal/praefect/database/schema/initial.sql b/internal/praefect/database/schema/initial.sql index 077baf03b..cc2138979 100644 --- a/internal/praefect/database/schema/initial.sql +++ b/internal/praefect/database/schema/initial.sql @@ -1,44 +1,29 @@ -CREATE TABLE storages ( - name TEXT PRIMARY KEY -); - -CREATE TABLE nodes ( - address TEXT PRIMARY KEY -); - -CREATE TABLE storage_nodes ( +CREATE TABLE IF NOT EXISTS node_storages ( id SERIAL PRIMARY KEY, - storage_name TEXT REFERENCES storages(name), - node_address TEXT REFERENCES nodes(address) + address TEXT NOT NULL, + storage_name TEXT NOT NULL ); -CREATE TABLE repositories ( +CREATE TABLE IF NOT EXISTS shards ( relative_path TEXT PRIMARY KEY, - "primary" INTEGER REFERENCES storage_nodes(id) + "primary" INTEGER REFERENCES node_storages(id) ); -CREATE TABLE repository_secondaries ( - repository_relative_path TEXT REFERENCES repositories(relative_path), - storage_node_id INTEGER REFERENCES storage_nodes(id), - PRIMARY KEY(repository_relative_path, storage_node_id) +CREATE TABLE IF NOT EXISTS shard_secondaries ( + shard_relative_path TEXT REFERENCES shards(relative_path), + node_storage_id INTEGER REFERENCES node_storages(id), + PRIMARY KEY(shard_relative_path, node_storage_id) ); -INSERT INTO storages ("name") VALUES ('default'); -INSERT INTO storages ("name") VALUES ('backup1'); -INSERT INTO storages ("name") VALUES ('backup2'); - -INSERT INTO nodes (address) VALUES('tcp://127.0.0.1:9999'); -INSERT INTO nodes (address) VALUES('tcp://127.0.0.1:9998'); - -INSERT INTO storage_nodes (id, node_address, storage_name) VALUES(1, 'tcp://127.0.0.1:9999', 'default'); -INSERT INTO storage_nodes (id, node_address, storage_name) VALUES(2, 'tcp://127.0.0.1:9998', 'default'); -INSERT INTO storage_nodes (id, node_address, storage_name) VALUES(3, 'tcp://127.0.0.1:9999', 'backup1'); -INSERT INTO storage_nodes (id, node_address, storage_name) VALUES(4, 'tcp://127.0.0.1:9998', 'backup2'); +INSERT INTO node_storages (id, address, storage_name) VALUES(1, 'tcp://127.0.0.1:9999', 'default'); +INSERT INTO node_storages (id, address, storage_name) VALUES(2, 'tcp://127.0.0.1:9998', 'default'); +INSERT INTO node_storages (id, address, storage_name) VALUES(3, 'tcp://127.0.0.1:9999', 'backup1'); +INSERT INTO node_storages (id, address, storage_name) VALUES(4, 'tcp://127.0.0.1:9998', 'backup2'); -INSERT INTO repositories(relative_path, "primary") +INSERT INTO shards(relative_path, "primary") VALUES('@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git', 1); -INSERT INTO repository_secondaries(repository_relative_path, storage_node_id) VALUES +INSERT INTO shard_secondaries(shard_relative_path, storage_node_id) VALUES ('@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git', 2), ('@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git', 3), ('@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git', 4);
\ No newline at end of file diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go new file mode 100644 index 000000000..1a2ccac8f --- /dev/null +++ b/internal/praefect/database/sql_datastore.go @@ -0,0 +1,136 @@ +package database + +import ( + "errors" + "fmt" + "os" + + "database/sql" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" +) + +type SQLDatastore struct { + db *sql.DB +} + +func NewSQLDatastore(addr string) (*sql.DB, error) { + connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disabled", + os.Getenv("PRAEFECT_PG_USER"), + os.Getenv("PRAEFECT_PG_PASSWORD"), + addr, + os.Getenv("PRAEFECT_PG_DB")) + + return sql.Open("postgres", connStr) +} + +func (sd *SQLDatastore) GetSecondaries(relativePath string) ([]models.StorageNode, error) { + var secondaries []models.StorageNode + + rows, err := sd.db.Query(` + SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage_name FROM repositories + INNER JOIN repository_secondaries ON repositories.relative_path = repository_secondaries.repository_relative_path + INNER JOIN storage_nodes ON storage_nodes.id = repository_secondaries.storage_node_id WHERE repositories.relative_path = $1 + `, relativePath) + + if err != nil { + return nil, err + } + + for rows.Next() { + var s models.StorageNode + err = rows.Scan(&s.ID, &s.StorageName, &s.Address) + if err != nil { + return nil, err + } + secondaries = append(secondaries, s) + } + + return secondaries, nil +} + +func (sd *SQLDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + + row := sd.db.QueryRow(` + SELECT storage_nodes.id, storage_nodes.adddress, storage_nodes.storage_name FROM repositories + INNER JOIN storage_nodes ON repositories.primary = storage_nodes.id + WHERE repositories.relative_path = $1 + `, relativePath) + + var s models.StorageNode + if err := row.Scan(&s.ID, &s.Address, &s.StorageName); err != nil { + return nil, err + } + + return &s, nil +} + +func (sd *SQLDatastore) SetPrimary(relativePath, storageNodeID int) error { + res, err := sd.db.Exec(`UPDATE repositories SET "primary" = ? WHERE relative_path= ?`, storageNodeID, relativePath) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return errors.New("repository does not exist") + } + + return nil +} + +func (sd *SQLDatastore) GetDefaultPrimary(storage string) (*models.StorageNode, error) { + row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage_name from stroage_nodes where storage_name = ?", storage) + + var s models.StorageNode + if err := row.Scan(&s.ID, &s.Address, &s.StorageName); err != nil { + return nil, err + } + + return &s, nil +} + +func (sd *SQLDatastore) AddSecondary(relativePath string, storageNodeID int) error { + res, err := sd.db.Exec("INSERT INTO repository_secondaries (repository_relative_path, node_storage_id) VALUES(?, ?)", relativePath, storageNodeID) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return errors.New("secondary already exists") + } + + return nil +} + +func (sd *SQLDatastore) RemoveSecondary(relativePath string, storageNodeID int) error { + res, err := sd.db.Exec("DELETE FROM repository_secondaries (repository_relative_path, node_storage_id) VALUES(?, ?)", relativePath, storageNodeID) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return errors.New("secondary did not exist") + } + + return nil +} + +func (sd *SQLDatastore) GetShard(relativePath string) (*models.Shard, error) { + primary, err := sd.GetPrimary(relativePath) + if err != nil { + return nil, fmt.Errorf("getting primary: %v", err) + } + + secondaries, err := sd.GetSecondaries(relativePath) + if err != nil { + return nil, fmt.Errorf("getting secondaries: %v", err) + } + + return &models.Shard{RelativePath: relativePath, Primary: *primary, Secondaries: secondaries}, nil +} diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go new file mode 100644 index 000000000..851a28a4e --- /dev/null +++ b/internal/praefect/models/node.go @@ -0,0 +1,13 @@ +package models + +type StorageNode struct { + ID int + StorageName string + Address string +} + +type Shard struct { + RelativePath string + Primary StorageNode + Secondaries []StorageNode +} |