当前位置: 首页 > wzjs >正文

城乡建设部网站第35号令视觉设计作品

城乡建设部网站第35号令,视觉设计作品,怎样进入wordpress,一对一软件提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解 在现代应用中,特别是像社交平台、金融系统等高并发场景下,如何高效地处理大量实时数据成为了系统设计的一个关键问题。BufferTrigger 是由快手开源的一个工具&#…

提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

在现代应用中,特别是像社交平台、金融系统等高并发场景下,如何高效地处理大量实时数据成为了系统设计的一个关键问题。BufferTrigger 是由快手开源的一个工具,专为解决大数据流处理场景中的缓冲与批量处理问题。本文将详细讲解如何在 Spring Boot 项目中使用 BufferTrigger,帮助你提高系统的吞吐量与响应速度,减少 I/O 操作,从而提升整体性能。

BufferTrigger 简介:如何高效处理实时数据流

快手开源的 BufferTrigger 是一个用于数据处理,它主要用于实时数据流处理场景。BufferTrigger 的主要作用是为了解决在大数据流处理中常见的问题:如何高效地对连续的数据流进行缓冲,并在满足一定条件时触发下游计算或存储操作。

使用 BufferTrigger 优势如下:

  1. 提高效率:通过批量处理数据而不是逐条处理,可以显著减少 I/O 操作的次数,从而提升整体处理效率。
  2. 资源优化:对于一些需要消耗较多计算资源的操作(如写入数据库、调用外部服务等),通过累积一批数据后再执行一次这样的操作,可以更有效地利用系统资源。
  3. 简化逻辑:对于开发者而言,使用 BufferTrigger 可以帮助简化代码逻辑,将注意力集中在业务逻辑上而不是复杂的缓冲控制逻辑上。
  4. 灵活配置:支持多种触发策略(比如基于时间窗口、基于数据量大小等),使得用户可以根据具体应用场景灵活选择最合适的触发方式。
  5. 易于集成:设计上考虑了与现有数据处理框架的良好兼容性,使得它可以方便地与其他组件一起工作,在现有的技术栈中引入该功能变得更加容易。

如何添加依赖:快速集成到 Spring Boot 项目

只需要在 pom.xml 中添加以下依赖,即可将 BufferTrigger 集成到你的 Spring Boot 项目中:

 <properties>// 省略...<buffertrigger.version>0.2.21</buffertrigger.version></properties><!-- 统一依赖管理 --><dependencyManagement><dependencies>// 省略...<!-- 快手 Buffer Trigger --><dependency><groupId>com.github.phantomthief</groupId><artifactId>buffer-trigger</artifactId><version>${buffertrigger.version}</version></dependency></dependencies></dependencyManagement>

快手 BufferTrigger 使用讲解

  • 核心概念
    1. 缓冲队列:BufferTrigger 会维护一个内部缓冲区,用来缓存从外部接收的数据。它允许指定缓存队列的最大容量,当达到上限时会根据预设的触发策略进行数据的批量处理。
    2. 触发策略:触发策略是指何时将缓存的数据批量提交进行处理。常见的触发策略有:
      • 基于数据量:当缓存的数据达到指定大小时触发处理。
      • 基于时间窗口:每隔一定时间就触发一次处理。
      • 混合触发:同时满足数据量和时间条件时触发。
    3. 数据消费:通过 BufferTrigger 提供的消费者回调机制,开发者可以自定义数据消费的逻辑。一般情况下,消费的过程是对缓存的数据进行处理、存储或其他操作。
    4. 批量处理:将一批数据聚合后一起处理,而不是一条一条地处理。这样能够减少 I/O 操作的次数,从而提高系统的吞吐量。

使用案例

在许多社交平台上,网红或明星的粉丝数通常会发生频繁的波动。比如,当一个网红被大量用户关注或取消关注时,这些信息会通过消息队列(如 RocketMQ)快速传递,系统需要高效地处理这些变化,并更新到缓存或数据库中。在这种场景下,如果每次有粉丝关注或取消关注时都进行一次 I/O 操作,会导致系统的负载过大,尤其是在并发请求较高时。

为了提高系统的性能,减少频繁的 I/O 操作,通常采用 批量处理 的方式来对消息进行合并和延迟处理。BufferTrigger 就是为了应对这种高并发和实时性要求的场景,它能够将多条消息缓存起来,当满足触发条件时(比如缓存队列达到一定大小或时间窗口到期),将这些消息批量处理,从而减少与缓存系统的交互次数,提升系统的吞吐量和响应速度。

@Component
@RocketMQMessageListener(consumerGroup = "xiaohashu_group_" + MQConstants.TOPIC_COUNT_FANS, // Group 组topic = MQConstants.TOPIC_COUNT_FANS // 主题 Topic
)
@Slf4j
public class CountFansConsumer implements RocketMQListener<String> {@Resourceprivate RedisTemplate<String, Object> redisTemplate;private BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking().bufferSize(50000) // 缓存队列的最大容量.batchSize(1000)   // 一批次最多聚合 1000 条.linger(Duration.ofSeconds(1)) // 多久聚合一次.setConsumerEx(this::consumeMessage).build();@Resourceprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void onMessage(String body) {// 往 bufferTrigger 中添加元素bufferTrigger.enqueue(body);}private void consumeMessage(List<String> bodys) {log.info("==> 聚合消息, size: {}", bodys.size());log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys));// List<String> 转 List<CountFollowUnfollowMqDTO>List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOS = bodys.stream().map(body -> JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class)).toList();//按照用户进行一个分组Map<Long, List<CountFollowUnfollowMqDTO>> groupMap  =countFollowUnfollowMqDTOS.stream().collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getUserId));// 按组汇总数据,统计出最终的计数// key 为目标用户ID, value 为最终操作的计数Map<Long, Integer> countMap = Maps.newHashMap();for (Map.Entry<Long, List<CountFollowUnfollowMqDTO>> entry : groupMap.entrySet()) {List<CountFollowUnfollowMqDTO> list = entry.getValue();// 最终的计数值,默认为 0int finalCount = 0;for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO : list) {Integer type = countFollowUnfollowMqDTO.getType();FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);// 若枚举为空,跳到下一次循环if (Objects.isNull(followUnfollowTypeEnum)) {continue;}switch (followUnfollowTypeEnum) {case FOLLOW -> finalCount += 1;case UNFOLLOW -> finalCount -= 1;}}// 将分组后统计出的最终计数,存入 countMap 中countMap.put(entry.getKey(), finalCount);}log.info("## 聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));// 更新 RediscountMap.forEach((k, v) -> {// Redis KeyString redisKey = RedisKeyConstants.buildCountUserKey(k);// 判断 Redis 中 Hash 是否存在boolean isExisted = redisTemplate.hasKey(redisKey);// 若存在才会更新// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)if (isExisted) {// 对目标用户 Hash 中的粉丝数字段进行计数操作redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, v);}});// 发送 MQ, 计数数据落库// 构建消息体 DTOMessage<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap)).build();// 异步发送 MQ 消息,提升接口响应速度rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("==> 【计数服务:粉丝数入库】MQ 发送成功,SendResult: {}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.error("==> 【计数服务:粉丝数入库】MQ 发送异常: ", throwable);}});}}

代码解析

  • BufferTrigger 的构建:通过 .batchBlocking() 创建一个 BufferTrigger 实例,该实例设置了缓存队列的最大容量、每批次最多处理的消息数量、以及聚合的时间窗口等配置。
  • enqueue(body):每接收到一条消息,就将消息加入到缓冲队列中,BufferTrigger 会根据设定的策略决定何时批量处理这些数据。
  • consumeMessage(List<String> bodys):当数据满足触发条件时(如缓存队列满或时间窗口到期),consumeMessage 会被调用,处理聚合后的数据。
http://www.dtcms.com/wzjs/593574.html

相关文章:

  • 邢台网站招聘员工123四川手机网站开发
  • 外贸网站推广渠道成都企业模板建站
  • 郑州网站设计的公司长沙商城网站开发
  • 专门做橱柜衣柜效果图的网站网站建设服务提供商
  • 网站建设可以入开发成本吗网站优化工作安排
  • 做网页跳转网站泰安百度推广代理
  • 网站运营方案潍坊学校网站建设
  • 泸州软件开发公司苏州网站优化企业
  • 做外贸做什么网站好中亿丰建设集团股份有限公司官方网站
  • 中山 网站制作做的网站怎么打开是白板
  • 建材网站建设哪家怎么注销网站查备案
  • 长春教做网站带维护的培训机构网站开发国内外研究状况
  • 模板建站费用住建网站需多少钱
  • 网站开发找哪家好在线制作海报网站
  • 门户网站做免费相亲的学生100元保险网站
  • h5网站开发价格网页素材提取
  • 做彩票网站抓到判几年wed是什么意思
  • 苏州网站建设苏州金融网站建设
  • ps怎么做响应式网站布局图广东广东深圳网站建设
  • 资讯类网站建设方案书网站怎么才有alexa排名
  • 杭州有哪些网站建设新手php网站建设
  • 个人网站怎么做支付物流网站怎么做代理
  • 福州网站设计哪家比较好什么做电子书下载网站
  • 做关于车的网站好网站建设源程序代码
  • 门户网站 建设 通知开发公司把已经出售的房子一房二卖卖给股东个人
  • 盐亭做网站珠海网站设计报价
  • 网站建设 599wordpress网址改坏了
  • 在线商城网站怎么做chatgpt网页
  • 网站权重怎么做网站结构该怎么做
  • 网站建设经济效益凡客旗下商城