当前位置: 首页 > news >正文

Kafka消息自定义序列化

文章目录

  • 1. 默认序列化
  • 2.自定义序列化
  • 3.示例
  • 4.自定义解序列化器


1. 默认序列化

在网络中发送数据都是以字节的方式,Kafka也不例外。Apache Kafka支持用户给broker发送各种类型的消息。它可以是一个字符串、一个整数、一个数组或是其他任意的对象类型。序列化器(serializer)负责在producer发送前将消息转换成字节数组;而与之相反,解序列化器(deserializer)则用于将consumer接收到的字节数组转换成相应的对象。
常见的serializer有:

  • ByteArraySerializer:本质上什么都不用做,因为已经是字节数组了。
  • ByteBufferSerializer:列化ByteBuffer。
  • BytesSerializer:序列化Kafka自定义的 Bytes 类。
  • DoubleSerializer:列化 Double 类型
  • IntegerSerializer:列化Integer 类型
  • LongSerializer:序列化Long类型。
  • StringSerializer:序列化 String 类型。

producer的序列化机制使用起来非常简单,只需要在构造producer时同时指定参数key.serializer 和 value.serializer的值即可,用户可以为消息的key和value 指定不同类型的 serializer,只要与解序列类型分别保持一致就可以。

2.自定义序列化

Kafka支持用户自定义消息序列化。若要编写一个自定义的serializer,需要完成以下3件事情。
1)定义数据对象格式。
2)创建自定义序列化类,实现 org.apache.kafka.common.serialization.Serializer 接口,在serializer方法中实现序列化逻辑。
3)在用于构造KafkaProducer 的Properties 对象中设置 key.serializer 或 value.serializer取决于是为消息key还是 value 做自定义序列化。

3.示例

下面结合一个实例来说明如何创建自定义的serializer。首先定义待序列化的数据对象。本例中使用一个简单的Java POJO对象,如下面的代码所示:

public class User {
    private String firstName;
    private String lastName;
    private int age;
    private String address;

    public User(String firstName, String lastName, int age, String address) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
        this.address = address;
    }
    @Override
    public String toString() {
        return "User{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", age=" + age +
                ", address='" + address + '\'' +
                '}';
    }
}

接下来创建 serializer。本例中使用了jackson-mapper-asl包的 ObjectMapper 帮助我们直接把对象转成字节数组。为了使用该类,你需要在producer工程中增加依赖:

<dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-mapper-asl</artifactId>
       <version>1.9.13</version>
</dependency>

UserSerializer代码如下:

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
    private ObjectMapper objectMapper;
    @Override
    public void configure(Map configs, boolean isKey) {
        objectMapper=new ObjectMapper();
    }
    @Override
    public byte[] serialize(String topic, User data) {
        byte[] ret =null;
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            ret=objectMapper.writeValueAsString(data).getBytes();
        } catch (IOException e) {
           e.printStackTrace();
        }
        return ret;
    }

    @Override
    public void close() {
    }
}

指定Serializer,然后构建消息发送:

import com.exm.collectcodenew.kafka.producer.customSerializer.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//必须指定
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定
        props.put("value.serializer", "com.exm.collectcodenew.kafka.producer.customSerializer.UserSerializer");//必须指定
        props.put("acks", "-1");
        props.put("retries", 3);
        props.put("batch.size", 323840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("max.block.ms", 3000);
        props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.customPartitioner.AuditPartitioner");
        Producer<String, String> producer = new KafkaProducer<>(props);
        //构建User对象
        User user = new User("Z","tt",18,"Beijing,China");
        ProducerRecord record = new ProducerRecord("topic-test",user);
        producer.send(record);
        producer.close();
    }
}

4.自定义解序列化器

Kafka支持用户自定义消息的deserializer。成功编写一个自定义的deserializer需要完成以下3件事情。
1)定义或复用 serializer 的数据对象格式,
2) 创建自定义 deserializer 类,令其实现 org.apache.kafka.common.serialization.Deserializer接口。在deserializer方法中实现 deserialize 逻辑。
3)在构造KafkaConsumer的Properties对象中设置key.deserializer和(或)value.deserializer为上一步的实现类。
依然使用序列化中的User 例子来实现自定义的 deserializer。代码如下。

import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserDeserializer implements Deserializer {
    private ObjectMapper objectMapper;
    @Override
    public void configure(Map configs, boolean isKey) {
        objectMapper = new ObjectMapper();
    }
    @Override
    public Object deserialize(String topic, byte[] data) {
        User user =null;
        try {
            user=objectMapper.readValue(data,User.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }finally {
            return user;
        }
    }
    @Override
    public void close() {
    }
}

消费者代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//必须指定
        props.put("group.id","test-group");//必须指定
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定
        props.put("value.deserializer", "com.exm.collectcodenew.kafka.producer.customSerializer.UserDeserializer");//必须指定
        props.put("enable.auto.commit","true");
        props.put("auto.commit.interval.ms","1000");
        props.put("auto.offset.reset","earliest");//从最早的消息开始读取
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //创建consumer实例
        consumer.subscribe(Arrays.asList("topic-test"));
        while(true){
            ConsumerRecords<String,String> records=consumer.poll(1000);
            for (ConsumerRecord<String, String> record: records){
                System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
            }
        }
    }
}

相关文章:

  • Android <queries>声明的作用和配置方法
  • 【yolo】YOLO训练参数输入之模型输入尺寸
  • wordpress表单插件CF7调用方式
  • 启动方法jupyter(Anaconda)
  • 【设计模式】装饰模式
  • Apache Tomcat CVE-2025-24813 安全漏洞
  • AI视频是否会影响原创价值
  • 如何提升库存系统的高并发和稳定性:算法与设计模式
  • 6.5840 Lab 3: Raft
  • 使用 Docker 构建 LangChain 开发环镜及 ChatOllama 示例
  • vscode/cursor中python运行路径设置 模块导入问题
  • vscode git 管理
  • PostgreSQL_数据表结构设计并创建
  • Unity将运行时Mesh导出为fbx
  • Nordic nRF 蓝牙的 Direct Test Mode (DTM) 测试介绍
  • 强大的AI网站推荐(第二集)—— V0.dev
  • 第六篇:Setup:组件渲染前的初始化过程是怎样的?
  • 【网络安全】从浏览器到服务端讲JavaScript安全
  • 星型拓扑网络发生网络风暴
  • 鸿蒙ArkTS+ArkUI实现五子棋游戏
  • 时代中国控股:前4个月销售额18.1亿元,境外债重组协议押后聆讯至5月底
  • 华泰柏瑞基金总经理韩勇因工作调整卸任,董事长贾波代为履职
  • 秦洪看盘|交易型资金收缩,释放短线压力
  • 71岁导演詹姆斯・弗雷病逝,曾执导《纸牌屋》、麦当娜MV
  • 正荣地产:前4个月销售14.96亿元,控股股东已获委任联合清盘人
  • Meta正为AI眼镜开发人脸识别功能