C#并发集合-ConcurrentQueue
在多线程环境下,使用这个集合接受网络请求
using System.Collections;
using System.Collections.Concurrent;
using System.Runtime.Serialization;
namespace ConsoleApp1.Concurrent;
/*
* 并发队列 ConcurrentQueue
* 数据结构:使用 数组 + 单链表
* 结构图:
* tail head
* segment1 -> segment2 -> ... -> segment...
* arr[0] arr[0] arr[0]
* arr[1] arr[1] arr[1]
* arr[...] arr[...] arr[...]
* arr[32] arr[32] arr[32]
* 操作:
* 进队
* tail =>
* 累加 tail segment 中的 high(入队方向从 0 到 31)
* 入队完成,if segment 内数组已满(high == 31),则提前将下一个 segment 创建,并且当前 tail.next 指向新的segment, tail 再执行 新的segment
* 出队
* head =>
* 累加 head segment 中的 low(出队方向从 0 到 31)
* if segment 内数组已空(low == 31),则提前将下一个 segment 创建,并且当前 tail.next 指向新的segment, tail 再执行 新的segment
*/
public class ConcurrentQueue<T> :
IProducerConsumerCollection<T>
{
private volatile ConcurrentQueue<T>.Segment m_head;
private volatile ConcurrentQueue<T>.Segment m_tail;
internal volatile int m_numSnapshotTakers;
private const int SEGMENT_SIZE_32 = 32;
public ConcurrentQueue()
{
this.m_head = this.m_tail = new Segment(0L, this);
}
private void InitializeFromCollection(IEnumerable<T> collection)
{
throw new NotImplementedException();
}
public ConcurrentQueue(IEnumerable<T> collection)
{
throw new NotImplementedException();
}
private void OnSerializing(StreamingContext context)
{
throw new NotImplementedException();
}
private void OnSerialized(StreamingContext context)
{
throw new NotImplementedException();
}
void ICollection.CopyTo(Array array, int index)
{
throw new NotImplementedException();
}
private IEnumerator<T> GetEnumerator(Segment head, Segment tail, int headLow, int tailHigh)
{
throw new NotImplementedException();
}
public IEnumerator<T> GetEnumerator()
{
throw new NotImplementedException();
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public void CopyTo(Array array, int index)
{
throw new NotImplementedException();
}
public int Count
{
get
{
this.GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh);
return head == tail
? tailHigh - headLow + 1
: SEGMENT_SIZE_32 - headLow + SEGMENT_SIZE_32 * (int)(tail.m_index - head.m_index - 1L) +
(tailHigh + 1);
}
}
private void GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh)
{
head = this.m_head;
tail = this.m_tail;
headLow = head.Low;
tailHigh = tail.High;
SpinWait spinWait = new SpinWait();
while (head != this.m_head || tail != this.m_tail || headLow != head.Low || tailHigh != tail.High ||
head.m_index > tail.m_index)
{
spinWait.SpinOnce();
head = this.m_head;
tail = this.m_tail;
headLow = head.Low;
tailHigh = tail.High;
}
}
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public void CopyTo(T[] array, int index)
{
throw new NotImplementedException();
}
public bool TryAdd(T item)
{
this.Enqueue(item);
return true;
}
public bool TryTake(out T item) => this.TryDequeue(out item);
public void Enqueue(T item)
{
SpinWait spinWait = new SpinWait();
while (!this.m_tail.TryAppend(item))
spinWait.SpinOnce();
}
public bool TryDequeue(out T result)
{
while (!this.IsEmpty)
{
if (this.m_head.TryRemove(out result))
return true;
}
result = default(T);
return false;
}
public bool TryPeek(out T result)
{
Interlocked.Increment(ref this.m_numSnapshotTakers);
while (!this.IsEmpty)
{
if (this.m_head.TryPeek(out result))
{
Interlocked.Decrement(ref this.m_numSnapshotTakers);
return true;
}
}
result = default(T);
Interlocked.Decrement(ref this.m_numSnapshotTakers);
return false;
}
public bool IsEmpty
{
get
{
ConcurrentQueue<T>.Segment head = this.m_head;
if (!head.IsEmpty)
return false;
if (head.Next == null)
return true;
SpinWait spinWait = new SpinWait();
for (; head.IsEmpty; head = this.m_head)
{
if (head.Next == null)
return true;
spinWait.SpinOnce();
}
return false;
}
}
public T[] ToArray()
{
throw new NotImplementedException();
}
internal struct VolatileBool
{
public volatile bool m_value;
public VolatileBool(bool value) => this.m_value = value;
}
private class Segment
{
// 存在实际的元素
internal volatile T[] m_array;
internal volatile VolatileBool[] m_state;
// 单链表,指向下一个 segment
internal ConcurrentQueue<T>.Segment Next => this.m_next;
private volatile ConcurrentQueue<T>.Segment m_next;
// 记录当前 segment 为 第几个(从0开始,计算总Count 会使用)
internal readonly long m_index;
// 当前 segment 中 的 可出队下标
internal int Low => Math.Min(this.m_low, SEGMENT_SIZE_32);
private volatile int m_low;
// 当前 segment 中 的 可进队下标
internal int High => Math.Max(this.m_high, 31);
private volatile int m_high;
// 属于哪个队列
private volatile ConcurrentQueue<T> m_source;
internal Segment(long index, ConcurrentQueue<T> source)
{
this.m_array = new T[SEGMENT_SIZE_32];
this.m_state = new VolatileBool[SEGMENT_SIZE_32];
this.m_high = -1;
this.m_index = index;
this.m_source = source;
}
internal bool IsEmpty => this.Low > this.High;
// 创建下一个 segment
internal void Grow()
{
this.m_next = new Segment(this.m_index + 1L, this.m_source);
// 将新创建的 segment 设置为 当前 队尾
this.m_source.m_tail = this.m_next;
}
internal bool TryAppend(T value)
{
// 当前 segment 队尾 已经到头,无法添加元素
if (this.m_high >= 31)
return false;
int high = SEGMENT_SIZE_32;
try
{
}
finally
{
// 原值性的增加队尾下标
high = Interlocked.Increment(ref this.m_high);
if (high <= 31)
{
// 设置队尾元素
this.m_array[high] = value;
// 设置元素添加标志(由于 value 可能是任意值,这里使用额外的bool 表示是否添加)
this.m_state[high].m_value = true;
}
// 队尾 已经 到头,开始创建一下 segment
if (high == 31)
this.Grow();
}
// 如果抢到的下标超过,则直接返回失败
return high <= 31;
}
internal bool TryRemove(out T result)
{
SpinWait spinWait1 = new SpinWait();
int low = this.Low;
for (int high = this.High; low <= high; high = this.High)
{
// 原子性的尝试对 这个 segment 的 队头下标 + 1,这里可能多个线程同时抢占
if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low)
{
SpinWait spinWait2 = new SpinWait();
// 当前存在进队元素,但是尚未完成进队 的全部操作,进队操作完成 m_value == true
while (!this.m_state[low].m_value)
spinWait2.SpinOnce();
// 获取这个队列元素
result = this.m_array[low];
/*
* 是否有操作持有快照,没有则直接赋值元素为默认值
*/
if (this.m_source.m_numSnapshotTakers <= 0)
this.m_array[low] = default(T);
/*
* 如果已经取到 当前 segment 队列的 队头 => low 到达 数组的最后一个下标
* Note:这里为什么不能写成 if(low == 31)?, 其实是可以的
*/
if (low + 1 >= SEGMENT_SIZE_32)
{
// 等待进入下一个 segment,根据 入队函数可知,当一个segment 队头达到最后一个位置后,会立马创建 下一个 segment
SpinWait spinWait3 = new SpinWait();
while (this.m_next == null)
spinWait3.SpinOnce();
// 将下一个 segment 设置为 队头 segment
this.m_source.m_head = this.m_next;
}
// 抢到 队列中的数据
return true;
}
spinWait1.SpinOnce();
// 前面 没有 抢到 队列中的数据,重新获取当前的最新 Low => 队头下标
low = this.Low;
}
// 未抢到 队列中的数据,直接返回失败
result = default(T);
return false;
}
internal bool TryPeek(out T result)
{
result = default(T);
int low = this.Low;
// 队列无元素
if (low > this.High)
return false;
// 当前队头元素,入队尚未完成,自旋一会
SpinWait spinWait = new SpinWait();
while (!this.m_state[low].m_value)
spinWait.SpinOnce();
// 获取当前队头元素
result = this.m_array[low];
return true;
}
}
}