C#基础15-线程安全集合
1、ConcurrentBag
(1)核心特点
线程安全的无序集合 专为多线程设计,无需手动加锁即可支持并发读写操作。 元素无序:存储和取出顺序无保证,适合不依赖顺序的场景。 高效的数据操作 同类线程优先操作本地数据:每个线程维护独立的本地队列(ThreadLocalList
),添加或移除自身线程的数据时几乎无锁竞争。 跨线程“偷取”机制:当线程自身无数据时,会从其他线程的队列中移除元素(Steal
方法),确保资源利用率。 惰性数据迁移 仅在当前线程无数据时,才遍历其他线程的队列迁移数据,减少同步开销。
(2)常用方法与操作
var bag = new ConcurrentBag< string > ( ) ;
bag. Add ( "Item1" ) ;
if ( bag. TryTake ( out string item) )
{ Console. WriteLine ( $"取出: { item } " ) ;
}
if ( bag. TryPeek ( out string peekItem) )
{ Console. WriteLine ( $"查看: { peekItem } " ) ;
}
(3)适用场景与限制
场景 说明 ✅ 同线程生产-消费 同一组线程既添加又移除数据时性能最佳(如线程池任务共享中间结果)。 ✅ 临时数据收集 多线程日志聚合、并行计算的阶段性结果合并。 ❌ 需顺序访问的场景 元素无序,不适用于需 FIFO/LIFO 的场景(改用 ConcurrentQueue
/ConcurrentStack
)。 ❌ 单线程操作 性能低于 List<T>
(因维护线程安全机制的开销)。
(4)注意事项
内存占用:每个线程独立存储数据,可能因线程过多导致内存碎片。 遍历性能:GetEnumerator()
会复制所有元素到新列表再遍历,大集合时效率低。 跨线程操作延迟:当线程从其他线程“偷取”数据时,会有额外同步开销。
(5)对比其他并发集合
集合类型 数据结构 顺序保证 适用场景 ConcurrentBag<T>
无序集合 ❌ 无 同线程生产-消费 ConcurrentQueue<T>
队列 (FIFO) ✅ 先进先出 任务调度、顺序处理 ConcurrentStack<T>
栈 (LIFO) ✅ 后进先出 撤销操作、递归结果 ConcurrentDictionary
哈希表 ✅ 按键访问 共享缓存、计数器
2、ConcurrentDictionary<TKey, TValue>
(1)核心特点
线程安全的键值对集合 专为多线程设计,支持并发读写操作无需手动加锁。 无序存储:不保证元素的遍历顺序,适合不依赖顺序的共享数据场景。 分段锁优化性能 内部通过多个锁分区(锁数量 ≈ CPU 核数 × 4)减少竞争。 操作不同分区的键时线程无需等待,提升并发吞吐量。 原子操作方法 提供线程安全的原子操作,避免复合操作中的竞态条件:
var dict = new ConcurrentDictionary< string , int > ( ) ;
dict. AddOrUpdate ( "key" , k => 1 , ( k, old) => old + 1
) ;
int value = dict. GetOrAdd ( "key" , k => 10 ) ;
(2)常用方法与场景
方法 作用 示例 TryAdd(key, value)
尝试添加键值对(成功返回 true
) dict.TryAdd("a", 1)
TryRemove(key, out value)
尝试移除键值对 dict.TryRemove("a", out int val)
TryUpdate(key, newVal, oldVal)
仅当当前值匹配 oldVal
时更新为 newVal
dict.TryUpdate("a", 2, 1)
GetOrAdd(key, valueFactory)
若键不存在则添加,否则返回现有值 dict.GetOrAdd("a", k => 5)
AddOrUpdate
结合添加与更新逻辑 见上方原子操作示例
(3)适用场景与限制
场景 推荐度 说明 ✅ 共享缓存 ⭐⭐⭐⭐⭐ 多线程读写键值数据(如配置项、用户会话)。 ✅ 并发计数器 ⭐⭐⭐⭐ 使用 AddOrUpdate
实现线程安全的计数(如访问统计)。 ❌ 高频遍历操作 ⭐ Keys
/Values
属性会复制全量数据,大集合时性能差。❌ 顺序敏感场景 ⭐ 元素遍历顺序不确定,需有序访问时改用 ConcurrentQueue
或锁控 Dictionary
。
(4)注意事项
内存开销:每个分区独立计数(m_countPerLock
),可能因分区过多导致内存碎片。 更新操作的原子性:AddOrUpdate
中的 updateValueFactory
可能被多次执行,需确保幂等性。
dict. AddOrUpdate ( "count" , 1 , ( k, old) => old + ExternalService. GetIncrement ( ) ) ;
dict. AddOrUpdate ( "count" , 1 , ( k, old) => old + 1 ) ;
资源释放:值类型为 IDisposable
时,需手动处理移除对象的释放。
(5)与其他并发容器的对比
集合类型 最佳场景 线程安全机制 ConcurrentDictionary
键值缓存/计数器 分段锁 + 原子操作 ConcurrentBag<T>
线程本地生产-消费 线程本地存储 + 偷取算法 ConcurrentQueue<T>
任务队列(FIFO) 无锁 CAS 操作 BlockingCollection<T>
生产者-消费者模式 阻塞操作 + 容量限制
(6)性能优化建议
避免频繁遍历:改用 foreach (var pair in dict)
而非 dict.Keys
或 dict.Values
。 控制分区数量:构造函数中指定 concurrencyLevel
(如 Environment.ProcessorCount * 2
)优化锁竞争。 优先使用原子方法:用 GetOrAdd
代替 ContainsKey
+ Add
组合,避免竞态条件。
3、ConcurrentQueue
(1)核心特性
线程安全性 采用无锁(lock-free)算法实现多线程安全,支持并发入队(Enqueue)和出队(TryDequeue)操作,无需额外同步机制(如锁)。 通过原子操作(如 Interlocked
)和内存屏障(memory barriers)避免竞争条件。 分段存储(Segmented Array) 内部由多个固定大小的 Segment
(默认 32 个元素)组成链表结构,避免扩容时复制整个数组。 新元素入队时若当前 Segment
已满,自动追加新 Segment
;出队后空 Segment
由 GC 回收。 高性能设计 相比 Queue<T> + lock
,减少锁争用和上下文切换开销,适合高并发场景。 单线程环境下性能略低于普通 Queue<T>
(无锁算法额外开销)。
(2)关键 API 及用法
方法/属性 说明 Enqueue(T item)
元素追加至队尾,若当前 Segment
满则自动扩容。 bool TryDequeue(out T result)
尝试移除队首元素,成功返回 true
;队列空时返回 false
。 bool TryPeek(out T result)
尝试查看队首元素(不移除)。 bool IsEmpty
高效判断队列空(O(1) 复杂度),优先于 Count == 0
。 int Count
获取元素数量(计算需遍历 Segment
,高并发时可能不精确)。
ConcurrentQueue< int > queue = new ConcurrentQueue< int > ( ) ;
Parallel. Invoke ( ( ) => queue. Enqueue ( 1 ) , ( ) => queue. Enqueue ( 2 )
) ;
if ( queue. TryDequeue ( out int result) )
{ Console. WriteLine ( result) ;
}
(3)与 Queue<T>
的对比
特性 ConcurrentQueue<T>
Queue<T>
线程安全 ✅ 内置无锁并发 ❌ 需手动加锁 扩容开销 低(分段追加,无全量复制) 高(数组双倍扩容+复制) 适用场景 多线程生产者-消费者模型 单线程或低并发场景
(4)使用注意事项
避免 Count
判空:高并发时 Count
计算开销大,必须用 IsEmpty
代替。 批量处理优化:循环调用 TryDequeue
批量取出元素,减少单次操作开销。
while ( queue. TryDequeue ( out var item) )
{ Process ( item) ;
}
配合 Task
实现高效并行:结合 Task.WhenAll
或 Parallel.ForEach
处理队列任务。
var tasks = new List< Task> ( ) ;
while ( ! queue. IsEmpty)
{ tasks. Add ( Task. Run ( ( ) => { if ( queue. TryDequeue ( out var item) ) Process ( item) ; } ) ) ;
}
await Task. WhenAll ( tasks) ;
(5)适用场景
高并发数据缓冲:如日志异步写入、实时消息分发。 任务调度队列:多线程任务分配(生产者提交任务,消费者执行)。 流式数据处理:并行管道中的中间数据存储。
4、ConcurrentStack
(1)核心特性与作用
线程安全的 LIFO 栈 实现后进先出(LIFO)的栈结构,专为多线程并发访问设计,无需额外同步锁。 通过细粒度锁或无锁算法(如 Interlocked.CompareExchange
)保证操作的原子性。 核心 API 方法 入栈 Push(T item)
:单元素入栈。PushRange(T[] items)
:批量入栈(原子操作)。 出栈 bool TryPop(out T result)
:尝试弹出栈顶元素(成功返回 true
)。int TryPopRange(T[] items)
:批量弹出元素,返回实际弹出数量。 查看栈顶 bool TryPeek(out T result)
:查看但不移除栈顶元素(栈空时返回 false
)。 其他属性 Count
:获取近似元素数量(并发环境下可能不精确)。IsEmpty
:检查栈是否为空(推荐代替 Count == 0
判断)。 性能优化 内部采用链表结构,避免数组扩容开销,减少内存拷贝。 使用自旋等待(SpinWait
)处理短暂竞争,降低线程阻塞概率。
(2)适用场景
场景 说明 多线程撤销操作 逆序处理操作历史(如用户操作回滚)。 并行任务调度 多个线程同时提交任务,按后进先出顺序执行。 资源池管理 高效复用资源对象(如数据库连接池),避免频繁创建销毁。 高并发数据缓冲 作为 BlockingCollection<T>
的底层存储,支持阻塞式生产者-消费者模型。
(3)实现原理与性能要点
数据结构 基于单向链表实现,每个节点(Node
)存储元素和指向下一节点的引用。 通过 head
指针维护栈顶,利用 Interlocked
原子操作更新指针。 线程安全机制
while ( true ) { Node oldHead = _head; var newNode = new Node ( item, oldHead) ; if ( Interlocked. CompareExchange ( ref _head, newNode, oldHead) == oldHead) break ; else SpinWait. SpinOnce ( ) ;
}
- 通过 `CompareExchange` 确保入栈/出栈操作的原子性。
- 批量操作(如 `PushRange`)通过一次性插入链表段减少竞争。
注意事项 非严格实时计数:Count
属性在并发下是近似值,需用 IsEmpty
判空。 批量操作限制:TryPopRange
可能返回少于请求数量的元素(栈元素不足时)。 内存开销:链表节点分散存储,相比数组栈(Stack<T>
)内存局部性较差。
(4)使用示例
using System. Collections. Concurrent ; var stack = new ConcurrentStack< int > ( ) ;
Parallel. For ( 0 , 10 , i => stack. Push ( i) ) ;
int [ ] poppedItems = new int [ 5 ] ;
int count = stack. TryPopRange ( poppedItems, 0 , 5 ) ; Console. WriteLine ( $"Popped { count } items: { string . Join ( "," , poppedItems) } " ) ;
(5)适用性建议
优先选择场景:需严格 LIFO 顺序且高并发(如任务调度、撤销栈)。 替代方案: 需阻塞/容量限制 → BlockingCollection<T>
(包装 ConcurrentStack<T>
)。 无序且线程本地操作 → ConcurrentBag<T>
。