Logstash 从 MySQL 同步数据到 Kafka
📋 配置概述
本指南提供完整的 Logstash 配置,用于将 MySQL 数据同步到 Kafka。
⚙️ 基础配置
1. 全量数据同步配置
# mysql_to_kafka_full.conf
input {jdbc {# MySQL 连接配置 - 请修改以下信息jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"# JDBC 驱动配置jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# SQL 查询 - 请修改表名statement => "SELECT * FROM 你的表名"# 分页设置(提高大表性能)jdbc_paging_enabled => truejdbc_page_size => 50000# 连接池配置jdbc_pool_timeout => 300connection_retry_attempts => 3connection_retry_attempts_wait_time => 10# 调度(每分钟执行一次)schedule => "* * * * *"# 元数据记录last_run_metadata_path => "/tmp/last_run_metadata"clean_run => falserecord_last_run => true}
}filter {# 添加处理时间戳 - 请根据实际字段修改date {match => [ "create_time", "yyyy-MM-dd HH:mm:ss" ]target => "@timestamp"timezone => "UTC"}# 移除不需要的字段mutate {remove_field => ["@version", "jdbc_url"]}# 添加自定义字段mutate {add_field => {"source_database" => "你的数据库名""source_table" => "你的表名""sync_timestamp" => "%{@timestamp}"}}
}output {# 输出到 Kafka - 请修改Kafka地址和Topickafka {bootstrap_servers => "kafka1:9092,kafka2:9092"topic_id => "你的kafka_topic名称"codec => json# Kafka 生产者配置acks => "1"batch_size => 16384linger_ms => 100compression_type => "snappy"# 消息键设置(可选)message_key => "%{id}" # 使用记录ID作为消息键}# 调试输出(可选)stdout { codec => rubydebug }
}
2. 增量数据同步配置
# mysql_to_kafka_incremental.conf
input {jdbc {# 数据库连接信息 - 请修改jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 增量查询 - 基于更新时间戳(请修改表名和字段名)statement => "SELECT * FROM 你的表名 WHERE update_time > :sql_last_value AND update_time < NOW() ORDER BY update_time ASC"# 跟踪列配置(请修改为实际的增量字段)use_column_value => truetracking_column => "update_time"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/last_run_incremental"# 分页设置jdbc_paging_enabled => truejdbc_page_size => 10000# 更频繁的调度schedule => "*/5 * * * * *" # 每5秒# 连接设置connection_retry_attempts => 5jdbc_validate_connection => true}
}filter {# 日期处理(请修改字段名)date {match => [ "update_time", "yyyy-MM-dd HH:mm:ss" ]target => "@timestamp"}# 数据转换mutate {convert => {"id" => "integer"# 添加其他需要转换的字段}remove_field => ["@version"]}# 添加元数据mutate {add_field => {"sync_type" => "incremental""batch_timestamp" => "%{@timestamp}"}}
}output {kafka {# Kafka配置 - 请修改bootstrap_servers => "kafka1:9092,kafka2:9092"topic_id => "你的增量数据topic"codec => jsonmessage_key => "%{id}" # 请修改为主键字段# 生产配置acks => "1"batch_size => 16384compression_type => "snappy"}
}
🔄 多表同步配置
# mysql_multiple_tables.conf
input {# 表1配置 - 请修改jdbc {jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"statement => "SELECT * FROM 表1 WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/last_run_table1"schedule => "*/10 * * * * *"type => "table1" # 表标识}# 表2配置 - 请修改jdbc {jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"statement => "SELECT * FROM 表2 WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/last_run_table2"schedule => "*/15 * * * * *"type => "table2" # 表标识}
}filter {# 根据类型添加不同处理if [type] == "table1" {mutate {add_field => { "topic_suffix" => "table1" }}}if [type] == "table2" {mutate {add_field => { "topic_suffix" => "table2" }}}# 通用处理mutate {add_field => {"source_host" => "数据库IP""database" => "数据库名"}}
}output {if [type] == "table1" {kafka {bootstrap_servers => "kafka1:9092"topic_id => "kafka_topic_table1"codec => jsonmessage_key => "%{id}" # 请修改为主键}}if [type] == "table2" {kafka {bootstrap_servers => "kafka1:9092"topic_id => "kafka_topic_table2"codec => jsonmessage_key => "%{id}" # 请修改为主键}}
}
🔧 配置参数说明
需要修改的关键参数:
| 参数 | 说明 | 示例 |
|---|---|---|
jdbc_connection_string | MySQL连接字符串 | jdbc:mysql://192.168.1.100:3306/mydb |
jdbc_user | 数据库用户名 | admin |
jdbc_password | 数据库密码 | password123 |
statement | SQL查询语句 | SELECT * FROM users |
tracking_column | 增量同步字段 | update_time 或 id |
bootstrap_servers | Kafka集群地址 | kafka1:9092,kafka2:9092 |
topic_id | Kafka主题名称 | user-data |
可选调优参数:
jdbc_page_size: 分页大小,根据数据量调整schedule: 执行频率,根据实时性要求调整batch_size: Kafka批处理大小compression_type: 压缩方式 (none, gzip, snappy, lz4)
