当前位置: 首页 > news >正文

kafka自定义分区器

根据企业需求,自己重新实现分区器

只需要定义类实现Partitioner接口,然后重写partition()方法即可

假设现在有一个需求,发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区

package com.example.kafkademo.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口Partitioner* 2. 实现3个方法:partition,close,configure* 3. 编写partition方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 重写这个方法* @param topic 主题* @param key 消息的key* @param keyBytes 消息的key序列化后的字节数组* @param value 消息的值* @param valueBytes 消息的值序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return 信息对应的分区*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区return msgValue.contains("cuihaida") ? 0 : 1;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

使用分区器的方法,在生产者的配置中添加分区器参数

package com.example.kafkademo.util;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;public class CommonUtils {/*** kafka生产者配置配置* @return 配置内容*/public static Properties buildKafkaProperties() {// 1. 创建kafka生产者配置对象Properties properties = new Properties();// 2. 给kafka的配置对象添加信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key, value初始化【必须有】properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// =========> 添加自定义分区器 <============properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafkademo.producer.MyPartitioner")return properties;}
}
http://www.dtcms.com/a/264157.html

相关文章:

  • Webpack的插件机制Tapable
  • 华为认证二选一:物联网 VS 人工智能,你的赛道在哪里?
  • 打造 AI 产品的前端架构:响应式、流式、智能交互三合一
  • uv介绍以及与anaconda/venv的区别
  • C#系统学习第七章——数组
  • python 继承
  • 《UE5_C++多人TPS完整教程》学习笔记39 ——《P40 远程过程调用(Remote Procedure Calls)》
  • 增材制造研究领域:3D 打印设计国际会议
  • 责任链模式 Go 语言实战
  • 电脑系统重装有什么用?
  • 动手实践:如何提取Python代码中的字符串变量的值
  • AI问答-vue3:如何选择使用reactive或ref
  • 【HarmonyOS】鸿蒙使用仓颉编程入门
  • 基于Halcon平台的常规OCR与深度OCR性能对比分析
  • 设计模式(行为型)-访问者模式
  • python训练day46 通道注意力
  • 【kernel8】spi协议,验证,模型,设备树处理,spidev,衍生协议
  • AI人工客服实战指南:基于大模型构建生产级智能对话系统
  • Hadoop、Spark、Flink 三大大数据处理框架的能力与应用场景
  • ESP32-S3开发板深度评测:AI语音识别与图像处理全面解析
  • C++ 第四阶段 STL 容器 - 第九讲:详解 std::map 与 std::unordered_map —— 关联容器的深度解析
  • Springboot整合高德地图
  • NeurIPS-2023《A Definition of Continual Reinforcement Learning》
  • 基于GD32 MCU的IAP差分升级方案
  • 迎战 AI Overviews:SEO 不被淘汰的实战策略
  • SpringBoot全局异常详解
  • Electron 应用打包与分发:从开发到交付的完整指南
  • 多容器应用与编排——AI教你学Docker
  • Java-String类静态成员方法深度解析
  • AR 地产互动沙盘:为地产沙盘带来变革​