diff options
author | Karthik Nayak <knayak@gitlab.com> | 2023-12-15 17:29:43 +0300 |
---|---|---|
committer | Karthik Nayak <knayak@gitlab.com> | 2023-12-19 11:50:57 +0300 |
commit | 0cc4d3eac9f9f6d318653e4ac293ea87c1289341 (patch) | |
tree | 715c10e8a84ce56cd26a551cf50a51f86d47a87e | |
parent | 6eabdd5280a34d90d4e78d736f80819e4cdde3bb (diff) |
hook: Use `context.Context` in `ProcReceiveRegistry.Transmit`
The `Transmit()` function provided by `ProcReceiveRegistry` is a
blocking function, which ensures that a handler is transmitted to any
waiters.
The problem is, that there is a potential scenario where the waiter
might have exited early. Hence blocking this call forever. So to avoid
that, we expand the function to receive a context. With this, if the
main RPC (waiter) exits early, the context will be canceled and we wont
be blocked forever.
-rw-r--r-- | internal/gitaly/hook/procreceive_registry.go | 11 | ||||
-rw-r--r-- | internal/gitaly/hook/procreceive_registry_test.go | 44 |
2 files changed, 49 insertions, 6 deletions
diff --git a/internal/gitaly/hook/procreceive_registry.go b/internal/gitaly/hook/procreceive_registry.go index 56021d311..ac3daaf3d 100644 --- a/internal/gitaly/hook/procreceive_registry.go +++ b/internal/gitaly/hook/procreceive_registry.go @@ -1,6 +1,7 @@ package hook import ( + "context" "fmt" "sync" @@ -90,7 +91,7 @@ func (r *ProcReceiveRegistry) RegisterWaiter(id storage.TransactionID) (<-chan P } // Transmit transmits a handler to its waiter. -func (r *ProcReceiveRegistry) Transmit(handler ProcReceiveHandler) error { +func (r *ProcReceiveRegistry) Transmit(ctx context.Context, handler ProcReceiveHandler) error { r.m.Lock() defer r.m.Unlock() @@ -99,7 +100,13 @@ func (r *ProcReceiveRegistry) Transmit(handler ProcReceiveHandler) error { return fmt.Errorf("no waiters for id: %d", handler.TransactionID()) } - ch <- handler + // It is possible that the RPC (waiter) returned because receive-pack + // returned an error. In such scenarios, we don't want to block indefinitely. + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- handler: + } return nil } diff --git a/internal/gitaly/hook/procreceive_registry_test.go b/internal/gitaly/hook/procreceive_registry_test.go index 339e4a6f1..980722585 100644 --- a/internal/gitaly/hook/procreceive_registry_test.go +++ b/internal/gitaly/hook/procreceive_registry_test.go @@ -2,6 +2,7 @@ package hook import ( "bytes" + "context" "fmt" "sync" "testing" @@ -68,11 +69,46 @@ func TestProcReceiveRegistry(t *testing.T) { registry := NewProcReceiveRegistry() handler, _ := newHandler(1) - err := registry.Transmit(handler) + err := registry.Transmit(ctx, handler) require.Equal(t, fmt.Errorf("no waiters for id: 1"), err) }) + t.Run("transmit with context cancelled", func(t *testing.T) { + t.Parallel() + registry := NewProcReceiveRegistry() + + handler, _ := newHandler(1) + + recvCh, cleanup, err := registry.RegisterWaiter(1) + require.NoError(t, err) + defer cleanup() + + ctx, cancel := context.WithCancel(ctx) + cancel() + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + assert.Equal(t, context.Canceled, ctx.Err()) + case <-recvCh: + assert.Fail(t, "handler wasn't expected") + } + }() + + go func() { + defer wg.Done() + err := registry.Transmit(ctx, handler) + assert.Equal(t, context.Canceled, err) + }() + + wg.Wait() + }) + t.Run("waiter registered twice", func(t *testing.T) { t.Parallel() registry := NewProcReceiveRegistry() @@ -128,17 +164,17 @@ func TestProcReceiveRegistry(t *testing.T) { go func() { defer wg.Done() - assert.NoError(t, registry.Transmit(handler1)) + assert.NoError(t, registry.Transmit(ctx, handler1)) }() go func() { defer wg.Done() - assert.NoError(t, registry.Transmit(handler2)) + assert.NoError(t, registry.Transmit(ctx, handler2)) }() go func() { defer wg.Done() - assert.NoError(t, registry.Transmit(handler3)) + assert.NoError(t, registry.Transmit(ctx, handler3)) }() wg.Wait() |