当前位置: 首页 > news >正文

Logstash 从 MySQL 同步数据到 Kafka

📋 配置概述

本指南提供完整的 Logstash 配置,用于将 MySQL 数据同步到 Kafka。

⚙️ 基础配置

1. 全量数据同步配置

# mysql_to_kafka_full.conf
input {jdbc {# MySQL 连接配置 - 请修改以下信息jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"# JDBC 驱动配置jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# SQL 查询 - 请修改表名statement => "SELECT * FROM 你的表名"# 分页设置(提高大表性能)jdbc_paging_enabled => truejdbc_page_size => 50000# 连接池配置jdbc_pool_timeout => 300connection_retry_attempts => 3connection_retry_attempts_wait_time => 10# 调度(每分钟执行一次)schedule => "* * * * *"# 元数据记录last_run_metadata_path => "/tmp/last_run_metadata"clean_run => falserecord_last_run => true}
}filter {# 添加处理时间戳 - 请根据实际字段修改date {match => [ "create_time", "yyyy-MM-dd HH:mm:ss" ]target => "@timestamp"timezone => "UTC"}# 移除不需要的字段mutate {remove_field => ["@version", "jdbc_url"]}# 添加自定义字段mutate {add_field => {"source_database" => "你的数据库名""source_table" => "你的表名""sync_timestamp" => "%{@timestamp}"}}
}output {# 输出到 Kafka - 请修改Kafka地址和Topickafka {bootstrap_servers => "kafka1:9092,kafka2:9092"topic_id => "你的kafka_topic名称"codec => json# Kafka 生产者配置acks => "1"batch_size => 16384linger_ms => 100compression_type => "snappy"# 消息键设置(可选)message_key => "%{id}"  # 使用记录ID作为消息键}# 调试输出(可选)stdout { codec => rubydebug }
}

2. 增量数据同步配置

# mysql_to_kafka_incremental.conf
input {jdbc {# 数据库连接信息 - 请修改jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 增量查询 - 基于更新时间戳(请修改表名和字段名)statement => "SELECT * FROM 你的表名 WHERE update_time > :sql_last_value AND update_time < NOW() ORDER BY update_time ASC"# 跟踪列配置(请修改为实际的增量字段)use_column_value => truetracking_column => "update_time"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/last_run_incremental"# 分页设置jdbc_paging_enabled => truejdbc_page_size => 10000# 更频繁的调度schedule => "*/5 * * * * *"  # 每5秒# 连接设置connection_retry_attempts => 5jdbc_validate_connection => true}
}filter {# 日期处理(请修改字段名)date {match => [ "update_time", "yyyy-MM-dd HH:mm:ss" ]target => "@timestamp"}# 数据转换mutate {convert => {"id" => "integer"# 添加其他需要转换的字段}remove_field => ["@version"]}# 添加元数据mutate {add_field => {"sync_type" => "incremental""batch_timestamp" => "%{@timestamp}"}}
}output {kafka {# Kafka配置 - 请修改bootstrap_servers => "kafka1:9092,kafka2:9092"topic_id => "你的增量数据topic"codec => jsonmessage_key => "%{id}"  # 请修改为主键字段# 生产配置acks => "1"batch_size => 16384compression_type => "snappy"}
}

🔄 多表同步配置

# mysql_multiple_tables.conf
input {# 表1配置 - 请修改jdbc {jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"statement => "SELECT * FROM 表1 WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/last_run_table1"schedule => "*/10 * * * * *"type => "table1"  # 表标识}# 表2配置 - 请修改jdbc {jdbc_connection_string => "jdbc:mysql://数据库IP:3306/数据库名"jdbc_user => "数据库用户名"jdbc_password => "数据库密码"jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"statement => "SELECT * FROM 表2 WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "/tmp/last_run_table2"schedule => "*/15 * * * * *"type => "table2"  # 表标识}
}filter {# 根据类型添加不同处理if [type] == "table1" {mutate {add_field => { "topic_suffix" => "table1" }}}if [type] == "table2" {mutate {add_field => { "topic_suffix" => "table2" }}}# 通用处理mutate {add_field => {"source_host" => "数据库IP""database" => "数据库名"}}
}output {if [type] == "table1" {kafka {bootstrap_servers => "kafka1:9092"topic_id => "kafka_topic_table1"codec => jsonmessage_key => "%{id}"  # 请修改为主键}}if [type] == "table2" {kafka {bootstrap_servers => "kafka1:9092"topic_id => "kafka_topic_table2"codec => jsonmessage_key => "%{id}"  # 请修改为主键}}
}

🔧 配置参数说明

需要修改的关键参数:

参数说明示例
jdbc_connection_stringMySQL连接字符串jdbc:mysql://192.168.1.100:3306/mydb
jdbc_user数据库用户名admin
jdbc_password数据库密码password123
statementSQL查询语句SELECT * FROM users
tracking_column增量同步字段update_timeid
bootstrap_serversKafka集群地址kafka1:9092,kafka2:9092
topic_idKafka主题名称user-data

可选调优参数:

  • jdbc_page_size: 分页大小,根据数据量调整
  • schedule: 执行频率,根据实时性要求调整
  • batch_size: Kafka批处理大小
  • compression_type: 压缩方式 (none, gzip, snappy, lz4)
http://www.dtcms.com/a/596336.html

相关文章:

  • 通过 HelloWorld 深入剖析 JVM 启动过程
  • css-文字背景渐变色
  • Tailwind CSS的grid布局
  • LangGraph基础教程(4)---LangGraph的核心能力
  • 百度网站推广费用多少物流网站前端模板下载
  • Docker-镜像存储机制-网络
  • 线性代数 - 从方程组到行列式
  • 景德镇做网站公司中国邮政做特产的网站
  • 【Linux】进程间通信(三)System V 共享内存完全指南:原理、系统调用与 C++ 封装实现
  • 记一次cssd无法启动故障处理
  • 开源 Objective-C IOS 应用开发(一)macOS 的使用
  • ElasticSearch详解(篇一)
  • flash网站价格网站推广的特点
  • 【C++ 面试题】内存对齐
  • busybox:启动阶段的静态 IP 配置过程
  • k8s 中遇到Calico CrashLoopBackOff 的解决方法
  • zookeeper单机版安装
  • 【Excel导入】读取WPS格式嵌入单元格内的图片
  • 福清建设银行网站网红营销的作用
  • 34节点配电网牛顿-拉夫逊潮流计算 + 分布式电源(DG)多场景分析的 MATLAB
  • 分布式专题——53 ElasticSearch高可用集群架构实战
  • 电子商务网站建设与设计网站常州建设
  • 学习编程好么 | 编程的好处与学习路径分析
  • 从中间件的历史来看移动App开发的未来
  • Faster-Whisper:更快更好的开源Asr模型
  • ubuntu部署whisper+speaker_large+qwen【gradio界面版】
  • 阿里云通过中国信通院首批安全可信中间件评估
  • 正点原子【第四期】Linux之驱动开发学习笔记-12.1 Linux 阻塞和非阻塞 IO 实验
  • 做网站fjfzwl门户wordpress主题下载
  • Elasticsearch的用法