Linux 下 Docker 与 ClickHouse 的安装配置及 MySQL 数据同步指南
本文详细介绍了在 Linux 系统上安装配置 Docker 和 ClickHouse 的完整流程,并实现了 MySQL 到 ClickHouse 的数据同步方案。
在Linux上安装配置docker,然后再安装配置Clickhouse,然后在里面建立MySQL外部表和MergeTree内部表的详细步骤,写SQL代码全量和按表里的时间戳字段增量同步数据。
关键点包括:
- Docker 安装配置:使用官方仓库安装,配置镜像加速
- ClickHouse 部署:通过 Docker 容器化部署,配置数据持久化
- 外部表配置:使用 MySQL 表引擎建立跨数据库连接
- 数据同步策略:
- 全量同步:初始数据迁移
- 增量同步:基于时间戳的变化数据捕获
- 自动化方案:通过 Shell 脚本实现定期同步
这种架构适用于数据分析、实时报表等场景,充分发挥 ClickHouse 的列式存储优势,同时保持与 MySQL 操作数据库的数据一致性。
第一部分:Linux 上安装配置 Docker
1.1 系统要求与准备工作
确保系统为较新的 Linux 发行版(如 Ubuntu 20.04+、CentOS 8+),并更新系统包:
# Ubuntu/Debian
sudo apt update && sudo apt upgrade -y# CentOS/RHEL
sudo yum update -y
1.2 安装 Docker
# 安装依赖包
sudo apt install -y apt-transport-https ca-certificates curl software-properties-common# 添加 Docker 官方 GPG 密钥
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpt --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg# 添加 Docker 仓库
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null# 安装 Docker
sudo apt update
sudo apt install -y docker-ce docker-ce-cli containerd.io# 启动并设置开机自启
sudo systemctl start docker
sudo systemctl enable docker# 验证安装
sudo docker --version
1.3 配置 Docker(可选)
# 将当前用户添加到 docker 组(避免每次使用 sudo)
sudo usermod -aG docker $USER
newgrp docker# 配置 Docker 镜像加速器(国内用户建议配置)
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<EOF
{"registry-mirrors": ["https://docker.mirrors.ustc.edu.cn","https://hub-mirror.c.163.com"]
}
EOF# 重启 Docker
sudo systemctl daemon-reload
sudo systemctl restart docker
第二部分:安装配置 ClickHouse
2.1 使用 Docker 运行 ClickHouse
# 创建数据目录
mkdir -p ~/clickhouse/data
mkdir -p ~/clickhouse/log
mkdir -p ~/clickhouse/config# 下载 ClickHouse 配置文件
wget -O ~/clickhouse/config/users.xml https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/programs/server/users.xml
wget -O ~/clickhouse/config/config.xml https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/programs/server/config.xml# 运行 ClickHouse 容器
docker run -d \--name clickhouse-server \--ulimit nofile=262144:262144 \-p 8123:8123 \-p 9000:9000 \-p 9009:9009 \-v ~/clickhouse/data:/var/lib/clickhouse \-v ~/clickhouse/log:/var/log/clickhouse-server \-v ~/clickhouse/config:/etc/clickhouse-server \clickhouse/clickhouse-server:latest
2.2 验证 ClickHouse 安装
# 检查容器状态
docker ps | grep clickhouse# 连接到 ClickHouse
docker exec -it clickhouse-server clickhouse-client
第三部分:配置 MySQL 外部表和数据同步
3.1 准备测试用的 MySQL 数据
首先运行一个 MySQL 容器用于测试:
# 运行 MySQL 容器
docker run -d \--name mysql-server \-e MYSQL_ROOT_PASSWORD=rootpassword \-e MYSQL_DATABASE=test_db \-e MYSQL_USER=test_user \-e MYSQL_PASSWORD=test_password \-p 3306:3306 \mysql:8.0# 等待 MySQL 启动
sleep 30
创建测试表并插入数据:
# 连接到 MySQL
docker exec -it mysql-server mysql -uroot -prootpassword# 在 MySQL 中执行以下 SQL
USE test_db;-- 创建源表
CREATE TABLE user_activity (id INT AUTO_INCREMENT PRIMARY KEY,user_id INT NOT NULL,activity_type VARCHAR(50) NOT NULL,activity_data JSON,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);-- 插入测试数据
INSERT INTO user_activity (user_id, activity_type, activity_data) VALUES
(1, 'login', '{"ip": "192.168.1.1", "device": "mobile"}'),
(2, 'purchase', '{"amount": 99.99, "product": "ebook"}'),
(3, 'logout', '{"session_duration": 3600}'),
(1, 'view', '{"page": "home", "duration": 120}'),
(4, 'login', '{"ip": "192.168.1.2", "device": "desktop"}');-- 创建订单表用于增量同步演示
CREATE TABLE orders (order_id INT AUTO_INCREMENT PRIMARY KEY,customer_id INT NOT NULL,order_amount DECIMAL(10,2) NOT NULL,order_status VARCHAR(20) DEFAULT 'pending',order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);INSERT INTO orders (customer_id, order_amount, order_status) VALUES
(1, 150.00, 'completed'),
(2, 75.50, 'pending'),
(3, 200.00, 'shipped'),
(1, 45.25, 'completed');
3.2 在 ClickHouse 中配置 MySQL 外部表
连接到 ClickHouse:
docker exec -it clickhouse-server clickhouse-client
在 ClickHouse 中执行以下 SQL:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS analytics;USE analytics;-- 创建 MySQL 外部表
CREATE TABLE mysql_user_activity (id Int32,user_id Int32,activity_type String,activity_data String,created_at DateTime,updated_at DateTime
) ENGINE = MySQL('mysql-server:3306', 'test_db', 'user_activity', 'root', 'rootpassword');-- 创建 MySQL 订单外部表
CREATE TABLE mysql_orders (order_id Int32,customer_id Int32,order_amount Decimal(10,2),order_status String,order_date DateTime,last_updated DateTime
) ENGINE = MySQL('mysql-server:3306', 'test_db', 'orders', 'root', 'rootpassword');
3.3 创建 ClickHouse 内部表(MergeTree)
-- 创建用户活动内部表
CREATE TABLE internal_user_activity (id Int32,user_id Int32,activity_type String,activity_data String,created_at DateTime,updated_at DateTime,_version UInt64 DEFAULT 1,_is_deleted UInt8 DEFAULT 0
) ENGINE = ReplacingMergeTree(_version, _is_deleted)
PARTITION BY toYYYYMM(created_at)
ORDER BY (id, created_at);-- 创建订单内部表(用于增量同步演示)
CREATE TABLE internal_orders (order_id Int32,customer_id Int32,order_amount Decimal(10,2),order_status String,order_date DateTime,last_updated DateTime,_sync_version UInt64 DEFAULT 1
) ENGINE = ReplacingMergeTree(_sync_version)
PARTITION BY toYYYYMM(order_date)
ORDER BY (order_id, order_date);
第四部分:数据同步策略
4.1 全量数据同步
-- 用户活动表全量同步
INSERT INTO analytics.internal_user_activity
SELECT id,user_id,activity_type,activity_data,created_at,updated_at,1 as _version,0 as _is_deleted
FROM analytics.mysql_user_activity;-- 订单表全量同步
INSERT INTO analytics.internal_orders
SELECT order_id,customer_id,order_amount,order_status,order_date,last_updated,1 as _sync_version
FROM analytics.mysql_orders;-- 验证数据同步
SELECT count(*) FROM analytics.internal_user_activity;
SELECT count(*) FROM analytics.internal_orders;
4.2 增量数据同步方案
方案一:基于时间戳的增量同步
-- 创建同步元数据表
CREATE TABLE IF NOT EXISTS analytics.sync_metadata (table_name String,last_sync_time DateTime,_updated DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY table_name;-- 初始化同步时间(如果是第一次同步)
INSERT INTO analytics.sync_metadata (table_name, last_sync_time) VALUES
('user_activity', now() - INTERVAL 1 DAY),
('orders', now() - INTERVAL 1 DAY);
增量同步函数
-- 用户活动表增量同步
INSERT INTO analytics.internal_user_activity
WITH last_sync AS (SELECT last_sync_time FROM analytics.sync_metadata WHERE table_name = 'user_activity'ORDER BY _updated DESC LIMIT 1),new_data AS (SELECT id,user_id,activity_type,activity_data,created_at,updated_at,2 as _version, -- 增加版本号0 as _is_deletedFROM analytics.mysql_user_activity WHERE updated_at > (SELECT last_sync_time FROM last_sync)OR created_at > (SELECT last_sync_time FROM last_sync))
SELECT * FROM new_data;-- 更新同步时间
INSERT INTO analytics.sync_metadata (table_name, last_sync_time)
VALUES ('user_activity', now());
方案二:完整的增量同步存储过程
-- 创建增量同步函数
CREATE TABLE IF NOT EXISTS analytics.data_sync_log (sync_id UUID DEFAULT generateUUIDv4(),table_name String,sync_type String,records_synced UInt64,sync_start_time DateTime,sync_end_time DateTime,status String
) ENGINE = MergeTree()
ORDER BY (table_name, sync_start_time);-- 完整的增量同步过程
INSERT INTO analytics.internal_orders
WITH last_sync AS (SELECT last_sync_time FROM analytics.sync_metadata WHERE table_name = 'orders'ORDER BY _updated DESC LIMIT 1),changed_orders AS (SELECT order_id,customer_id,order_amount,order_status,order_date,last_updated,-- 增加版本号用于处理更新冲突(SELECT max(_sync_version) FROM analytics.internal_orders WHERE order_id = mo.order_id) + 1 as new_versionFROM analytics.mysql_orders moWHERE last_updated > (SELECT last_sync_time FROM last_sync))
SELECT order_id,customer_id,order_amount,order_status,order_date,last_updated,if(new_version = 0, 2, new_version) as _sync_version
FROM changed_orders;-- 记录同步日志
INSERT INTO analytics.data_sync_log (table_name,sync_type,records_synced,sync_start_time,sync_end_time,status
) VALUES ('orders', 'incremental', 1, now() - INTERVAL 10 SECOND, now(), 'success');-- 更新同步时间
INSERT INTO analytics.sync_metadata (table_name, last_sync_time)
VALUES ('orders', now());
4.3 自动化同步脚本
创建同步脚本 sync_mysql_to_ch.sh:
#!/bin/bash# 配置参数
CLICKHOUSE_HOST="localhost"
CLICKHOUSE_PORT="9000"
SYNC_TABLES=("user_activity" "orders")# 同步函数
sync_table() {local table_name=$1echo "开始同步表: $table_name"# 执行增量同步docker exec -i clickhouse-server clickhouse-client \--host $CLICKHOUSE_HOST \--port $CLICKHOUSE_PORT \--query "INSERT INTO analytics.internal_$table_name WITH last_sync AS (SELECT last_sync_time FROM analytics.sync_metadata WHERE table_name = '$table_name'ORDER BY _updated DESC LIMIT 1),new_data AS (SELECT *,(SELECT max(_sync_version) FROM analytics.internal_$table_name WHERE order_id = mo.order_id) + 1 as new_versionFROM analytics.mysql_$table_name moWHERE last_updated > (SELECT last_sync_time FROM last_sync))SELECT * EXCEPT(new_version),if(new_version = 0, 2, new_version) as _sync_versionFROM new_data;-- 更新同步时间INSERT INTO analytics.sync_metadata (table_name, last_sync_time) VALUES ('$table_name', now());"echo "完成同步表: $table_name"
}# 主循环
for table in "${SYNC_TABLES[@]}"; dosync_table "$table"
doneecho "所有表同步完成"
给脚本执行权限并运行:
chmod +x sync_mysql_to_ch.sh
./sync_mysql_to_ch.sh
第五部分:验证和监控
5.1 数据验证查询
-- 比较 MySQL 和 ClickHouse 数据量
SELECT 'mysql' as source, count(*) as count
FROM analytics.mysql_user_activity
UNION ALL
SELECT 'clickhouse' as source, count(*) as count
FROM analytics.internal_user_activity;-- 查看最新同步的数据
SELECT order_id,customer_id,order_amount,order_status,order_date,last_updated
FROM analytics.internal_orders
ORDER BY last_updated DESC
LIMIT 5;-- 检查同步日志
SELECT table_name,sync_type,records_synced,sync_start_time,sync_end_time,status
FROM analytics.data_sync_log
ORDER BY sync_start_time DESC
LIMIT 10;
5.2 性能优化建议
-- 为常用查询字段创建索引
ALTER TABLE analytics.internal_user_activity
ADD INDEX user_id_index user_id TYPE minmax GRANULARITY 1;ALTER TABLE analytics.internal_orders
ADD INDEX customer_status_index (customer_id, order_status) TYPE bloom_filter GRANULARITY 1;-- 优化表设置
ALTER TABLE analytics.internal_user_activity
MODIFY SETTING index_granularity = 8192;-- 创建物化视图用于常用聚合
CREATE MATERIALIZED VIEW analytics.user_activity_daily
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (day, user_id, activity_type)
AS SELECTtoDate(created_at) as day,user_id,activity_type,count() as activity_count
FROM analytics.internal_user_activity
GROUP BY day, user_id, activity_type;
