当前位置: 首页 > news >正文

Logstash数据迁移之mysql-to-kafka.conf详细配置

在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道

下面我将详细解析配置文件的每个部分,并提供多个场景的示例。

核心架构与组件

数据流:MySQL → Logstash (jdbc inputfilterkafka output) → Kafka

为了实现高效的增量同步,其核心工作机制如下所示:

在这里插入图片描述

基础配置文件详解 (mysql-to-kafka.conf)

input {jdbc {# 【必需】JDBC 连接字符串jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"# 【必需】数据库用户名和密码jdbc_user => "your_username"jdbc_password => "your_password"# 【必需】MySQL JDBC 驱动路径# 需要手动下载 https://dev.mysql.com/downloads/connector/j/jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名# 【必需】要执行的 SQL 语句# 1. 使用增量字段(如update_time, id)进行增量查询# 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"# 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。schedule => "* * * * *"# 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)# 此文件由Logstash管理,用于下次查询的:sql_last_valuerecord_last_run => truelast_run_metadata_path => "/path/to/.logstash_jdbc_last_run" # 【可选】是否强制将JDBC列的字符串转换为UTF-8jdbc_default_timezone => "UTC"jdbc_force_standard_timezone => true# 【可选】分页查询,用于处理大表jdbc_paging_enabled => truejdbc_page_size => 100000}
}filter {# 此处是进行数据清洗和转换的地方,根据需求添加。# 例如:# 1. 删除不必要的字段mutate {remove_field => ["@version", "@timestamp"]}# 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)# json {#   source => "message"#   target => "value"# }
}output {kafka {# 【必需】Kafka集群的broker列表bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"# 【必需】目标Topic的名称topic_id => "mysql-your_table-topic"# 【必需】指定消息的序列化格式。通常使用JSON。codec => json# 【可选】消息
http://www.dtcms.com/a/354707.html

相关文章:

  • 卷积神经网络(CNN)搭建详解
  • 区块链+隐私计算护航“东数西算”数据安全报告
  • AppScan扫描电脑上的客户端,C/S架构客户端等
  • 深度学习----卷积神经网络实现数字识别
  • RAW API 的 TCP 总结2
  • 数据结构8---排序
  • 鸿蒙OS与Rust整合开发流程
  • 【边缘计算】RK3576算力评估
  • 排序(Sort)方法详解(冒泡、插入、希尔、选择、堆、快速、归并)
  • 详细介绍Linux 内存管理 struct page数据结构中有一个锁,请问trylock_page()和lock_page()有什么区别?
  • 开源工具新玩法:cpolar提升Penpot协作流畅度
  • 8.28日QT
  • 分布式锁过期危机:4大续命方案拯救超时任务
  • 2025年机械工程与机器人国际研讨会(CMER2025)
  • PAT 1086 Tree Traversals Again
  • React 动画库
  • 2025.8.28总结
  • Docker Swarm vs Kubernetes vs Nomad:容器编排方案对比与选型建议
  • GitHub宕机自救指南技术文章大纲
  • 图论基础篇
  • Oracle 数据库权限管理的艺术:从入门到精通
  • 【第四章】BS 架构测试全解析:从功能验证到问题定位​
  • @HAProxy 介绍部署使用
  • DM LSN 与 Oracle SCN 对比
  • UNIX网络编程笔记:共享内存区和远程过程调用
  • 机器学习基本概述
  • 小白入门:支持深度学习的视觉数据库管理系统
  • 神经网络为何能 “学习”?从神经元到深度学习模型的层级结构解析
  • 【OS】IO
  • 不同业务怎么选服务器?CPU / 内存 / 带宽配置表