网站搭建找谁培训机构在哪个平台找
根据企业需求,自己重新实现分区器
只需要定义类实现Partitioner接口,然后重写partition()方法即可
假设现在有一个需求,发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
package com.example.kafkademo.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口Partitioner* 2. 实现3个方法:partition,close,configure* 3. 编写partition方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 重写这个方法* @param topic 主题* @param key 消息的key* @param keyBytes 消息的key序列化后的字节数组* @param value 消息的值* @param valueBytes 消息的值序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return 信息对应的分区*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区return msgValue.contains("cuihaida") ? 0 : 1;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
使用分区器的方法,在生产者的配置中添加分区器参数
package com.example.kafkademo.util;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;public class CommonUtils {/*** kafka生产者配置配置* @return 配置内容*/public static Properties buildKafkaProperties() {// 1. 创建kafka生产者配置对象Properties properties = new Properties();// 2. 给kafka的配置对象添加信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key, value初始化【必须有】properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// =========> 添加自定义分区器 <============properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafkademo.producer.MyPartitioner")return properties;}
}