当前位置: 首页 > news >正文

RocketMQ高并发编程技巧(二)

信号量的使用技巧

信号量是用来控制在同一时间,能够访问某个共享资源的并发线程/进程数量的——在OS中我们常常用PV操作来协调共享内存的资源管理,主要用于实现进程的互斥和同步。

当然OS的这个信号量和Java的 semaphore 本质上还是不同的东西,只是思想上是同源的,Java的 semaphore 本质上还是解决进程内线程的同步机制。

RocketMQ中信号量主要就是做了一件事情——限流。

RocketMQ进行异步发送的时候,就可以通过信号量控制发送的并发量,也就是超过一定的并发量进行限流的处理,阻止新任务的提交。

信号量使用示例

首先我们先来看看 Semaphore 信号量使用示例。

package org.example;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class semaphoreDemo {static Semaphore semaphore = new Semaphore(10);public static void main(String[] args) {for (int i = 0; i < 100; i++){Thread t = new Thread(new Runnable() {@Overridepublic void run() {dosomething();}});t.start();}}private static void dosomething(){boolean acquire = false;try {// 尝试在3000微秒内获取许可acquire = semaphore.tryAcquire(3000, TimeUnit.MICROSECONDS);if (acquire){System.out.println(Thread.currentThread().getName() + ": 获取信号量成功");}else {System.out.println(Thread.currentThread().getName() + ": 获取信号量失败");}} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 如果成功获取了许可,则在使用后释放if (acquire){semaphore.release();}}}
}

try-catch-finally 是非常经典的语句。在这个简单场景下,我们只需要负责好 semaphore 的控制即可。

但是如果这个 dosomething 方法中涉及到多个分支,分支中再一次的异步操作,那么信号量的控制会变得十分复杂。

RocketMQ 的解决方案

RocketMQ希望用 Semaphore 又希望能正确实现信号的获取和归还,如下便是解决方案:

public class SemaphoreReleaseOnlyOnce {private final AtomicBoolean released = new AtomicBoolean(false);private final Semaphore semaphore;public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {this.semaphore = semaphore;}public void release() {if (this.semaphore != null) {// 使用CAS操作,确保信号量只被释放一次if (this.released.compareAndSet(false, true)) {this.semaphore.release();}}}public Semaphore getSemaphore() {return semaphore;}
}

这段代码主要是用 CAS (Compare-And-Set) 的操作,使得 release 方法即使被多次调用也只会实际去进行一次的真正的 release

RocketMQ 的使用场景

上述是解决方案,那么RocketMQ的使用场景是在 异步请求的处理单向请求的发送 进行使用的。

如下就是单向请求发送的代码:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();// 尝试在指定超时时间内获取许可boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {// 使用包装类确保信号量只释放一次final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {// 异步操作完成后释放信号量once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {// 出现异常时也要释放信号量once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {// 如果在超时时间内未能获取到许可,则抛出异常String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTooMuchRequestException(info);}
}
http://www.dtcms.com/a/481697.html

相关文章:

  • 算法沉淀第二天(Catching the Krug)
  • redis-4.0.11-1.ky10.sw_64.rpm安装教程(申威麒麟V10 64位系统详细步骤)
  • 为企业为什么做网站企业网站建设注意什么
  • 从监听风险到绝对隐私:Zoom偷听门后,Briefing+CPolar重新定义远程会议安全标准
  • 网站源代码下载工具网站备案网站前置审批
  • 基于GENESIS64核心可视化组件GraphWorX64的工业图形设计解决方案
  • QML学习笔记(三十七)QML的Slider
  • 3:Django-migrate
  • 【Linux】网络基础概念
  • Go语言技术与应用(三):服务注册发现机制详解
  • 网线学习笔记
  • 【OpenHarmony】存储管理服务模块架构
  • 网站做报表网站维护是谁做的
  • 阿里云k8s部署微服务yaml和Dockerfile文件脚本
  • [Backstage] 后端插件 | 包架构 | 独立微服务 | by HTTP路由
  • java微服务-尚医通-编写接口
  • Go|sync.Pool|临时对象池,实现临时对象的复用,降低GC压力
  • go语言了解
  • 网站页面高度福建住房城乡建设部网站
  • 【Go】--数组和切片
  • 李宏毅机器学习笔记22
  • 重排反应是什么?从分子变化到四大关键特征解析
  • 服务治理与 API 网关:微服务流量管理的艺术
  • 怎样做企业的网站首页网站开发求职简历
  • 程序设计基础第2周上课前预习
  • 谷歌 chrome 浏览器安装crx插件(hackbar为例)
  • 分布式专题——43 ElasticSearch概述
  • Tomcat 启动后只显示 index.jsp,没有进入你的 Servlet 逻辑
  • 分布式之RabbitMQ的使用(3)QueueBuilder
  • 建立自己网站的好处抖音代运营可以相信吗