当前位置: 首页 > news >正文

C#并发集合-ConcurrentQueue

在多线程环境下,使用这个集合接受网络请求

using System.Collections;
using System.Collections.Concurrent;
using System.Runtime.Serialization;

namespace ConsoleApp1.Concurrent;

/*
 * 并发队列 ConcurrentQueue
 * 数据结构:使用 数组 + 单链表
 * 结构图:
 *          tail                           head
 *          segment1 -> segment2 -> ... -> segment...
 *           arr[0]      arr[0]             arr[0]
 *           arr[1]      arr[1]             arr[1]
 *           arr[...]    arr[...]           arr[...]
 *           arr[32]     arr[32]            arr[32]
 * 操作:
 *  进队
 *      tail =>
 *          累加 tail segment 中的 high(入队方向从 0 到 31)
 *          入队完成,if segment 内数组已满(high == 31),则提前将下一个 segment 创建,并且当前 tail.next 指向新的segment, tail 再执行 新的segment
 *  出队
 *      head =>
 *          累加 head segment 中的 low(出队方向从 0 到 31)
 *                   if segment 内数组已空(low == 31),则提前将下一个 segment 创建,并且当前 tail.next 指向新的segment, tail 再执行 新的segment
 */
public class ConcurrentQueue<T> :
    IProducerConsumerCollection<T>
{
    private volatile ConcurrentQueue<T>.Segment m_head;
    private volatile ConcurrentQueue<T>.Segment m_tail;
    internal volatile int m_numSnapshotTakers;
    private const int SEGMENT_SIZE_32 = 32;

    public ConcurrentQueue()
    {
        this.m_head = this.m_tail = new Segment(0L, this);
    }

    private void InitializeFromCollection(IEnumerable<T> collection)
    {
        throw new NotImplementedException();
    }

    public ConcurrentQueue(IEnumerable<T> collection)
    {
        throw new NotImplementedException();
    }

    private void OnSerializing(StreamingContext context)
    {
        throw new NotImplementedException();
    }

    private void OnSerialized(StreamingContext context)
    {
        throw new NotImplementedException();
    }

    void ICollection.CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }

    private IEnumerator<T> GetEnumerator(Segment head, Segment tail, int headLow, int tailHigh)
    {
        throw new NotImplementedException();
    }

    public IEnumerator<T> GetEnumerator()
    {
        throw new NotImplementedException();
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

    public void CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }

    public int Count
    {
        get
        {
            this.GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh);
            return head == tail
                ? tailHigh - headLow + 1
                : SEGMENT_SIZE_32 - headLow + SEGMENT_SIZE_32 * (int)(tail.m_index - head.m_index - 1L) +
                  (tailHigh + 1);
        }
    }

    private void GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh)
    {
        head = this.m_head;
        tail = this.m_tail;
        headLow = head.Low;
        tailHigh = tail.High;
        SpinWait spinWait = new SpinWait();

        while (head != this.m_head || tail != this.m_tail || headLow != head.Low || tailHigh != tail.High ||
               head.m_index > tail.m_index)
        {
            spinWait.SpinOnce();
            head = this.m_head;
            tail = this.m_tail;
            headLow = head.Low;
            tailHigh = tail.High;
        }
    }

    public bool IsSynchronized => false;

    public object SyncRoot => throw new NotSupportedException();

    public void CopyTo(T[] array, int index)
    {
        throw new NotImplementedException();
    }

    public bool TryAdd(T item)
    {
        this.Enqueue(item);
        return true;
    }

    public bool TryTake(out T item) => this.TryDequeue(out item);

    public void Enqueue(T item)
    {
        SpinWait spinWait = new SpinWait();
        while (!this.m_tail.TryAppend(item))
            spinWait.SpinOnce();
    }

    public bool TryDequeue(out T result)
    {
        while (!this.IsEmpty)
        {
            if (this.m_head.TryRemove(out result))
                return true;
        }

        result = default(T);
        return false;
    }

    public bool TryPeek(out T result)
    {
        Interlocked.Increment(ref this.m_numSnapshotTakers);
        while (!this.IsEmpty)
        {
            if (this.m_head.TryPeek(out result))
            {
                Interlocked.Decrement(ref this.m_numSnapshotTakers);
                return true;
            }
        }

        result = default(T);
        Interlocked.Decrement(ref this.m_numSnapshotTakers);
        return false;
    }

    public bool IsEmpty
    {
        get
        {
            ConcurrentQueue<T>.Segment head = this.m_head;
            if (!head.IsEmpty)
                return false;
            if (head.Next == null)
                return true;
            SpinWait spinWait = new SpinWait();
            for (; head.IsEmpty; head = this.m_head)
            {
                if (head.Next == null)
                    return true;
                spinWait.SpinOnce();
            }

            return false;
        }
    }

    public T[] ToArray()
    {
        throw new NotImplementedException();
    }

    internal struct VolatileBool
    {
        public volatile bool m_value;
        public VolatileBool(bool value) => this.m_value = value;
    }

    private class Segment
    {
        // 存在实际的元素 
        internal volatile T[] m_array;
        internal volatile VolatileBool[] m_state;
        
        // 单链表,指向下一个 segment
        internal ConcurrentQueue<T>.Segment Next => this.m_next;
        private volatile ConcurrentQueue<T>.Segment m_next;
        
        // 记录当前 segment 为 第几个(从0开始,计算总Count 会使用)
        internal readonly long m_index;
        
        // 当前 segment 中 的 可出队下标
        internal int Low => Math.Min(this.m_low, SEGMENT_SIZE_32);
        private volatile int m_low;
        
        // 当前 segment 中 的 可进队下标
        internal int High => Math.Max(this.m_high, 31);
        private volatile int m_high;
        
        // 属于哪个队列
        private volatile ConcurrentQueue<T> m_source;

        internal Segment(long index, ConcurrentQueue<T> source)
        {
            this.m_array = new T[SEGMENT_SIZE_32];
            this.m_state = new VolatileBool[SEGMENT_SIZE_32];
            this.m_high = -1;
            this.m_index = index;
            this.m_source = source;
        }
        
        internal bool IsEmpty => this.Low > this.High;

        // 创建下一个 segment
        internal void Grow()
        {
            this.m_next = new Segment(this.m_index + 1L, this.m_source);

            // 将新创建的 segment 设置为 当前 队尾
            this.m_source.m_tail = this.m_next;
        }

        internal bool TryAppend(T value)
        {
            // 当前 segment 队尾 已经到头,无法添加元素
            if (this.m_high >= 31)
                return false;
            int high = SEGMENT_SIZE_32;

            try
            {
            }
            finally
            {
                // 原值性的增加队尾下标
                high = Interlocked.Increment(ref this.m_high);
                if (high <= 31)
                {
                    // 设置队尾元素
                    this.m_array[high] = value;

                    // 设置元素添加标志(由于 value 可能是任意值,这里使用额外的bool 表示是否添加)
                    this.m_state[high].m_value = true;
                }

                // 队尾 已经 到头,开始创建一下 segment
                if (high == 31)
                    this.Grow();
            }

            // 如果抢到的下标超过,则直接返回失败
            return high <= 31;
        }

        internal bool TryRemove(out T result)
        {
            SpinWait spinWait1 = new SpinWait();
            int low = this.Low;
            for (int high = this.High; low <= high; high = this.High)
            {
                // 原子性的尝试对 这个 segment 的 队头下标 + 1,这里可能多个线程同时抢占
                if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low)
                {
                    SpinWait spinWait2 = new SpinWait();

                    // 当前存在进队元素,但是尚未完成进队 的全部操作,进队操作完成 m_value == true
                    while (!this.m_state[low].m_value)
                        spinWait2.SpinOnce();

                    // 获取这个队列元素
                    result = this.m_array[low];

                    /*
                     * 是否有操作持有快照,没有则直接赋值元素为默认值
                     */
                    if (this.m_source.m_numSnapshotTakers <= 0)
                        this.m_array[low] = default(T);

                    /*
                     * 如果已经取到 当前 segment 队列的 队头 => low 到达 数组的最后一个下标
                     * Note:这里为什么不能写成 if(low == 31)?, 其实是可以的
                     */
                    if (low + 1 >= SEGMENT_SIZE_32)
                    {
                        // 等待进入下一个 segment,根据 入队函数可知,当一个segment 队头达到最后一个位置后,会立马创建 下一个 segment
                        SpinWait spinWait3 = new SpinWait();
                        while (this.m_next == null)
                            spinWait3.SpinOnce();

                        // 将下一个 segment 设置为 队头 segment
                        this.m_source.m_head = this.m_next;
                    }

                    // 抢到 队列中的数据
                    return true;
                }

                spinWait1.SpinOnce();

                // 前面 没有 抢到 队列中的数据,重新获取当前的最新 Low => 队头下标
                low = this.Low;
            }

            // 未抢到 队列中的数据,直接返回失败
            result = default(T);
            return false;
        }

        internal bool TryPeek(out T result)
        {
            result = default(T);
            int low = this.Low;

            // 队列无元素
            if (low > this.High)
                return false;

            // 当前队头元素,入队尚未完成,自旋一会
            SpinWait spinWait = new SpinWait();
            while (!this.m_state[low].m_value)
                spinWait.SpinOnce();

            // 获取当前队头元素
            result = this.m_array[low];
            return true;
        }
    }
}

 

相关文章:

  • 2024年第十五届蓝桥杯大赛软件赛省赛Python大学A组真题解析《更新中》
  • kkfileview部署
  • 0x04 jdbc和mybatis
  • 初阶数据结构(C语言实现)——3顺序表和链表(1)
  • 腾讯游戏完成架构调整 IEG新设五大产品事业部
  • 迷你世界脚本生物接口:Creature
  • Navicat连接虚拟机数据库详细教程
  • PAT乙级(1045 快速排序)C语言详解
  • 基于SpringBoot+Vue的医院挂号管理系统+LW示例参考
  • 【UI设计——陕西红富士苹果海报分享】
  • 【深度学习】Hopfield网络:模拟联想记忆
  • 前端开发常用的加密算法
  • MFC: 控件根据文本内容大小自动调整
  • git配置多个SSH key
  • ES scroll=1m:表示快照的有效时间为1分钟。怎么理解
  • 使用hutool将json集合对象转化为对象
  • 02_NLP文本预处理之文本张量表示法
  • SpringBoot3—核心特性:基础特性
  • Spring 原理(很多面试题)
  • (下:补充——五个模型的理论基础)深度学习——图像分类篇章
  • 坚持吃素,是不是就不会得高血脂了?
  • 互降关税后,从中国至美国的集装箱运输预订量飙升近300%
  • 盛和资源海外找稀土矿提速:拟超7亿元收购匹克,加快推动坦桑尼亚项目
  • 4月份全国企业销售收入同比增长4.3%
  • 最新研究:新型合成小分子可“精准杀伤”癌细胞
  • 央媒评网红质疑胖东来玉石定价暴利:对碰瓷式维权不能姑息