当前位置: 首页 > wzjs >正文

国外mod大型网站建设团购网站费用

国外mod大型网站,建设团购网站费用,做我的世界皮肤壁纸的网站,如何制作手机购物网站好长时间没更新了。。。。。。 背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至…

好长时间没更新了。。。。。。

背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至可以选择自定义线程池+DelayQueue 这种操作去进行异步操作,而大多数人会在第一时间想到消息丢列,但是消息引入消息队列这件事对于一个并发量不大、后半段消息允许失败的情况单独引入一个中间件对系统的开发维护难度都会提升一个等级,所以我就想到应用 Redis Stream 这种方式来实现异步任务的执行

废话不多说,直接上代码

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisCountdownTaskProducer implements CountdownStrategy {private final StringRedisTemplate stringRedisTemplate;@Overridepublic void startCountdown(long duration, Runnable onFinish, String userId, String taskId) {log.info("使用redis stream 的延时任务开始执行 userId:{},taskId:{}",userId,taskId);Map<String,String> producerMap = Map.of("userId",userId,"taskId",taskId,"duration",String.valueOf(duration));// 发送延迟消费信息  topic: pickUpTheLightRecordstringRedisTemplate.opsForStream().add("streamKey", producerMap);}@Overridepublic void cancelCountdown(String userId, String taskId) {}
}

以我的应用场景为例,大家可以忽略这个继承的 CountdownStrategy 的接口,我这是用策略模式来实现多种方式的动态切换

最核心的代码就是一行 stringRedisTemplate.opsForStream().add(“pickUpTheLightRecord”, producerMap);
应用 redis 提供的 stream 功能,直接发送你的 topic 和你的 key(这个 key 可以是你的某个实体,某个信息,或者说某种标识,以便后续取出的时候可以知道自己要进行什么操作)

@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {private final RedisConnectionFactory redisConnectionFactory;private final RedisCountdownTaskConsumer redisCountdownTaskConsumer;@Beanpublic ExecutorService asyncStreamConsumer() {AtomicInteger index = new AtomicInteger();int processors = Runtime.getRuntime().availableProcessors();return new ThreadPoolExecutor(processors,processors + processors >> 1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),runnable -> {Thread thread = new Thread(runnable);thread.setName("stream_consumer_countdown_task_" + index.incrementAndGet());thread.setDaemon(true);return thread;});}@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(ExecutorService asyncStreamConsumer) {// 创建消费者组String consumerGroup = "your_comsumer_group";String streamKey = "streamKey"; // 与生产者topic一致try {redisConnectionFactory.getConnection().xGroupCreate(streamKey.getBytes(), consumerGroup, ReadOffset.from("0-0"), true);} catch (Exception e) {// 捕获异常,检查是否是因为消费者组已存在导致的错误if (e.getMessage().contains("BUSYGROUP")) {// 如果消费者组已存在,则复用现有的消费者组log.warn("消费者组已存在,复用现有的消费者组: {}", consumerGroup);} else {// 如果是其他错误,则记录日志log.warn("消费者组创建失败: {}", e.getMessage());}}StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(10)// 执行从 Stream 拉取到消息的任务流程.executor(asyncStreamConsumer)// 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时.pollTimeout(Duration.ofSeconds(3)).build();StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =StreamMessageListenerContainer.create(redisConnectionFactory, options);streamMessageListenerContainer.receiveAutoAck(Consumer.from(consumerGroup,"countdownTaskConsumer"),StreamOffset.create(streamKey, ReadOffset.lastConsumed()),redisCountdownTaskConsumer);return streamMessageListenerContainer;}
}

以上是对消费者的配置,配置过后我们通过 Bean 的形式直接注入 Spring 容器,方便在应用启动时它可以自动创建,应用结束时可以自动销毁,避免资源浪费

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCountdownTaskConsumer implements StreamListener<String, MapRecord<String, String, String>> {private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;private final TaskCommonPomodoroTechniqueMapper taskCommonPomodoroTechniqueMapper;private final StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {log.info("receive message:{},倒计时任务开始执行 userId:{},taskId:{}",message);// 执行倒计时任务String stream = message.getStream();RecordId id = message.getId();String consumerGroup = "pickUpTheLightRecord_consumer_group";//消息幂等处理if(!messageQueueIdempotentHandler.isMessageProcessed(id.toString())){//判断当前消息是否执行完成if(messageQueueIdempotentHandler.isAccomplish(id.toString())){log.info("消息已处理完成: {}", id);stringRedisTemplate.opsForStream().acknowledge(stream, consumerGroup, id); // 显式ACreturn;}throw new RuntimeException("消息未完成流程,选择消息队列重试");}try {Map<String, String> value = message.getValue();String userId = value.get("userId");String taskId = value.get("taskId");LambdaUpdateWrapper<TaskCommonPomodoroTechnique> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(TaskCommonPomodoroTechnique::getUserId,userId);wrapper.eq(TaskCommonPomodoroTechnique::getId,taskId);TaskCommonPomodoroTechnique taskCommonPomodoroTechnique = taskCommonPomodoroTechniqueMapper.selectOne(wrapper);if(taskCommonPomodoroTechnique == null){log.error("倒计时任务不存在,userId:{},taskId:{}",userId,taskId);return;}taskCommonPomodoroTechnique.setCompletionTimes(taskCommonPomodoroTechnique.getCompletionTimes()+1);taskCommonPomodoroTechniqueMapper.updateById(taskCommonPomodoroTechnique);log.info("倒计时任务执行成功,userId:{},taskId:{}",userId,taskId);stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());}catch (Throwable e){messageQueueIdempotentHandler.deleteMessageProcessed(id.toString());log.error("记录倒计时任务异常",e);throw e;}messageQueueIdempotentHandler.setAccomplish(id.toString());}
}

最后我们来实现具体的消费者,直接继承 StreamListener 这个类,重写 onMessage 方法,在这个方法中定义你要执行的业务逻辑(大家可以忽略幂等的处理,这个不是讲解的重点)

这样就可以使用基于 Redis Stream 的消息丢列啦

总的来说和 RocketMQ 的使用差不多,但是多了一些配置的过程


文章转载自:

http://sZYdyieX.qhkdt.cn
http://dC2ncuuF.qhkdt.cn
http://o2Q6xBeO.qhkdt.cn
http://62BdqCW7.qhkdt.cn
http://B896l1Nc.qhkdt.cn
http://A3sUM37m.qhkdt.cn
http://3rUifuTE.qhkdt.cn
http://uYG0H0GH.qhkdt.cn
http://JWfCE9RQ.qhkdt.cn
http://ECtbQnkC.qhkdt.cn
http://6u2lcMLv.qhkdt.cn
http://s3Nh9L4g.qhkdt.cn
http://IsrOCDE2.qhkdt.cn
http://FcZZx77v.qhkdt.cn
http://3tUdw3Dp.qhkdt.cn
http://lUv9uhuL.qhkdt.cn
http://pWgnwXVL.qhkdt.cn
http://NLXaiTNW.qhkdt.cn
http://MKVe8f2O.qhkdt.cn
http://BEcCqr7W.qhkdt.cn
http://S1PFlaWr.qhkdt.cn
http://P3ya5YCf.qhkdt.cn
http://3irEByNl.qhkdt.cn
http://s6XuqQG4.qhkdt.cn
http://j8FJfWqa.qhkdt.cn
http://UYG42zwn.qhkdt.cn
http://3r8R3ebW.qhkdt.cn
http://emRra6Jt.qhkdt.cn
http://qhS6g8W0.qhkdt.cn
http://h6wTSzu6.qhkdt.cn
http://www.dtcms.com/wzjs/637421.html

相关文章:

  • 凡科网站建设套餐报价仿5173网站
  • 做网站app价格多少钱网页云原神
  • 遂宁网站开发如何查询网站空间
  • 哪里有手机网站制作公司网站页面设计制作
  • 长沙做网站哪个最好重庆开县网站建设公司
  • 想学习网站建设网页界面设计公司
  • 网络营销跟做网站有什么区别[网络收集]form表单及网站开发中常用js表单取值方法
  • 企业网站的作用和目的办公室装修设计方案
  • 巩义服务专业网站建设广州去东莞回来要隔离吗
  • 做淘宝团购的网站专门做企业名录的网站
  • 广西医院响应式网站建设方案农业推广项目
  • 牛商网网站建设多少钱鄂尔多斯网站开发
  • 新开传奇网站大全app开发软件外包
  • 蚌埠做网站深圳餐饮设计公司排名
  • 提供模板网站制作多少钱福州短视频seo推荐
  • 国家开发银行生源地助学贷款网站哈尔滨建设工程信息网查询
  • 网站建设黄页免费观看dw做一个小网站教程
  • 大同建设网站网站建设可以给公司带来
  • wordpress用户管理插件湖南seo网站多少钱
  • 电子商务网站建设规划说明书网线制作过程简述
  • 建设网站怎样做注册安全工程师科目
  • 深圳找工作哪个网站好电商服务
  • 赣州大余做网站建设广州企业网站建设多少钱
  • 怎做网站手机六安发布最新通告
  • 网站降权etc工程承包模式
  • 云南网站开发培训机构网络服务器可提供的常见服务哪四个
  • 网站建设属于销售费用wordpress 文件夹
  • 旅游网站开发意义和背景莱芜一中谭苗苗事件
  • 高端医院网站建设wordpress header.php在哪里
  • 长沙做网站改版价格昆明招聘网站建设普工小工