当前位置: 首页 > news >正文

网站建设合同报价花果园营销型网站建设

网站建设合同报价,花果园营销型网站建设,wordpress链接跳转等待,网站建设和网站编辑是什么工作最近因为需要对接企业微信的消息记录缓存,由于消息量较多,就设计了分表的功能。且由于时saas系统,有租户,所以 消息的分表较为复杂 message_租户_年月,但在es中索引只按照租户分索引 由于同步不需要实时性所以在同步到…

最近因为需要对接企业微信的消息记录缓存,由于消息量较多,就设计了分表的功能。且由于时saas系统,有租户,所以 消息的分表较为复杂 message_租户_年月,但在es中索引只按照租户分索引
由于同步不需要实时性所以在同步到es中时使用了es家族的logstash
前置:安装elk

获取所有要生成的表

-- 获取we_sync_message所有流水表名
SELECT table_name FROM information_schema.tables WHERE table_schema = 'ft-cloud-wecom' AND table_name LIKE 'we_sync_message_%' AND RIGHT(table_name, 6) IN (DATE_FORMAT(CURDATE(), '%Y%m'),DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 MONTH), '%Y%m'))

写入自动生成conf文件的配置

#我的 logstash位置
cd /usr/local/elk/logstash-9.1.2/config
touch logstashgen.conf
vim -v logstashgen.conf

完整代码

input {jdbc {jdbc_connection_string => "jdbc:mysql://localhost:3306/information_schema?useSSL=false&serverTimezone=UTC"jdbc_user => "user"jdbc_password => "pass"jdbc_driver_library => "/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"statement => "SELECT table_name FROM information_schema.tables WHERE table_schema = 'ft-cloud-wecom' AND table_name LIKE 'we_sync_message_%' AND RIGHT(table_name, 6) IN (DATE_FORMAT(CURDATE(), '%Y%m'),DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 MONTH), '%Y%m'))"# 每个月的一号凌晨20分执行schedule => "20 0 1 * *"}
}
filter {ruby {code => "# 确保table_names是数组类型table_names = event.get('table_name')table_names = [table_names] if table_names.is_a?(String)# 动态生成配置并写入独立文件table_names.each do |table|# 截取表名(移除最后7位)-- 移除时间只保留租户processed_table = table[0..-8] if table.length > 7next unless processed_table  # 跳过无效表名# 设置租户table_tenete = processed_table[26..]  if processed_table.length > 26next unless table_tenete        # 创建独立文件名(确保目录存在)output_dir = 'config/bin/'Dir.mkdir(output_dir) unless Dir.exist?(output_dir)#创建最后更新字段持久化文件夹last_dir = 'last_run'Dir.mkdir(last_dir) unless Dir.exist?(last_dir)# 生成的conf文件名filename = output_dir + processed_table + '.conf'config = <<-CONFIGinput {jdbc {jdbc_connection_string => 'jdbc:mysql://localhost:3306/ft-cloud-wecom?useSSL=false&serverTimezone=UTC'jdbc_user => 'user'jdbc_password => 'pass'jdbc_driver_library => '/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar'jdbc_driver_class => 'com.mysql.cj.jdbc.Driver'statement => 'SELECT * FROM #{table} WHERE updated_at > :sql_last_value'schedule => '1-59/5 * * * *'tracking_column => 'updated_at'tracking_column_type => 'timestamp'use_column_value => truerecord_last_run => truelast_run_metadata_path => 'last_run/#{processed_table}.txt'}}filter {mutate {convert => {\"id\" => \"integer\"\"msgid\" => \"string\"\"sender_type\" => \"integer\"\"sender_id\"=> \"string\"\"chatid\"=> \"string\"\"receiver_type\" => \"integer\"\"receiver_id\"=> \"string\"\"receiver_list\"=> \"string\"\"msgtype\" => \"integer\"\"encrypted_secret_key\" => \"string\"\"public_key_ver\" => \"string\"\"extra_info\" => \"integer\"\"corpid\" => \"string\"\"role_code\" => \"integer\"\"tenant_id\" => \"integer\"\"deleted\" => \"boolean\"}}date {match => [\"send_time\", \"ISO8601\"]target => \"@timestamp\"}}output {if [tenant_id] == #{table_tenete} {elasticsearch {hosts => ['http://localhost:19200']index => '#{processed_table}'ilm_pattern => '1'ilm_policy => 'logs_policy'user => 'elastic'password => 'admin'document_id => '%{id}'}}}CONFIG# 安全写入配置文件beginFile.write(filename, config)rescue => eevent.set('error', e.message)endend"}
}

解释

输入:配置jdbc信息,以及需要查询的sql

input {jdbc {# 账号密码数据库ipjdbc_connection_string => "jdbc:mysql://localhost:3306/information_schema"jdbc_user => "user"jdbc_password => "pass"# 你自己的jar包位置(下载或者导入过来的地址)jdbc_driver_library => "/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar"# 驱动jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 执行的sqlstatement => "SELECT table_name FROM information_schema.tables WHERE table_schema = 'ft-cloud-wecom' AND table_name LIKE 'we_sync_message_%' AND RIGHT(table_name, 6) IN (DATE_FORMAT(CURDATE(), '%Y%m'),DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 MONTH), '%Y%m'))"# 每个月的一号凌晨20分执行schedule => "20 0 1 * *"}
}

在filter中使用ruby 生成规则

filter {ruby {code => "# 确保table_names是数组类型table_names = event.get('table_name')# 如果只有一个表也强制转成数组table_names = [table_names] if table_names.is_a?(String)# 动态生成配置并写入独立文件 (for循环开始)table_names.each do |table|# 截取表名(移除最后7位)-- 移除时间只保留租户processed_table = table[0..-8] if table.length > 7# 跳过无效表名next unless processed_table  # 创建独立文件名(确保目录存在)output_dir = 'config/bin/'# 如果没有文件地址则创建文件夹Dir.mkdir(output_dir) unless Dir.exist?(output_dir)# 生成的conf文件名filename = output_dir + processed_table + '.conf'config = <<-CONFIG# ...  要写入的config文件内容CONFIG# 安全写入配置文件begin# 根据filename 和写入的config创建文件File.write(filename, config)rescue => e# 错误处理event.set('error', e.message)endend"}
}

config = <<-CONFIG 。。。。 CONFIG中的东西就是要生成的conf,并通过 #{table}传入参数
输入(从mysql中获取数据)
过滤器 (更改字段类型、计算等等)
输出 (过滤器后的字段添加到es中,设置主键 = 表中的主键)

input {
# jdbc指向的数据库jdbc {jdbc_connection_string => 'jdbc:mysql://localhost:3306/ft-cloud-wecom?useSSL=false&serverTimezone=UTC'jdbc_user => 'user'jdbc_password => 'pass'jdbc_driver_library => '/usr/local/elk/mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar'jdbc_driver_class => 'com.mysql.cj.jdbc.Driver'statement => 'SELECT * FROM #{table} WHERE send_time > :sql_last_value'# 执行时间schedule => '1-59/5 * * * *'#设置记录字段 可以是id、时间等等,用于logstash执行扫表的时候的指针,尽量不要全标扫描tracking_column => 'send_time'tracking_column_type => 'timestamp'use_column_value => truerecord_last_run => true# 持久化防止重新启动全扫last_run_metadata_path => 'last_run/#{processed_table}.txt'}
}
#过滤器
filter {mutate {#将sql中的字段对应到es中convert => {"id" => "integer""msgid" => "string""sender_type" => "integer""sender_id"=> "string""chatid"=> "string""receiver_type" => "integer""receiver_id"=> "string""receiver_list"=> "string""msgtype" => "integer""encrypted_secret_key" => "string""public_key_ver" => "string""extra_info" => "integer""corpid" => "string""role_code" => "integer""tenant_id" => "integer""deleted" => "boolean"}}# 翻译时间类型字段date {match => [\"send_time\", \"ISO8601\"]target => \"@timestamp\"}
}
# 输出到es中
output {# 判断当前租户与查询表的租户是否一致if [tenant_id] == #{table_tenete} {elasticsearch {hosts => ['http://localhost:19200']index => '#{processed_table}'ilm_pattern => '1'ilm_policy => 'logs_policy'user => 'elastic'password => 'admin'document_id => '%{id}'}}
}

table的值为table_names.each do |table| 循环得到

运行logstash

cd /usr/local/elk
sudo -u elasticsearch ./logstash-9.1.2/bin/logstash -f config/logstashgen.conf

会在/usr/local/elk/logstash-9.1.2/ config/bin/下生成新的conf文件

在这里插入图片描述

根据生成conf文件运行,并动态监控conf文件

  1. 根据现在生成的conf文件运行,由于是按月份的流水表,所以logstashgen.conf会在每个月的凌晨更新conf文件,那么就需要使用logstash的自动加载配置文件 --config.reload.automatic
  2. 由于logstashgen.conf一直在运行,则需要在运行生成的conf时需要指定path.data的地址不能与logstashgen.conf重复,否则会报错
  3. 可以通过通配符直接启动config/bin下的所有conf

运行

cd /usr/local/elk
sudo -u elasticsearch ./logstash-9.1.2/bin/logstash   -f config/bin/*.conf   --config.reload.automatic   --verbose   --path.data /usr/local/elk/logstash/data/we_sync_message

最后

当使用config/bin/*.conf 通配符运行时,logstash会将bin内的所有配置融合,所以需要在输入进行隔离租户,否则会出现所有的es索引都是同一个值得问题
在这里插入图片描述
结束output会生成如下内容
在这里插入图片描述

http://www.dtcms.com/a/431209.html

相关文章:

  • 最小二乘问题详解2:线性最小二乘求解
  • Multi-Arith数据集:数学推理评估的关键基准与挑战
  • 基于 Spring Security OAuth2 + JWT 实现 SSO
  • 数智经济时代医疗领域医学影像系统现状与趋势研究:多模态融合技术方向
  • 解读 2025 《高质量数据集 分类指南》
  • 为什么说这个是6dB de-emphasis”(即“6dB去加重”)--Con‘t
  • Eclipse 快捷键
  • 樟木头网站网络安全维护公司
  • 【EE初阶 - 网络原理】网络通信
  • 方案网站有哪些盗用别的公司网站模块
  • 做网站是否要去工商备案做网站群
  • Less resolver error:‘~antd/es/style/themes/index.less‘ wasn‘t found.
  • php网站验证码错误网站改版对用户的影响
  • vue中如何实现异步加载组件
  • 网站地图seo石城网站建设
  • 怎么防止网站被镜像wordpress seo 主题
  • 那些钓鱼网站是怎么做的页面设计上边距在哪里找
  • 中国移动idc建设网站wordpress 导航栏
  • @RequestBody与@PathVariable什么时候加
  • 2011 年真题配套词汇单词笔记(考研真相)
  • “AMQP协议深度解析:消息队列背后的通信魔法”之核心概念与SpringBoot落地实战
  • 网规答题点【summer解析】华为5G空口新技术有F-OFDM和SCMA,F-OFDM是基于OFDM的改进版本,可以 实现空口物理层分片,兼容LTE 4G。
  • 简约智能设备制造公司网站今天东营发生的重大新闻
  • Matrixport DAT与XBIT携DEX赋能生态,共赴新加坡TOKEN2049
  • 做网站需要什么营业执照中国建设企业协会网站首页
  • 微服务项目->在线oj系统(Java-Spring)--增删改(前端)
  • 软件网站开发评估免费拿货的代理商
  • C#基础05-控制语句
  • 网站域名过期还能用吗wordpress主题管理插件
  • 扩展BaseMapper类