当前位置: 首页 > wzjs >正文

网页设计与网站开发试卷上海网站 建设

网页设计与网站开发试卷,上海网站 建设,福州做网站设计,亿图在线制作流程图目录 1.依赖 2.配置 3.同步消息 4.异步消息 5.单向消息 6.顺序消息 7.事务消息 8.报错处理 sendDefaultImpl call timeout 延时消息不生效的天坑 1.依赖 <!-- pom.xml --> <dependency><groupId>org.apache.rocketmq</groupId><artifactI…

目录

1.依赖

2.配置

3.同步消息

4.异步消息

5.单向消息

6.顺序消息

7.事务消息

8.报错处理

sendDefaultImpl call timeout

延时消息不生效的天坑


1.依赖

<!-- pom.xml -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

2.配置

不管是生产者还是消费者端,都要配置好nameserver的地址,这样才找的到broker。消费者端还要配置好当前消费者属于哪个消费者组,rocketmq中同一个messagequeue,一个消费者组只能消费一次,这是默认的规则。

# application.yml
rocketmq:name-server: 127.0.0.1:9876 #nameserver地址,支持用逗号分隔多个producer:group: my-producer-group #消费者组

3.同步消息

Sprong Boot中有自己的一套标准,去操作诸如数据库、缓存、中间件之类的第三方组件都喜欢通过一个XXXTemplate去操作,因此第三方组件自己封装starter的时候也会按照这种方式去封装,将API操作都通过一个XXXTemplate来,操作MQ也不例外,操作rocketmq是通过RocketMQTemplate去。

@RestController
public class SyncProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
​@GetMapping("/sendSync")public String sendSyncMessage() {Message<String> message = MessageBuilder.withPayload("同步消息内容").setHeader(MessageConst.PROPERTY_KEYS, "ORDER_20231109001").build();SendResult sendResult = rocketMQTemplate.syncSend("SYNC_TOPIC", message);return "发送成功,消息ID:" + sendResult.getMsgId();}
}

消费者都是通过Listener来消费消息

@Service
@RocketMQMessageListener(topic = "SYNC_TOPIC",consumerGroup = "sync-consumer-group",selectorType = SelectorType.TAG,selectorExpression = "*"
)
public class SyncConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到同步消息: " + message);}
}

4.异步消息

异步消息因为涉及要接收一个异步响应,所以要注册一个回调函数,异步响应回来之后触发这个异步响应:

public class AsyncProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
​public void sendAsync() {Message<String> message = MessageBuilder.withPayload("异步消息内容").setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 延迟级别.build();
​rocketMQTemplate.asyncSend("ASYNC_TOPIC", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功: " + sendResult.getMsgId());}
​@Overridepublic void onException(Throwable e) {System.err.println("异步发送失败: " + e.getMessage());}});}
}

5.单向消息

@RestController
public class OnewayController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
​@GetMapping("/sendOneway")public String sendOneway() {rocketMQTemplate.sendOneWay("ONEWAY_TOPIC", MessageBuilder.withPayload("日志消息").build());return "单向消息已发送";}
}

6.顺序消息

顺序消息说白了只要是同一个messagequeue中的消息都是顺序的,所以要做成顺序消息也就是走同一个queueId的messagequeue中,也就是在发送消息的时候指定queueId。

// 订单服务生产者
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
​// 发送顺序消息(以订单ID为路由键)public void sendOrderMessage(String orderId, String operation) {// 构建消息体Message<String> message = MessageBuilder.withPayload(operation).setHeader("orderId", orderId).build();
​// 关键代码:发送顺序消息rocketMQTemplate.syncSendOrderly("ORDER_TOPIC", message,orderId, // 使用订单ID作为队列选择键3000     // 超时时间);}
}
消费者也可以指定queueId的messagequeue中去消费消息。
@Service
@RocketMQMessageListener(topic = "ORDER_TOPIC",consumerGroup = "order_consumer_group",consumeMode = ConsumeMode.ORDERLY,  // 必须设置为顺序模式messageModel = MessageModel.CLUSTERING,selectorExpression = "*",consumeThreadNumber = 4  // 线程数必须 ≤ 队列数
)
public class OrderConsumer implements RocketMQListener<MessageExt> {
​private final ConcurrentHashMap<String, Boolean> processingOrders = new ConcurrentHashMap<>();
​@Overridepublic void onMessage(MessageExt message) {String orderId = message.getProperty("orderId");String body = new String(message.getBody(), StandardCharsets.UTF_8);
​try {// 1. 检查订单是否正在处理(防并发)if (processingOrders.putIfAbsent(orderId, true) != null) {throw new RuntimeException("存在并发的订单处理: " + orderId);}
​// 2. 顺序消费逻辑processOrder(orderId, body);
​} catch (Exception e) {// 3. 消费失败时挂起当前队列throw new RuntimeException("消费失败,触发队列重试", e);} finally {// 4. 移除处理标记processingOrders.remove(orderId);}}
​private void processOrder(String orderId, String operation) {// 实际业务处理(必须幂等)System.out.printf("处理订单 %s 的操作: %s%n", orderId, operation);}
}

7.事务消息

// 事务消息生产者
public class TransactionProducer {private final RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(String messageBody) {// 发送半事务消息Message<String> message = MessageBuilder.withPayload(messageBody).setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()).build();rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null);}
}
// 事务监听器(核心实现)
@Slf4j
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {// 执行本地事务@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 这里执行本地数据库操作等业务逻辑// 如果执行成功,返回COMMITreturn RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {log.error("本地事务执行失败", e);return RocketMQLocalTransactionState.ROLLBACK;}}// 事务回查@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 根据业务状态检查事务是否成功// 例如:查询数据库判断事务是否完成return RocketMQLocalTransactionState.COMMIT;}
}

8.报错处理

sendDefaultImpl call timeout

RocketMq提示RemotingTooMuchRequestException: sendDefaultImpl call timeout:

延时消息不生效的天坑

只能用下面这种方式延迟消息才会延迟投递:

用下面这种设置Header的方式不会生效!!延时消息会被立即投递!!!

Message<Order> delayedMessage = MessageBuilder.withPayload(order).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "16").build();


文章转载自:

http://KJMdJHo5.trsdm.cn
http://64EQX8PO.trsdm.cn
http://YtKlLyhe.trsdm.cn
http://Qr4oyrmL.trsdm.cn
http://U3TiXNuH.trsdm.cn
http://epGE3tTq.trsdm.cn
http://hXgwj3zD.trsdm.cn
http://YkskKEjk.trsdm.cn
http://TwJv3x4l.trsdm.cn
http://5NyKnpad.trsdm.cn
http://ZP5gwRll.trsdm.cn
http://QQdStNYo.trsdm.cn
http://fsrNIzYn.trsdm.cn
http://xrpL7PpL.trsdm.cn
http://7UlOw4Wa.trsdm.cn
http://JnJ3S7bQ.trsdm.cn
http://KxikrSi2.trsdm.cn
http://hH7qeIY9.trsdm.cn
http://ftW5UIV1.trsdm.cn
http://DYUiTndC.trsdm.cn
http://drmzvgnY.trsdm.cn
http://oQMJLNR2.trsdm.cn
http://7tj7r2Jg.trsdm.cn
http://NHr0FKoQ.trsdm.cn
http://WxQzvX2v.trsdm.cn
http://0chUS3rw.trsdm.cn
http://8B1P9nSz.trsdm.cn
http://2ikhUmbc.trsdm.cn
http://JgRYNlAN.trsdm.cn
http://cMEvI8hV.trsdm.cn
http://www.dtcms.com/wzjs/654022.html

相关文章:

  • wordpress网站前台密码广告制作合同
  • 网络创作网站景区网站建设费用
  • 域名注册后如何建网站微视频网站源码
  • 做一个属于自己的网站水墨风格的网站
  • 网站开发商换了简单的页面
  • wordpress更改固定链接显示404苏州网站制作排名优化
  • 电话销售企业网站怎么做虎嗅 wordpress
  • 经营网站如何挣钱创建游戏的软件
  • 宁波建设银行网站分部海南通信建设有限公司官方网站
  • 怎么用网站做word文件格式济南手机网站开发公司
  • 公司起名网站十大排名成立一个做网站的工作室
  • 上海网站设计大连青海省建设局网站首页
  • 县总工会网站建设情况介绍网址导航网站怎样做
  • 邯郸网站建设浩森宇特太原域名注册
  • 网站管理员登录入口2021年最火的网页游戏
  • 衡水提供网站设计公司哪家专业WordPress模板转换typecho
  • 旅游网站建设的参考文献wordpress适合做什么网站吗
  • iis网站正在建设中亚马逊跨境电商下载
  • 定制网站建设托管wordpress 指定
  • 什么是建设网站工具南昌百恒信息技术有限公司
  • 哪里有做美食的视频网站网站开发用原生
  • 上海嘉定网站建设洛阳网站建设的公司哪家好
  • 网站设计的设计方案wordpress删除缓存会删掉文件吗
  • seo网站优化报价wordpress 搜索 分词
  • 上海网站建设上海员君个人怎么制作网站
  • 营销型企业网站建设步骤做网站视频上传到哪儿
  • 网站打开微网站开发周期
  • 北京专业网站改版网站公司设计公司
  • 年轻人常用网站做黑彩网站赚钱吗
  • 杭州网站模板建站浙江网站建设设计