Flink中Kafka连接器的基本应用
文章目录
- 前言
- Kafka连接器基础案例演示
- 前置说明和环境准备步骤
- Kafka连接器基本配置
- 关联数据源
- 映射转换
- 案例效果演示
 
- 基于Kafka连接器同步数据到MySQL
- 案例说明
- 前置准备
- Kafka连接器消费位点调整
- 映射转换与数据投递
- MysqlSlink持久化收集器数据
- 最终效果演示
 
- 小结
- 参考
 
前言
本文将基于内置kafka连接器演示如何使用kafka内置流收集器的api完成Kafka数据的采集,同时我们也会给出一个收集Kafka数据流数据保存到MySQL的示例,希望对你有帮助。
Kafka连接器基础案例演示
前置说明和环境准备步骤
本案例将基于Kafka投递的单词(用逗号分隔),通过flink完成抽取,切割为独立单词,并完成词频统计,例如我们输入hello,world,最终控制台就会输出hello,1和world,1。
在正式演示之前,笔者介绍一些flink的使用版本:
<flink.version>1.16.0</flink.version>
对应还有下面这些依赖分别用于:
- 使用Kafka连接器
- 使用hutool的jdbc连接器
- MySQL驱动包
 <!-- CSV Format for Kafka (因为你的配置中用了 'format' = 'csv') --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- JDBC Connector (用于你的 spend_report 表写入 MySQL) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 推荐使用 8.0.x 版本 --></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency>
完成这些后我们将Kafka等相关环境准备好就可以着手编码工作了。
Kafka连接器基本配置
首先我们基于StreamExecutionEnvironment 初始化环境构建配置:
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();然后我们就可以基于内置的KafkaSource的建造者模式完成如Kafka连接器的构建:
- setBootstrapServers设置- Kafka地址为- broker字符串配置的ip和端口号
- setTopics设置消费的主题为- input-topic
- setGroupId当前kafka消费者组为- my-group
- setStartingOffsets设置为从最早偏移量开始消费
- setValueOnlyDeserializer设置收到- Kafka数据时直接反序列化为字符串
对应的代码如下所示:
	//基于建造者模式完成Kafka连接器的配置KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers)//设置Kafka server端地址.setTopics("input-topic") //指定消费的Topic为input-topic.setGroupId("my-group")//设置消费组ID为my-group.setStartingOffsets(OffsetsInitializer.earliest())//设置从Kafka的最开始位置开始消费.setValueOnlyDeserializer(new SimpleStringSchema())// 设置数据直接反序列化为字符串.build();
这里需要补充一下关于Kafka消费位点的设置,flink已经内置了如下几种消费位点的设置,对应的代码配置示例如下,读者可参阅并进行配置:
KafkaSource.builder()