当前位置: 首页 > wzjs >正文

影响网站速度因素 dns北京seo顾问推推蛙

影响网站速度因素 dns,北京seo顾问推推蛙,免费网站在线客服系统,免费国外医疗静态网站模板下载正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。 SmartETL框架设计之初…

正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。

SmartETL框架设计之初就考虑到了这个情况,在早期就根据团队的技术栈,实现了对MongoDBMySQLElasticSearchClickHouse等数据库的Extract操作(即Loader组件)和Load操作(即Processor组件)。具体来说,是在wikidata_filter.loader模块和wikidata_filter.iterator模块下分别创建了名为database的子模块,分别实现了相应的数据库组件。

ElasticSearch全文索引数据库操作为例,为了实现将数据写入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter组件,提供基于批量模式将一组JSON对象写入ES中。代码如下:

class ESWriter(BufferedWriter):"""数据写入ES索引中"""def __init__(self, host="localhost",port=9200,username=None,password=None,index=None,buffer_size=1000, **kwargs):super().__init__(buffer_size=buffer_size)self.url = f"http://{host}:{port}"if password:self.auth = (username, password)else:self.auth = Noneself.index_name = indexdef write_batch(self, rows: list):header = {"Content-Type":  "application/json"}lines = []for row in rows:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)break# row_meta = json.dumps({"index": action_row})row_meta = json.dumps({"index": action_row})try:row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)except:passbody = '\n'.join(lines)body += '\n'print(f"{self.url}/{self.index_name} bulk")res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

需要注意,为了提高写ES的效率,ESWriter并不是直接实现JsonIterator,而是继承自BufferedWriter,通过重写write_batch方法,基于ES的bulk接口,实现了批量写入ES。

类似的,为了实现从ES读取数据,可以基于ES检索接口(POST /{index}/_search)或Scroll接口(POST /{index}/_search/scroll)实现检索Loader组件或Scroll模式的Loader组件;为了实现ES数据删除,基于ES删除接口(DELETE /{index}/_doc/{id})实现删除Processor组件;为了判断数据是否存在,基于ES详情接口(GET /{index}/_doc/{id})实现是否存在的Processor组件;……

这种方式比较简单直观,也很容易实现,但是随着应用中需要集成的数据库种类越来越多、数据操作越来越多样化,我们就会发现,为了实现对数据库的访问操作,需要针对每一类操作开发Loader组件或Processor组件,最后数据库相关操作代码就会分散在多个模块、函数中,不便于组件使用、维护和扩展。

有没有可能设计一套专门的机制,实现数据库操作与流程节点分离,同时根据需要进行绑定?当然可以!

出于这样的目的,本文设计了独立的数据库接口体系,并基于函数式组件机制实现数据库独立接口与流程的松耦合绑定。核心过程包括3步:

首先,定义一套数据库操作接口Database,包括数据库级别的表格列表list_tables、获取表格元数据desc_table等和表(集合、索引)级别的写入upsert、扫描scroll、检索search、获取详情get、是否存在exists、删除delete等。这类操作可以根据业务需要就行扩展。类图如下所示:
数据库接口类图
注意,Database接口函数通过使用命名参数,这些参数可以通过SmartETLYAML流程进行配置,方便传递特定数据库的配置。

第二,根据实际需要的数据库类型,实现对应的Database类。事实上,这就是“桥接模式”的应用,将数据库SDK提供的接口转换为本项目的Database接口。以下是一个ElasticSearch实现类(基于ES-HTTP接口)的示例代码:

import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Databaseid_keys = ["_id", "id", "mongo_id"]headers = {'Content-Type': 'application/json','Accept': 'application/json'
}
class ES(Database):"""读取ES指定索引全部数据,支持提供查询条件"""def __init__(self, host: str = "localhost",port: int = 9200,username: str = None,password: str = None,index: str = None,secure: bool = False,**kwargs):self.url = f"{'https' if secure else 'http'}://{host}:{port}"if password:self.auth = HTTPBasicAuth(username, password)else:self.auth = Noneself.index = indexdef search(self, query: dict = None,query_body: dict = None,fetch_size: int = 10,index: str = None,**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = fetch_sizeprint("ES search query_body:", query_body)res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)returnres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)returnhits = res['hits']['hits']for hit in hits:# print(hit)doc = hit.get('_source') or {}doc['_id'] = hit['_id']doc['_score'] = hit['_score']if 'fields' in hit:doc.update(hit['fields'])yield docdef scroll(self, query: dict = None,query_body: dict = None,batch_size: int = 10,fetch_size: int = 10000,index: str = None,_scroll: str = "1m",**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = batch_sizeprint("ES scroll query_body:", query_body)scroll_id = Nonetotal = 0while True:if scroll_id:# 后续请求url = f'{self.url}/_search/scroll'res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)else:# 第一次请求 scrollurl = f'{self.url}/{index}/_search?scroll={_scroll}'res = requests.post(url, auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)breakres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)continueif '_scroll_id' in res:scroll_id = res['_scroll_id']hits = res['hits']['hits']for hit in hits:doc = hit.get('_source') or {}doc['_id'] = hit['_id']yield doctotal += len(hits)if len(hits) < batch_size or 0 < fetch_size <= total:breakif scroll_id:# clear scrollurl = f'{self.url}/_search/scroll'requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})def exists(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}?_source=_id'res = requests.get(url, auth=self.auth)if res.status_code == 200:return res.json().get("found") is Truereturn Falsedef delete(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}'res = requests.delete(url, auth=self.auth)return res.status_code == 200def upsert(self, items: dict or list, index: str = None, **kwargs):index = index or self.indexheader = {"Content-Type": "application/json"}if not isinstance(items, list):items = [items]lines = []for row in items:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)breakrow_meta = json.dumps({"index": action_row})row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)body = '\n'.join(lines)body += '\n'print(f"{self.url}/{index} bulk")res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

SmartETL项目中根据目前业务流程需求,初步实现了ElasticSearchClickHouseMongoDBMySQLPostgreSQLQdrantSQLiteMinIO等8类数据库,类结构图如下所示:
数据库类图结构
第三,定义一组数据库操作的代理函数,将流程的数据库调用需求转发给数据库组件。目前定义为gestata.dbops模块,代码很简单,如下所示:

from wikidata_filter.util.database.base import Databasedef tables(db: Database, *database_list, columns: bool = False):if database_list:for database in database_list:for table in db.list_tables(database):yield {"name": table,"columns": db.desc_table(table, database) if columns else [],"database": database}else:for table in db.list_tables():yield {"name": table,"columns": db.desc_table(table) if columns else []}def search(db: Database, **kwargs):return db.search(**kwargs)def scroll(db: Database, **kwargs):return db.scroll(**kwargs)def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return rowdef delete(_id, db: Database, **kwargs):return db.delete(_id, **kwargs)def exists(_id, db: Database, **kwargs) -> bool:return db.exists(_id, **kwargs)def get(_id, db: Database, **kwargs):return db.get(_id, **kwargs)

这里需要注意各个函数的返回值,大多数直接返回数据库组件对应方法的执行结果,但upsert返回的是输入参数。这是为什么呢?这样就能够将数据库写入操作作为流程的中间节点,也就是说数据经过入库流程,但没有终止,而是继续流入后续节点中,入下图所示:
入库流程示意
至此,我们完成了将数据库组件与流程节点组件进行解耦的设计。下面来看一个应用案例,通过读取arXiv数据集(来自kaggle,可参考这篇文章),写入ElasticSearch(建立全文索引),流程定义如下:

from: local/db_envs.yamlname: load arXiv meta flow
description: 读取arXiv-meta数据集,写入ES
arguments: 1consts:type_mapping:all: .jsonlnodes:es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')select: SelectVal('data')rename: RenameFields(id='_id')change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引write_es: Map('gestata.dbops.upsert', es)loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)

源代码及流程定义详见SmartETL项目。需要注意,由于项目持续演化,目前除了Kafka,其他数据库组件都已经按照新的方式完成重构。

总结:本文阐述了SmartETL项目中的数据库与流程解耦的设计,包括动机、目的、设计思路、应用案例。作为软件设计中的一条基本原则,高内聚、松耦合是我们持续追求的目标,也只有好的设计,才能让我们的代码能够易于维护与扩展,从而快速响应业务需求,降低开发成本。

http://www.dtcms.com/wzjs/190570.html

相关文章:

  • wordpress打电话聊插件太原seo
  • 淘宝网站都是怎么做的搜狐视频
  • seo研究中心南宁线下seo软件全套
  • 专门做书籍设计的网站搜索引擎网站优化推广
  • 石家庄网站建设电话求好用的seo软件
  • h5页面设计软件搜索引擎环境优化
  • 任丘住房建设局网站时空seo助手
  • cs5怎么做网站网络营销软件排行
  • 中英文网站建设品牌推广
  • 哪些网站做彩票预测途径南京百度关键字优化价格
  • 给别人做的网站涉及到违法seo网站优化课程
  • 学做网站能赚多少钱阿里巴巴指数查询
  • 做地税电子签章的网站社交网络的推广方法
  • 做网站的价格贵吗今天大事件新闻
  • 可以建站的网站头条新闻 最新消息条
  • 网络技术服务郑州seo公司排名
  • 微信视频网站怎么做微博营销案例
  • drupal做新闻网站长沙百度关键词搜索
  • 广东高端网站建设报价建立个人网站
  • 廊坊网站建设总部在哪里网络营销怎么推广
  • 官方网站开发seo的方法
  • 东莞网站建设最牛百度视频推广
  • 医院网站加快建设方案龙华线上推广
  • 夜夜做新郎网站百度推广有哪些售后服务
  • 广州10大网站开发海南seo排名优化公司
  • 临沂网站建设网站推广seo网站推广推荐
  • wordpress资讯网站模板微信软文范例
  • 通过高权重网站做长尾关键词如何在网上做销售推广
  • 网站建设实际总结百度搜索关键词排名优化推广
  • 网站开发分层抖音seo系统