当前位置: 首页 > news >正文

Logstash开启定时任务增量同步mysql数据到es的时区问题

本文使用修改时间modify_date作为增量同步检测字段,可检测新增和修改,检测不到删除,检测删除请使用canal查询binlog日志同步数据

检测修改时间字段为varchar的时候可以先创建索引,并设置对应的mapping为(可以无视时区问题)

...
"time": {
   "type": "date",
   "format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
 }
 ...

检测修改时间字段为datetime的时候需要注意时区问题

前提:正常存储时间,mysql是UTC+8,Logstash 和ES则都使用UTC

下面几个测试得出的结论

使用测试4,不需要考虑客户端连接es的时区问题(直观上忽略es存储的时间格式为UTC,把其当作UTC+8来用)

使用测试1,需要注意es中存储UTC,使用客户端client连接时需要转换对应的事件UTC+8转为UTC

SearchRequest searchRequest = new SearchRequest("stu_sign");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
LocalDateTime localDateTime1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDateTime localDateTime2 = LocalDateTime.of(2023, 3, 1, 8, 0, 0);
ZonedDateTime zonedDateTime1 = localDateTime1.atZone(ZoneId.of("UTC+8"));
ZonedDateTime zonedDateTime2 = localDateTime2.atZone(ZoneId.of("UTC+8"));

// 将ZonedDateTime转换为UTC时区
ZonedDateTime utcZonedDateTime1 = zonedDateTime1.withZoneSameInstant(ZoneId.of("UTC"));
ZonedDateTime utcZonedDateTime2 = zonedDateTime2.withZoneSameInstant(ZoneId.of("UTC"));


searchSourceBuilder.query(QueryBuilders.rangeQuery("sing_date").gte(utcZonedDateTime1).lte(utcZonedDateTime2));
searchRequest.source(searchSourceBuilder);
try {
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    System.out.println(searchResponse.getHits().getHits());
} catch (IOException e) {
    e.printStackTrace();
}
测试1:

不创建mapping

设置时区都为Asia/Shanghai,ES中存储时间均为UTC,比正常少八个小时,

追踪日志记录时间为UTC:

— !ruby/object:DateTime ‘2024-12-31 02:13:30.000000000 Z’

定时任务执行正常

SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 10:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

配置文件为:

input {
    jdbc {
      # 配置数据库信息
      jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_user => "root"
      jdbc_password => "123456"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone => "Asia/Shanghai"
  	  # mysql驱动所在位置
      jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"
      #sql执行语句
      statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "
      use_column_value => true
      tracking_column => "modify_date"
      tracking_column_type => "timestamp"
      last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"
      schedule => "*/3 * * * * Asia/Shanghai"
      lowercase_column_names => false
    }
}

output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "stu_sign_test"
        # document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
测试2:

使用mapping,将自动映射的date转为其他允许的时间格式

"modify_date": {
    "type": "date",
    "format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
  }

与测试1结果完全一致

测试3:

不使用mapping,时间日期调整

mysql中时间为"modify_date":“2024-12-31 10:13:30”

ES记录时间多8个小时:“modify_date”:“2024-12-31T18:13:30.000Z”

{"id":20120,"appeal_time":null,"teacher_name":"黄雅平","stu_id":736,"stu_name":"郑欣欣","modify_date":"2024-12-31T18:13:30.000Z","sing_date":"2024-12-31T18:13:30.000Z","teach_time":"2024-12-31T08:00:00.000Z","teach_time_str":"20241231","stu_sign_status":0,"@timestamp":"2025-04-01T01:54:25.599Z","teacher_id":731,"@version":"1","class_num_end":null,"leave_status":null,"sign_status":0,"stu_num":"20233003","course_name":"数字系统基础","appeal_msg":null,"course_sched_id":186641,"grade_id":"DE44AF38ADB74D9BBF42A6C8285B8285","appeal_status":null,"academy_id":471,"classroom_id":440428,"roll_call_status":0,"roll_call_date":"2024-12-31T18:13:30.000Z","classroom":"304","class_end_time":"2024-12-31 18:30:00","create_date":"2024-12-31T18:13:30.000Z","course_id":1373,"academy_superior_id":6,"late_status":null,"sign_type":null,"class_id":5407,"class_num_begin":null,"class_begin_time":"2024-12-31 16:35:00","semester_id":1,"leave_statusersss":null,"leave_statuser":null}

追踪日志记录为— !ruby/object:DateTime ‘2024-12-31 18:13:30.000000000 Z’

执行sql为:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 18:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

对应的配置文件为:

input {
    jdbc {
      # 配置数据库信息
      jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_user => "root"
      jdbc_password => "123456"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone => "UTC"
  	  # mysql驱动所在位置
      jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"
      #sql执行语句
      statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "
      use_column_value => true
      tracking_column => "modify_date"
      tracking_column_type => "timestamp"
      last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"
      schedule => "*/3 * * * * Asia/Shanghai"
      lowercase_column_names => false
    }
}
测试4:

不使用mapping,时间日期调整

mysql中时间为"modify_date":“2024-12-31 10:13:30”

ES记录时间:“modify_date”:“2024-12-31T10:13:30.000Z”

{"teacher_id":731,"class_num_end":null,"stu_sign_status":0,"roll_call_status":0,"semester_id":1,"stu_id":749,"leave_statusersss":null,"classroom_id":440428,"course_sched_id":186641,"teacher_name":"黄雅平","create_date":"2024-12-31T10:13:30.000Z","appeal_msg":null,"appeal_status":null,"class_end_time":"2024-12-31 18:30:00","modify_date":"2024-12-31T10:13:30.000Z","class_begin_time":"2024-12-31 16:35:00","leave_statuser":null,"teach_time":"2024-12-31T00:00:00.000Z","appeal_time":null,"academy_superior_id":6,"sign_type":null,"roll_call_date":"2024-12-31T10:13:30.000Z","late_status":null,"classroom":"304","@timestamp":"2025-04-01T02:03:36.223Z","class_id":5407,"stu_num":"2024008","@version":"1","id":20107,"teach_time_str":"20241231","sign_status":0,"sing_date":"2024-12-31T10:13:30.000Z","course_name":"数字系统基础","course_id":1373,"stu_name":"张恒","class_num_begin":null,"academy_id":471,"leave_status":null,"grade_id":"DE44AF38ADB74D9BBF42A6C8285B8285"}

追踪日志记录为— !ruby/object:DateTime ‘2024-12-31 10:13:30.000000000 Z’

执行sql为:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 10:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

对应的配置文件为:

input {
    jdbc {
      # 配置数据库信息
      jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_user => "root"
      jdbc_password => "123456"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone => "UTC"
  	  # mysql驱动所在位置
      jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"
      #sql执行语句
      statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "
      use_column_value => true
      tracking_column => "modify_date"
      tracking_column_type => "timestamp"
      last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"
      schedule => "*/3 * * * * Asia/Shanghai"
      lowercase_column_names => false
    }
}

时间配置正确符合预期需求

其中定时任务配置
schedule => “/3 * * * * Asia/Shanghai"
schedule => "
/3 * * * * UTC”
schedule => "*/3 * * * * "
效果一致

测试5:
不使用mapping,时间日期调整
与测试1相反,时间都调整为UTC的时候,es储存的时间均多出八个小时
如下:
mysql中时间为"modify_date":“2024-12-31 18:13:30”
ES记录时间:“modify_date”:“2024-12-31T18:13:30.000Z”
追踪日志记录为— !ruby/object:DateTime ‘2024-12-31 18:13:30.000000000 Z’
执行sql为:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 18:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

可自行得出结论:
jdbc_connection_string和jdbc_default_timezone需要配合使用,效果会叠加。具体为什么会这样?懂得兄弟可以在评论区解释一下,博主也学习学习

本文环境相关:
elasticsearch-7.16.3
kibana-7.16.3-windows-x86_64
logstash-7.16.3
mysql5.7.38

客户端:RestHighLevelClient

 <dependency>
     <groupId>org.elasticsearch.client</groupId>
     <artifactId>elasticsearch-rest-high-level-client</artifactId>
     <version>7.16.3</version>
 </dependency>

启动命令:

logstash.bat -f E:\software\logstash\logstash-7.16.3\conf\stu_sign.conf

最后附上配置文件解释
Logstash配置文件conf介绍:

input {
    jdbc {
      # mysql 数据库链接
      jdbc_connection_string => "jdbc:mysql:localhost/database?characterEncoding=utf8"
      # 用户名和密码
      jdbc_user => "xxx"
      jdbc_password => "xxxx"
      # 驱动
      jdbc_driver_library => "D:/xx/xx/logstash-6.2.4/config/mysql-connector-java-8.0.18.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # 执行的sql 文件路径+名称
      #statement_filepath => ""
  parameters => { "sql_last_value" => "UpdateTime" }
  statement => "SELECT * FROM (SELECT * FROM table1 ) t WHERE t.updatetime > :sql_last_value"
  # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
  schedule => "* * * * *"
      # 索引类型
      #type => "article"
     # 防止自动将大小转为小写
      lowercase_column_names => false
      # 记录上一次运行记录
      record_last_run => true
      # 使用字段值
      use_column_value => true
     # 追踪字段名
      tracking_column => "updatetime"
      # 字段类型
      tracking_column_type => "timestamp"
     # 上一次运行元数据保存路径
      last_run_metadata_path => "./logstash_last_id"
     # 是否删除记录的数据
      clean_run => false
    }
}
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
output {
  elasticsearch {
    hosts => "http://localhost:9200/"
    index => "indexname"
    document_type => "articles"
    document_id => "%{articleid}"
    template_overwrite => true
  }
  # 这里输出调试,正式运行时可以注释掉
  stdout {
      codec => json_lines
  }
}

相关文章:

  • 淘宝搜索关键字与商品数据采集接口技术指南
  • 软考 中级软件设计师 考点知识点笔记总结 day09 操作系统进程管理
  • 自然语言处理(24:(第六章4.)​seq2seq模型的应用)
  • 卸载360壁纸
  • Android开发:support.v4包与AndroidX
  • AI Agent拐点已至,2B+2C星辰大海——行业深度报告
  • nextjs使用02
  • MySQL在线DDL操作指南
  • 安全框架SpringSecurity入门
  • Window C++ Postmortem Debugger
  • opencv(C++)图像的读写、翻转、绘制、鼠标事件
  • 源码分析之Leaflet中control模块Control基类实现原理
  • Vue2 通过 Object.defineProperty 对哪些数组进行了特殊处理?
  • C语言中的内存管理:掌握动态分配的技巧
  • 雪花算法生成的主键存在哪些问题,为什么不能使用自增ID或者UUID做MySQL的主键
  • git 对比两种优化方法的性能
  • MySQL主从复制(二)
  • Go语言入门指南:从语法基础到核心特性解析
  • 【C++】mapset使用与实战 OJ题
  • ABAP RANGE表 OPTION 运算符 SIGN
  • 烟花秀、新航线、购物节......上海邮轮文化旅游节今日开幕
  • 朝鲜新型驱逐舰“崔贤”号进行多项武器试验
  • 中国人民解放军南部战区位南海海域进行例行巡航
  • 最近这75年,谁建造了上海?
  • 全球前瞻|王毅赴巴西出席金砖外长会,加拿大迎来“几十年来最重要大选”
  • 第二艘国产大型邮轮爱达·花城号完成坞内起浮