字节-面试
字节-实习-java-三面-商业平台
如何保证RabbitMQ消息的顺序性?
- 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
- 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。
- 如果你的应用程序能够容忍部分消息无序,但对一组相关消息的顺序有严格要求,那么可以考虑将消息分组,并为每个组指定一个唯一的标识符。然后,确保同一组内的所有消息由同一个消费者处理。
- RabbitMQ支持优先级队列,你可以设置消息的优先级。虽然这不是为了保证消息的顺序性而设计的,但在某些场景下可以通过调整消息的优先级来间接控制消息处理的顺序。
哪如果同类消息数量过多,是都放在同一个队列中吗?
- 分区队列(Sharding):可以按照某种规则(如用户ID、消息类型或其他业务逻辑相关的键值)对消息进行分片,然后将这些分片分配到不同的队列中。每个队列可以有自己独立的消费者组,这样可以在保持一定顺序性的前提下提高并发处理能力。
- 对于一些不需要立即响应的消息,可以考虑批量处理的方式。通过合并多条消息为一批次进行处理,可以减少I/O操作次数,提高效率。
java的线程池怎么设计的?
Java 线程池详解,图文并茂,还有谁不会?!_java公共线程池-CSDN博客
主要的addwork()方法:
-
状态检查:首先判断线程池的状态是否允许添加新的工作线程。只有在特定条件下(如线程池处于 RUNNING 或 SHUTDOWN 且满足“善后条件”),才会继续执行。
-
CAS 增加线程数:使用 CAS 操作尝试原子性地增加线程池中的工作线程数量,确保多线程环境下的安全性。如果当前线程数已达到限制,则直接返回失败。
-
创建并启动线程:成功更新线程数后,创建一个新的
Worker
对象,并将其加入到线程池中。随后启动这个新线程,使其开始执行任务。 -
异常处理:如果在任何阶段出现问题(比如线程启动失败),都会进行相应的清理工作,确保线程池的状态一致性。
“我在项目中用线程池比较多,也看过 ThreadPoolExecutor
的源码。比如 addWorker
方法,它会先看线程池是不是还在运行,然后再用 CAS 去抢一个线程名额,抢到了就 new 一个 Worker 并启动。Worker 内部用了 AQS 来控制中断,防止任务执行中被误中断……”
- 4.
Worker
类为什么既实现Runnable
又继承AQS
?有什么用?
✅ 答:
这是 JDK 设计的一个巧妙之处:
特性 | 作用 |
---|---|
implements Runnable | 使 Worker 可以作为任务提交给线程执行(t.start() 实际调用 run() ) |
extends AbstractQueuedSynchronizer | 实现了一个不可重入的互斥锁,用于控制任务执行期间的中断行为 |
📌 关键用途:
- 防止任务执行过程中被意外中断(比如调用
setCorePoolSize
时); - 实现
interruptIfStarted()
:只有当state >= 0
时才允许中断,而初始state = -1
,确保线程启动前不被中断; - 锁机制简单高效,避免使用
ReentrantLock
导致任务重入加锁的问题。
unWorker()
是线程池中真正执行任务的核心方法。
📌 主要流程:
- 先执行
firstTask
(如果有); - 循环调用
getTask()
从队列中获取后续任务; - 执行前调用
beforeExecute()
(钩子方法,可重写); - 执行
task.run()
; - 执行后调用
afterExecute()
(可用于异常处理或监控); - 更新完成任务数
completedTasks++
; - 异常退出时进入
processWorkerExit()
处理线程回收。
📌 加锁意义:
w.lock()
是为了防止在任务执行期间被外部中断(如shutdown()
);- 不是为了并发同步,而是保护任务执行上下文。
- 6.
getTask()
方法的作用是什么?它是如何控制线程超时退出的?
✅ 答:
getTask()
负责从任务队列中获取下一个任务,如果获取不到,可能返回 null
,导致线程退出。
📌 核心逻辑:
- 判断是否需要超时控制:
- 核心线程:默认永不超时(除非设置了
allowCoreThreadTimeOut
); - 非核心线程:一定时间没拿到任务就退出;
- 核心线程:默认永不超时(除非设置了
- 使用
queue.poll(keepAliveTime, TimeUnit)
实现超时获取; - 如果超时仍未获取到任务,返回
null
,触发线程退出流程; - 同时检查线程池状态:
- 若为
STOP
及以上状态,直接返回null
; - 若为
SHUTDOWN
且队列为空,也返回null
。
- 若为
📌 返回 null 的后果:
runWorker()
中的循环结束;- 调用
processWorkerExit()
回收该线程; - 实现了线程池的动态缩容。