当前位置: 首页 > news >正文

【MCP 实战4-1】开发 OpenSearch MCP server

文章目录

    • 1. 项目需求分析
      • OpenSearch 简介
      • 需要支持的功能清单
        • 索引管理功能
        • 集群管理功能
        • 文档操作功能
      • 架构设计
    • 2. 项目配置
      • 项目依赖
      • OpenSearch 客户端封装
      • 环境配置
    • 4. 核心功能实现
      • 索引管理工具
      • 集群管理工具
      • 文档操作工具
    • 5. 主服务器实现
    • 6. 包入口点

1. 项目需求分析

OpenSearch 简介

OpenSearch 是一个分布式搜索和分析引擎,基于 Apache Lucene 构建。它提供:

  • 全文搜索功能
  • 实时数据分析
  • 可视化和仪表板
  • 多租户支持

需要支持的功能清单

基于我们的 OpenSearch MCP Server 项目,我们需要实现:

索引管理功能
  • ✅ 列出所有索引
  • ✅ 获取索引映射
  • ✅ 获取索引设置
集群管理功能
  • ✅ 获取集群健康状态
  • ✅ 获取集群统计信息
文档操作功能
  • ✅ 搜索文档

架构设计

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│                 │    │                 │    │                 │
│   MCP Client    │◄──►│  MCP Server     │◄──►│  OpenSearch     │
│ (Claude/Cursor) │    │                 │    │   Cluster       │
│                 │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘││▼┌─────────────────┐│  Tool Modules   ││                 ││ ├─ IndexTools   ││ ├─ ClusterTools ││ └─ DocumentTools│└─────────────────┘

2. 项目配置

项目依赖

# pyproject.toml
[project]
name = "opensearch-mcp-server"
version = "1.0.0"
description = "MCP Server for interacting with OpenSearch"
readme = "README.md"
requires-python = ">=3.10"
dependencies = ["opensearch-py>=2.0.0","python-dotenv>=1.0.0","fastmcp==2.8.1",
][project.license]
file = "LICENSE"[project.scripts]
opensearch-mcp-server = "opensearch_mcp_server:main"[build-system]
requires = ["hatchling",
]
build-backend = "hatchling.build"

OpenSearch 客户端封装

# src/opensearch_mcp_server/opensearch_client.py
import logging
import os
from dotenv import load_dotenv
from opensearchpy import OpenSearch
import warningsclass OpenSearchClient:def __init__(self, logger=None):         self.logger = loggerself.os_client = self._create_opensearch_client()def _get_os_config(self):"""Get OpenSearch configuration from environment variables."""# Load environment variables from .env fileload_dotenv()config = {"host": os.getenv("OPENSEARCH_HOST"),"username": os.getenv("OPENSEARCH_USERNAME"),"password": os.getenv("OPENSEARCH_PASSWORD")}if not all([config["username"], config["password"]]):self.logger.error("Missing required OpenSearch configuration. Please check environment variables:")self.logger.error("OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD are required")raise ValueError("Missing required OpenSearch configuration")return configdef _create_opensearch_client(self) -> OpenSearch:"""Create and return an OpenSearch client using configuration from environment."""config = self._get_os_config()# Disable SSL warningswarnings.filterwarnings("ignore", message=".*SSL.*",)return OpenSearch(hosts=[config["host"]],http_auth=(config["username"], config["password"]),verify_certs=False,ssl_show_warn=False)

环境配置

# .env
OPENSEARCH_HOST=https://localhost:9200
OPENSEARCH_USERNAME=xxxx
OPENSEARCH_PASSWORD=xxxxxx

4. 核心功能实现

索引管理工具

# src/opensearch_mcp_server/tools/index.py
import logging
from typing import Dict, Anyclass IndexTools:def __init__(self, logger=None, os_client=None):"""Initialize IndexTools with logger and OpenSearch client.Args:logger: Logger instanceos_client: OpenSearch client instance"""self.logger = logger or logging.getLogger(__name__)self.os_client = os_clientdef register_tools(self, mcp: Any):"""Register index-related tools."""@mcp.tool(description="List all indices in OpenSearch cluster")async def list_indices() -> str:"""List all indices in the OpenSearch cluster."""self.logger.info("Listing indices...")try:indices = self.os_client.cat.indices(format="json")return str(indices)except Exception as e:self.logger.error(f"Error listing indices: {e}")return f"Error: {str(e)}"@mcp.tool(description="Get index mapping")async def get_mapping(index: str) -> str:"""Get the mapping for an index.Args:index: Name of the index"""self.logger.info(f"Getting mapping for index: {index}")try:response = self.os_client.indices.get_mapping(index=index)return str(response)except Exception as e:self.logger.error(f"Error getting mapping: {e}")return f"Error: {str(e)}"@mcp.tool(description="Get index settings")async def get_settings(index: str) -> str:"""Get the settings for an index.Args:index: Name of the index"""self.logger.info(f"Getting settings for index: {index}")try:response = self.os_client.indices.get_settings(index=index)return str(response)except Exception as e:self.logger.error(f"Error getting settings: {e}")return f"Error: {str(e)}"

集群管理工具

# src/opensearch_mcp_server/tools/cluster.py
import logging
from typing import Dict, Anyclass ClusterTools:def __init__(self, logger=None, os_client=None):"""Initialize ClusterTools with logger and OpenSearch client.Args:logger: Logger instanceos_client: OpenSearch client instance"""self.logger = logger or logging.getLogger(__name__)self.os_client = os_clientdef register_tools(self, mcp: Any):"""Register cluster-related tools."""@mcp.tool(description="Get cluster health status")async def get_cluster_health() -> str:"""Get health status of the OpenSearch cluster.Returns information about the number of nodes, shards, etc."""self.logger.info("Getting cluster health")try:response = self.os_client.cluster.health()return str(response)except Exception as e:self.logger.error(f"Error getting cluster health: {e}")return f"Error getting cluster health: {str(e)}"@mcp.tool(description="Get cluster statistics")async def get_cluster_stats() -> str:"""Get statistics from a cluster wide perspective. The API returns basic index metrics (shard numbers, store size, memory usage) and information about the current nodes that form the cluster (number, roles, os, jvm versions, memory usage, cpu and installed plugins)."""self.logger.info("Getting cluster stats")try:response = self.os_client.cluster.stats()return str(response)except Exception as e:self.logger.error(f"Error getting cluster stats: {e}")return f"Error getting cluster stats: {str(e)}"

文档操作工具

# src/opensearch_mcp_server/tools/document.py
import logging
from typing import Dict, Anyclass DocumentTools:def __init__(self, logger=None, os_client=None):"""Initialize DocumentTools with logger and OpenSearch client.Args:logger: Logger instanceos_client: OpenSearch client instance"""self.logger = logger or logging.getLogger(__name__)self.os_client = os_clientdef register_tools(self, mcp: Any):"""Register document-related tools."""@mcp.tool(description="Search documents in an index with a custom query")async def search_documents(index: str, body: dict) -> str:"""Search documents in a specified index using a custom query.Args:index: Name of the index to searchbody: OpenSearch query DSL"""self.logger.info(f"Searching in index: {index} with query: {body}")try:response = self.os_client.search(index=index, body=body)return str(response)except Exception as e:self.logger.error(f"Error searching documents: {e}")return f"Error: {str(e)}"

5. 主服务器实现

# src/opensearch_mcp_server/server.py
#!/usr/bin/env python3
import logging
import argparse
from fastmcp import FastMCP
from .tools.index import IndexTools
from .tools.cluster import ClusterTools
from .tools.document import DocumentTools
from .opensearch_client import OpenSearchClientclass OpenSearchMCPServer:def __init__(self):self.name = "opensearch_mcp_server"# Configure logginglogging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')self.logger = logging.getLogger(self.name)self.mcp = FastMCP(self.name)# Initialize OpenSearch clientself.os_client = OpenSearchClient(self.logger).os_client# Initialize toolsself._register_tools()def _register_tools(self):"""Register all MCP tools."""# Initialize tool classes with shared OpenSearch clientindex_tools = IndexTools(self.logger, self.os_client)cluster_tools = ClusterTools(self.logger, self.os_client)document_tools = DocumentTools(self.logger, self.os_client)# Register tools from each moduleindex_tools.register_tools(self.mcp)cluster_tools.register_tools(self.mcp)document_tools.register_tools(self.mcp)def run(self, host=None, port=None):"""Run the MCP server with streamable-http transport.Args:host: Optional host address, defaults to 127.0.0.1port: Optional port number, defaults to 8000"""host = host or "127.0.0.1"port = port or 8000self.logger.info(f"OpenSearch MCP service will start on {host}:{port}")self.mcp.run(transport="streamable-http", host=host, port=port)def main():# Parse command line argumentsparser = argparse.ArgumentParser(description='OpenSearch MCP Server')parser.add_argument('--host', type=str, default="127.0.0.1", help='Service listening host (default: 127.0.0.1)')parser.add_argument('--port', type=int, default=8000, help='Service listening port (default: 8000)')args = parser.parse_args()# Create and run the serverserver = OpenSearchMCPServer()server.run(host=args.host, port=args.port)if __name__ == "__main__":main()

6. 包入口点

# src/opensearch_mcp_server/__init__.py
#!/usr/bin/env python3
"""
OpenSearch MCP Server
A server for exposing OpenSearch functionality through MCP
"""from . import serverdef main():"""Main entry point for the package."""server.main()# Optionally expose other important items at package level
__all__ = ["main", "server"]

源码 https://github.com/showjason/opensearch-mcp-server


关键词标签
OpenSearch MCP Python 搜索引擎 AI工具 开发教程 实战项目 技术集成 Claude Cursor FastMCP API开发

相关文章:

  • 太平鸟品牌门户网站建设中小企业网站制作
  • php 网站做分享功能搜狗登录入口
  • 苏州做网站公司排名免费网页在线客服系统
  • 如何在易语言上做网站企业网站设计价格
  • 论文引用网站数据 如何做注释seo管理系统
  • 地推拉新app推广接单平台免费萌新seo
  • 微信小程序中scss、ts、wxml
  • 开源代码修复新标杆——月之暗面最新开源编程模型Kimi-Dev-72B本地部署教程,自博弈修复 Bug
  • Opengauss数据库的安装以及镜像源配置
  • 链表“追及”问题终极指南:快慢指针三部曲
  • 汉字编码之GBK编码详解
  • 数据结构 顺序表与链表
  • Spring Cloud Ribbon核心负载均衡算法详解
  • SDC命令详解:使用write_sdc命令进行输出
  • 高等数学》(同济大学·第7版)第七章 微分方程 第五节可降阶的高阶微分方程
  • Feign源码解析:动态代理与HTTP请求全流程
  • Azure虚拟机添加磁盘
  • 企业级RAG系统架构设计与实现指南(Java技术栈)
  • 深入学习入门--(一)前备知识
  • Java基础黑马进阶综合考试
  • 网络安全漏洞扫描是什么?如何识别目标进行扫描?
  • 理解epoll:水平触发与边沿触发
  • (C++)vector数组相关基础用法(C++教程)(STL库基础教程)
  • 《从0到1:C/C++音视频开发自学指南》
  • 多个 Job 并发运行时共享配置文件导致上下文污染,固化 Jenkins Job 上下文
  • 家用存储怎么选?NAS vS 硬盘柜,备份游戏素材与照片谁更合适?