diff options
author | John Cai <jcai@gitlab.com> | 2019-07-25 01:55:06 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-25 01:55:06 +0300 |
commit | 3f83088f0e12979ca45b6128450ddb1e9d8f35bf (patch) | |
tree | fd258965aaab4004c137ea8559a4b2a0b8ea3cad | |
parent | 38e5e9cb4ff4cd01938264bba42220db0b6f1f6b (diff) |
rewrite repositoryjc-sql-data-store-expierment
-rw-r--r-- | internal/praefect/coordinator.go | 16 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker.go | 1 |
2 files changed, 16 insertions, 1 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 86a254b5a..6287641e0 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -92,11 +92,15 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var primary *models.StorageNode if mi.Scope == protoregistry.ScopeRepository { - targetRepo, err := targetRepo(mi, frames[0]) + m, err := mi.UnmarshalRequestProto(frames[0]) if err != nil { return nil, nil, err } + targetRepo, err := mi.TargetRepo(m) + if err != nil { + return nil, nil, err + } primary, err = c.datastore.GetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) if err != nil { @@ -122,6 +126,16 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } primary = &newPrimary + + } + + targetRepo.StorageName = primary.Storage + b, err := proxy.Codec().Marshal(m) + if err != nil { + return nil, nil, err + } + if err := peeker.ReplaceFrames(1, b); err != nil { + return nil, nil, err } } else { //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go index 45cc4a5d3..86292efb0 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker.go +++ b/internal/praefect/grpc-proxy/proxy/peeker.go @@ -15,6 +15,7 @@ type StreamPeeker interface { // removing those messages from the stream that will be forwarded to // the backend server. Peek(ctx context.Context, n uint) (frames [][]byte, _ error) + ReplaceFrames(n int, payloads ...[]byte) error } type partialStream struct { |