Logstash数据迁移之mysql-to-kafka.conf详细配置
在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道。
下面我将详细解析配置文件的每个部分,并提供多个场景的示例。
核心架构与组件
数据流:MySQL → Logstash (jdbc input
→ filter
→ kafka output
) → Kafka
为了实现高效的增量同步,其核心工作机制如下所示:
基础配置文件详解 (mysql-to-kafka.conf
)
input {jdbc {# 【必需】JDBC 连接字符串jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"# 【必需】数据库用户名和密码jdbc_user => "your_username"jdbc_password => "your_password"# 【必需】MySQL JDBC 驱动路径# 需要手动下载 https://dev.mysql.com/downloads/connector/j/jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名# 【必需】要执行的 SQL 语句# 1. 使用增量字段(如update_time, id)进行增量查询# 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"# 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。schedule => "* * * * *"# 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)# 此文件由Logstash管理,用于下次查询的:sql_last_valuerecord_last_run => truelast_run_metadata_path => "/path/to/.logstash_jdbc_last_run" # 【可选】是否强制将JDBC列的字符串转换为UTF-8jdbc_default_timezone => "UTC"jdbc_force_standard_timezone => true# 【可选】分页查询,用于处理大表jdbc_paging_enabled => truejdbc_page_size => 100000}
}filter {# 此处是进行数据清洗和转换的地方,根据需求添加。# 例如:# 1. 删除不必要的字段mutate {remove_field => ["@version", "@timestamp"]}# 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)# json {# source => "message"# target => "value"# }
}output {kafka {# 【必需】Kafka集群的broker列表bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"# 【必需】目标Topic的名称topic_id => "mysql-your_table-topic"# 【必需】指定消息的序列化格式。通常使用JSON。codec => json# 【可选】消息