当前位置: 首页 > news >正文

基于Flink 1.20、StarRocks与TiCDC构建高效数据处理链路教程

在大数据处理领域,实现高效、实时的数据处理与分析至关重要。Flink作为强大的流批一体化计算框架,结合StarRocks这一高性能的实时分析型数据库,再搭配TiCDC(TiDB Change Data Capture)用于捕获数据变更,能够构建出极为高效的数据处理链路。本教程将详细介绍如何利用这些技术实现从MySQL数据源抽取数据,经Flink处理后写入StarRocks的完整流程,并对相关表结构和字段进行合理抽象与调整,以保障数据处理的通用性与安全性。

一、技术简介

Flink 1.20

Flink 1.20是Apache Flink的一个重要版本,它进一步强化了流批一体的计算能力。在流处理方面,其能够以低延迟处理大规模的实时数据流;而在批处理场景下,也具备高效的性能表现。Flink提供了丰富的连接器(Connector),方便与各类数据源和数据存储系统进行对接,同时支持使用SQL进行数据处理操作,大大降低了开发成本,提升了开发效率。

StarRocks

StarRocks是一款高性能的实时分析型数据库,采用MPP(Massively Parallel Processing)架构,能够对海量数据进行亚秒级的查询分析。它支持多种数据模型,包括聚合模型、主键模型等,适用于各类数据分析场景,如报表生成、实时看板、即席查询等。StarRocks通过其高效的存储和查询引擎,以及对多种数据格式的支持,为数据的快速分析提供了有力保障。

TiCDC

TiCDC是TiDB生态中的数据变更捕获工具,它基于TiDB的分布式事务和MVCC(Multi-Version Concurrency Control)机制,能够实时捕获TiDB数据库中的数据变更,包括增、删、改操作。TiCDC将这些变更数据以有序的方式输出,为数据同步、实时数据处理等场景提供了可靠的数据源。在本教程中,虽然我们主要从MySQL数据源抽取数据,但TiCDC的原理和应用思路可作为扩展参考,在涉及TiDB数据源时能够快速迁移应用。

二、环境准备

安装与配置Flink 1.20

  1. 下载Flink 1.20.0:通过curl命令下载安装包,执行 curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
  2. 解压文件:使用命令 tar -xzvf flink-1.20.0-bin-scala_2.12.tgz 解压下载的压缩包。
  3. 移动到目标目录(可选):可将解压后的Flink目录移动到 /opt 或其他目标位置,例如执行 sudo mv flink-1.20.0 /opt/flink
  4. 配置环境变量:编辑 ~/.bashrc 文件,添加如下内容:
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出文件后,运行 source ~/.bashrc 使修改生效。
5. 配置Flink:Flink默认已配置一些基本设置。若无需集群配置,可跳过 mastersworkers 文件的配置。如需调整参数,如内存配置或其他作业配置,可修改Flink配置文件 config.yaml,该文件位于 /opt/flink/conf 目录下。例如,将 bind-host 设置从 localhost 改为 0.0.0.0,使Flink能够绑定所有网络接口,修改如下:

jobmanager:bind-host: 0.0.0.0
rpc:address: 0.0.0.0port: 6123
memory:process:size: 1600m
execution:failover-strategy: region
taskmanager:bind-host: 0.0.0.0host: 0.0.0.0numberOfTaskSlots: 1
memory:process:size: 1728m
parallelism:address: 0.0.0.0bind-address: 0.0.0.0
  1. 启动Flink:进入Flink目录,执行 ./bin/start-cluster.sh 启动Flink。若要关闭Flink,执行 ./bin/stop-cluster.sh。启动后,可通过浏览器访问Flink Web UI,默认地址为 http://<your_server_ip>:8081(例如 http://192.168.1.1:8081),以查看Flink集群的状态、提交作业等。

安装与配置StarRocks

  1. 下载与部署:从StarRocks官方网站获取安装包,按照官方文档指引进行下载与解压操作。根据实际的生产环境需求,选择合适的部署方式,如单节点部署用于测试环境,集群部署用于生产环境。
  2. 配置参数:在StarRocks的配置文件中,对一些关键参数进行设置,如FE(Frontend)节点的内存分配、BE(Backend)节点的存储路径等。例如,在FE节点的 fe.conf 文件中设置 query_mem_limit = 2147483648 来限制查询内存,在BE节点的 be.conf 文件中设置 storage_root_path = /data/starrocks/be 来指定存储路径。
  3. 启动服务:分别启动FE和BE节点,确保各个节点正常运行且相互通信正常。启动后,可通过MySQL客户端连接到StarRocks,验证其是否正常工作,例如执行 mysql -h <starrocks_fe_host> -P 9030 -u root -p

配置MySQL数据源

  1. 开启Binlog:确保MySQL开启了Binlog功能,在MySQL配置文件(通常为 my.cnfmy.ini)中,添加或修改如下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

修改完成后,重启MySQL服务使配置生效。
2. 创建测试表:在MySQL中创建用于测试的数据表,例如创建一个名为 example_table 的表,表结构如下:

CREATE TABLE example_table (id BIGINT NOT NULL,data_column_1 VARCHAR(255),data_column_2 INT,PRIMARY KEY (id)
);

向表中插入一些测试数据,以便后续进行数据同步与处理测试。

三、表结构设计与调整

StarRocks表结构设计

在StarRocks中创建用于存储数据的表,以用户标签相关数据存储为例,设计如下表结构:

CREATE TABLE table_demo (id BIGINT NOT NULL COMMENT '主键',sign CHAR(32) NOT NULL COMMENT '签名',shop_id BIGINT NOT NULL COMMENT 'shopID',shop_type BIGINT NOT NULL COMMENT '类型',user_id BIGINT NULL COMMENT 'userID',create_time DATETIME NULL COMMENT '记录创建时间',operation_type VARCHAR(20) COMMENT '操作类型',row_change_type VARCHAR(20) COMMENT '行变更类型'
) ENGINE=OLAP
PRIMARY KEY (id)
COMMENT '用户商品表'
DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","bloom_filter_columns" = "shop_id, user_id","in_memory" = "false","storage_format" = "DEFAULT","enable_persistent_index" = "false","compression" = "LZ4"
);

该表结构设计充分考虑了数据的存储与查询需求,通过主键约束、哈希分布以及相关属性设置,保障数据的高效存储与查询性能。

Flink中MySQL CDC表结构定义

在Flink中通过MySQL CDC连接器读取MySQL数据时,定义如下表结构:

CREATE TABLE mysql_cdc_example (id BIGINT,sign STRING COMMENT '签名',shop_id BIGINT COMMENT 'shopID',shop_type BIGINT COMMENT '类型',user_id BIGINT COMMENT 'userID',create_time TIMESTAMP(0),operation_type STRING COMMENT '业务操作字段',operation_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'operation_timestamp' VIRTUAL,row_change_type STRING METADATA FROM 'row_change_type' VIRTUAL,PRIMARY KEY (`id`) NOT ENFORCED
)
WITH 
('connector' ='mysql-cdc','hostname' = '192.168.0.1','port' = '3306','database-name' = 'your_database_name','table-name' = 'example_table','username' = 'your_username','password' = 'your_password','debezium.snapshot.mode' = 'initial'
);

该表结构定义与StarRocks中的目标表结构相对应,同时通过WITH参数配置了MySQL CDC连接器的相关信息,包括数据源地址、端口、数据库名、表名、用户名、密码以及快照模式等。

Flink中StarRocks Sink表结构定义

在Flink中定义用于将处理后数据写入StarRocks的Sink表结构如下:

CREATE TABLE starrocks_sink_example (id BIGINT PRIMARY KEY NOT ENFORCED,sign STRING,shop_id BIGINT,shop_type BIGINT,user_id bigint,create_time STRING,operation_type STRING,row_change_type STRING
) 
WITH 
('connector'='starrocks','sink.max-retries'='5','jdbc-url' = 'jdbc:mysql://192.168.0.1:9030/your_database_name?useUnicode=true&characterEncoding=utf-8&useSSL=false&connectTimeout=3000&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&&serverTimezone=Asia/Shanghai&sessionVariables=query_timeout=86400','load-url'='192.168.0.1:8030','table-name' = 'table_demo','username'='your_username','password'='your_password','sink.buffer-flush.interval-ms'='5000','sink.parallelism' = '2','database-name'='your_database_name'
);

此Sink表结构与StarRocks中的目标表结构一致,通过WITH参数配置了StarRocks连接器的相关信息,如JDBC URL、Load URL、表名、用户名、密码、缓冲刷新间隔以及并行度等,确保Flink能够将处理后的数据准确高效地写入StarRocks。

四、数据同步与处理流程

使用Flink SQL进行数据抽取与转换

  1. 配置Flink SQL环境:在Flink的SQL客户端或相关集成开发环境中,配置好Flink SQL的运行环境,确保能够执行SQL语句对数据进行操作。
  2. 编写数据抽取与转换SQL:编写SQL语句从MySQL CDC表中抽取数据,并进行必要的转换操作,例如将时间格式进行转换、根据业务规则对某些字段进行计算等。以下是一个简单的示例,将 create_time 字段从 TIMESTAMP 类型转换为字符串类型,并根据 operation_typerow_change_type 字段确定最终的操作类型:
INSERT INTOstarrocks_sink_example
SELECTid,sign,shop_id,shop_typeuser_id,cast(create_time as CHAR) as create_time,CASE WHEN operation_type = 'DELETE' THEN 'DELETE'WHEN row_change_type = '+I' THEN 'INSERT'WHEN row_change_type IN ('-U', '+U') THEN 'UPDATE'WHEN row_change_type = '-D' THEN 'DELETE'ELSE 'UNKNOWN'END AS operation_type,row_change_type 
FROMmysql_cdc_example;

该SQL语句从 mysql_cdc_example 表中读取数据,对 create_time 字段进行类型转换,并根据不同的变更类型确定最终的 operation_type,然后将处理后的数据插入到 starrocks_sink_example 表中。

使用Routine Load进行数据实时摄入(以Kafka数据源为例)

  1. 配置Kafka数据源:在Kafka中创建用于存储数据变更的主题,确保数据源能够正常向该主题发送数据。例如,创建一个名为 user_table_changes 的主题。
  2. 创建StarRocks的Routine Load任务:在StarRocks中创建Routine Load任务,用于实时消费Kafka主题中的数据并写入到StarRocks表中。以下是一个示例:
CREATE ROUTINE LOAD your_load_job_name ON table_demo
COLUMNS (id,sign,shop_id,shop_type,user_id,create_time,operation_type,row_change_type,temp_operation_type=IF(operation_type = 'DELETE', 'DELETE', IF(operation_type = 'UPDATE', 'UPSERT', 'APPEND'))
)
PROPERTIES ("desired_concurrent_number" = "1","max_batch_interval" = "10","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM
KAFKA ("kafka_broker_list" = "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092","kafka_topic" = "user_table_changes","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

该Routine Load任务配置了从Kafka主题 user_table_changes 中读取数据,按照指定的列映射关系写入到 user_table_mapping 表中,并设置了相关的属性,如期望的并发数、最大批次间隔、最大批次行数、最大批次大小、严格模式以及数据格式等。

http://www.dtcms.com/a/268555.html

相关文章:

  • linux如何下载github的一个项目
  • stm32与tp-linkv2接线、解决识别不到芯片问题
  • C++ -- string类的模拟实现
  • Go的标准库http原理解析
  • 【论文阅读】Few-Shot PPG Signal Generation via Guided Diffusion Models
  • Web Worker:让前端飞起来的隐形引擎
  • 第0章:开篇词 - 嘿,别怕,AI应用开发没那么神!
  • 【PaddleOCR】数据合成工具 Style-Text安装与使用案例介绍
  • 【机器学习笔记 Ⅲ】3 异常检测算法
  • 4D-VLA:具有跨场景标定的时空视觉-语言-动作预训练
  • Linux运维安全新范式:基于TCPIP与SSH密钥的无密码认证实战
  • 【保姆级图文详解】探秘 Prompt 工程:AI 交互的关键密码
  • C++多线程网络编程:助力高并发服务器性能提升
  • 无人机精准降落辅助系统核心技术解析
  • 一文讲清楚React Fiber
  • RAG 相关概念学习
  • VMware 17.0.2-21581411 安装教程(附详细步骤+序列号激活指南)
  • 【牛客算法】 小红的奇偶抽取
  • kotlin+MongoTemplate的时间类型为is_date类型 pymongo如何处理
  • 【vue】用conda配置nodejs,一键开通模版使用权
  • 设计模式分析
  • 1.1_5_1 计算机网络的性能指标(上)
  • 大模型在肾囊肿诊疗全流程预测及应用研究报告
  • kafka总结
  • 【Java编程动手学】Java常用工具类
  • Apache Cloudberry 亮相 2025 IvorySQL 生态大会暨 PostgreSQL 高峰论坛
  • c# Process.Start异常解决办法
  • 【一起来学AI大模型】支持向量机(SVM):核心算法深度解析
  • 支持向量机(SVM)在心脏MRI分类(心肌病检测)中的应用与实现
  • 最简单的实验室资产管理系统,使用Flask,mysql,html(四、知识补充)