当前位置: 首页 > wzjs >正文

wordpress开启多站点功大数据获客系统

wordpress开启多站点功,大数据获客系统,南昌专业做网站公司有哪些,巴彦淖尔seo下面是一个使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步的Python解决方案,包含全量同步、增量同步和测试用例。 此解决方案提供了生产级数据同步所需的核心功能,可根据具体场景扩展更多高级特性如:数据转换、字段映射…

下面是一个使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步的Python解决方案,包含全量同步、增量同步和测试用例。

此解决方案提供了生产级数据同步所需的核心功能,可根据具体场景扩展更多高级特性如:数据转换、字段映射、类型转换等。

设计思路

  1. 全量同步:首次运行时将MySQL表完整导入ClickHouse
  2. 增量同步:基于增量字段(如自增ID或时间戳)同步新增数据
  3. 状态管理:使用JSON文件记录同步位置
  4. 错误处理:完善的日志和异常处理机制
import subprocess
import json
import os
import logging
from configparser import ConfigParser# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)class MySQLToClickHouseSync:def __init__(self, config_path='config.ini'):self.config = self._load_config(config_path)self.state_file = self.config['state_file']self.last_state = self._load_state()def _load_config(self, path):"""加载配置文件"""config = ConfigParser()config.read(path)return {'mysql': dict(config['mysql']),'clickhouse': dict(config['clickhouse']),'state_file': config['general']['state_file']}def _load_state(self):"""加载同步状态"""try:if os.path.exists(self.state_file):with open(self.state_file, 'r') as f:return json.load(f)return {'last_id': 0, 'last_timestamp': '1970-01-01 00:00:00'}except Exception as e:logger.error(f"加载状态失败: {e}")return {'last_id': 0, 'last_timestamp': '1970-01-01 00:00:00'}def _save_state(self, state):"""保存同步状态"""try:with open(self.state_file, 'w') as f:json.dump(state, f)logger.info(f"状态已保存: {state}")except Exception as e:logger.error(f"保存状态失败: {e}")def run_clickhouse_command(self, query):"""执行clickhouse-local命令"""cmd = ['clickhouse-local','--query', query]try:result = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True,check=True)logger.debug(f"命令执行成功: {cmd}\n输出: {result.stdout}")return Trueexcept subprocess.CalledProcessError as e:logger.error(f"命令执行失败: {cmd}\n错误: {e.stderr}")return Falsedef full_sync(self):"""全量数据同步"""mysql = self.config['mysql']ch = self.config['clickhouse']query = f"""CREATE TABLE {ch['table']} ENGINE = MergeTree ORDER BY id ASSELECT *FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')"""logger.info("开始全量同步...")if self.run_clickhouse_command(query):# 获取最新ID作为增量起点max_id_query = f"""SELECT max(id) FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')"""cmd = ['clickhouse-local', '--query', max_id_query]result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0:new_state = {'last_id': int(result.stdout.strip())}self._save_state(new_state)self.last_state = new_statelogger.info("全量同步完成")return Truereturn Falsedef incremental_sync(self):"""增量数据同步"""mysql = self.config['mysql']ch = self.config['clickhouse']last_id = self.last_state.get('last_id', 0)query = f"""INSERT INTO {ch['table']}SELECT *FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')WHERE id > {last_id}"""logger.info(f"开始增量同步, 最后ID: {last_id}")if self.run_clickhouse_command(query):# 获取新增的最大IDnew_max_query = f"""SELECT max(id) FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')WHERE id > {last_id}"""cmd = ['clickhouse-local', '--query', new_max_query]result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0 and result.stdout.strip():new_id = int(result.stdout.strip())if new_id > last_id:self._save_state({'last_id': new_id})self.last_state = {'last_id': new_id}logger.info(f"增量同步完成, 新最后ID: {new_id}")else:logger.info("没有新数据需要同步")return Truereturn False# 配置文件示例 (config.ini)
"""
[general]
state_file = sync_state.json[mysql]
host = 127.0.0.1
port = 3306
database = test_db
table = source_table
user = root
password = mysqlpass[clickhouse]
table = default.target_table
"""if __name__ == "__main__":sync = MySQLToClickHouseSync()# 首次运行全量同步if not sync.last_state.get('last_id'):sync.full_sync()# 后续增量同步sync.incremental_sync()

测试用例

import unittest
import sqlite3
from unittest.mock import patch, MagicMock
import tempfile
import os
import jsonclass TestMySQLToClickHouseSync(unittest.TestCase):def setUp(self):self.config = {'state_file': 'test_state.json','mysql': {'host': '127.0.0.1','port': '3306','database': 'test_db','table': 'source_table','user': 'root','password': 'pass'},'clickhouse': {'table': 'target_table'}}# 创建临时状态文件self.state_file = tempfile.NamedTemporaryFile(delete=False)self.config['state_file'] = self.state_file.namedef tearDown(self):os.unlink(self.state_file.name)def test_full_sync(self):"""测试全量同步"""with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch.object(MySQLToClickHouseSync, '_load_state', return_value={'last_id': 0}), \patch('subprocess.run') as mock_run:# 模拟clickhouse-local成功执行mock_run.return_value = MagicMock(returncode=0, stdout="100")sync = MySQLToClickHouseSync()result = sync.full_sync()# 验证命令执行self.assertTrue(mock_run.called)self.assertTrue(result)# 验证状态更新with open(self.state_file.name) as f:state = json.load(f)self.assertEqual(state['last_id'], 100)def test_incremental_sync(self):"""测试增量同步"""# 初始状态with open(self.state_file.name, 'w') as f:json.dump({'last_id': 50}, f)with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch('subprocess.run') as mock_run:# 模拟获取新最大ID为75mock_run.side_effect = [MagicMock(returncode=0),  # INSERT执行MagicMock(returncode=0, stdout="75")  # SELECT max(id)]sync = MySQLToClickHouseSync()result = sync.incremental_sync()# 验证命令执行self.assertEqual(mock_run.call_count, 2)self.assertTrue(result)# 验证状态更新with open(self.state_file.name) as f:state = json.load(f)self.assertEqual(state['last_id'], 75)def test_no_new_data(self):"""测试无新数据的情况"""with open(self.state_file.name, 'w') as f:json.dump({'last_id': 100}, f)with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch('subprocess.run') as mock_run:# 模拟返回空结果mock_run.side_effect = [MagicMock(returncode=0),MagicMock(returncode=0, stdout="")]sync = MySQLToClickHouseSync()result = sync.incremental_sync()self.assertTrue(result)# 状态应保持不变self.assertEqual(sync.last_state['last_id'], 100)def test_command_failure(self):"""测试命令执行失败"""with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch('subprocess.run') as mock_run:mock_run.side_effect = subprocess.CalledProcessError(1, "cmd", output="", stderr="Error")sync = MySQLToClickHouseSync()result = sync.full_sync()self.assertFalse(result)if __name__ == '__main__':unittest.main()

使用说明

  1. 安装依赖:
pip install configparser
  1. 准备配置文件 (config.ini):
[general]
state_file = sync_state.json[mysql]
host = 127.0.0.1
port = 3306
database = your_db
table = source_table
user = root
password = your_mysql_password[clickhouse]
table = default.target_table
  1. 创建ClickHouse表 (自动创建):
-- 首次运行时会自动创建表
-- 表结构自动从MySQL继承
  1. 运行同步:
# 首次运行(全量同步)
python sync.py# 后续运行(增量同步)
python sync.py

关键特性

  1. 高效同步

    • 使用clickhouse-local直接管道传输,无需中间存储
    • 批量数据加载,避免逐行插入
  2. 增量同步机制

    • 基于自增ID的增量检测
    • 支持时间戳字段(需修改WHERE条件)
  3. 状态管理

    • JSON文件记录最后同步位置
    • 支持异常恢复
  4. 错误处理

    • 详细日志记录
    • 子进程错误捕获
    • 状态文件异常处理
  5. 配置驱动

    • 所有参数通过配置文件管理
    • 敏感信息与代码分离

性能优化建议

  1. 大表分批次同步
# 在全量同步中增加分页逻辑
BATCH_SIZE = 100000
for offset in range(0, total_count, BATCH_SIZE):query = f"SELECT * FROM ... LIMIT {BATCH_SIZE} OFFSET {offset}"
  1. 使用时间戳增量
# 修改增量查询条件
WHERE update_time > '{last_timestamp}'
  1. 并行处理
# 使用ThreadPoolExecutor并行处理不同数据分区
from concurrent.futures import ThreadPoolExecutor
  1. 压缩传输
# 在命令中添加压缩选项
clickhouse-local --query "..." | gzip | clickhouse-client --query "INSERT ..."
http://www.dtcms.com/wzjs/124191.html

相关文章:

  • 做天猫网站价格表如何点击优化神马关键词排名
  • 贵州黔东南双控体系建设网站网络营销代运营外包公司
  • 做公司网站 烟台电脑网络优化软件
  • 企业网站建设如何去规划优化新十条
  • 网站建设找工作营销新闻
  • 浦东网站开发培训网络广告网站
  • 网站建设创新点百度权重怎么看
  • window服务器如何做网站访问网站结构有哪几种
  • 专门做防盗门的网站方象科技专注于什么领域
  • 有什么网站可以免费做四六级模拟题电商运营自学网站
  • 网站服务器购买价格快速收录工具
  • 网站建设技术网站建刷粉网站推广便宜
  • wordpress 企业主题下载长春网站优化咨询
  • 做微信的微网站费用多少免费b站推广软件
  • 成都网站建设 erp百度电话号码查询平台
  • cf辅助如何做代理拿网站推广app最快的方法
  • 广州一起做网店网站长沙网络推广外包费用
  • 有和wind一样做用网站seo入口
  • 小伙做钓鱼网站 背警方带走河南今日重大新闻
  • 常德网站建设哪家权威网址查询
  • 动态网站开发语言都有哪些网上营销网站
  • vps的网站打不开廊坊百度快照优化哪家服务好
  • 网站的建设价格营销策略有哪些4种
  • 宁波网站优化价格甘肃seo网站
  • 投票网站怎么制作seo能干一辈子吗
  • 建设政府门户网站的意义有哪些超级优化
  • 重庆网站建设培训机构学费网络推广文案
  • 网站栏目关键词付费推广有几种方式
  • 泰顺做网站google seo实战教程
  • 网站建设代理开发科技企业服务长沙seo网络营销推广