当前位置: 首页 > news >正文

PostgreSQL10 逻辑复制实战:构建高可用数据同步架构!

PostgreSQL10 逻辑复制实战:打造高可用数据同步架构!

概述

PostgreSQL 10 引入了逻辑复制(Logical Replication),为数据库高可用和数据同步提供了更灵活的选择。PostgreSQL 复制机制主要分为物理复制和逻辑复制两种:物理复制(又称流复制/物理块复制)在实例级别同步数据,而逻辑复制则支持更精细的复制粒度。逻辑复制通过逻辑解码插件解析 WAL 日志,提取 DML 语句并在订阅端执行,从而实现表级别的数据同步,适用于分库分表、实时数据同步、异构数据同步等高可用场景。

具备特性

逻辑复制过程中使用限制:

1. 不支持复制DDL。
2. 不支持复制序列、索引。
3. 不支持双向复制。
4. 发布节点和订阅节点表的模式名、表名必须一致,订阅节点允许表有额外字段。

逻辑复制与物理复制区别:

1. 物理复制不能垮操作系统(Linux-Windows),而逻辑复制可以。
2. 无法在不同的PG版本之间进行物理复制(例如10-12),逻辑复制可以支持。因此PostgreSQL大版本升级可以使用逻辑复制。
3. 物理复制是实例级别的复制,而逻辑复制可以基于对象级别(具体到某个表)。
4. 物理复制备库只能读,逻辑复制的备库可以写入。

应用场景

可基于表级别复制,是一种粒度可细的复制,主要用在以下场景

1. 满足业务上需求,实现某些指定表数据同步。
2. PostgreSQL 跨版本数据同步。
3. PostgreSQL 大版本升级。

具体流程

逻辑复制的流程图

在这里插入图片描述

PostgreSQL数据库逻辑复制使用发布者/订阅者模型,使用订阅复制槽技术,可并行的传输WAL日志,通过在订阅端回放WAL日志中的逻辑条目,保持复制表的数据同步,订阅端通过逻辑解码对数据进行REDO。

PUBLICATION对象

CREATE PUBLICATION 名称
[ FOR TABLE [ ONLY ] 表名 [ * ] [, ...]
| FOR ALL TABLES ]
[ WITH ( publication_parameter [= 值] [, ... ] ) ]

参数说明

FOR TABLE:表示要复制的表,可以通过’,’定义多个表。
FOR ALL TABLES:表示数据库的所有表都要复制。
WITH:表的DML操作行为,忽略表示全部DML操作。

一个PUBLICATION对象可以注册一个或多个表,也可以选择DML操作进行复制,一个表同时也可以被多个PUBLICATION注册。

SUBSCRIPTION对象

CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= 值] [, ... ] ) ]

参数说明

CONNECTION:连接master节点的字符串信息。eg. 'host=ip port=5432 user=xxx dbname=xxx'
PUBLICATION:对应发布端的PUBLICATION对象
WITH:表示DML操作,忽略表示全部DML操作

SUBSCRIPTION对象是逻辑复制过程汇总,由订阅节点创建的对象,用于连接发布节点的PUBLICATION对象。

逻辑解码

逻辑解码是使用一个输出插件将 Postgres 的预写日志 (WAL) 转换为可读格式。逻辑解码过程如图:

在这里插入图片描述

当 Postgres 数据库表中的一行发生更改时,该更改会记录在 WAL 中。如果启用了逻辑解码,则该更改的记录将传递给输出插件。输出插件将记录从 WAL 格式更改为插件的格式(例如 JSON 对象)。然后重新格式化的更改通过复制槽退出 Postgres。最后是消费者。消费者是您选择的任何连接到 Postgres 并接收逻辑解码输出的应用程序。

pgout插件解码wal后效果:

testdb=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10); 
    lsn     |  xid   |                                                 data                                                 
------------+--------+------------------------------------------------------------------------------------------------------
 0/3DAE5178 | 377183 | BEGIN 377183
 0/3DAE5178 | 377183 | table public.users: UPDATE: id[character varying]:'4' name[character varying]:'anna' age[integer]:21
 0/3DAE5348 | 377183 | COMMIT 377183
 0/3DAE65F0 | 377184 | BEGIN 377184
 0/3DAE65F0 | 377184 | table public.users: INSERT: id[character varying]:'5' name[character varying]:'5' age[integer]:55
 0/3DAE6728 | 377184 | COMMIT 377184
(6 rows)

wal2json 插件可解码为json格式。

https://docs.microsoft.com/zh-cn/azure/postgresql/concepts-logical

使用示例

测试构建PostgreSQL10的逻辑复制环境。

角色数据库操作系统版本和数据库版本复制用户
发布节点:172.168.98.107testdbCentos 7/ PostgreSQL 10replication
订阅节点:172.168.98.115testdbCentos 7/ PostgreSQL 10replication

1、首先需要在发布角色节点设置 postgresql.conf 相关参数。

wal_level = logical

2、配置主和复制节点的pg_hba.conf文件配置replication用户连接不受限

host replication all 0.0.0.0/0 trust

3、主库和复制库上都创建replication角色并具有复制权限

[root@localhost ~]# su postgres
bash-4.2$ psql
postgres=# CREATE ROLE replication WITH replication PASSWORD '123456' LOGIN;

4、主库和复制库上都创建测试库和测试表

postgres=# CREATE USER testdb WITH ENCRYPTED PASSWORD 'testdb!123';
postgres=# CREATE DATABASE testdb OWNER testdb;

执行\c命令切换至testdb数据库

postgres=# \c testdb testdb
You are now connected to database "testdb" as user "testdb".
testdb=>

继续执行创建测试表

CREATE TABLE users (
        id varchar(10) NOT NULL,
        name varchar(35) NOT NULL,
        age integer
) ;
ALTER TABLE public.users ADD CONSTRAINT users_id_pkey PRIMARY KEY ("id");

主库插入测试数据

INSERT INTO public.users (id, name, age) VALUES ('1', 'zhangsan', 20);
INSERT INTO public.users (id, name, age) VALUES ('2', 'lisi', 30);
INSERT INTO public.users (id, name, age) VALUES ('3', 'wangwu', 21);

5、主库和复制库上都给replication用户授权数据库权限

testdb=> GRANT SELECT ON ALL tables IN SCHEMA PUBLIC TO replication;

6、在主库上创建发布并指定users表

testdb=> CREATE PUBLICATION testpub FOR TABLE users;
CREATE PUBLICATION
testdb=> \dRp
                    List of publications
  Name   | Owner  | All tables | Inserts | Updates | Deletes 
---------+--------+------------+---------+---------+---------
 testpub | testdb | f          | t       | t       | t
(1 row)

testdb=>

7、在复制库上创建订阅

创建订阅,指定连接到主库上的发布。使用superuser来创建订阅,通过命令\c切换至postgres用户。

testdb=> \c testdb postgres
You are now connected to database "testdb" as user "postgres".
testdb=# CREATE SUBSCRIPTION testsub CONNECTION 'host=172.168.98.107 port=5432 dbname=testdb user=replication password=123456' PUBLICATION testpub;
NOTICE:  created replication slot "testsub" on publisher
CREATE SUBSCRIPTION

创建订阅时可指定已经存在的slot

CREATE SUBSCRIPTION testsub CONNECTION 'host=172.168.98.107 port=5432 dbname=testdb user=replication password=123456' PUBLICATION testpub WITH (slot_name=test, create_slot=false);

复制库上面查看订阅情况

testdb=# \dRs
           List of subscriptions
  Name   |  Owner   | Enabled | Publication 
---------+----------+---------+-------------
 testsub | postgres | t       | {testpub}
(1 row)

创建成功之后数据会自动复制过来。

testdb=# SELECT * FROM users;
 id |   name   | age 
----+----------+-----
 1  | zhangsan |  20
 2  | lisi     |  30
 3  | wangwu   |  21
 (3 rows)

8、测试增删改

-- 主库插入记录
testdb=> INSERT INTO users VALUES('4','anna', 17);
INSERT 0 1

-- 从库查询,记录'anna'已插入。
testdb=> select * from users;
 id |   name   | age 
----+----------+-----
 1  | zhangsan |  20
 2  | lisi     |  30
 3  | wangwu   |  21
 4  | anna     |  17
(4 rows)

-- 主库修改'anna'
testdb=> update users set age=18 where id='4';
-- 从库查询'anna'age已经同步修改...

-- 主库删除'anna'
testdb=> delete from users where id='4';
-- 从库查询'anna'这行数据同步删除...

常用命令

查看当前数据库已有发布

SELECT * FROM pg_publication;

查看当前数据库已有订阅

SELECT * FROM pg_subscription;

删除发布

DROP PUBLICATION testpub;

删除订阅

DROP SUBSCRIPTION testsub;

禁用订阅

ALTER SUBSCRIPTION testsub disable;

启动订阅

ALTER SUBSCRIPTION testsub enable;

如果逻辑复制操作中一张表缺少主键,就需要执行这条语句,代表使用整行作为标识

ALTER TABLE table REPLICA IDENTITY FULL;

解除复制槽与订阅的关联

ALTER SUBSCRIPTION testsub disable;
ALTER SUBSCRIPTION testsub SET (slot_name = NONE);
DROP SUBSCRIPTION testsub;

显示当前服务的所有复制连接(发布端执行)

SELECT * FROM pg_stat_replication;

显示订阅者的状态信息

SELECT * FROM pg_stat_subscription;

显示所有复制槽(发布端执行)

SELECT * FROM pg_replication_slots;

创建复制槽

select pg_create_logical_replication_slot('test','test_decoding');

创建复制槽

SELECT pg_create_logical_replication_slot('sub_iuser', 'pgoutput');

在这个示例中:

  • ‘sub_iuser’ 是要创建的复制槽的名称。
  • ‘pgoutput’ 是指定的输出插件名称,它用于将逻辑复制的 WAL 记录转换为适合于逻辑复制的格式。

执行上述 SQL 命令后,将创建名为 sub_iuser 的逻辑复制槽。

请确保在创建复制槽之前,已经启用了逻辑复制,并且已经将逻辑复制参数配置为允许创建复制槽。

删除复制槽

SELECT * FROM pg_drop_replication_slot('test');

脚本实践

使用脚本发布订阅相关数据库和相关表

逻辑复制发布脚本

#!/bin/sh

# DB密码
PASSWORD=PG_PWD
# 查询所有数据库
database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
if [ -z "$database_name_result" ]; then
    # 查询结果为空
    exit 1
fi

while IFS= read -r database_name; do
    database_name=$(echo "$database_name" | sed 's/ //g')
    # 删除可能存在的发布、订阅关系
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"
    # 重新创建发布关系
    table_names=""
    # 主备需要排除表(例如排除日志表)
    table_names=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
        SELECT string_agg(table_name, ',') AS table_names
            FROM information_schema.tables
        WHERE table_schema = 'public'
            AND table_type = 'BASE TABLE'
            AND table_name NOT LIKE 't_log%'
            AND table_name != 't_xxx';")
    # 获取要发布的表清单信息
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "CREATE PUBLICATION pub_$database_name FOR TABLE $table_names;"
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT * FROM pg_publication;"
    echo "PUBLICATION $database_name done."
echo "PUBLICATION done."

逻辑复制订阅脚本

注意:需要传参主库服务器ip和主库数据库密码。

#!/bin/sh
# 主机
MASTER_HOST=$1
# 主机数据库密码
MASTER_PASSWORD=$2
# 当前备机旧密码
PASSWORD=PG_PWD
# 查询iuser用户下的所有数据库
database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
if [ -z "$database_name_result" ]; then
# 查询结果为空
    exit 1
fi
# 遍历每个数据库
while IFS= read -r database_name; do
    database_name=$(echo "$database_name" | sed 's/ //g')
    # 删除存在的发布、订阅关系
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"
    # 尝试连接数据库
    echo "尝试连接主机数据库[$database_name]..."
    PGPASSWORD=$MASTER_PASSWORD psql -h $MASTER_HOST -p 5432 -U iuser -d $database_name -c "SELECT 1;"
    if [ $? -eq 0 ]; then
        echo "主机数据库连接[$database_name]成功..."
    else
        echo "主机数据库连接[$database_name]失败..."
        continue
    fi
    # 清空表数据
    # 主备需要排除表(例如排除日志表)
    table_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
        SELECT table_name
            FROM information_schema.tables
        WHERE table_schema = 'public'
            AND table_type = 'BASE TABLE'
            AND table_name NOT LIKE 't_log%'
            AND table_name != 't_xxx';")
    while IFS= read -r table_name; do
        echo "Processing table: $table_name"
        # 清空表数据
        PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "TRUNCATE TABLE $table_name;"
    # 重新创建订阅关系
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
    CREATE SUBSCRIPTION sub_$database_name
    CONNECTION 'host=$MASTER_HOST port=5432 user=iuser password=$MASTER_PASSWORD dbname=$database_name'
    PUBLICATION pub_$database_name;"
    # 查询订阅情况
    echo "SELECT SUBSCRIPTION"
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT * FROM pg_subscription;"
    echo "SUBSCRIPTION $database_name done."
done <<< "$database_name_result"
echo "SUBSCRIPTION done."

逻辑复制停止脚本

#!/bin/sh
# 本机密码
PASSWORD=PG_PWD
# 查询iuser用户下的所有数据库
database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
# 检查查询结果是否为空
if [ -z "$database_name_result" ]; then
    # 查询结果为空
    exit 1
fi
# 遍历每个数据库
while IFS= read -r database_name; do
    database_name=$(echo "$database_name" | sed 's/ //g')
    echo "DROP PUBLICATION&SUBSCRIPTION"
    # 删除之前可能存在的发布、订阅关系
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"
    PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"
    echo "DROP $database_name PUBLICATION&SUBSCRIPTION done."
done <<< "$database_name_result"
echo "PUBLICATION done."

延迟测试

100w测试

delete from users;
insert into users select generate_series(1,1000000), 'anna', 18;
delete from users;
> Affected rows: 1000000
> 时间: 2.024s

删除100w行需要时间2s,观测延迟

select * from users limit 1;  
\watch 1  

开始时间
2022年02月18日 星期五 18时18分03秒 (每 1s)
 id | name | age 
----+------+-----
 1  | anna |  18
(1 行记录)
结束时间  
2022年02月18日 星期五 18时18分25秒 (每 1s)

 id | name | age 
----+------+-----
(0 行记录)

延迟20s左右

10w测试

insert into users select generate_series(1,100000), 'anna', 18;
delete from users;
> Affected rows: 100000
> 时间: 0.229s

删除10w行需要时间0.2s,观测延迟

select * from users limit 1;  
\watch 1  

开始时间
2022年02月18日 星期五 18时29分20秒 (每 1s)

 id | name | age 
----+------+-----
 1  | anna |  18
(1 行记录)

结束时间  

2022年02月18日 星期五 18时29分22秒 (每 1s)

 id | name | age 
----+------+-----
(0 行记录)

延迟2s左右,取决于一次性事务大小。

相关问题

逻辑复制配置双向复制WAL循环

-- 正向发布
CREATE TABLE t(a SERIAL, b CHAR);
create publication testpub1 FOR table t; 
-- 正向订阅
CREATE TABLE t(a SERIAL, b CHAR);
create subscription testsub1 connection 'host=172.168.98.107 port=5432 dbname=testdb user=replication' publication testpub1;
-- 反向发布
create publication testpub2 FOR table t; 
-- 反正订阅
create subscription testsub2 connection 'host=172.168.98.115 port=5432 dbname=testdb user=replication' publication testpub2;

至此已经创建了一个双向循环复制,如图所示。

在这里插入图片描述

此时我在发布端插入一条数据就会出现环绕现像。此时就会出现循环。不停的从A复制到B,再从B复制到A,直到把数据库搞崩。双向复制需要使用不同的表来实现。使用同样的表会产生WAL循环。

相关链接

1. PostgreSQL10官网文档
https://www.postgresql.org/docs/10/index.html
2. PostgreSQL10逻辑特性
https://www.postgresql.org/docs/10/logical-replication.html
3. 逻辑解码
https://docs.microsoft.com/zh-cn/azure/postgresql/concepts-logical

总结

逻辑复制是PostgreSQL10引入的重要特性,为数据库提供了更灵活的同步方式。在高可用架构中,逻辑复制可用于数据同步、灾备切换、实时分析等场景,提升数据库的可扩展性和业务连续性。合理规划依然可以打造稳定高效的高可用架构。💡 想让你的数据库更可靠?逻辑复制值得一试!🚀

相关文章:

  • Android JNI开发指南
  • IP段转CIDR:原理Java实现
  • 优云智算:借助强大镜像社区,开启AI算力新纪元!
  • Flutter的permission_handler插件检查iOS的通知权限不准确
  • iphone上ios设备开启safari开发者debug模式,配合mac电脑使用
  • SLAM网站连接
  • playbin之autoplug_factories源码剖析
  • Windows文件资源管理器左侧导航窗格没有WSL的Linux图标的解决方法
  • eNSP中AR2220、AR201、AR1220、AR2240、AR3260、Router、NE40E、NE5000E、NE9000、CX路由器学习笔记
  • 算法-二叉树篇15-最大二叉树
  • 蓝桥杯 路径之谜
  • spineNET模型详解及代码复现
  • 六、索引优化实战案例
  • 自然语言处理NLP入门 -- 第五节词向量与嵌入
  • 2025计算机考研复试资料(附:网课+历年复试真题+140所高校真题+机试)
  • python量化交易——金融数据管理最佳实践——qteasy创建本地数据源
  • Spring 源码硬核解析系列专题(六):Spring MVC 的请求处理源码解析
  • Python 中,将十进制整数转换为二进制
  • 机器视觉线阵相机分时频闪选型/机器视觉线阵相机分时频闪选型
  • Apollo Cyber 学习笔记
  • 网站的ftp帐号/如何进行电子商务网站推广
  • 建网站是怎么造成的/网页模板图片
  • 绍兴建设开发有限公司网站/关键词排名优化工具
  • pcb高端网站建设/湖南竞价优化专业公司
  • 武汉做商城网站建设/网络优化报告
  • 如何看出网站用dede做的/长沙百度地图