当前位置: 首页 > wzjs >正文

网站开发需求网百度推广上班怎么样

网站开发需求网,百度推广上班怎么样,中国人民银行网站查询网址,静态网页设计作业成品测试结论 源端增量获取方式包括:bulk、incrementing、timestamp、incrementingtimestamp(混合),各种方式说明如下: bulk: 一次同步整个表的数据 incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新…
  • 测试结论

源端增量获取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各种方式说明如下:

bulk: 一次同步整个表的数据

incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新和删除

timestamp: 使用时间戳标识增量数据,每次更新数据都要修改时间戳,时间戳严格递增

timestamp+incrementing: 使用两个列,一个为自增列,一个为时间戳列。综合incrementing和timestamp的功能

  • 环境说明

本文在kafka的standalone模式下,适配kafka jdbc connector从源端mysql数据库实时同步数据到kadb中。验证1. 增量数据获取及增量数据获取方式

  1. kadb版本:V8R3
  2. mysql版本:5.7
  3. 操作系统:centos 7.6
  4. jdbc connector版本:10.8.3。下载地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
  5. mysql驱动:mysql-connector-java-5.1.39-bin.jar
  6. kadb驱动:postgresql-42.7.4.jar
  7. java版本:17.0.12 (kafka要求必须为17或者18版本,否则kafka安装报错)
  8. kafka版本:kafka_2.13-4.0.0
  9. kafka jdbc connector参考资料:

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

  1. kafka connector参考资料

https://kafka.apache.org/documentation/

  • 环境部署
  1. kafka部署

解压

tar -xzf kafka_2.13-4.0.0.tgz

cd kafka_2.13-4.0.0

产生集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

  1. jdbc connector部署

下载jdbc connector,将解压的内容保存到kafka解压目录的plugins下(plugins目录需自己创建内容如下:

[root@nanri plugins]# ls -l

total 8

drwxr-xr-x. 2 root root   43 Apr 17 21:50 assets

drwxr-xr-x. 3 root root  108 Apr 17 21:50 doc

drwxr-xr-x. 2 root root   90 Apr 17 21:50 etc

drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib

-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json

[root@nanri plugins]# pwd

/root/kafka_2.13-4.0.0/plugins

  1. 源端/目标端jdbc驱动

将源端mysql的jdbc驱动文件和目标端kadb驱动文件拷贝至kafka的解压目录的libs目录下:

[root@nanri libs]# ls -l mysql* postgres*

-rw-r--r--. 1 root root  989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar

-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar

[root@nanri libs]# pwd

/root/kafka_2.13-4.0.0/libs

  1. 配置文件修改
  1. 连接器配置文件:connect-standalone.properties

添加插件路径参数:(绝对路径)

plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar

  1. 源端配置文件:connect-mysql-source.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

#productor名字

name=connect-mysql-source                

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector    //固定值,使用jdbc connector的类

# topic名称列表,源端和目标端的topic必须一致

topics=test

# 配置jdbc连接

connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false

#增量获取方式,支持bulk,incrementing,timestamp等等

mode=incrementing

  1. 目标端配置文件:connect-kadb-sink.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

#consumer名字

name=connect-kadb-sink

# 为当前connector创建的最大线程数

tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必须设置

# topic名称列表

topics=test

# 配置jdbc连接

connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink

connection.user=mppadmin

# 自动创建表

auto.create=true

# 写入模式

insert.mode=insert

  1. 启动connect

bin/connect-standalone.sh

config/connect-standalone.properties                //connect配置参数

config/connect-mysql-source.properties    //源端配置参数

config/connect-kadb-sink.properties            //目标端参数

  1. 测试
  1. mysql源端创建表,目标端会自动创建对应的表

mysql> desc test

    -> ;

+-------+-------------+------+-----+---------+----------------+

| Field | Type        | Null | Key | Default | Extra          |

+-------+-------------+------+-----+---------+----------------+

| a     | int(11)     | NO   | PRI | NULL    | auto_increment |    //使用increment ing方式,必须是自增列

| b     | varchar(10) | YES  |     | NULL    |                |

+-------+-------------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

  1. 源端插入数据

mysql> insert into test(b) values('dddd');

Query OK, 1 row affected (0.00 sec)

  1. connect日志:

[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

  1. 使用kafka-console-consumer.sh查看topic中的事件

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}

  1. 目标端数据

1 | aaa

 2 | bbb

 3 | ccc

 4 | ddd

 5 | dddd

(844 rows)

test_sink=#

  1. 源端数据

mysql> select * from test;

+---+------+

| a | b    |

+---+------+

| 5 | dddd |

+---+------+

1 row in set (0.00 sec)

  1. 命令参考

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

http://www.dtcms.com/wzjs/286115.html

相关文章:

  • 学会计算机编程可以做网站吗深圳防疫措施优化
  • 南宁做网站推广外贸网站建设流程
  • 医院手机网站源码广告营销策略有哪些
  • 大型网站技术架构:核心原理与案例分析网络营销软文案例
  • 查看网站是否做百度推广小红书怎么推广引流
  • 建设网上购物网站百度开户推广多少钱
  • 哪个网站做电子请帖好章鱼磁力链接引擎
  • 物流公司创建百色seo关键词优化公司
  • Adobe Muse 商业网站爆款引流推广软件
  • 新闻门户网站psd模板推广优化关键词
  • 专业做网站产品上架的有吗百度移动端点赞排名软件
  • 手机端网站建设下载一个百度时事新闻
  • 北京网站建设汉邦国内外十大免费crm软件推荐
  • 品牌微信网站建设最近的疫情情况最新消息
  • 属于b2c的网站如何让百度收录自己信息
  • 专门做音箱的网站网络推广的公司是骗局吗
  • 产品公司网站建设方案模板公司开发设计推荐
  • 建立网站请示新站如何让百度快速收录
  • wordpress 汉化版主题益阳网站seo
  • 沧州网站建设的集成商站长域名查询
  • 人民日报客户端下载app兰州seo实战优化
  • 德阳市做网站网络营销的四大基础理论
  • 网站建设论文致谢班级优化大师官网下载
  • 做民宿房东怎样上网站卖房制作网站的步骤和过程
  • 西安模板网站建设套餐数据分析系统
  • 长沙建站价格厦门seo关键词
  • 网站建设的7种流程济南网络优化厂家
  • 南昌网站建设的流程地推放单平台
  • jsp网站开发代码下载搜索广告是什么意思
  • 免费网站下载直播软件大全bt磁力库