Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dotnet/aspnetcore.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastien Ros <sebastienros@gmail.com>2022-03-01 21:18:55 +0300
committerSebastien Ros <sebastienros@gmail.com>2022-03-01 21:18:55 +0300
commit71aa42b08d262355601cab937acab83a6dbc545a (patch)
treebdaf146a09812daceac4abde9682b4772bccf353
parent363be8e20ba828d4e6eb188903a34492f4237a58 (diff)
Partition pools by coresebros/queue
-rw-r--r--src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs24
-rw-r--r--src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs30
2 files changed, 42 insertions, 12 deletions
diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs
index 5a53632d77..7cecda03b2 100644
--- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs
+++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSenderPool.cs
@@ -10,7 +10,7 @@ internal class SocketSenderPool : IDisposable
{
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?
- private readonly ConcurrentQueue<SocketSender> _queue = new();
+ private readonly ConcurrentQueue<SocketSender>[] _queues;
private int _count;
private readonly PipeScheduler _scheduler;
private bool _disposed;
@@ -18,11 +18,20 @@ internal class SocketSenderPool : IDisposable
public SocketSenderPool(PipeScheduler scheduler)
{
_scheduler = scheduler;
+
+ _queues = new ConcurrentQueue<SocketSender>[Environment.ProcessorCount];
+
+ for (var i = 0; i < _queues.Length; i++)
+ {
+ _queues[i] = new ConcurrentQueue<SocketSender>();
+ }
}
public SocketSender Rent()
{
- if (_queue.TryDequeue(out var sender))
+ var partition = Thread.GetCurrentProcessorId() % _queues.Length;
+
+ if (_queues[partition].TryDequeue(out var sender))
{
Interlocked.Decrement(ref _count);
return sender;
@@ -40,8 +49,10 @@ internal class SocketSenderPool : IDisposable
return;
}
+ var partition = Thread.GetCurrentProcessorId() % _queues.Length;
+
sender.Reset();
- _queue.Enqueue(sender);
+ _queues[partition].Enqueue(sender);
}
public void Dispose()
@@ -49,9 +60,12 @@ internal class SocketSenderPool : IDisposable
if (!_disposed)
{
_disposed = true;
- while (_queue.TryDequeue(out var sender))
+ foreach (var queue in _queues)
{
- sender.Dispose();
+ while (queue.TryDequeue(out var sender))
+ {
+ sender.Dispose();
+ }
}
}
}
diff --git a/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs b/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs
index 6c74b477c8..0ab1eba018 100644
--- a/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs
+++ b/src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs
@@ -32,7 +32,7 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
/// Thread-safe collection of blocks which are currently in the pool. A slab will pre-allocate all of the block tracking objects
/// and add them to this collection. When memory is requested it is taken from here first, and when it is returned it is re-added.
/// </summary>
- private readonly ConcurrentQueue<MemoryPoolBlock> _blocks = new ConcurrentQueue<MemoryPoolBlock>();
+ private readonly ConcurrentQueue<MemoryPoolBlock>[] _queues;
/// <summary>
/// This is part of implementing the IDisposable pattern.
@@ -46,6 +46,15 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
/// </summary>
private const int AnySize = -1;
+ public PinnedBlockMemoryPool()
+ {
+ _queues = new ConcurrentQueue<MemoryPoolBlock>[Environment.ProcessorCount];
+
+ for (var i = 0; i < _queues.Length; i++)
+ {
+ _queues[i] = new ConcurrentQueue<MemoryPoolBlock>();
+ }
+ }
public override IMemoryOwner<byte> Rent(int size = AnySize)
{
if (size > _blockSize)
@@ -58,7 +67,10 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
MemoryPoolThrowHelper.ThrowObjectDisposedException(MemoryPoolThrowHelper.ExceptionArgument.MemoryPool);
}
- if (_blocks.TryDequeue(out var block))
+
+ var partition = Thread.GetCurrentProcessorId() % _queues.Length;
+
+ if (_queues[partition].TryDequeue(out var block))
{
// block successfully taken from the stack - return it
return block;
@@ -84,7 +96,9 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
if (!_isDisposed)
{
- _blocks.Enqueue(block);
+ var partition = Thread.GetCurrentProcessorId() % _queues.Length;
+
+ _queues[partition].Enqueue(block);
}
}
@@ -101,11 +115,13 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
if (disposing)
{
- // Discard blocks in pool
- while (_blocks.TryDequeue(out _))
+ foreach (var queue in _queues)
{
-
- }
+ while (queue.TryDequeue(out var block))
+ {
+ block.Dispose();
+ }
+ }
}
}
}