RocketMQ高并发编程技巧(二)
信号量的使用技巧
信号量是用来控制在同一时间,能够访问某个共享资源的并发线程/进程数量的——在OS中我们常常用PV操作来协调共享内存的资源管理,主要用于实现进程的互斥和同步。
当然OS的这个信号量和Java的 semaphore
本质上还是不同的东西,只是思想上是同源的,Java的 semaphore
本质上还是解决进程内线程的同步机制。
RocketMQ中信号量主要就是做了一件事情——限流。
RocketMQ进行异步发送的时候,就可以通过信号量控制发送的并发量,也就是超过一定的并发量进行限流的处理,阻止新任务的提交。
信号量使用示例
首先我们先来看看 Semaphore
信号量使用示例。
package org.example;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class semaphoreDemo {static Semaphore semaphore = new Semaphore(10);public static void main(String[] args) {for (int i = 0; i < 100; i++){Thread t = new Thread(new Runnable() {@Overridepublic void run() {dosomething();}});t.start();}}private static void dosomething(){boolean acquire = false;try {// 尝试在3000微秒内获取许可acquire = semaphore.tryAcquire(3000, TimeUnit.MICROSECONDS);if (acquire){System.out.println(Thread.currentThread().getName() + ": 获取信号量成功");}else {System.out.println(Thread.currentThread().getName() + ": 获取信号量失败");}} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 如果成功获取了许可,则在使用后释放if (acquire){semaphore.release();}}}
}
try-catch-finally
是非常经典的语句。在这个简单场景下,我们只需要负责好 semaphore
的控制即可。
但是如果这个 dosomething
方法中涉及到多个分支,分支中再一次的异步操作,那么信号量的控制会变得十分复杂。
RocketMQ 的解决方案
RocketMQ希望用 Semaphore
又希望能正确实现信号的获取和归还,如下便是解决方案:
public class SemaphoreReleaseOnlyOnce {private final AtomicBoolean released = new AtomicBoolean(false);private final Semaphore semaphore;public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {this.semaphore = semaphore;}public void release() {if (this.semaphore != null) {// 使用CAS操作,确保信号量只被释放一次if (this.released.compareAndSet(false, true)) {this.semaphore.release();}}}public Semaphore getSemaphore() {return semaphore;}
}
这段代码主要是用 CAS (Compare-And-Set) 的操作,使得
release
方法即使被多次调用也只会实际去进行一次的真正的release
。
RocketMQ 的使用场景
上述是解决方案,那么RocketMQ的使用场景是在 异步请求的处理 和 单向请求的发送 进行使用的。
如下就是单向请求发送的代码:
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();// 尝试在指定超时时间内获取许可boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {// 使用包装类确保信号量只释放一次final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {// 异步操作完成后释放信号量once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {// 出现异常时也要释放信号量once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {// 如果在超时时间内未能获取到许可,则抛出异常String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTooMuchRequestException(info);}
}