diff options
author | Sebastien Ros <sebastienros@gmail.com> | 2022-03-01 21:18:55 +0300 |
---|---|---|
committer | Sebastien Ros <sebastienros@gmail.com> | 2022-03-01 21:18:55 +0300 |
commit | 71aa42b08d262355601cab937acab83a6dbc545a (patch) | |
tree | bdaf146a09812daceac4abde9682b4772bccf353 | |
parent | 363be8e20ba828d4e6eb188903a34492f4237a58 (diff) |
Partition pools by coresebros/queue
-rw-r--r-- | src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs | 24 | ||||
-rw-r--r-- | src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs | 30 |
2 files changed, 42 insertions, 12 deletions
diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs index 5a53632d77..7cecda03b2 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs @@ -10,7 +10,7 @@ internal class SocketSenderPool : IDisposable { private const int MaxQueueSize = 1024; // REVIEW: Is this good enough? - private readonly ConcurrentQueue<SocketSender> _queue = new(); + private readonly ConcurrentQueue<SocketSender>[] _queues; private int _count; private readonly PipeScheduler _scheduler; private bool _disposed; @@ -18,11 +18,20 @@ internal class SocketSenderPool : IDisposable public SocketSenderPool(PipeScheduler scheduler) { _scheduler = scheduler; + + _queues = new ConcurrentQueue<SocketSender>[Environment.ProcessorCount]; + + for (var i = 0; i < _queues.Length; i++) + { + _queues[i] = new ConcurrentQueue<SocketSender>(); + } } public SocketSender Rent() { - if (_queue.TryDequeue(out var sender)) + var partition = Thread.GetCurrentProcessorId() % _queues.Length; + + if (_queues[partition].TryDequeue(out var sender)) { Interlocked.Decrement(ref _count); return sender; @@ -40,8 +49,10 @@ internal class SocketSenderPool : IDisposable return; } + var partition = Thread.GetCurrentProcessorId() % _queues.Length; + sender.Reset(); - _queue.Enqueue(sender); + _queues[partition].Enqueue(sender); } public void Dispose() @@ -49,9 +60,12 @@ internal class SocketSenderPool : IDisposable if (!_disposed) { _disposed = true; - while (_queue.TryDequeue(out var sender)) + foreach (var queue in _queues) { - sender.Dispose(); + while (queue.TryDequeue(out var sender)) + { + sender.Dispose(); + } } } } diff --git a/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs b/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs index 6c74b477c8..0ab1eba018 100644 --- a/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs +++ b/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs @@ -32,7 +32,7 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte> /// Thread-safe collection of blocks which are currently in the pool. A slab will pre-allocate all of the block tracking objects /// and add them to this collection. When memory is requested it is taken from here first, and when it is returned it is re-added. /// </summary> - private readonly ConcurrentQueue<MemoryPoolBlock> _blocks = new ConcurrentQueue<MemoryPoolBlock>(); + private readonly ConcurrentQueue<MemoryPoolBlock>[] _queues; /// <summary> /// This is part of implementing the IDisposable pattern. @@ -46,6 +46,15 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte> /// </summary> private const int AnySize = -1; + public PinnedBlockMemoryPool() + { + _queues = new ConcurrentQueue<MemoryPoolBlock>[Environment.ProcessorCount]; + + for (var i = 0; i < _queues.Length; i++) + { + _queues[i] = new ConcurrentQueue<MemoryPoolBlock>(); + } + } public override IMemoryOwner<byte> Rent(int size = AnySize) { if (size > _blockSize) @@ -58,7 +67,10 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte> MemoryPoolThrowHelper.ThrowObjectDisposedException(MemoryPoolThrowHelper.ExceptionArgument.MemoryPool); } - if (_blocks.TryDequeue(out var block)) + + var partition = Thread.GetCurrentProcessorId() % _queues.Length; + + if (_queues[partition].TryDequeue(out var block)) { // block successfully taken from the stack - return it return block; @@ -84,7 +96,9 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte> if (!_isDisposed) { - _blocks.Enqueue(block); + var partition = Thread.GetCurrentProcessorId() % _queues.Length; + + _queues[partition].Enqueue(block); } } @@ -101,11 +115,13 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte> if (disposing) { - // Discard blocks in pool - while (_blocks.TryDequeue(out _)) + foreach (var queue in _queues) { - - } + while (queue.TryDequeue(out var block)) + { + block.Dispose(); + } + } } } } |