当前位置: 首页 > wzjs >正文

辽宁省住房和城乡建设部网站网页图片怎么保存为pdf文件

辽宁省住房和城乡建设部网站,网页图片怎么保存为pdf文件,做网站 会计分录,wordpress投票插件wp-polls🌊 消息队列处理模式:流式与批处理的艺术 📌 深入解析现代分布式系统中的数据处理范式 一、流式处理:实时数据的"活水" 在大数据时代,流式处理已成为实时分析的核心技术。它将数据视为无限的流,…

🌊 消息队列处理模式:流式与批处理的艺术

📌 深入解析现代分布式系统中的数据处理范式

一、流式处理:实时数据的"活水"

在大数据时代,流式处理已成为实时分析的核心技术。它将数据视为无限的流,而非有限的集合,实现了毫秒级的数据处理响应。

1️⃣ Kafka Streams核心概念

Kafka Streams是构建在Kafka之上的客户端库,提供了强大的流处理能力:

// Kafka Streams应用示例
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders-topic");// 过滤出大额订单并转换为通知消息
KStream<String, Notification> notifications = orders.filter((key, order) -> order.getAmount() > 10000).mapValues(order -> new Notification("大额订单提醒", order));// 输出到通知主题
notifications.to("notifications-topic");

核心抽象

  • KStream:代表无界、连续的记录流
  • KTable:可更新的数据表视图,支持查询
  • GlobalKTable:全局分布式表,适合小规模数据关联

2️⃣ 窗口计算与状态管理

流处理中,窗口是处理时间维度数据的关键机制:

窗口类型特点应用场景
滚动窗口固定大小,不重叠每分钟订单统计
滑动窗口固定大小,可重叠最近5分钟热门商品
会话窗口动态大小,基于活动间隔用户行为分析

状态存储

// 配置状态存储
StoreBuilder<KeyValueStore<String, Long>> countStore =Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("counts"),Serdes.String(),Serdes.Long());// 注册状态存储
builder.addStateStore(countStore);// 使用状态存储进行计算
orders.process(() -> new OrderProcessor(), "counts");

3️⃣ Exactly-Once实现

Kafka Streams通过事务和幂等生产者实现了端到端的精确一次语义:

// 配置Exactly-Once语义
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

实现原理

  • 消费者偏移量与处理结果在同一事务中提交
  • 幂等生产者确保重试不会导致重复
  • 事务协调器管理跨分区的原子性

二、批处理:大规模数据的"蓄水池"

批处理适合处理大量历史数据,或者定期执行的数据处理任务。

1️⃣ 消息积压处理策略

当消息堆积时,系统面临巨大压力,需要合理的处理策略:

// 消费者配置批量拉取
Properties props = new Properties();
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB

积压处理最佳实践

  • 临时扩容:增加消费者实例和分区数
  • 跳过非关键消息:设置过滤条件,优先处理重要消息
  • 批量压缩存储:将积压消息归档,延后处理

2️⃣ 消费者并行度调整

合理的并行度设计是批处理性能的关键:

// 动态调整消费者线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy()
);// 根据积压量动态调整线程数
if (getLagSize() > 10000) {executor.setCorePoolSize(executor.getCorePoolSize() + 5);
}

并行度优化公式

  • 理想并行度 = min(分区数, 可用CPU核心数 × (1 + I/O等待比率))
  • 消费者实例数 ≤ 分区数(避免资源浪费)

3️⃣ 背压控制机制

背压(Backpressure)是处理生产速度超过消费速度的关键机制:

// RxJava背压示例
Flowable.create(emitter -> {// 消息源for (Message msg : messageSource) {if (emitter.isCancelled()) return;// 检查背压while (!emitter.requested() > 0) {Thread.sleep(100);}emitter.onNext(msg);}emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.onBackpressureBuffer(10000, () -> log.warn("缓冲区已满"))
.observeOn(Schedulers.io(), false, 512)
.subscribe(message -> process(message));

背压策略对比

策略描述适用场景
缓冲使用队列暂存过多消息短暂峰值,内存充足
丢弃丢弃无法处理的消息非关键数据,如监控
限流降低生产者发送速率关键业务,不允许丢失
采样只处理部分消息统计分析类应用

三、流批融合:未来的趋势

现代数据处理框架正在打破流处理和批处理的界限:

// Flink流批统一处理示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 批处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 或流处理模式
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 相同的代码,不同的执行模式
DataStream<Order> orders = env.fromSource(KafkaSource.<Order>builder().setTopics("orders").setValueOnlyDeserializer(new OrderDeserializer()).build(),WatermarkStrategy.noWatermarks(),"Kafka Orders"
);orders.keyBy(Order::getCustomerId).window(TumblingEventTimeWindows.of(Time.minutes(5))).aggregate(new OrderAggregator()).sinkTo(new DatabaseSink());

融合优势

  • 统一的编程模型,降低开发复杂度
  • 灵活切换处理模式,适应不同场景
  • 充分利用历史数据增强实时分析

🔍 关注我,每周解锁更多分布式系统与消息队列的技术干货!


文章转载自:

http://KKTSzLl6.tjmmc.cn
http://4dy08gGR.tjmmc.cn
http://K50yJASq.tjmmc.cn
http://wToB8TQo.tjmmc.cn
http://3jfHuin3.tjmmc.cn
http://ytVPILQ1.tjmmc.cn
http://5pgEfARO.tjmmc.cn
http://ZhErHHor.tjmmc.cn
http://1UtaJi1x.tjmmc.cn
http://mWrjVi4x.tjmmc.cn
http://6G8m0JO0.tjmmc.cn
http://tY7byOYT.tjmmc.cn
http://qabObRNd.tjmmc.cn
http://c3IIu8Us.tjmmc.cn
http://AfUIzIab.tjmmc.cn
http://qo230H9r.tjmmc.cn
http://xNEwkH5T.tjmmc.cn
http://nqNFBKy3.tjmmc.cn
http://uGKHPO5j.tjmmc.cn
http://WJKYEnMm.tjmmc.cn
http://VEfiMPHY.tjmmc.cn
http://kVtbc7yH.tjmmc.cn
http://zjxFNdfG.tjmmc.cn
http://bdWtFRXY.tjmmc.cn
http://JemL2okG.tjmmc.cn
http://qAnelvwi.tjmmc.cn
http://V0QvHq0x.tjmmc.cn
http://hf7gZ5KX.tjmmc.cn
http://DZVdHFBA.tjmmc.cn
http://U6RS3Dja.tjmmc.cn
http://www.dtcms.com/wzjs/734441.html

相关文章:

  • 英文网站制作 官网濮阳网站建设熊掌网络
  • 安阳网站建设优化渠道wordpress 出错
  • 贵州网站建站python搭建个人网站
  • dw怎么做网站相册黄页大全18勿看2000网站
  • 做网站还需要搜狗吗西安网站建设推广
  • 阜阳企业网站推广凡科做网站
  • 专门做婚纱儿童摄影网站网站建设毕业答辩ppt怎么写
  • 找兼职h5网站开发人员杭州seo外包服务
  • 免费ai设计logo网站wordpress zhai主题
  • 山西设计网站公司北京做网站哪家专业
  • wordpress客户端建站南冒网站建设制作推广公司
  • 做网站技术员怎么去推广一个产品
  • 做竞赛的平台或网站大型集团网站建设公司
  • 企业网站制作一般多少钱网站建设更新不及时
  • 上海做网站比较好的网站建设服务合同印花税
  • 建站市场开发商城微信小程序
  • 网站怎么做下拉刷新页面数据wordpress sae 安装主题
  • 网站建设基础策划书建网站做站在
  • 开网站挣不挣钱自助建站源码下载
  • 如何开个公司网站甘肃省和城乡建设厅网站
  • 自己做的网站如何让别人访问牡丹江市建设行业协会网站
  • 惠州网站建设推广wordpress注册文件
  • 网站开发和建设展厅设计制作公司会计分录
  • 上海网站建设框架图凡客诚品app下载
  • 网站视频下载到手机怎么做国家工商局网站官网
  • 建筑工程找工作哪个网站好wordpress评论优化插件
  • 企业信用网站建设凡科网站建设总结
  • 网站建设赶集网网站空间每年继费到哪交
  • 宁夏建设技术职业学院官方网站源码之家网站
  • 沧州做网站公司西安网络科技有限公司有哪些