网站建设通有的网站网速慢
flink-cdc连接的FE,数据写入时正常的,
但通过python脚本使用stream load方式直接连接FE,提示NOT_AUTHORIZED]no valid Basic authorization
,如果改用直接连接BE节点,写入又提示[DATA_QUALITY_ERROR]too many filtered rows
这个就是搞技术最麻烦的地方,提示的异常,有没有具体信息,根本就不知道原因是啥,要猜。
from sqlalchemy import create_engine
import pandas as pd
import math
import requests
from loguru import logger
import gzip
import io
import uuid
from requests.auth import HTTPBasicAuth
import json# MySQL 配置
ziping_config = {'user': 'root','password': '123456','host': 'localhost','port': 3306,'database': 'ziping'
}# 创建 SQLAlchemy 引擎
# ziping_engine = create_engine(
# "mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4".format(**ziping_config)
# )def sync_zp_bazi_info():# 从 MySQL 读取数据ziping_engine = create_engine("mysql+pymysql://{user}:{password}@{host}:{port}/{database}".format(**ziping_config),connect_args={'charset': 'utf8mb4'})logger.info("连接 MySQL 成功")query = "SELECT id, year, month, day, hour, year_xun, month_xun, day_xun, hour_xun, " \"yg, yz, mg, mz, dg, dz, hg, hz, year_shu, month_shu, day_shu, hour_shu, yz_cs, " \"mz_cs, dz_cs, hz_cs, yg_sk, yz_sk, mg_sk, mz_sk, dz_sk, hg_sk, hz_sk, year_ny, " \"month_ny, day_ny, hour_ny, jin_cn, mu_cn, shui_cn, huo_cn, tu_cn, zy_cn, py_cn, " \"zc_cn, pc_cn, zg_cn, qs_cn, ss_cn, sg_cn, bj_cn, jc_cn, ym_gj, md_gj, dh_gj FROM zp_bazi_info"df = pd.read_sql(query, ziping_engine)logger.info("读取数据成功")# Stream Load 配置stream_load_url = "http://10.101.1.31:8040/api/fay/zp_bazi_info/_stream_load"auth = HTTPBasicAuth('root', '123456')headers = {"Content-Encoding":"gzip","Content-Type": "text/csv; charset=UTF-8", # 数据格式为 CSV"Expect": "100-continue", # 支持大文件传输}# 分批次大小batch_size = 100000 # 每批次 10 万条total_rows = len(df)num_batches = math.ceil(total_rows / batch_size)# 分批次导入for i in range(num_batches):start = i * batch_sizeend = min((i + 1) * batch_size, total_rows)batch_df = df[start:end]# 将批次数据转换为 CSVbatch_csv = batch_df.to_csv(index=False, header=False,sep='\t').encode('utf-8')headers["Content-Length"] = str(len(batch_csv))# 压缩 CSV# compressed_data = compress_csv(batch_csv)# 发送 Stream Load 请求headers["label"] = str(uuid.uuid1())response = requests.put(stream_load_url, headers=headers, data=batch_csv,auth=auth,timeout=60)# 检查导入结果result = response.json()if result['Status'] != 'Fail':logger.info(result['Message'])logger.info(f"批次 {i + 1}/{num_batches} 导入成功")else:logger.info(f"批次 {i + 1}/{num_batches} 导入失败")logger.info("错误信息:{}", result['Message'])breakdef compress_csv(batch_csv):buffer = io.BytesIO()with gzip.GzipFile(fileobj=buffer, mode='wb') as f:f.write(batch_csv)compressed_data = buffer.getvalue()return compressed_dataif __name__ == '__main__':sync_zp_bazi_info()
从BE中可以查看到日志,csv文件中到doris中被解析为1列了。看来问题出在batch_df.to_csv(index=False, header=False,sep='\t')
,需要增加sep='\t'
这是将mysql的表结构直接转成doris的表
CREATE TABLE `zp_bazi_info` (`id` CHAR(11) NOT NULL COMMENT "ID",`year` CHAR(2) NOT NULL COMMENT "年柱",`month` CHAR(2) COMMENT "月柱",`day` CHAR(2) COMMENT "日柱",`hour` CHAR(2) COMMENT "时柱",`year_xun` CHAR(2) COMMENT "年柱旬",`month_xun` CHAR(2) COMMENT "月柱旬",`day_xun` CHAR(2) COMMENT "日柱旬",`hour_xun` CHAR(2) COMMENT "时柱旬",`yg` CHAR(1) COMMENT "年干",`yz` CHAR(1) COMMENT "年支",`mg` CHAR(1) COMMENT "月干",`mz` CHAR(1) COMMENT "月支",`dg` CHAR(1) COMMENT "日干",`dz` CHAR(1) COMMENT "日支",`hg` CHAR(1) COMMENT "时干",`hz` CHAR(1) COMMENT "时支",`year_shu` TINYINT COMMENT "年柱数",`month_shu` TINYINT COMMENT "月柱数",`day_shu` TINYINT COMMENT "日柱数",`hour_shu` TINYINT COMMENT "时柱数",`yz_cs` CHAR(2) COMMENT "年长生",`mz_cs` CHAR(2) COMMENT "月长生",`dz_cs` CHAR(2) COMMENT "日长生",`hz_cs` CHAR(2) COMMENT "时长生",`yg_sk` CHAR(1) COMMENT "年干生克",`yz_sk` CHAR(1) COMMENT "年支生克",`mg_sk` CHAR(1) COMMENT "月干生克",`mz_sk` CHAR(1) COMMENT "月支生克",`dz_sk` CHAR(1) COMMENT "日支生克",`hg_sk` CHAR(1) COMMENT "时干生克",`hz_sk` CHAR(1) COMMENT "时支生克",`year_ny` CHAR(3) COMMENT "年纳音",`month_ny` CHAR(3) COMMENT "月纳音",`day_ny` CHAR(3) COMMENT "日纳音",`hour_ny` CHAR(3) COMMENT "时纳音",`jin_cn` TINYINT COMMENT "金数量",`mu_cn` TINYINT COMMENT "木数量",`shui_cn` TINYINT COMMENT "水数量",`huo_cn` TINYINT COMMENT "火数量",`tu_cn` TINYINT COMMENT "土数量",`zy_cn` TINYINT COMMENT "正印数量",`py_cn` TINYINT COMMENT "偏印数量",`zc_cn` TINYINT COMMENT "正财数量",`pc_cn` TINYINT COMMENT "偏财数量",`zg_cn` TINYINT COMMENT "正官数量",`qs_cn` TINYINT COMMENT "七杀数量",`ss_cn` TINYINT COMMENT "食神数量",`sg_cn` TINYINT COMMENT "伤官数量",`bj_cn` TINYINT COMMENT "比肩数量",`jc_cn` TINYINT COMMENT "劫财数量",`ym_gj` CHAR(1) COMMENT "年月拱夹",`md_gj` CHAR(1) COMMENT "月时拱夹",`dh_gj` CHAR(1) COMMENT "日时拱夹"
)
ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES ("replication_num" = "1"
);
但实际写入,报下面的错误,看来错误原因应该是mysql的字节与doris的字节计算不一样。
Reason: column_name[id], the length of input is too long than schema. first 32 bytes of input str: [丁丑 丁未 丁丑 丁未] schema length: 11; actual length: 27; . src line [];
因为id为主键,因此执行下面的语句,会提示Execution failed: Error Failed to execute sql: java.sql.SQLException: (conn=24) errCode = 2, detailMessage = No key column left. index[zp_bazi_info]
ALTER TABLE zp_bazi_info MODIFY COLUMN `id` VARCHAR(32) NOT NULL COMMENT "ID";
这里看一下,flink-cdc是怎么做的。
你可以看到varchar在doris中变成了4倍,utf8mb4 编码(这是 MySQL 默认推荐的 UTF-8 实现,支持完整的 Unicode 字符集,包括表情符号等),flink采取了最坏的保守策略。而char是保持不变。
CREATE TABLE `zp_bazi_info` (`id` VARCHAR(27) NOT NULL COMMENT "ID",`year` VARCHAR(6) NOT NULL COMMENT "年柱",`month` VARCHAR(6) COMMENT "月柱",`day` VARCHAR(6) COMMENT "日柱",`hour` VARCHAR(6) COMMENT "时柱",`year_xun` VARCHAR(6) COMMENT "年柱旬",`month_xun` VARCHAR(6) COMMENT "月柱旬",`day_xun` VARCHAR(6) COMMENT "日柱旬",`hour_xun` VARCHAR(6) COMMENT "时柱旬",`yg` VARCHAR(9) COMMENT "年干",`yz` VARCHAR(9) COMMENT "年支",`mg` VARCHAR(9) COMMENT "月干",`mz` VARCHAR(9) COMMENT "月支",`dg` VARCHAR(9) COMMENT "日干",`dz` VARCHAR(9) COMMENT "日支",`hg` VARCHAR(9) COMMENT "时干",`hz` VARCHAR(9) COMMENT "时支",`year_shu` TINYINT COMMENT "年柱数",`month_shu` TINYINT COMMENT "月柱数",`day_shu` TINYINT COMMENT "日柱数",`hour_shu` TINYINT COMMENT "时柱数",`yz_cs` VARCHAR(6) COMMENT "年长生",`mz_cs` VARCHAR(6) COMMENT "月长生",`dz_cs` VARCHAR(6) COMMENT "日长生",`hz_cs` VARCHAR(6) COMMENT "时长生",`yg_sk` VARCHAR(9) COMMENT "年干生克",`yz_sk` VARCHAR(9) COMMENT "年支生克",`mg_sk` VARCHAR(9) COMMENT "月干生克",`mz_sk` VARCHAR(9) COMMENT "月支生克",`dz_sk` VARCHAR(9) COMMENT "日支生克",`hg_sk` VARCHAR(9) COMMENT "时干生克",`hz_sk` VARCHAR(9) COMMENT "时支生克",`year_ny` VARCHAR(9) COMMENT "年纳音",`month_ny` VARCHAR(9) COMMENT "月纳音",`day_ny` VARCHAR(9) COMMENT "日纳音",`hour_ny` VARCHAR(9) COMMENT "时纳音",`jin_cn` TINYINT COMMENT "金数量",`mu_cn` TINYINT COMMENT "木数量",`shui_cn` TINYINT COMMENT "水数量",`huo_cn` TINYINT COMMENT "火数量",`tu_cn` TINYINT COMMENT "土数量",`zy_cn` TINYINT COMMENT "正印数量",`py_cn` TINYINT COMMENT "偏印数量",`zc_cn` TINYINT COMMENT "正财数量",`pc_cn` TINYINT COMMENT "偏财数量",`zg_cn` TINYINT COMMENT "正官数量",`qs_cn` TINYINT COMMENT "七杀数量",`ss_cn` TINYINT COMMENT "食神数量",`sg_cn` TINYINT COMMENT "伤官数量",`bj_cn` TINYINT COMMENT "比肩数量",`jc_cn` TINYINT COMMENT "劫财数量",`ym_gj` VARCHAR(9) COMMENT "年月拱夹",`md_gj` VARCHAR(9) COMMENT "月时拱夹",`dh_gj` VARCHAR(9) COMMENT "日时拱夹"
)
ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES ("replication_num" = "1"
);
扩展后,数据写入正常了,于是我又验证了,反复执行,看看有没有问题。结果在doris中出现了两条数据。id不是key,为什么会重复写入呢?
因为在Doris中,“Duplicate Key”是一个冗余模型的特性。这个模型的数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。这个Duplicate模型针对日志是可以,但是针对我们的系统表是不合适的,这里就需要采用UNIQUE KEY
51万数据,写入过程日志,doris的数据写入速度还挺快。
2025-03-01 15:27:18.756 | INFO | __main__:sync_zp_bazi_info:33 - 连接 MySQL 成功
2025-03-01 15:29:06.586 | INFO | __main__:sync_zp_bazi_info:40 - 读取数据成功
2025-03-01 15:29:21.388 | INFO | __main__:sync_zp_bazi_info:74 - OK
2025-03-01 15:29:21.388 | INFO | __main__:sync_zp_bazi_info:75 - 批次 1/6 导入成功
2025-03-01 15:29:36.447 | INFO | __main__:sync_zp_bazi_info:74 - OK
2025-03-01 15:29:36.447 | INFO | __main__:sync_zp_bazi_info:75 - 批次 2/6 导入成功
2025-03-01 15:29:52.508 | INFO | __main__:sync_zp_bazi_info:74 - OK
2025-03-01 15:29:52.509 | INFO | __main__:sync_zp_bazi_info:75 - 批次 3/6 导入成功
2025-03-01 15:30:06.747 | INFO | __main__:sync_zp_bazi_info:74 - OK
2025-03-01 15:30:06.747 | INFO | __main__:sync_zp_bazi_info:75 - 批次 4/6 导入成功
2025-03-01 15:30:22.621 | INFO | __main__:sync_zp_bazi_info:74 - OK
2025-03-01 15:30:22.621 | INFO | __main__:sync_zp_bazi_info:75 - 批次 5/6 导入成功
2025-03-01 15:30:24.921 | INFO | __main__:sync_zp_bazi_info:74 - OK
2025-03-01 15:30:24.921 | INFO | __main__:sync_zp_bazi_info:75 - 批次 6/6 导入成功Process finished with exit code 0
执行select * from zp_bazi_info where year='乙丑' and month='戊子'
,mysql需要2.232s,而doris却只需要92ms,doris这个查询比mysql快了24倍。那么为什么doris那么快呢?