diff options
author | Jacob Vosmaer <contact@jacobvosmaer.nl> | 2020-01-26 00:08:34 +0300 |
---|---|---|
committer | Jacob Vosmaer <contact@jacobvosmaer.nl> | 2020-01-26 21:00:38 +0300 |
commit | 8957852814d264963fd26d3652ee85e7b858f048 (patch) | |
tree | 291a720258a9b9f10e8740474275b1a003ac7fd4 | |
parent | 84c42ffbeb0fc1e2cfe73997f7681b366daa6b11 (diff) |
Post to gitlab url after writejv-alt-ha
-rw-r--r-- | internal/config/config.go | 10 | ||||
-rw-r--r-- | internal/middleware/notifier/notifier.go | 191 | ||||
-rw-r--r-- | internal/server/server.go | 4 |
3 files changed, 203 insertions, 2 deletions
diff --git a/internal/config/config.go b/internal/config/config.go index 037c96027..7c37bbdd8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -46,8 +46,9 @@ type Cfg struct { GitlabShell GitlabShell `toml:"gitlab-shell"` Concurrency []Concurrency `toml:"concurrency"` GracefulRestartTimeout time.Duration - GracefulRestartTimeoutToml duration `toml:"graceful_restart_timeout"` - InternalSocketDir string `toml:"internal_socket_dir"` + GracefulRestartTimeoutToml duration `toml:"graceful_restart_timeout"` + InternalSocketDir string `toml:"internal_socket_dir"` + GitlabRails GitlabRails `toml:"gitlab-rails"` } // TLS configuration @@ -61,6 +62,11 @@ type GitlabShell struct { Dir string `toml:"dir"` } +// GitlabRails bla +type GitlabRails struct { + URL string `toml:"url"` +} + // Git contains the settings for the Git executable type Git struct { BinPath string `toml:"bin_path"` diff --git a/internal/middleware/notifier/notifier.go b/internal/middleware/notifier/notifier.go new file mode 100644 index 000000000..d63cc4a5e --- /dev/null +++ b/internal/middleware/notifier/notifier.go @@ -0,0 +1,191 @@ +package notifier + +import ( + "context" + "encoding/base64" + "fmt" + "net/http" + "strings" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func shouldIgnore(fullMethod string) bool { + return strings.HasPrefix(fullMethod, "/grpc.health") +} + +// StreamNotifier bla +func StreamNotifier(gitlab config.GitlabRails, reg *protoregistry.Registry) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if shouldIgnore(info.FullMethod) { + return handler(srv, ss) + } + + mInfo, err := reg.LookupMethod(info.FullMethod) + if err != nil { + logrus.WithError(err).Error("method lookup") + return handler(srv, ss) + } + + if mInfo.Scope != protoregistry.ScopeRepository || mInfo.Operation == protoregistry.OpAccessor { + return handler(srv, ss) + } + + handler, callback := notifyChange(gitlab, mInfo, handler) + peeker := newStreamPeeker(ss, callback) + return handler(srv, peeker) + } +} + +const contentType = "application/json" + +// UnaryNotifier bla +func UnaryNotifier(gitlab config.GitlabRails, reg *protoregistry.Registry) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if shouldIgnore(info.FullMethod) { + return handler(ctx, req) + } + + mInfo, err := reg.LookupMethod(info.FullMethod) + if err != nil { + logrus.WithError(err).Error("method lookup") + } + + if mInfo.Scope != protoregistry.ScopeRepository || mInfo.Operation == protoregistry.OpAccessor { + return handler(ctx, req) + } + + pbReq, ok := req.(proto.Message) + if !ok { + logrus.Errorf("expected protobuf message but got %T", req) + return handler(ctx, req) + } + + target, err := mInfo.TargetRepo(pbReq) + if err != nil { + logrus.WithError(err).Error("expected target repository") + return handler(ctx, req) + } + + // TODO: notify gitlab-rails of start of write + + // wrap the handler to ensure the lease is always ended + return func() (resp interface{}, err error) { + defer notifyAPI(gitlab, target) + return handler(ctx, req) + }() + } +} + +func notifyAPI(gitlab config.GitlabRails, repo *gitalypb.Repository) { + data, err := proto.Marshal(repo) + if err != nil { + logrus.WithError(err).Error("marshal repo") + return + } + + body := strings.NewReader(fmt.Sprintf( + `{"payload":"%s"}`, + base64.StdEncoding.EncodeToString(data), + )) + + notifyResp, err := http.Post( + gitlab.URL+`/api/v4/internal/praefect/finish-write`, + contentType, + body, + ) + if err != nil { + logrus.WithError(err).Error("http post") + return + } + + notifyResp.Body.Close() // Important! Prevent resource leak. +} + +type recvMsgCallback func(interface{}, error) + +func notifyChange(gitlab config.GitlabRails, mInfo protoregistry.MethodInfo, handler grpc.StreamHandler) (grpc.StreamHandler, recvMsgCallback) { + var repo struct { + sync.Mutex + *gitalypb.Repository + } + + // ensures that the lease ender is invoked after the original handler + wrappedHandler := func(srv interface{}, stream grpc.ServerStream) error { + defer func() { + repo.Lock() + defer repo.Unlock() + + if repo.Repository == nil { + return + } + notifyAPI(gitlab, repo.Repository) + }() + return handler(srv, stream) + } + + // starts the cache lease and sets the lease ender iff the request's target + // repository can be determined from the first request message + peekerCallback := func(firstReq interface{}, err error) { + if err != nil { + logrus.WithError(err).Error("peeker callback") + return + } + + pbFirstReq, ok := firstReq.(proto.Message) + if !ok { + logrus.WithError(fmt.Errorf("cache invalidation expected protobuf request, but got %T", firstReq)).Error("cast to proto") + return + } + + target, err := mInfo.TargetRepo(pbFirstReq) + if err != nil { + logrus.WithError(err).Error("get target") + return + } + + // TODO: notify gitlab-rails of start of write + + repo.Lock() + defer repo.Unlock() + + repo.Repository = target + } + + return wrappedHandler, peekerCallback +} + +// streamPeeker allows a stream interceptor to insert peeking logic to perform +// an action when the first RecvMsg +type streamPeeker struct { + grpc.ServerStream + + // onFirstRecvCallback is called the first time the server stream's RecvMsg + // is invoked. It passes the results of the stream's RecvMsg as the + // callback's parameters. + onFirstRecvOnce sync.Once + onFirstRecvCallback recvMsgCallback +} + +// newStreamPeeker returns a wrapped stream that allows a callback to be called +// on the first invocation of RecvMsg. +func newStreamPeeker(stream grpc.ServerStream, callback recvMsgCallback) grpc.ServerStream { + return &streamPeeker{ + ServerStream: stream, + onFirstRecvCallback: callback, + } +} + +// RecvMsg overrides the embedded grpc.ServerStream's method of the same name so +// that the callback is called on the first call. +func (sp *streamPeeker) RecvMsg(m interface{}) error { + err := sp.ServerStream.RecvMsg(m) + sp.onFirstRecvOnce.Do(func() { sp.onFirstRecvCallback(m, err) }) + return err +} diff --git a/internal/server/server.go b/internal/server/server.go index 250749248..233b291fe 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/notifier" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -75,6 +76,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { } lh := limithandler.New(concurrencyKeyFn) + gitlabRails := config.Config.GitlabRails opts := []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( @@ -89,6 +91,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { auth.StreamServerInterceptor(config.Config.Auth), grpctracing.StreamServerTracingInterceptor(), cache.StreamInvalidator(diskcache.LeaseKeyer{}, protoregistry.GitalyProtoPreregistered), + notifier.StreamNotifier(gitlabRails, protoregistry.GitalyProtoPreregistered), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.StreamPanicHandler, @@ -105,6 +108,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { auth.UnaryServerInterceptor(config.Config.Auth), grpctracing.UnaryServerTracingInterceptor(), cache.UnaryInvalidator(diskcache.LeaseKeyer{}, protoregistry.GitalyProtoPreregistered), + notifier.UnaryNotifier(gitlabRails, protoregistry.GitalyProtoPreregistered), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.UnaryPanicHandler, |