正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。
SmartETL框架设计之初就考虑到了这个情况,在早期就根据团队的技术栈,实现了对MongoDB
、MySQL
、ElasticSearch
、ClickHouse
等数据库的Extract操作(即Loader
组件)和Load操作(即Processor
组件)。具体来说,是在wikidata_filter.loader
模块和wikidata_filter.iterator
模块下分别创建了名为database
的子模块,分别实现了相应的数据库组件。
以ElasticSearch
全文索引数据库操作为例,为了实现将数据写入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter
组件,提供基于批量模式将一组JSON对象写入ES中。代码如下:
class ESWriter(BufferedWriter):
"""
数据写入ES索引中
"""
def __init__(self, host="localhost",
port=9200,
username=None,
password=None,
index=None,
buffer_size=1000, **kwargs):
super().__init__(buffer_size=buffer_size)
self.url = f"http://{host}:{port}"
if password:
self.auth = (username, password)
else:
self.auth = None
self.index_name = index
def write_batch(self, rows: list):
header = {
"Content-Type": "application/json"
}
lines = []
for row in rows:
action_row = {}
for key in id_keys:
if key in row:
action_row["_id"] = row.pop(key)
break
# row_meta = json.dumps({"index": action_row})
row_meta = json.dumps({"index": action_row})
try:
row_data = json.dumps(row)
lines.append(row_meta)
lines.append(row_data)
except:
pass
body = '\n'.join(lines)
body += '\n'
print(f"{self.url}/{self.index_name} bulk")
res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)
if res.status_code != 200:
print("Warning, ES bulk load failed:", res.text)
return False
return True
需要注意,为了提高写ES的效率,ESWriter并不是直接实现JsonIterator
,而是继承自BufferedWriter
,通过重写write_batch
方法,基于ES的bulk
接口,实现了批量写入ES。
类似的,为了实现从ES读取数据,可以基于ES检索接口(POST /{index}/_search
)或Scroll接口(POST /{index}/_search/scroll
)实现检索Loader组件或Scroll模式的Loader组件;为了实现ES数据删除,基于ES删除接口(DELETE /{index}/_doc/{id}
)实现删除Processor组件;为了判断数据是否存在,基于ES详情接口(GET /{index}/_doc/{id}
)实现是否存在的Processor组件;……
这种方式比较简单直观,也很容易实现,但是随着应用中需要集成的数据库种类越来越多、数据操作越来越多样化,我们就会发现,为了实现对数据库的访问操作,需要针对每一类操作开发Loader组件或Processor组件,最后数据库相关操作代码就会分散在多个模块、函数中,不便于组件使用、维护和扩展。
有没有可能设计一套专门的机制,实现数据库操作与流程节点分离,同时根据需要进行绑定?当然可以!
出于这样的目的,本文设计了独立的数据库接口体系,并基于函数式组件机制实现数据库独立接口与流程的松耦合绑定。核心过程包括3步:
首先,定义一套数据库操作接口Database
,包括数据库级别的表格列表list_tables
、获取表格元数据desc_table
等和表(集合、索引)级别的写入upsert
、扫描scroll
、检索search
、获取详情get
、是否存在exists
、删除delete
等。这类操作可以根据业务需要就行扩展。类图如下所示:
注意,Database
接口函数通过使用命名参数,这些参数可以通过SmartETL的YAML流程进行配置,方便传递特定数据库的配置。
第二,根据实际需要的数据库类型,实现对应的Database
类。事实上,这就是“桥接模式”的应用,将数据库SDK提供的接口转换为本项目的Database
接口。以下是一个ElasticSearch实现类(基于ES-HTTP接口)的示例代码:
import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Database
id_keys = ["_id", "id", "mongo_id"]
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
class ES(Database):
"""
读取ES指定索引全部数据,支持提供查询条件
"""
def __init__(self, host: str = "localhost",
port: int = 9200,
username: str = None,
password: str = None,
index: str = None,
secure: bool = False,
**kwargs):
self.url = f"{'https' if secure else 'http'}://{host}:{port}"
if password:
self.auth = HTTPBasicAuth(username, password)
else:
self.auth = None
self.index = index
def search(self, query: dict = None,
query_body: dict = None,
fetch_size: int = 10,
index: str = None,
**kwargs):
index = index or self.index
query_body = query_body or {}
if query:
query_body['query'] = query
elif 'query' not in query_body:
query_body['query'] = {"match_all": {}}
if 'size' not in query_body:
query_body['size'] = fetch_size
print("ES search query_body:", query_body)
res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)
if res.status_code != 200:
print("Error:", res.text)
return
res = res.json()
if 'hits' not in res or 'hits' not in res['hits']:
print('ERROR', res)
return
hits = res['hits']['hits']
for hit in hits:
# print(hit)
doc = hit.get('_source') or {}
doc['_id'] = hit['_id']
doc['_score'] = hit['_score']
if 'fields' in hit:
doc.update(hit['fields'])
yield doc
def scroll(self, query: dict = None,
query_body: dict = None,
batch_size: int = 10,
fetch_size: int = 10000,
index: str = None,
_scroll: str = "1m",
**kwargs):
index = index or self.index
query_body = query_body or {}
if query:
query_body['query'] = query
elif 'query' not in query_body:
query_body['query'] = {"match_all": {}}
if 'size' not in query_body:
query_body['size'] = batch_size
print("ES scroll query_body:", query_body)
scroll_id = None
total = 0
while True:
if scroll_id:
# 后续请求
url = f'{self.url}/_search/scroll'
res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)
else:
# 第一次请求 scroll
url = f'{self.url}/{index}/_search?scroll={_scroll}'
res = requests.post(url, auth=self.auth, json=query_body, **kwargs)
if res.status_code != 200:
print("Error:", res.text)
break
res = res.json()
if 'hits' not in res or 'hits' not in res['hits']:
print('ERROR', res)
continue
if '_scroll_id' in res:
scroll_id = res['_scroll_id']
hits = res['hits']['hits']
for hit in hits:
doc = hit.get('_source') or {}
doc['_id'] = hit['_id']
yield doc
total += len(hits)
if len(hits) < batch_size or 0 < fetch_size <= total:
break
if scroll_id:
# clear scroll
url = f'{self.url}/_search/scroll'
requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})
def exists(self, _id, index: str = None, **kwargs):
index = index or self.index
if isinstance(_id, dict):
_id = _id.get("_id") or _id.get("id")
url = f'{self.url}/{index}/_doc/{_id}?_source=_id'
res = requests.get(url, auth=self.auth)
if res.status_code == 200:
return res.json().get("found") is True
return False
def delete(self, _id, index: str = None, **kwargs):
index = index or self.index
if isinstance(_id, dict):
_id = _id.get("_id") or _id.get("id")
url = f'{self.url}/{index}/_doc/{_id}'
res = requests.delete(url, auth=self.auth)
return res.status_code == 200
def upsert(self, items: dict or list, index: str = None, **kwargs):
index = index or self.index
header = {
"Content-Type": "application/json"
}
if not isinstance(items, list):
items = [items]
lines = []
for row in items:
action_row = {}
for key in id_keys:
if key in row:
action_row["_id"] = row.pop(key)
break
row_meta = json.dumps({"index": action_row})
row_data = json.dumps(row)
lines.append(row_meta)
lines.append(row_data)
body = '\n'.join(lines)
body += '\n'
print(f"{self.url}/{index} bulk")
res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)
if res.status_code != 200:
print("Warning, ES bulk load failed:", res.text)
return False
return True
SmartETL
项目中根据目前业务流程需求,初步实现了ElasticSearch、ClickHouse、MongoDB、MySQL、PostgreSQL、Qdrant、SQLite、MinIO等8类数据库,类结构图如下所示:
第三,定义一组数据库操作的代理函数,将流程的数据库调用需求转发给数据库组件。目前定义为gestata.dbops
模块,代码很简单,如下所示:
from wikidata_filter.util.database.base import Database
def tables(db: Database, *database_list, columns: bool = False):
if database_list:
for database in database_list:
for table in db.list_tables(database):
yield {
"name": table,
"columns": db.desc_table(table, database) if columns else [],
"database": database
}
else:
for table in db.list_tables():
yield {
"name": table,
"columns": db.desc_table(table) if columns else []
}
def search(db: Database, **kwargs):
return db.search(**kwargs)
def scroll(db: Database, **kwargs):
return db.scroll(**kwargs)
def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return row
def delete(_id, db: Database, **kwargs):
return db.delete(_id, **kwargs)
def exists(_id, db: Database, **kwargs) -> bool:
return db.exists(_id, **kwargs)
def get(_id, db: Database, **kwargs):
return db.get(_id, **kwargs)
这里需要注意各个函数的返回值,大多数直接返回数据库组件对应方法的执行结果,但upsert
返回的是输入参数。这是为什么呢?这样就能够将数据库写入操作作为流程的中间节点,也就是说数据经过入库流程,但没有终止,而是继续流入后续节点中,入下图所示:
至此,我们完成了将数据库组件与流程节点组件进行解耦的设计。下面来看一个应用案例,通过读取arXiv数据集(来自kaggle,可参考这篇文章),写入ElasticSearch(建立全文索引),流程定义如下:
from: local/db_envs.yaml
name: load arXiv meta flow
description: 读取arXiv-meta数据集,写入ES
arguments: 1
consts:
type_mapping:
all: .jsonl
nodes:
es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')
select: SelectVal('data')
rename: RenameFields(id='_id')
change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引
write_es: Map('gestata.dbops.upsert', es)
loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)
源代码及流程定义详见SmartETL项目。需要注意,由于项目持续演化,目前除了Kafka
,其他数据库组件都已经按照新的方式完成重构。
总结:本文阐述了SmartETL项目中的数据库与流程解耦的设计,包括动机、目的、设计思路、应用案例。作为软件设计中的一条基本原则,高内聚、松耦合是我们持续追求的目标,也只有好的设计,才能让我们的代码能够易于维护与扩展,从而快速响应业务需求,降低开发成本。