当前位置: 首页 > news >正文

SmartETL中数据库操作与流程解耦的设计与应用

正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。

SmartETL框架设计之初就考虑到了这个情况,在早期就根据团队的技术栈,实现了对MongoDBMySQLElasticSearchClickHouse等数据库的Extract操作(即Loader组件)和Load操作(即Processor组件)。具体来说,是在wikidata_filter.loader模块和wikidata_filter.iterator模块下分别创建了名为database的子模块,分别实现了相应的数据库组件。

ElasticSearch全文索引数据库操作为例,为了实现将数据写入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter组件,提供基于批量模式将一组JSON对象写入ES中。代码如下:

class ESWriter(BufferedWriter):"""数据写入ES索引中"""def __init__(self, host="localhost",port=9200,username=None,password=None,index=None,buffer_size=1000, **kwargs):super().__init__(buffer_size=buffer_size)self.url = f"http://{host}:{port}"if password:self.auth = (username, password)else:self.auth = Noneself.index_name = indexdef write_batch(self, rows: list):header = {"Content-Type":  "application/json"}lines = []for row in rows:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)break# row_meta = json.dumps({"index": action_row})row_meta = json.dumps({"index": action_row})try:row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)except:passbody = '\n'.join(lines)body += '\n'print(f"{self.url}/{self.index_name} bulk")res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

需要注意,为了提高写ES的效率,ESWriter并不是直接实现JsonIterator,而是继承自BufferedWriter,通过重写write_batch方法,基于ES的bulk接口,实现了批量写入ES。

类似的,为了实现从ES读取数据,可以基于ES检索接口(POST /{index}/_search)或Scroll接口(POST /{index}/_search/scroll)实现检索Loader组件或Scroll模式的Loader组件;为了实现ES数据删除,基于ES删除接口(DELETE /{index}/_doc/{id})实现删除Processor组件;为了判断数据是否存在,基于ES详情接口(GET /{index}/_doc/{id})实现是否存在的Processor组件;……

这种方式比较简单直观,也很容易实现,但是随着应用中需要集成的数据库种类越来越多、数据操作越来越多样化,我们就会发现,为了实现对数据库的访问操作,需要针对每一类操作开发Loader组件或Processor组件,最后数据库相关操作代码就会分散在多个模块、函数中,不便于组件使用、维护和扩展。

有没有可能设计一套专门的机制,实现数据库操作与流程节点分离,同时根据需要进行绑定?当然可以!

出于这样的目的,本文设计了独立的数据库接口体系,并基于函数式组件机制实现数据库独立接口与流程的松耦合绑定。核心过程包括3步:

首先,定义一套数据库操作接口Database,包括数据库级别的表格列表list_tables、获取表格元数据desc_table等和表(集合、索引)级别的写入upsert、扫描scroll、检索search、获取详情get、是否存在exists、删除delete等。这类操作可以根据业务需要就行扩展。类图如下所示:
数据库接口类图
注意,Database接口函数通过使用命名参数,这些参数可以通过SmartETLYAML流程进行配置,方便传递特定数据库的配置。

第二,根据实际需要的数据库类型,实现对应的Database类。事实上,这就是“桥接模式”的应用,将数据库SDK提供的接口转换为本项目的Database接口。以下是一个ElasticSearch实现类(基于ES-HTTP接口)的示例代码:

import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Databaseid_keys = ["_id", "id", "mongo_id"]headers = {'Content-Type': 'application/json','Accept': 'application/json'
}
class ES(Database):"""读取ES指定索引全部数据,支持提供查询条件"""def __init__(self, host: str = "localhost",port: int = 9200,username: str = None,password: str = None,index: str = None,secure: bool = False,**kwargs):self.url = f"{'https' if secure else 'http'}://{host}:{port}"if password:self.auth = HTTPBasicAuth(username, password)else:self.auth = Noneself.index = indexdef search(self, query: dict = None,query_body: dict = None,fetch_size: int = 10,index: str = None,**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = fetch_sizeprint("ES search query_body:", query_body)res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)returnres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)returnhits = res['hits']['hits']for hit in hits:# print(hit)doc = hit.get('_source') or {}doc['_id'] = hit['_id']doc['_score'] = hit['_score']if 'fields' in hit:doc.update(hit['fields'])yield docdef scroll(self, query: dict = None,query_body: dict = None,batch_size: int = 10,fetch_size: int = 10000,index: str = None,_scroll: str = "1m",**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = batch_sizeprint("ES scroll query_body:", query_body)scroll_id = Nonetotal = 0while True:if scroll_id:# 后续请求url = f'{self.url}/_search/scroll'res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)else:# 第一次请求 scrollurl = f'{self.url}/{index}/_search?scroll={_scroll}'res = requests.post(url, auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)breakres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)continueif '_scroll_id' in res:scroll_id = res['_scroll_id']hits = res['hits']['hits']for hit in hits:doc = hit.get('_source') or {}doc['_id'] = hit['_id']yield doctotal += len(hits)if len(hits) < batch_size or 0 < fetch_size <= total:breakif scroll_id:# clear scrollurl = f'{self.url}/_search/scroll'requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})def exists(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}?_source=_id'res = requests.get(url, auth=self.auth)if res.status_code == 200:return res.json().get("found") is Truereturn Falsedef delete(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}'res = requests.delete(url, auth=self.auth)return res.status_code == 200def upsert(self, items: dict or list, index: str = None, **kwargs):index = index or self.indexheader = {"Content-Type": "application/json"}if not isinstance(items, list):items = [items]lines = []for row in items:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)breakrow_meta = json.dumps({"index": action_row})row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)body = '\n'.join(lines)body += '\n'print(f"{self.url}/{index} bulk")res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

SmartETL项目中根据目前业务流程需求,初步实现了ElasticSearchClickHouseMongoDBMySQLPostgreSQLQdrantSQLiteMinIO等8类数据库,类结构图如下所示:
数据库类图结构
第三,定义一组数据库操作的代理函数,将流程的数据库调用需求转发给数据库组件。目前定义为gestata.dbops模块,代码很简单,如下所示:

from wikidata_filter.util.database.base import Databasedef tables(db: Database, *database_list, columns: bool = False):if database_list:for database in database_list:for table in db.list_tables(database):yield {"name": table,"columns": db.desc_table(table, database) if columns else [],"database": database}else:for table in db.list_tables():yield {"name": table,"columns": db.desc_table(table) if columns else []}def search(db: Database, **kwargs):return db.search(**kwargs)def scroll(db: Database, **kwargs):return db.scroll(**kwargs)def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return rowdef delete(_id, db: Database, **kwargs):return db.delete(_id, **kwargs)def exists(_id, db: Database, **kwargs) -> bool:return db.exists(_id, **kwargs)def get(_id, db: Database, **kwargs):return db.get(_id, **kwargs)

这里需要注意各个函数的返回值,大多数直接返回数据库组件对应方法的执行结果,但upsert返回的是输入参数。这是为什么呢?这样就能够将数据库写入操作作为流程的中间节点,也就是说数据经过入库流程,但没有终止,而是继续流入后续节点中,入下图所示:
入库流程示意
至此,我们完成了将数据库组件与流程节点组件进行解耦的设计。下面来看一个应用案例,通过读取arXiv数据集(来自kaggle,可参考这篇文章),写入ElasticSearch(建立全文索引),流程定义如下:

from: local/db_envs.yamlname: load arXiv meta flow
description: 读取arXiv-meta数据集,写入ES
arguments: 1consts:type_mapping:all: .jsonlnodes:es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')select: SelectVal('data')rename: RenameFields(id='_id')change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引write_es: Map('gestata.dbops.upsert', es)loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)

源代码及流程定义详见SmartETL项目。需要注意,由于项目持续演化,目前除了Kafka,其他数据库组件都已经按照新的方式完成重构。

总结:本文阐述了SmartETL项目中的数据库与流程解耦的设计,包括动机、目的、设计思路、应用案例。作为软件设计中的一条基本原则,高内聚、松耦合是我们持续追求的目标,也只有好的设计,才能让我们的代码能够易于维护与扩展,从而快速响应业务需求,降低开发成本。

相关文章:

  • Stewart并联结构两自由度正逆解计算和工作空间仿真
  • 使用asyncio构建高性能网络爬虫
  • for each循环语句
  • FPGA芯片的配置方法
  • [Java 基础]HashMap
  • Element Plus el-button实例类型详解
  • 什么是数字签名(ECDSA)?
  • 非阻塞 IO
  • Java面试题030:一文深入了解MySQL(2)
  • 将文档转为数据集
  • FPGA内部资源介绍
  • tf serving和torch serve哪个耗时更低
  • 数据结构与算法总概
  • 鸿蒙 Grid 与 GridItem 深度解析:二维网格布局解决方案
  • 操作系统的概述之二
  • 探索钉钉生态中的宜搭:创建与分享应用的新视界
  • SpringMVC系列(六)(Restful架构风格(中))
  • 每日算法刷题Day40 6.27:leetcode前缀和3道题,用时1h20min
  • 鸿蒙5:布局组件
  • Linux环境安装Redis的多种方式分析