当前位置: 首页 > news >正文

ElasticSearch迁移至openGauss

Elasticsearch 作为一种高效的全文搜索引擎,广泛应用于实时搜索、日志分析等场景。而 openGauss,作为一款企业级关系型数据库,强调事务处理与数据一致性。那么,当这两者的应用场景和技术架构发生交集时,如何实现它们之间的平滑迁移呢?

本文将探讨 Elasticsearch 基础数据数据迁移至 openGauss 的解决方案,在此,我们首先根据等价实例来看一下 Elasticsearch 和关系型数据库(如 openGauss)的基础数据结构:

关系型数据库操作

CREATE TABLE products (    id INT PRIMARY KEY,    name VARCHAR(100),    price DECIMAL(10,2));
INSERT INTO products VALUES (1, 'Laptop', 999.99);

Elasticsearch等价操作

PUT /products{  "mappings": {    "properties": {      "id": { "type": "integer" },      "name": { "type": "text" },      "price": { "type": "double" }    }  }}
POST /products/_doc/1{  "id": 1,  "name": "Laptop",  "price": 999.99}

数据组织层级

  • 关系型数据库:

    Database → Table → Row/Column

  • Elasticsearch:

    6.x之前:Index → Type → Document (类似Database → Table → Row)

    7.x之后:Index → Document (Type被移除,强化了Index≈Table的对应关系)

Elasticsearch 概念

关系型数据库(如openGauss)概念

说明

索引(Index)库-表(Table)

对应关系

类型(Type)

(已弃用,7.x后无对应)

早期版本中类似表分区

文档(Document)

行(Row)

一条记录

字段(Field)

列(Column)

数据属性

映射(Mapping)

表结构定义(Schema)

定义字段类型等

索引别名(Alias)

视图(View)

虚拟索引/表

分片(Shard)

分区(Partition)

数据水平拆分

检索方式

1、向量检索

Elasticsearch 向量检索

# 1. 创建包含向量字段的索引PUT /image_vectors{  "mappings": {    "properties": {      "image_name": {        "type": "text"      },      "image_vector": {        "type": "dense_vector",        "dims": 512      }    }  }}
# 2. 插入向量数据POST /image_vectors/_doc{  "image_name": "sunset.jpg",  "image_vector": [0.12, 0.34, ..., 0.56]  // 512维向量}
# 3. 精确向量检索 (script_score)GET /image_vectors/_search{  "query": {    "script_score": {      "query": {"match_all": {}},      "script": {        "source": "cosineSimilarity(params.query_vector, 'image_vector') + 1.0",        "params": {          "query_vector": [0.23, 0.45, ..., 0.67]  // 查询向量        }      }    }  }}
# 4. 近似最近邻搜索 (kNN search)GET /image_vectors/_search{  "knn": {    "field": "image_vector",    "query_vector": [0.23, 0.45, ..., 0.67],    "k": 10,    "num_candidates": 100  }}

openGauss 向量检索(openGauss 从 7.0 版本开始支持向量检索功能)

# 1. 创建包含向量字段的表
-- 创建表CREATE TABLE image_vectors (  id SERIAL PRIMARY KEY,  image_name TEXT,  image_vector VECTOR(512)  -- 512维向量);#2. 插入向量数据INSERT INTO image_vectors (image_name, image_vector) VALUES ('sunset.jpg', '[0.12, 0.34, ..., 0.56]');
# 3. 精确向量检索 (余弦相似度)-- 使用余弦相似度SELECT id, image_name,        1 - (image_vector <=> '[0.23, 0.45, ..., 0.67]') AS cosine_similarityFROM image_vectorsORDER BY cosine_similarity DESCLIMIT 10;
# 4. 近似最近邻搜索 (使用IVFFLAT索引)-- 创建IVFFLAT索引CREATE INDEX idx_image_vector ON image_vectors USING IVFFLAT(image_vector) WITH (lists = 100);
-- 近似最近邻查询SELECT id, image_name,        image_vector <=> '[0.23, 0.45, ..., 0.67]' AS distanceFROM image_vectorsORDER BY distanceLIMIT 10;
2、全文检索

es全文检索 相当于 openGauss的LIKE和正则表达式

​​​​​​​​​​​​​​

# es 全文检索GET /products/_search{  "query": {    "match": {      "description": "search term"    }  }}
# openGauss 模糊查询SELECT * FROM products WHERE description LIKE '%search term%';
# openGauss 正则表达式匹配SELECT * FROM logs WHERE message ~ 'error|warning';

因此,根据数据层级及检索方式分析,迁移时将es的索引迁移到openGauss的一张表里。

环境准备

  • 已部署7.3 及以上(支持向量)版本的ElasticSearch实例

  • 已部署7.0.0-RC1 及以上版本(支持向量)的openGauss实例

  • 已安装3.8 及以上版本的Python环境

  • 已安装涉及的Python库

pip3 install psycopg2pip3 install requests pip3 install pyOpenSSL
#如果安装失败,可以考虑在一个新的虚拟环境中重新安装所需的库,执行以下命令:python3 -m venv venvsource venv/bin/activatepip install requests pyOpenSSL

前置条件

远程连接权限:

openGauss端:

​​​​​​​

#修改openGauss配置文件。将迁移脚本所在机器IP地址加入白名单,修改openGauss监听地址。# 执行以下命令gs_guc set -D {DATADIR} -c " listen_addresses = '\*'"
gs_guc set -D {DATADIR} -h "host all all x.x.x.x/32 sha256"
# 修改完毕后重启openGauss。
gs_ctl restart -D {DATADIR}

elasticsearch端:

​​​​​​​

vim /path/to/your_elasticsearch/config/elasticsearch.yml#修改network.hostnetwork.host: 0.0.0.0

openGauss端创建普通用户(赋权)、迁移的目标数据库:

​​​​​​​

 create user mig_test identified by 'Simple@123';
 grant all privileges to mig_test;
 create database es_to_og with owner mig_test;

迁移操作

1、根据本地部署的elasticsearch与openGauss对脚本进行配置修改,需要修改的内容如下:

​​​​​​​

# Elasticsearch 配置信息es_url = 'http://ip:port'  # Elasticsearch 服务器地址es_index = 'your_es_index'  # Elasticsearch 索引名
# openGauss 配置信息db_host = '127.0.0.1'   # openGauss服务器地址db_port = 5432          # openGauss 端口号db_name = 'your_opengauss_db' # 迁移到openGauss的数据库名称db_user = 'user_name'    # 连接openGauss的普通用户db_password = 'xxxxxx'   # 连接openGauss的用户密码

elasticsearchToOpenGauss.py迁移脚本如下:

​​​​​​​

import requestsimport psycopg2import jsonimport refrom typing import List, Dict, Any, Optional, Union
# Elasticsearch 配置信息es_url = 'http://192.168.0.114:9200'  # Elasticsearch 服务器地址es_index = 'my_dynamic_index'  # Elasticsearch 索引名
# openGauss 配置信息db_host = '192.168.0.219'   # openGauss服务器地址db_port = 15620          # openGauss 端口号db_name = 'es_to_og' # 迁移到openGauss的数据库名称db_user = 'mig_test'    # 连接openGauss的普通用户db_password = 'xxxxxx'   # 连接openGauss的用户密码
RESERVED_KEYWORDS = {    "select", "insert", "update", "delete", "drop", "table", "from", "where", "group",    "by", "having", "order", "limit", "join", "inner", "left", "right", "full", "union",    "all", "distinct", "as", "on", "and", "or", "not", "null", "true", "false", "case",    "when", "then", "else", "end", "exists", "like", "in", "between", "is", "like",    "references", "foreign", "primary", "key", "unique", "check", "default", "constraint",    "index", "unique", "varchar", "text", "int", "bigint", "smallint", "boolean", "timestamp"}
# 从 Elasticsearch 获取数据def fetch_data_from_es():    query = {        "query": {            "match_all": {}        },        "_source": True  # 获取所有字段    }    response = requests.get(f'{es_url}/{es_index}/_search', json=query)    if response.status_code == 200:        return response.json()['hits']['hits']    else:        raise Exception(f"Failed to fetch data from Elasticsearch: {response.status_code}, {response.text}")# 获取索引映射信息def fetch_mapping(es_url, es_index):    response = requests.get(f'{es_url}/{es_index}/_mapping')    if response.status_code == 200:        return response.json()    else:        raise Exception(f"Failed to fetch mapping: {response.status_code}, {response.text}")def get_field_type(es_url: str, es_index: str, field_name: str) -> str:    """ 获取 Elasticsearch 字段的类型 """    mappings = fetch_mapping(es_url, es_index)    print(f"Field name: {field_name}")    print(f"map: {mappings}")    # 获取 properties 字段    properties = mappings.get(es_index, {}).get('mappings', {}).get('properties', {})    # 遍历并查找字段的类型    field_type = 'text'  # 默认类型为 'text'    if field_name in properties:        field_type = properties[field_name].get('type', 'text')    elif 'fields' in properties.get(field_name, {}):        # 如果字段有子字段(比如 keyword),获取 'keyword' 类型        field_type = properties[field_name]['fields'].get('keyword', {}).get('type', 'text')    return field_type
def convert_dict_to_jsonb(value):    # 如果 value 是字典类型,递归调用该函数处理其中的每个元素    if isinstance(value, dict):        return json.dumps({k: convert_dict_to_jsonb(v) for k, v in value.items()})    # 如果 value 是列表类型,递归处理其中的每个元素    elif isinstance(value, list):        return json.dumps([convert_dict_to_jsonb(v) for v in value])    # 如果是其他类型(如字符串、数字),直接返回该值    else:        return value
# 映射 Elasticsearch 数据类型到 openGauss 类型def map_to_opengauss_type(es_type: str, dim: Optional[int] = None) -> str:    """Map Elasticsearch types to openGauss types"""    if isinstance(es_type, (dict, list)):  # 如果 es_type 是字典类型,则需要特殊处理        return 'JSONB'    type_map = {        "long": "BIGINT",  # 大整数        "integer": "INTEGER",  # 整数        "short": "SMALLINT",  # 小整数        "byte": "SMALLINT",  # 小字节        "float": "REAL",  # 浮点数        "double": "DOUBLE PRECISION",  # 双精度浮点数        "boolean": "BOOLEAN",  # 布尔值        "keyword": "VARCHAR",  # 关键字(字符串类型)        "text": "TEXT",  # 长文本        "date": "TIMESTAMP",  # 日期类型        "binary": "BYTEA",  # 二进制数据        "geo_point": "POINT",  # 地理坐标(经纬度)        "geo_shape": "GEOMETRY",  # 复杂地理形状        "nested": "JSONB",  # 嵌套对象        "object": "JSONB",  # 对象        "ip": "INET",  # IP 地址        "scaled_float": "REAL",  # 扩展浮动类型(带缩放的浮动)        "float_vector": f"VECTOR({dim})" if dim else "VECTOR",  # 浮动向量类型        "dense_vector": f"VECTOR({dim})" if dim else "VECTOR",  # 稠密向量类型        "binary_vector": f"BIT({dim})" if dim else "BIT",  # 二进制向量类型        "half_float": "REAL",  # 半精度浮动        "unsigned_long": "BIGINT",  # 无符号长整数        "date_nanos": "TIMESTAMP",  # 高精度日期时间        "alias": "TEXT",  # 别名(通常是字段的别名)    }
    # 如果 es_type 在映射表中,直接返回映射后的类型    if es_type in type_map:        print(f"es_type:{es_type} ----- og_type: {type_map[es_type]}")        return type_map[es_type]    else:        print(f"Warning: Unsupported Elasticsearch type '{es_type}', defaulting to 'TEXT'")        return 'TEXT'  # 默认使用 TEXT 类型# 函数:将非法字符替换为下划线def sanitize_name(field_name: str) -> str:    """处理字段名,确保不会与保留字冲突,且将非字母数字字符替换为下划线"""    # 将所有非字母数字字符替换为下划线    sanitized_name = re.sub(r'[^a-zA-Z0-9_]', '_', field_name)
    # 如果是保留字,则加双引号    if sanitized_name.lower() in RESERVED_KEYWORDS:        return f'"{sanitized_name}"'
    return sanitized_name# 创建 openGauss 表def create_table_in_opengauss(es_url, es_index, table_name):    columns_definition = ['id VARCHAR PRIMARY KEY']  # 增加 id 主键字段    seen_fields = set()  # 用于记录已经处理过的字段名
    # 获取 properties 字段    properties = fetch_mapping(es_url, es_index).get(es_index, {}).get('mappings', {}).get('properties', {})
    # 遍历每个字段    for field, field_info in properties.items():        # 如果该字段已经处理过,跳过        if field in seen_fields:            continue        # 获取字段的类型        es_type = field_info.get('type', 'text')        dim = field_info.get('dims', 0) if isinstance(field_info, dict) else 0        field_type = map_to_opengauss_type(es_type, dim)        sanitized_field_name = sanitize_name(field)        seen_fields.add(field)        columns_definition.append(f"{sanitized_field_name} {field_type}")    # 生成表创建 SQL    columns_str = ", ".join(columns_definition)    create_table_sql = f"DROP TABLE IF EXISTS {sanitize_name(table_name)}; CREATE TABLE {sanitize_name(table_name)} ({columns_str});"    try:        # 建立数据库连接并执行创建表 SQL        connection = psycopg2.connect(            host=db_host,            port=db_port,            dbname=db_name,            user=db_user,            password=db_password        )        cursor = connection.cursor()        cursor.execute(create_table_sql)        connection.commit()        print(f"Table {sanitize_name(table_name)} created successfully.")    except Exception as e:        print(f"Error while creating table {sanitize_name(table_name)}: {e}")    finally:        if connection:            cursor.close()            connection.close()
# 将数据插入到 openGauss 表中def insert_data_to_opengauss(table_name, es_source, es_id):    try:        # 建立数据库连接        connection = psycopg2.connect(            host=db_host,            port=db_port,            dbname=db_name,            user=db_user,            password=db_password        )        cursor = connection.cursor()
        # 动态生成插入 SQL 语句        sanitized_columns = ['id'] + [sanitize_name(col) for col in es_source.keys()]  # 清理列名        values = [es_id]
        # 处理每一列的数据类型,必要时进行转换        for column in es_source:            value = es_source[column]            if isinstance(value, (dict, list)):                # 如果是字典类型,转换为 JSONB                value = convert_dict_to_jsonb(value)            values.append(value)
        columns_str = ', '.join(sanitized_columns)        values_str = ', '.join(['%s'] * len(values))
        insert_sql = f"INSERT INTO {sanitize_name(table_name)} ({columns_str}) VALUES ({values_str})"        cursor.execute(insert_sql, values)
        # 提交事务        connection.commit()
    except Exception as e:        print(f"Error while inserting data into {table_name}: {e}")    finally:        if connection:            cursor.close()            connection.close()
# 主函数def main():    try:        es_data = fetch_data_from_es()        table_name = es_index  # 可以使用索引名作为表名
        create_table_in_opengauss(es_url, es_index, table_name)        for record in es_data:            es_source = record['_source']  # 获取 Elasticsearch 文档中的数据            es_id = record['_id']            insert_data_to_opengauss(table_name, es_source, es_id)        print(f"Successfully inserted data into table {table_name}.")    except Exception as e:        print(f"Migration failed: {e}")if __name__ == "__main__":    main()

2、执行脚本

python3 ./elasticsearchToOpenGauss.py

3、openGauss端查看数据​​​​​​​

#切换到迁移目标数据库openGauss=# \c es_to_og
#查看迁移的表es_to_og=# \d
#查看表结构es_to_og=# \d my_dynamic_index
#查看表数据es_to_og=# select c

图片

-END-

相关文章:

  • Spring Boot 整合 Spring Security
  • 【软件】在 macOS 上安装 MySQL
  • Vue3使用vue-web-screen-shot实现截图功能
  • 第六十一节:深度学习-使用 OpenCV DNN 模块
  • 短剧系统开发文案:打造沉浸式互动娱乐新体验
  • Vue Element tree lable组合展示
  • Android JNI开发
  • React基础教程(13):路由的使用
  • HTML5 列表、表格与媒体元素、页面结构分析
  • Android全局网络监控最佳实践(Kotlin实现)
  • t015-预报名管理系统设计与实现 【含源码!!!】
  • Hive在实际应用中,如何选择合适的JOIN优化策略?
  • Hive的GROUP BY操作如何优化?
  • 使用vscode进行c/c++开发的时候,输出报错乱码、cpp文件本身乱码的问题解决
  • split_conversion将json转成yolo训练用的txt,在直接按照8:1:1的比例分成训练集,测试集,验证集
  • 实现一个免费可用的文生图的MCP Server
  • Spring Boot 中 @RequestParam 和 @RequestPart 的区别详解(含实际项目案例)
  • 长短期记忆(LSTM)网络模型
  • 408考研逐题详解:2009年第25题
  • GSR 手环能耗数据实测:STM32 与 SD NAND 的功耗优化成果
  • 怎样做自己的网站和发布网站/长沙网站优化对策
  • 网站后台管理系统安装/自己建网页
  • 深圳做网站要/网络推广理实一体化软件
  • 江门制作网站公司/杭州网站优化服务
  • 农家乐网站模板/百度网盘app怎么打开链接
  • 的建站公司/搜索引擎优化seo公司