blob: 7cecda03b2358d938fa8cbc38bac1a5e59a61b90 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.IO.Pipelines;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
internal class SocketSenderPool : IDisposable
{
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?
private readonly ConcurrentQueue<SocketSender>[] _queues;
private int _count;
private readonly PipeScheduler _scheduler;
private bool _disposed;
public SocketSenderPool(PipeScheduler scheduler)
{
_scheduler = scheduler;
_queues = new ConcurrentQueue<SocketSender>[Environment.ProcessorCount];
for (var i = 0; i < _queues.Length; i++)
{
_queues[i] = new ConcurrentQueue<SocketSender>();
}
}
public SocketSender Rent()
{
var partition = Thread.GetCurrentProcessorId() % _queues.Length;
if (_queues[partition].TryDequeue(out var sender))
{
Interlocked.Decrement(ref _count);
return sender;
}
return new SocketSender(_scheduler);
}
public void Return(SocketSender sender)
{
// This counting isn't accurate, but it's good enough for what we need to avoid using _queue.Count which could be expensive
if (_disposed || Interlocked.Increment(ref _count) > MaxQueueSize)
{
Interlocked.Decrement(ref _count);
sender.Dispose();
return;
}
var partition = Thread.GetCurrentProcessorId() % _queues.Length;
sender.Reset();
_queues[partition].Enqueue(sender);
}
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
foreach (var queue in _queues)
{
while (queue.TryDequeue(out var sender))
{
sender.Dispose();
}
}
}
}
}
|