当前位置: 首页 > news >正文

从零开始学Flink:数据源

在实时数据处理场景中,数据源(Source)是整个数据处理流程的起点。Flink作为流批一体的计算框架,提供了丰富的Source接口支持,其中通过Kafka获取实时数据是最常见的场景之一。本文将以Flink DataStream API为核心,带你从0到1实现“从Kafka消费数据并输出到日志”的完整流程,掌握Flink Source的核心用法。

一、为什么选择Kafka作为Flink的数据源?

Kafka作为分布式流处理平台,具备高吞吐量、低延迟、持久化存储等特性,是实时数据管道的首选。Flink与Kafka的集成方案经过多年优化,支持:

  • 高吞吐量:单集群可处理数十万条/秒的消息,满足大规模实时数据处理需求;
  • 持久化存储:数据按时间顺序写入磁盘并保留一定周期,支持离线重放和故障恢复;
  • 精确一次(Exactly-Once)消费语义:通过Kafka偏移量(Offset)管理和Flink检查点(Checkpoint)机制保证数据一致性;
  • 动态分区发现:自动感知Kafka主题的分区变化(如新增分区),无需重启任务;
  • 灵活的消费模式:支持从指定偏移量、时间戳或最新位置开始消费。

二、环境准备与依赖配置

1. 版本说明

本文基于以下版本实现(需保持版本兼容):

  • Flink:1.20.1(最新稳定版)
  • Kafka:3.4.0(Flink Kafka Connector兼容Kafka 2.8+)
  • JDK:17+
  • gradle 8.3+

2. gradle依赖

在gradle添加Flink核心依赖及Kafka Connector依赖,build.gradle配置可以是如下:

plugins {id 'java' // Java项目插件id 'application' // 支持main方法运行}// 设置主类(可选,用于application插件)application {mainClass.set('com.cn.daimajiangxin.flink.source.KafkaSourceDemo') // 替换为你的主类全限定名}// 依赖仓库(Maven中央仓库)repositories {mavenCentral()}// 依赖配置dependencies {// Flink核心依赖(生产环境通常标记为provided,由Flink运行时提供)implementation 'org.apache.flink:flink-java:1.20.1'implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1'// Flink Kafka Connector(新版API,兼容Kafka 2.8+)implementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1'// SLF4J日志门面 + Log4j实现(避免日志警告)implementation 'org.apache.logging.log4j:log4j-api:2.17.1'implementation 'org.apache.logging.log4j:log4j-core:2.17.1'implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'}// 编译配置(可选,根据需要调整)tasks.withType(JavaCompile) {options.encoding = 'UTF-8' // 指定编码sourceCompatibility = JavaVersion.VERSION_17 // 兼容Java 8targetCompatibility = JavaVersion.VERSION_17}

三、核心概念:Flink Kafka Source的工作原理

在深入代码前,需理解Flink Kafka Source的核心组件:
  • KafkaSource:Flink提供的Kafka数据源连接器,负责与Kafka Broker建立连接、拉取消息;
  • 反序列化器(Deserializer):将Kafka消息的字节数组(byte[])转换为Flink可处理的数据类型(如String、POJO、Row等);
  • 偏移量管理:记录已消费的Kafka消息位置(Offset),确保故障恢复时能从断点继续消费;
  • 检查点(Checkpoint):Flink的容错机制,定期将状态(包括偏移量)持久化到存储系统(如HDFS),保证Exactly-Once语义。

四、核心代码实现:从Kafka读取数据并输出到日志

1. 流程概述

整个流程分为5步:

  1. 配置Kafka连接参数(如Broker地址、主题、消费者组);
  2. 创建Flink流执行环境(StreamExecutionEnvironment);
  3. 定义Kafka Source(使用新版KafkaSource);
  4. 将Source添加到执行环境,并处理数据(如打印到日志);
  5. 触发任务执行。

2.代码详解

以下是完整的示例代码,包含详细注释:

package com.cn.daimajiangxin.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class KafkaSourceDemo {private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceDemo.class);public static void main(String[] args) throws Exception {// 1. 创建Flink流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 可选:启用检查点(生产环境必选,保证Exactly-Once语义)env.enableCheckpointing(5000); // 每5秒做一次检查点// 启用检查点env.enableCheckpointing(5000); // 每5秒做一次检查点// 设置检查点超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);// 2. 配置Kafka参数String kafkaBootstrapServers = "172.30.244.152:9092"; // Kafka Broker地址String topic = "test_topic"; // 目标主题String consumerGroup = "flink-consumer-group"; // 消费者组IDLOG.info("Connecting to Kafka at " + kafkaBootstrapServers);LOG.info("Consuming topic: " + topic);LOG.info("Consumer group: " + consumerGroup);// 3. 定义Kafka Source(新版API)KafkaSource`<String>` kafkaSourceDemo = KafkaSource.`<String>`builder().setBootstrapServers(kafkaBootstrapServers) // Kafka Broker地址.setTopics(topic) // 订阅的主题.setGroupId(consumerGroup) // 消费者组.setProperty("enable.auto.commit", "true").setProperty("auto.commit.interval.ms", "1000").setProperty("session.timeout.ms", "30000").setProperty("retry.backoff.ms", "1000").setProperty("reconnect.backoff.max.ms", "10000").setDeserializer(new KafkaRecordDeserializationSchema `<String>`() {@Overridepublic void deserialize(ConsumerRecord<byte[], byte[]> record, Collector `<String>` out) throws IOException {// 从ConsumerRecord中提取值(字节数组),并转为字符串String value = new String(record.value(), StandardCharsets.UTF_8);LOG.info("Received message: " + value);out.collect(value); // 将反序列化后的数据收集到Flink流中}@Overridepublic TypeInformation`<String>` getProducedType() {return TypeInformation.of(String.class);}})// 从最早偏移量开始消费(这样即使没有新消息也会读取历史数据).setStartingOffsets(OffsetsInitializer.earliest()).build();// 4. 将Kafka Source添加到Flink流环境,并处理数据DataStream`<String>` kafkaStream = env.fromSource(kafkaSourceDemo,WatermarkStrategy.noWatermarks(), // 无水印(适用于无序数据场景)"Kafka Source" // Source名称(用于监控));LOG.info("Kafka source created successfully");// 5. 处理数据:将每条数据打印到日志(实际生产中可替换为写入数据库、消息队列等)kafkaStream.print("KafkaData");LOG.info("Flink Kafka Source Demo started.");// 6. 触发任务执行env.execute("Flink Kafka Source Demo");}}

3. 关键配置说明

  • KafkaSource.Builder:新版Kafka Source的核心构建器,支持灵活配置;
  • setDeserializer:指定反序列化方式,deserialize 接收Kafka的ConsumerRecord(包含键、值、偏移量等信息),提取值(record.value())并反序列化为字符,getProducedType声明输出数据的类型(此处为String);
  • setStartingOffsets:控制消费起始位置(latest()从最新数据开始,earliest()从最早数据开始,生产环境常用OffsetsInitializer.committedOffsets()从上次提交的偏移量继续);
  • WatermarkStrategy:用于事件时间(Event Time)处理,示例中无时间窗口需求,故使用noWatermarks();
  • PrintSinkFunction:Flink内置的日志打印Sink(true表示打印完整上下文,包含Subtask信息)。

五、运行与测试

在WSL2的Ubuntu 环境中安装Kafka。

1. 安装Kafka服务

  • 下载Kafka二进制包
    访问Apache Kafka官网,选择最新稳定版(如3.9.0),使用wget下载:
  wget https://mirrors.aliyun.com/apache/kafka/3.9.0/kafka_2.12-3.9.0.tgz
  • 解压并配置环境变量
  # 解压到/opt/kafka(全局可访问)sudo mkdir -p /opt/kafkatar -zxvf kafka_2.12-3.9.0.tgz -C /opt/kafka --strip-components=1# 永久生效(编辑~/.bashrc)echo 'export KAFKA_HOME=/opt/kafka' >> /etc/profileecho 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profilesource /etc/profile

2. 配置Kafka

Kafka的核心配置文件位于$KAFKA_HOME/config目录,需修改以下两个文件:

配置Kafka Broker(server.properties)

修改以下关键参数以适配WSL2环境:

# ==================== 核心角色与ID配置 ====================# 启用KRaft模式(默认已启用)# 单节点同时担任Broker和控制器process.roles=broker,controller# 节点唯一ID(单节点必须设为0)node.id=0# 控制器ID(与node.id一致,单节点唯一)controller.id=0# ==================== 监听端口配置 ====================# 全局监听端口(客户端读写请求)和控制器监听端口# 多个监听器使用逗号分隔,每个监听器都需要指定安全协议listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093# 对外暴露的地址(Windows主机通过localhost访问)# 多个公布的监听器使用逗号分隔advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093# 指定CONTROLLER监听器的安全协议listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT# 定义控制器监听器的名称(KRaft模式必需)controller.listener.names=CONTROLLER# ==================== ZooKeeper兼容配置(可选) ====================# 若需兼容旧客户端,可保留ZooKeeper配置(但KRaft模式无需ZooKeeper)# zookeeper.connect=localhost:2181# ==================== 日志与分区配置 ====================# 数据存储目录配置(Kafka的核心配置参数)# Kafka将主题数据、索引文件等存储在该目录下log.dirs=/opt/kafka/datanum.partitions=1# 副本数(单节点设为1)default.replication.factor=1# 最小同步副本数(单节点设为1)min.insync.replicas=1# ==================== 日志存储高级配置 ====================# 日志保留时间(默认7天,生产环境根据存储容量和需求调整)# log.retention.hours=168# 或按大小限制保留(单位:字节)# log.retention.bytes=107374182400  # 100GB# 单个分区日志段大小(默认1GB,可根据实际需求调整)# log.segment.bytes=1073741824# 日志段检查和清理的时间间隔(默认300000ms=5分钟)# log.retention.check.interval.ms=300000# 控制是否自动创建主题(生产环境建议禁用,改为手动创建)# auto.create.topics.enable=false# ==================== 控制器引导配置 ====================# 控制器引导服务器(单节点指向自己,格式:host:port)# 与控制器监听端口一致controller.quorum.bootstrap.servers=localhost:9093# 控制器投票者配置(单节点设为0@localhost:9093)controller.quorum.voters=0@localhost:9093

3.启动Kafka服务

3.1初始化KRaft存储目录(首次启动必需)

在KRaft模式下,需要先初始化元数据存储:

# 生成集群ID并保存到变量CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)echo "生成的集群ID: $CLUSTER_ID"# 使用生成的集群ID格式化存储目录$KAFKA_HOME/bin/kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_HOME/config/server.properties

注意: 如果手动运行命令,请确保先执行生成集群ID的命令,然后使用实际生成的ID替换"$CLUSTER_ID"。

3.2启动Kafka Broker
# 启动Broker(日志输出到$KAFKA_HOME/logs/server.log)     
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
3.3验证服务状态
检查Kafka Broker进程:
ps -ef | grep kafka  # 应看到Kafka进程
3.4创建测试主题

确保Kafka服务已启动,并创建测试主题 test_topic

$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic
3.5发送测试数据

使用Kafka内置的生产者工具发送测试消息到 test_topic

# 启动Kafka生产者控制台
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic# 输入几条测试消息(每行一条)
> hello flink
> flink kafka integration
> real-time data processing
3.6运行Flink程序

在IDE中直接运行 KafkaSourceDemo类的 main方法,或通过Gradle构建并运行:

# 构建项目./gradlew clean build# 运行Flink作业./gradlew run
3.7验证结果

成功运行后,你应该能在控制台看到类似如下输出:

20250918103316

六、进阶配置与优化

1. 消费语义保证

在生产环境中,为了确保数据一致性,需要配置Flink的检查点机制和Kafka偏移量提交策略:

// 1. 启用检查点env.enableCheckpointing(5000); // 每5秒做一次检查点// 2. 获取检查点配置对象(Flink 1.20.1及以上版本推荐方式)CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 3. 配置检查点模式为EXACTLY_ONCE(精确一次语义)checkpointConfig.setMode(CheckpointingMode.EXACTLY_ONCE);// 4. 设置检查点超时时间checkpointConfig.setCheckpointTimeout(Duration.ofSeconds(60));// 4. 配置从上次提交的偏移量继续消费(生产环境推荐).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

2. 并行度与资源配置

合理设置并行度可充分利用集群资源并提高吞吐量:

// 设置Flink作业的全局并行度env.setParallelism(3); // 与Kafka主题分区数匹配// 或单独设置Source的并行度KafkaSource`<String>` kafkaSource = KafkaSource.`<String>`builder()// ... 其他配置 ....build();DataStream`<String>` stream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Source").setParallelism(3); // Source并行度

3. 高级反序列化

除了基础的字符串反序列化,还可以使用更灵活的反序列化方式:

3.1 使用预定义反序列化器
// 使用Flink提供的String反序列化器     .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))  
3.2 自定义POJO反序列化

如果Kafka消息是JSON格式,可以使用Jackson等库将其反序列化为POJO对象:

public class User {private String id;private String name;private int age;// getters, setters, constructors...}// 自定义POJO反序列化器.setDeserializer(new KafkaRecordDeserializationSchema`<User>`() {private final ObjectMapper mapper = new ObjectMapper();@Overridepublic void deserialize(ConsumerRecord<byte[], byte[]> record, Collector`<User>` out) throws IOException {User user = mapper.readValue(record.value(), User.class);out.collect(user);}@Overridepublic TypeInformation`<User>` getProducedType() {return TypeInformation.of(User.class);}})

七、常见问题与解决方案

1. 连接超时问题

问题现象:程序启动后报 org.apache.kafka.common.errors.TimeoutException

解决方案

  • 检查Kafka服务是否正常运行:ps -ef | grep kafka
  • 确认 bootstrap.servers配置正确,特别是在WSL2环境中确保端口映射正确
  • 检查防火墙设置,确保9092端口开放

2. 数据消费不完整

问题现象:部分Kafka消息未被Flink消费

解决方案

  • 检查Kafka主题的分区数与Flink Source并行度是否匹配
  • 确认 setStartingOffsets配置正确,生产环境建议使用 OffsetsInitializer.committedOffsets()
  • 检查检查点机制是否正常启用,确保偏移量正确提交

3. 性能优化

对于高吞吐量场景,可以通过以下方式优化性能:

  • 增加Kafka主题分区数(与Flink并行度匹配)
  • 调大 fetch.max.bytesmax.partition.fetch.bytes参数,增加单次拉取的数据量
  • 启用增量检查点,减少检查点开销
  • 使用 setUnboundedUsePreviousEventTimeWatermark()优化水印生成

八、总结与扩展

本文详细介绍了如何使用Flink从Kafka读取数据,包括环境准备、代码实现、运行测试以及进阶配置。通过本文的学习,你应该能够掌握Flink数据源的核心用法,为构建企业级实时数据处理应用打下坚实基础。

在实际应用中,Flink还支持多种其他数据源,如:

  • 文件系统(HDFS、本地文件)
  • 数据库(MySQL、PostgreSQL、MongoDB等)
  • 消息队列(RabbitMQ、Pulsar等)
  • 自定义数据源(通过实现 SourceFunction接口)

后续文章将继续深入探讨Flink的数据转换、窗口计算、状态管理等核心概念,敬请关注!


源文来自:http://blog.daimajiangxin.com.cn

源码地址:https://gitee.com/daimajiangxin/flink-learning

http://www.dtcms.com/a/389024.html

相关文章:

  • GRPO算法复现
  • AI+Flask博客项目实战提示词笔记 20250918
  • 无人设备遥控器之时间戳技术篇
  • 模块四 展望微服务
  • RN 添加 <NavigationContainer>组件报错
  • 深入理解 AVL 树
  • 软考中级习题与解答——第八章_计算机网络(2)
  • FinalShell远程连接CentOS下方文件列表信息不显示且刷新报空指针异常
  • 贪心算法应用:线性规划贪心舍入问题详解
  • 设计模式学习笔记(二)
  • 轻量化录屏插件,MP4输出格式
  • 静态代理 设计模式
  • Salesforce知识点:触发器:自动化业务逻辑的核心工具详解
  • CentOS 8.5部署Zabbix6.0 agent2端
  • 【TestCenter】设置DHCP Option
  • Jenkins 安全清理孤立工作区(workspace)的 Shell 脚本:原理、实现与实战
  • WebDancer论文阅读
  • Node.js、npm 和 npx:前端开发的三剑客
  • Node.js 创建 UDP 服务
  • 【NodeJS 二维码】node.js 怎样读取二维码信息?
  • IRN论文阅读笔记
  • pacote:Node.js 生态中的包获取工具
  • 使用 Ansible 管理 Docker 容器:开关机、定时开关机及 VNC 控制
  • 【Spring AI】实现一个基于 Streamable HTTP 的 MCP Server
  • 云手机:概念、历史、内容与发展战略
  • linux服务器上安装oss对象存储(命令行工具使用oss)
  • 强化学习1.1 使用Gymnasium库
  • 日语学习-日语知识点小记-进阶-JLPT-N1阶段蓝宝书,共120语法(11):101-110语法 +(考え方15)
  • 运维分享:神卓 N600 如何实现 NAS 安全稳定访问
  • 系统集成项目管理工程师:第十四章 收尾过程组