logstash同步mysql流水表到es
最近因为需要对接企业微信的消息记录缓存,由于消息量较多,就设计了分表的功能。且由于时saas系统,有租户,所以 消息的分表较为复杂 message_租户_年月,但在es中索引只按照租户分索引
由于同步不需要实时性所以在同步到es中时使用了es家族的logstash
前置:安装elk
获取所有要生成的表
-- 获取we_sync_message所有流水表名
SELECT table_name FROM information_schema.tables WHERE table_schema = 'ft-cloud-wecom' AND table_name LIKE 'we_sync_message_%' AND RIGHT(table_name, 6) IN (DATE_FORMAT(CURDATE(), '%Y%m'),DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 MONTH), '%Y%m'))
写入自动生成conf文件的配置
#我的 logstash位置
cd /usr/local/elk/logstash-9.1.2/config
touch logstashgen.conf
vim -v logstashgen.conf
完整代码
input {jdbc {jdbc_connection_string => "jdbc:mysql://localhost:3306/information_schema?useSSL=false&serverTimezone=UTC"jdbc_user => "user"jdbc_password => "pass"jdbc_driver_library => "/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"statement => "SELECT table_name FROM information_schema.tables WHERE table_schema = 'ft-cloud-wecom' AND table_name LIKE 'we_sync_message_%' AND RIGHT(table_name, 6) IN (DATE_FORMAT(CURDATE(), '%Y%m'),DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 MONTH), '%Y%m'))"# 每个月的一号凌晨20分执行schedule => "20 0 1 * *"}
}
filter {ruby {code => "# 确保table_names是数组类型table_names = event.get('table_name')table_names = [table_names] if table_names.is_a?(String)# 动态生成配置并写入独立文件table_names.each do |table|# 截取表名(移除最后7位)-- 移除时间只保留租户processed_table = table[0..-8] if table.length > 7next unless processed_table # 跳过无效表名# 设置租户table_tenete = processed_table[26..] if processed_table.length > 26next unless table_tenete # 创建独立文件名(确保目录存在)output_dir = 'config/bin/'Dir.mkdir(output_dir) unless Dir.exist?(output_dir)#创建最后更新字段持久化文件夹last_dir = 'last_run'Dir.mkdir(last_dir) unless Dir.exist?(last_dir)# 生成的conf文件名filename = output_dir + processed_table + '.conf'config = <<-CONFIGinput {jdbc {jdbc_connection_string => 'jdbc:mysql://localhost:3306/ft-cloud-wecom?useSSL=false&serverTimezone=UTC'jdbc_user => 'user'jdbc_password => 'pass'jdbc_driver_library => '/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar'jdbc_driver_class => 'com.mysql.cj.jdbc.Driver'statement => 'SELECT * FROM #{table} WHERE updated_at > :sql_last_value'schedule => '1-59/5 * * * *'tracking_column => 'updated_at'tracking_column_type => 'timestamp'use_column_value => truerecord_last_run => truelast_run_metadata_path => 'last_run/#{processed_table}.txt'}}filter {mutate {convert => {\"id\" => \"integer\"\"msgid\" => \"string\"\"sender_type\" => \"integer\"\"sender_id\"=> \"string\"\"chatid\"=> \"string\"\"receiver_type\" => \"integer\"\"receiver_id\"=> \"string\"\"receiver_list\"=> \"string\"\"msgtype\" => \"integer\"\"encrypted_secret_key\" => \"string\"\"public_key_ver\" => \"string\"\"extra_info\" => \"integer\"\"corpid\" => \"string\"\"role_code\" => \"integer\"\"tenant_id\" => \"integer\"\"deleted\" => \"boolean\"}}date {match => [\"send_time\", \"ISO8601\"]target => \"@timestamp\"}}output {if [tenant_id] == #{table_tenete} {elasticsearch {hosts => ['http://localhost:19200']index => '#{processed_table}'ilm_pattern => '1'ilm_policy => 'logs_policy'user => 'elastic'password => 'admin'document_id => '%{id}'}}}CONFIG# 安全写入配置文件beginFile.write(filename, config)rescue => eevent.set('error', e.message)endend"}
}
解释
输入:配置jdbc信息,以及需要查询的sql
input {jdbc {# 账号密码数据库ipjdbc_connection_string => "jdbc:mysql://localhost:3306/information_schema"jdbc_user => "user"jdbc_password => "pass"# 你自己的jar包位置(下载或者导入过来的地址)jdbc_driver_library => "/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar"# 驱动jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 执行的sqlstatement => "SELECT table_name FROM information_schema.tables WHERE table_schema = 'ft-cloud-wecom' AND table_name LIKE 'we_sync_message_%' AND RIGHT(table_name, 6) IN (DATE_FORMAT(CURDATE(), '%Y%m'),DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 MONTH), '%Y%m'))"# 每个月的一号凌晨20分执行schedule => "20 0 1 * *"}
}
在filter中使用ruby 生成规则
filter {ruby {code => "# 确保table_names是数组类型table_names = event.get('table_name')# 如果只有一个表也强制转成数组table_names = [table_names] if table_names.is_a?(String)# 动态生成配置并写入独立文件 (for循环开始)table_names.each do |table|# 截取表名(移除最后7位)-- 移除时间只保留租户processed_table = table[0..-8] if table.length > 7# 跳过无效表名next unless processed_table # 创建独立文件名(确保目录存在)output_dir = 'config/bin/'# 如果没有文件地址则创建文件夹Dir.mkdir(output_dir) unless Dir.exist?(output_dir)# 生成的conf文件名filename = output_dir + processed_table + '.conf'config = <<-CONFIG# ... 要写入的config文件内容CONFIG# 安全写入配置文件begin# 根据filename 和写入的config创建文件File.write(filename, config)rescue => e# 错误处理event.set('error', e.message)endend"}
}
config = <<-CONFIG 。。。。 CONFIG中的东西就是要生成的conf,并通过 #{table}传入参数
输入(从mysql中获取数据)
过滤器 (更改字段类型、计算等等)
输出 (过滤器后的字段添加到es中,设置主键 = 表中的主键)
input {
# jdbc指向的数据库jdbc {jdbc_connection_string => 'jdbc:mysql://localhost:3306/ft-cloud-wecom?useSSL=false&serverTimezone=UTC'jdbc_user => 'user'jdbc_password => 'pass'jdbc_driver_library => '/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar'jdbc_driver_class => 'com.mysql.cj.jdbc.Driver'statement => 'SELECT * FROM #{table} WHERE send_time > :sql_last_value'# 执行时间schedule => '1-59/5 * * * *'#设置记录字段 可以是id、时间等等,用于logstash执行扫表的时候的指针,尽量不要全标扫描tracking_column => 'send_time'tracking_column_type => 'timestamp'use_column_value => truerecord_last_run => true# 持久化防止重新启动全扫last_run_metadata_path => 'last_run/#{processed_table}.txt'}
}
#过滤器
filter {mutate {#将sql中的字段对应到es中convert => {"id" => "integer""msgid" => "string""sender_type" => "integer""sender_id"=> "string""chatid"=> "string""receiver_type" => "integer""receiver_id"=> "string""receiver_list"=> "string""msgtype" => "integer""encrypted_secret_key" => "string""public_key_ver" => "string""extra_info" => "integer""corpid" => "string""role_code" => "integer""tenant_id" => "integer""deleted" => "boolean"}}# 翻译时间类型字段date {match => [\"send_time\", \"ISO8601\"]target => \"@timestamp\"}
}
# 输出到es中
output {# 判断当前租户与查询表的租户是否一致if [tenant_id] == #{table_tenete} {elasticsearch {hosts => ['http://localhost:19200']index => '#{processed_table}'ilm_pattern => '1'ilm_policy => 'logs_policy'user => 'elastic'password => 'admin'document_id => '%{id}'}}
}
table的值为table_names.each do |table| 循环得到
运行logstash
cd /usr/local/elk
sudo -u elasticsearch ./logstash-9.1.2/bin/logstash -f config/logstashgen.conf
会在/usr/local/elk/logstash-9.1.2/ config/bin/下生成新的conf文件
根据生成conf文件运行,并动态监控conf文件
- 根据现在生成的conf文件运行,由于是按月份的流水表,所以logstashgen.conf会在每个月的凌晨更新conf文件,那么就需要使用logstash的自动加载配置文件 --config.reload.automatic
- 由于logstashgen.conf一直在运行,则需要在运行生成的conf时需要指定path.data的地址不能与logstashgen.conf重复,否则会报错
- 可以通过通配符直接启动config/bin下的所有conf
运行
cd /usr/local/elk
sudo -u elasticsearch ./logstash-9.1.2/bin/logstash -f config/bin/*.conf --config.reload.automatic --verbose --path.data /usr/local/elk/logstash/data/we_sync_message
最后
当使用config/bin/*.conf 通配符运行时,logstash会将bin内的所有配置融合,所以需要在输入进行隔离租户,否则会出现所有的es索引都是同一个值得问题
结束output会生成如下内容