当前位置: 首页 > wzjs >正文

商城微网站模板宝鸡网站开发公司

商城微网站模板,宝鸡网站开发公司,做网站用c 还是php,做农产品网站正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。 SmartETL框架设计之初…

正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。

SmartETL框架设计之初就考虑到了这个情况,在早期就根据团队的技术栈,实现了对MongoDBMySQLElasticSearchClickHouse等数据库的Extract操作(即Loader组件)和Load操作(即Processor组件)。具体来说,是在wikidata_filter.loader模块和wikidata_filter.iterator模块下分别创建了名为database的子模块,分别实现了相应的数据库组件。

ElasticSearch全文索引数据库操作为例,为了实现将数据写入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter组件,提供基于批量模式将一组JSON对象写入ES中。代码如下:

class ESWriter(BufferedWriter):"""数据写入ES索引中"""def __init__(self, host="localhost",port=9200,username=None,password=None,index=None,buffer_size=1000, **kwargs):super().__init__(buffer_size=buffer_size)self.url = f"http://{host}:{port}"if password:self.auth = (username, password)else:self.auth = Noneself.index_name = indexdef write_batch(self, rows: list):header = {"Content-Type":  "application/json"}lines = []for row in rows:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)break# row_meta = json.dumps({"index": action_row})row_meta = json.dumps({"index": action_row})try:row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)except:passbody = '\n'.join(lines)body += '\n'print(f"{self.url}/{self.index_name} bulk")res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

需要注意,为了提高写ES的效率,ESWriter并不是直接实现JsonIterator,而是继承自BufferedWriter,通过重写write_batch方法,基于ES的bulk接口,实现了批量写入ES。

类似的,为了实现从ES读取数据,可以基于ES检索接口(POST /{index}/_search)或Scroll接口(POST /{index}/_search/scroll)实现检索Loader组件或Scroll模式的Loader组件;为了实现ES数据删除,基于ES删除接口(DELETE /{index}/_doc/{id})实现删除Processor组件;为了判断数据是否存在,基于ES详情接口(GET /{index}/_doc/{id})实现是否存在的Processor组件;……

这种方式比较简单直观,也很容易实现,但是随着应用中需要集成的数据库种类越来越多、数据操作越来越多样化,我们就会发现,为了实现对数据库的访问操作,需要针对每一类操作开发Loader组件或Processor组件,最后数据库相关操作代码就会分散在多个模块、函数中,不便于组件使用、维护和扩展。

有没有可能设计一套专门的机制,实现数据库操作与流程节点分离,同时根据需要进行绑定?当然可以!

出于这样的目的,本文设计了独立的数据库接口体系,并基于函数式组件机制实现数据库独立接口与流程的松耦合绑定。核心过程包括3步:

首先,定义一套数据库操作接口Database,包括数据库级别的表格列表list_tables、获取表格元数据desc_table等和表(集合、索引)级别的写入upsert、扫描scroll、检索search、获取详情get、是否存在exists、删除delete等。这类操作可以根据业务需要就行扩展。类图如下所示:
数据库接口类图
注意,Database接口函数通过使用命名参数,这些参数可以通过SmartETLYAML流程进行配置,方便传递特定数据库的配置。

第二,根据实际需要的数据库类型,实现对应的Database类。事实上,这就是“桥接模式”的应用,将数据库SDK提供的接口转换为本项目的Database接口。以下是一个ElasticSearch实现类(基于ES-HTTP接口)的示例代码:

import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Databaseid_keys = ["_id", "id", "mongo_id"]headers = {'Content-Type': 'application/json','Accept': 'application/json'
}
class ES(Database):"""读取ES指定索引全部数据,支持提供查询条件"""def __init__(self, host: str = "localhost",port: int = 9200,username: str = None,password: str = None,index: str = None,secure: bool = False,**kwargs):self.url = f"{'https' if secure else 'http'}://{host}:{port}"if password:self.auth = HTTPBasicAuth(username, password)else:self.auth = Noneself.index = indexdef search(self, query: dict = None,query_body: dict = None,fetch_size: int = 10,index: str = None,**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = fetch_sizeprint("ES search query_body:", query_body)res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)returnres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)returnhits = res['hits']['hits']for hit in hits:# print(hit)doc = hit.get('_source') or {}doc['_id'] = hit['_id']doc['_score'] = hit['_score']if 'fields' in hit:doc.update(hit['fields'])yield docdef scroll(self, query: dict = None,query_body: dict = None,batch_size: int = 10,fetch_size: int = 10000,index: str = None,_scroll: str = "1m",**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = batch_sizeprint("ES scroll query_body:", query_body)scroll_id = Nonetotal = 0while True:if scroll_id:# 后续请求url = f'{self.url}/_search/scroll'res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)else:# 第一次请求 scrollurl = f'{self.url}/{index}/_search?scroll={_scroll}'res = requests.post(url, auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)breakres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)continueif '_scroll_id' in res:scroll_id = res['_scroll_id']hits = res['hits']['hits']for hit in hits:doc = hit.get('_source') or {}doc['_id'] = hit['_id']yield doctotal += len(hits)if len(hits) < batch_size or 0 < fetch_size <= total:breakif scroll_id:# clear scrollurl = f'{self.url}/_search/scroll'requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})def exists(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}?_source=_id'res = requests.get(url, auth=self.auth)if res.status_code == 200:return res.json().get("found") is Truereturn Falsedef delete(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}'res = requests.delete(url, auth=self.auth)return res.status_code == 200def upsert(self, items: dict or list, index: str = None, **kwargs):index = index or self.indexheader = {"Content-Type": "application/json"}if not isinstance(items, list):items = [items]lines = []for row in items:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)breakrow_meta = json.dumps({"index": action_row})row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)body = '\n'.join(lines)body += '\n'print(f"{self.url}/{index} bulk")res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

SmartETL项目中根据目前业务流程需求,初步实现了ElasticSearchClickHouseMongoDBMySQLPostgreSQLQdrantSQLiteMinIO等8类数据库,类结构图如下所示:
数据库类图结构
第三,定义一组数据库操作的代理函数,将流程的数据库调用需求转发给数据库组件。目前定义为gestata.dbops模块,代码很简单,如下所示:

from wikidata_filter.util.database.base import Databasedef tables(db: Database, *database_list, columns: bool = False):if database_list:for database in database_list:for table in db.list_tables(database):yield {"name": table,"columns": db.desc_table(table, database) if columns else [],"database": database}else:for table in db.list_tables():yield {"name": table,"columns": db.desc_table(table) if columns else []}def search(db: Database, **kwargs):return db.search(**kwargs)def scroll(db: Database, **kwargs):return db.scroll(**kwargs)def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return rowdef delete(_id, db: Database, **kwargs):return db.delete(_id, **kwargs)def exists(_id, db: Database, **kwargs) -> bool:return db.exists(_id, **kwargs)def get(_id, db: Database, **kwargs):return db.get(_id, **kwargs)

这里需要注意各个函数的返回值,大多数直接返回数据库组件对应方法的执行结果,但upsert返回的是输入参数。这是为什么呢?这样就能够将数据库写入操作作为流程的中间节点,也就是说数据经过入库流程,但没有终止,而是继续流入后续节点中,入下图所示:
入库流程示意
至此,我们完成了将数据库组件与流程节点组件进行解耦的设计。下面来看一个应用案例,通过读取arXiv数据集(来自kaggle,可参考这篇文章),写入ElasticSearch(建立全文索引),流程定义如下:

from: local/db_envs.yamlname: load arXiv meta flow
description: 读取arXiv-meta数据集,写入ES
arguments: 1consts:type_mapping:all: .jsonlnodes:es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')select: SelectVal('data')rename: RenameFields(id='_id')change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引write_es: Map('gestata.dbops.upsert', es)loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)

源代码及流程定义详见SmartETL项目。需要注意,由于项目持续演化,目前除了Kafka,其他数据库组件都已经按照新的方式完成重构。

总结:本文阐述了SmartETL项目中的数据库与流程解耦的设计,包括动机、目的、设计思路、应用案例。作为软件设计中的一条基本原则,高内聚、松耦合是我们持续追求的目标,也只有好的设计,才能让我们的代码能够易于维护与扩展,从而快速响应业务需求,降低开发成本。

http://www.dtcms.com/wzjs/291950.html

相关文章:

  • 语文答题模板高中qq排名优化网站
  • 房地产新闻建发360优化大师官方下载
  • 网站交给别人做安全吗北京建公司网站价格
  • 怎么把园林设计网站做的酷炫seo整合营销
  • 建设网站如何挣钱百度网址安全检测中心
  • 企业做网站公司怎么做google网站推广
  • 网站建设取得了优化营商环境心得体会
  • 网站制作合作协议平台交易网
  • 定位网站关键词在线crm软件
  • 广州做啊里巴巴网站多少钱网店运营工作内容
  • 电子商务网站的设计工具沧州网站建设公司
  • 网站如何做百度权重官方百度平台
  • 自建商城网站营销的手段和方法
  • 做网站媒体b2b免费推广平台
  • 怎么做一网站首页seo基础教程使用
  • 帝国手机网站模板seo推广怎么样
  • 广州平台网站建设长春网站建设方案推广
  • 有没有专门做家纺的网站新手怎么入行sem
  • 珠海商城网站制作深圳seo
  • 典型b2c模式的网站保定网站建设公司哪家好
  • 网站顶部展出的大幅广告百度指数功能模块有哪些
  • 政府投资类网站建设单位今日最新新闻摘抄
  • 建设网站业务不好做seo优化搜索推广
  • 蚌埠做网站多少钱公司推广咨询
  • 高职图书馆网站建设大赛广州百度竞价托管
  • 网站建设yuanmus外贸seo
  • 产品视频宣传片长春百度关键词优化
  • wordpress上传网站独立源码seo公司 彼亿营销
  • 水利部网站建设管理司优化推广排名网站教程
  • 开企网站建设公司网站建设公司