Welcome to mirror list, hosted at ThFree Co, Russian Federation.

SocketSenderPool.cs « Internal « src « Transport.Sockets « Kestrel « Servers « src - github.com/dotnet/aspnetcore.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 7cecda03b2358d938fa8cbc38bac1a5e59a61b90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.IO.Pipelines;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;

internal class SocketSenderPool : IDisposable
{
    private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?

    private readonly ConcurrentQueue<SocketSender>[] _queues;
    private int _count;
    private readonly PipeScheduler _scheduler;
    private bool _disposed;

    public SocketSenderPool(PipeScheduler scheduler)
    {
        _scheduler = scheduler;

        _queues = new ConcurrentQueue<SocketSender>[Environment.ProcessorCount];

        for (var i = 0; i < _queues.Length; i++)
        {
            _queues[i] = new ConcurrentQueue<SocketSender>();
        }
    }

    public SocketSender Rent()
    {
        var partition = Thread.GetCurrentProcessorId() % _queues.Length;

        if (_queues[partition].TryDequeue(out var sender))
        {
            Interlocked.Decrement(ref _count);
            return sender;
        }
        return new SocketSender(_scheduler);
    }

    public void Return(SocketSender sender)
    {
        // This counting isn't accurate, but it's good enough for what we need to avoid using _queue.Count which could be expensive
        if (_disposed || Interlocked.Increment(ref _count) > MaxQueueSize)
        {
            Interlocked.Decrement(ref _count);
            sender.Dispose();
            return;
        }

        var partition = Thread.GetCurrentProcessorId() % _queues.Length;

        sender.Reset();
        _queues[partition].Enqueue(sender);
    }

    public void Dispose()
    {
        if (!_disposed)
        {
            _disposed = true;
            foreach (var queue in _queues)
            {
                while (queue.TryDequeue(out var sender))
                {
                    sender.Dispose();
                }
            }
        }
    }
}