当前位置: 首页 > wzjs >正文

android 旅游网站开发skr搜索引擎入口

android 旅游网站开发,skr搜索引擎入口,网站建设 证书,wordpress写文章失败测试结论 源端增量获取方式包括:bulk、incrementing、timestamp、incrementingtimestamp(混合),各种方式说明如下: bulk: 一次同步整个表的数据 incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新…
  • 测试结论

源端增量获取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各种方式说明如下:

bulk: 一次同步整个表的数据

incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新和删除

timestamp: 使用时间戳标识增量数据,每次更新数据都要修改时间戳,时间戳严格递增

timestamp+incrementing: 使用两个列,一个为自增列,一个为时间戳列。综合incrementing和timestamp的功能

  • 环境说明

本文在kafka的standalone模式下,适配kafka jdbc connector从源端mysql数据库实时同步数据到kadb中。验证1. 增量数据获取及增量数据获取方式

  1. kadb版本:V8R3
  2. mysql版本:5.7
  3. 操作系统:centos 7.6
  4. jdbc connector版本:10.8.3。下载地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
  5. mysql驱动:mysql-connector-java-5.1.39-bin.jar
  6. kadb驱动:postgresql-42.7.4.jar
  7. java版本:17.0.12 (kafka要求必须为17或者18版本,否则kafka安装报错)
  8. kafka版本:kafka_2.13-4.0.0
  9. kafka jdbc connector参考资料:

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

  1. kafka connector参考资料

https://kafka.apache.org/documentation/

  • 环境部署
  1. kafka部署

解压

tar -xzf kafka_2.13-4.0.0.tgz

cd kafka_2.13-4.0.0

产生集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

  1. jdbc connector部署

下载jdbc connector,将解压的内容保存到kafka解压目录的plugins下(plugins目录需自己创建内容如下:

[root@nanri plugins]# ls -l

total 8

drwxr-xr-x. 2 root root   43 Apr 17 21:50 assets

drwxr-xr-x. 3 root root  108 Apr 17 21:50 doc

drwxr-xr-x. 2 root root   90 Apr 17 21:50 etc

drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib

-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json

[root@nanri plugins]# pwd

/root/kafka_2.13-4.0.0/plugins

  1. 源端/目标端jdbc驱动

将源端mysql的jdbc驱动文件和目标端kadb驱动文件拷贝至kafka的解压目录的libs目录下:

[root@nanri libs]# ls -l mysql* postgres*

-rw-r--r--. 1 root root  989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar

-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar

[root@nanri libs]# pwd

/root/kafka_2.13-4.0.0/libs

  1. 配置文件修改
  1. 连接器配置文件:connect-standalone.properties

添加插件路径参数:(绝对路径)

plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar

  1. 源端配置文件:connect-mysql-source.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

#productor名字

name=connect-mysql-source                

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector    //固定值,使用jdbc connector的类

# topic名称列表,源端和目标端的topic必须一致

topics=test

# 配置jdbc连接

connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false

#增量获取方式,支持bulk,incrementing,timestamp等等

mode=incrementing

  1. 目标端配置文件:connect-kadb-sink.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

#consumer名字

name=connect-kadb-sink

# 为当前connector创建的最大线程数

tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必须设置

# topic名称列表

topics=test

# 配置jdbc连接

connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink

connection.user=mppadmin

# 自动创建表

auto.create=true

# 写入模式

insert.mode=insert

  1. 启动connect

bin/connect-standalone.sh

config/connect-standalone.properties                //connect配置参数

config/connect-mysql-source.properties    //源端配置参数

config/connect-kadb-sink.properties            //目标端参数

  1. 测试
  1. mysql源端创建表,目标端会自动创建对应的表

mysql> desc test

    -> ;

+-------+-------------+------+-----+---------+----------------+

| Field | Type        | Null | Key | Default | Extra          |

+-------+-------------+------+-----+---------+----------------+

| a     | int(11)     | NO   | PRI | NULL    | auto_increment |    //使用increment ing方式,必须是自增列

| b     | varchar(10) | YES  |     | NULL    |                |

+-------+-------------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

  1. 源端插入数据

mysql> insert into test(b) values('dddd');

Query OK, 1 row affected (0.00 sec)

  1. connect日志:

[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

  1. 使用kafka-console-consumer.sh查看topic中的事件

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}

  1. 目标端数据

1 | aaa

 2 | bbb

 3 | ccc

 4 | ddd

 5 | dddd

(844 rows)

test_sink=#

  1. 源端数据

mysql> select * from test;

+---+------+

| a | b    |

+---+------+

| 5 | dddd |

+---+------+

1 row in set (0.00 sec)

  1. 命令参考

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

http://www.dtcms.com/wzjs/261168.html

相关文章:

  • 做网站的公司不给域名电脑软件推广平台
  • 网站建设公司哪家靠谱自制网站 免费
  • 公司网站如何做的美丽手机游戏性能优化软件
  • 推销网站话术精准引流怎么推广
  • 网站被k申诉网站模板免费下载
  • 成都德阳网站建设软件外包平台
  • php网站开发实例教程源码汽车营销策划方案ppt
  • 汕头潮南网站建设搜索排名优化软件
  • 做设计比较好的网站腾讯推广平台
  • 网站建设3d插件抖音权重查询
  • 郑州电子商务网站建设全网营销系统是干什么的
  • 太原微商网站建设自己做网站网页归档
  • 自豪地采用wordpress更改seo优化网络公司
  • 淄博百度网站seo是什么意思职业
  • 捷讯官网 网站建设服装品牌营销策划方案
  • axurerp如何做网站nba常规赛
  • 网站 刷流量 SEO优书网首页
  • 小白怎么做网站搬家教程首页优化排名
  • 网站内容更改教程网站建设合同模板
  • 济南网站建设凡科广州seo优化公司
  • asp网站压缩百度广告位价格表
  • 零食店网站构建策划报告google搜索优化方法
  • 国外做美食视频网站电商网站对比表格
  • 大气好看的网站开发网站
  • 网站开发制作全包逆冬seo
  • 网站分哪些种类网站页面的优化
  • web网站开发与管理企业网站模板免费下载
  • 深圳市网站建设科技公司自动提取关键词的软件
  • 网站设计用户体验网站seo快速排名优化的软件
  • 网站左侧 导航百度seo软件首选帝搜软件