当前位置: 首页 > news >正文

Flink中自定义序列化器

Flink中有自己的序列化器和Kryo序列化器,当不满足Flink中类型定义的要求的的时候,就会回退到使用Kryo序列化器,而通常使用Kryo序列化器比使用Flink的序列化器性能要低很多。

当然Flink提供了一些当回退到了Kryo的时候,可以根据自己的类型来注册自定义的序列化器,位置见:https://nightlies.apache.org/flink/flink-docs-release-2.1/zh/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/

这里要求要实现Kryo的Serializer类,类全路径为:com.ericsoftware.kryo.Serializer,下面给一个实现的例子出来:

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.fory.Fory;
import org.apache.fory.ThreadSafeFory;
import org.apache.fory.config.Language;public class ForySerializer <T> extends Serializer<T> {public static final ThreadSafeFory fory = Fory.builder().withLanguage(Language.JAVA).withRefTracking(false).requireClassRegistration(false).buildThreadSafeFory();@Overridepublic void write(Kryo kryo, Output output, T object) {byte[] bytes = fory.serialize(object);output.writeInt(bytes.length);output.writeBytes(bytes);}@Overridepublic T read(Kryo kryo, Input input, Class<? extends T> type) {int length = input.readInt();byte[] bytes = input.readBytes(length);return (T) fory.deserialize(bytes);}@Overridepublic boolean isImmutable() {return false;}}

虽然Flink提供了以上的方式来自定义序列化器,但是当遇到以下这种情况的时候,用上面提到的方式,却是不会生效的:

@NoArgsConstructor
@AllArgsConstructor
@Data
public class CustomDataType {private String status;private RawValue rawValue;@NoArgsConstructor@AllArgsConstructor@Datapublic static class RawValue {private String key;private Object value;}}

接着我们翻阅官方发现了,可以通过定义TypeInfomationFactory的方式来,保证我们无法走Flink序列化器的类型走自定义的类型:

https://nightlies.apache.org/flink/flink-docs-release-2.1/zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

以下是针对TypeInfomationFactory方式实现的详情:

public class CustomTypeInfoFactory extends TypeInfoFactory<CustomDataType> {@Overridepublic TypeInformation<CustomDataType> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {return new CustomTypeInformation();}
}
public class CustomTypeInformation extends TypeInformation<CustomDataType> {private static final long serialVersionUID = 1L;@Overridepublic boolean isBasicType() {return false;}@Overridepublic boolean isTupleType() {return false;}@Overridepublic int getArity() {return 1;}@Overridepublic int getTotalFields() {return CustomDataType.class.getFields().length;}@Overridepublic Class<CustomDataType> getTypeClass() {return CustomDataType.class;}@Overridepublic boolean isKeyType() {return false;}@Overridepublic TypeSerializer<CustomDataType> createSerializer(SerializerConfig config) {return new CustomTypeSerializer();}@Overridepublic String toString() {return "CustomTypeInformation<CustomDataType>";}@Overridepublic boolean equals(Object obj) {return obj instanceof CustomTypeInformation;}@Overridepublic int hashCode() {return CustomTypeInformation.class.hashCode();}@Overridepublic boolean canEqual(Object obj) {return obj instanceof CustomTypeInformation;}
}

这里是最核心的定义序列化器的地方,这里使用了Apache Fory来序列化数据:

public class CustomTypeSerializer extends TypeSerializer<CustomDataType> {private static final long serialVersionUID = 1L;public static final ThreadSafeFory fory = Fory.builder().withLanguage(Language.JAVA).withRefTracking(false).requireClassRegistration(false).buildThreadSafeFory();public CustomTypeSerializer() {}@Overridepublic boolean isImmutableType() {return false;}@Overridepublic TypeSerializer<CustomDataType> duplicate() {return new CustomTypeSerializer();}@Overridepublic CustomDataType createInstance() {return new CustomDataType();}@Overridepublic CustomDataType copy(CustomDataType from) {CustomDataType.RawValue rawValue = new CustomDataType.RawValue();rawValue.setKey(from.getRawValue().getKey());rawValue.setValue(from.getRawValue().getValue());return new CustomDataType(from.getStatus(), rawValue);}@Overridepublic CustomDataType copy(CustomDataType from, CustomDataType reuse) {reuse.setStatus(from.getStatus());reuse.setRawValue(from.getRawValue());return reuse;}@Overridepublic int getLength() {return -1; // 可变长度}@Overridepublic void serialize(CustomDataType record, DataOutputView target) throws IOException {// 序列化逻辑byte[] bytes = fory.serialize(record);target.writeInt(bytes.length);target.write(bytes);}@Overridepublic CustomDataType deserialize(DataInputView source) throws IOException {// 反序列化逻辑int length = source.readInt();byte[] bytes = new byte[length];source.read(bytes);return (CustomDataType) fory.deserialize(bytes);}@Overridepublic CustomDataType deserialize(CustomDataType reuse, DataInputView source) throws IOException {return reuse;}@Overridepublic void copy(DataInputView source, DataOutputView target) throws IOException {serialize(deserialize(source), target);}@Overridepublic boolean equals(Object obj) {return obj instanceof CustomTypeSerializer;}@Overridepublic int hashCode() {return CustomTypeSerializer.class.hashCode();}@Overridepublic TypeSerializerSnapshot<CustomDataType> snapshotConfiguration() {return new CustomTypeSerializerSnapshot();}// 序列化器快照(用于状态兼容性)public static final class CustomTypeSerializerSnapshot extends SimpleTypeSerializerSnapshot<CustomDataType> {public CustomTypeSerializerSnapshot() {super(CustomTypeSerializer::new);}}
}

最后是我们的数据类型上加上注解:@TypeInfo(CustomTypeInfoFactory.class)。

接下来提供一个例子来跑通以上的自定义序列化器:

public class SerializerDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration);env.setParallelism(2);env.disableOperatorChaining();env.addSource(new SourceFunction<String>() {private volatile boolean flag = true;private Random rand = new Random();@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (flag) {Thread.sleep(1000);ctx.collect(String.format("%s@%s", rand.nextInt(100), LocalDateTime.now()));}}@Overridepublic void cancel() {flag = false;}}).map(new MapFunction<String, CustomDataType>() {@Overridepublic CustomDataType map(String value) throws Exception {String[] split = value.split("@");CustomDataType customDataType = new CustomDataType();customDataType.setStatus("alive");CustomDataType.RawValue rawValue = new CustomDataType.RawValue();rawValue.setKey(split[0]);rawValue.setValue(split[1]);customDataType.setRawValue(rawValue);return customDataType;}}).addSink(new SinkFunction<CustomDataType>() {@Overridepublic void invoke(CustomDataType value) throws Exception {System.out.println("结果为:"+value);}});env.execute();}}

其中最后附上我debug代码的时候,拷贝下来的代码栈,方便代码理解整个代码逻辑:

createTypeInfo:13, CustomTypeInfoFactory (com.bonree.serializers)
createTypeInfoFromFactory:1385, TypeExtractor (org.apache.flink.api.java.typeutils)
createTypeInfoFromFactory:1353, TypeExtractor (org.apache.flink.api.java.typeutils)
getTypeInfoFactory:1730, TypeExtractor (org.apache.flink.api.java.typeutils)          // 在这里去读取Pojo类上定义的@TypeInfo注解获取对应的TypeInfoFactory的
getClosestFactory:1790, TypeExtractor (org.apache.flink.api.java.typeutils)
createTypeInfoFromFactory:1340, TypeExtractor (org.apache.flink.api.java.typeutils)
createTypeInfoWithTypeHierarchy:882, TypeExtractor (org.apache.flink.api.java.typeutils)
privateCreateTypeInfo:861, TypeExtractor (org.apache.flink.api.java.typeutils)
getUnaryOperatorReturnType:608, TypeExtractor (org.apache.flink.api.java.typeutils)
getMapReturnTypes:184, TypeExtractor (org.apache.flink.api.java.typeutils)
map:425, DataStream (org.apache.flink.streaming.api.datastream)
main:41, SerializerDemo (com.bonree.serializers)

http://www.dtcms.com/a/506966.html

相关文章:

  • Linux 线程控制与同步互斥
  • 餐饮网站开发性能需求分析电商办属于哪个单位
  • 【Devops-Jenkins自动将Java Maven工程编译成jar、并打成Docker镜像,并上传Harbor】
  • 本地window10同步ubuntu上conda指定环境,并在C#项目中通过Python.NET调用自定义python接口
  • 蕲春网站建设一个网站怎么优化
  • 实战:Python爬虫如何模拟登录与维持会话状态
  • 【完整源码+数据集+部署教程】【天线&化学】航拍图屋顶异常检测系统源码&数据集全套:改进yolo11-ContextGuided
  • Blender霓虹灯牌发光标志店招门牌生成器插件 Procedural Signs V2.0.0+预设包
  • LeetCode——二分(初阶)
  • 高端网站建设公司注意什么广告公司名字大全参考
  • 哈尔滨网站建设运营网站建设优化开发公司排名
  • Linux小课堂: 输入重定向与管道操作详解
  • AI+BI工具全景指南:重构企业数据决策效能
  • 全球云服务震荡:Amazon Web Services (AWS) 出现大规模故障 多项线上服务受冲击
  • 3.Rocky Linux 磁盘管理
  • led行业网站源码asp网站浏览器兼容
  • Linux基本指令(3)
  • TypeScript 快速入门与环境搭建
  • Python数据分析实战:基于PISA 2022金融素养数据集,解构“借衣服收2元”背后的青少年金钱观【数据集可下载】
  • 用TikZ绘制专业流程图:从入门到进阶(基于D3QN训练流程)
  • Java 反射机制实战:对象属性复制与私有方法调用全解析
  • 火星时代UE奶瓜粒子特效②
  • 网站制作流程图wordpress 维文版
  • MySQL与K8s:数据库运维新范式
  • 第9篇 opencv提取矩形角度不是很准确的处理方法
  • 检测十字标 opencv python
  • NSSCTF - Web | 【SWPUCTF 2021 新生赛】Do_you_know_http
  • Linux小课堂: 流、重定向与 cut 命令进阶
  • 虚拟内存核心常识
  • ubuntu配置mysql8.0并设置Navicat网络连接