【MCP 实战4-1】开发 OpenSearch MCP server
文章目录
- 1. 项目需求分析
- OpenSearch 简介
- 需要支持的功能清单
- 索引管理功能
- 集群管理功能
- 文档操作功能
- 架构设计
- 2. 项目配置
- 项目依赖
- OpenSearch 客户端封装
- 环境配置
- 4. 核心功能实现
- 索引管理工具
- 集群管理工具
- 文档操作工具
- 5. 主服务器实现
- 6. 包入口点
1. 项目需求分析
OpenSearch 简介
OpenSearch 是一个分布式搜索和分析引擎,基于 Apache Lucene 构建。它提供:
- 全文搜索功能
- 实时数据分析
- 可视化和仪表板
- 多租户支持
需要支持的功能清单
基于我们的 OpenSearch MCP Server 项目,我们需要实现:
索引管理功能
- ✅ 列出所有索引
- ✅ 获取索引映射
- ✅ 获取索引设置
集群管理功能
- ✅ 获取集群健康状态
- ✅ 获取集群统计信息
文档操作功能
- ✅ 搜索文档
架构设计
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ │ │ │ │ │
│ MCP Client │◄──►│ MCP Server │◄──►│ OpenSearch │
│ (Claude/Cursor) │ │ │ │ Cluster │
│ │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘││▼┌─────────────────┐│ Tool Modules ││ ││ ├─ IndexTools ││ ├─ ClusterTools ││ └─ DocumentTools│└─────────────────┘
2. 项目配置
项目依赖
# pyproject.toml
[project]
name = "opensearch-mcp-server"
version = "1.0.0"
description = "MCP Server for interacting with OpenSearch"
readme = "README.md"
requires-python = ">=3.10"
dependencies = ["opensearch-py>=2.0.0","python-dotenv>=1.0.0","fastmcp==2.8.1",
][project.license]
file = "LICENSE"[project.scripts]
opensearch-mcp-server = "opensearch_mcp_server:main"[build-system]
requires = ["hatchling",
]
build-backend = "hatchling.build"
OpenSearch 客户端封装
# src/opensearch_mcp_server/opensearch_client.py
import logging
import os
from dotenv import load_dotenv
from opensearchpy import OpenSearch
import warningsclass OpenSearchClient:def __init__(self, logger=None): self.logger = loggerself.os_client = self._create_opensearch_client()def _get_os_config(self):"""Get OpenSearch configuration from environment variables."""# Load environment variables from .env fileload_dotenv()config = {"host": os.getenv("OPENSEARCH_HOST"),"username": os.getenv("OPENSEARCH_USERNAME"),"password": os.getenv("OPENSEARCH_PASSWORD")}if not all([config["username"], config["password"]]):self.logger.error("Missing required OpenSearch configuration. Please check environment variables:")self.logger.error("OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD are required")raise ValueError("Missing required OpenSearch configuration")return configdef _create_opensearch_client(self) -> OpenSearch:"""Create and return an OpenSearch client using configuration from environment."""config = self._get_os_config()# Disable SSL warningswarnings.filterwarnings("ignore", message=".*SSL.*",)return OpenSearch(hosts=[config["host"]],http_auth=(config["username"], config["password"]),verify_certs=False,ssl_show_warn=False)
环境配置
# .env
OPENSEARCH_HOST=https://localhost:9200
OPENSEARCH_USERNAME=xxxx
OPENSEARCH_PASSWORD=xxxxxx
4. 核心功能实现
索引管理工具
# src/opensearch_mcp_server/tools/index.py
import logging
from typing import Dict, Anyclass IndexTools:def __init__(self, logger=None, os_client=None):"""Initialize IndexTools with logger and OpenSearch client.Args:logger: Logger instanceos_client: OpenSearch client instance"""self.logger = logger or logging.getLogger(__name__)self.os_client = os_clientdef register_tools(self, mcp: Any):"""Register index-related tools."""@mcp.tool(description="List all indices in OpenSearch cluster")async def list_indices() -> str:"""List all indices in the OpenSearch cluster."""self.logger.info("Listing indices...")try:indices = self.os_client.cat.indices(format="json")return str(indices)except Exception as e:self.logger.error(f"Error listing indices: {e}")return f"Error: {str(e)}"@mcp.tool(description="Get index mapping")async def get_mapping(index: str) -> str:"""Get the mapping for an index.Args:index: Name of the index"""self.logger.info(f"Getting mapping for index: {index}")try:response = self.os_client.indices.get_mapping(index=index)return str(response)except Exception as e:self.logger.error(f"Error getting mapping: {e}")return f"Error: {str(e)}"@mcp.tool(description="Get index settings")async def get_settings(index: str) -> str:"""Get the settings for an index.Args:index: Name of the index"""self.logger.info(f"Getting settings for index: {index}")try:response = self.os_client.indices.get_settings(index=index)return str(response)except Exception as e:self.logger.error(f"Error getting settings: {e}")return f"Error: {str(e)}"
集群管理工具
# src/opensearch_mcp_server/tools/cluster.py
import logging
from typing import Dict, Anyclass ClusterTools:def __init__(self, logger=None, os_client=None):"""Initialize ClusterTools with logger and OpenSearch client.Args:logger: Logger instanceos_client: OpenSearch client instance"""self.logger = logger or logging.getLogger(__name__)self.os_client = os_clientdef register_tools(self, mcp: Any):"""Register cluster-related tools."""@mcp.tool(description="Get cluster health status")async def get_cluster_health() -> str:"""Get health status of the OpenSearch cluster.Returns information about the number of nodes, shards, etc."""self.logger.info("Getting cluster health")try:response = self.os_client.cluster.health()return str(response)except Exception as e:self.logger.error(f"Error getting cluster health: {e}")return f"Error getting cluster health: {str(e)}"@mcp.tool(description="Get cluster statistics")async def get_cluster_stats() -> str:"""Get statistics from a cluster wide perspective. The API returns basic index metrics (shard numbers, store size, memory usage) and information about the current nodes that form the cluster (number, roles, os, jvm versions, memory usage, cpu and installed plugins)."""self.logger.info("Getting cluster stats")try:response = self.os_client.cluster.stats()return str(response)except Exception as e:self.logger.error(f"Error getting cluster stats: {e}")return f"Error getting cluster stats: {str(e)}"
文档操作工具
# src/opensearch_mcp_server/tools/document.py
import logging
from typing import Dict, Anyclass DocumentTools:def __init__(self, logger=None, os_client=None):"""Initialize DocumentTools with logger and OpenSearch client.Args:logger: Logger instanceos_client: OpenSearch client instance"""self.logger = logger or logging.getLogger(__name__)self.os_client = os_clientdef register_tools(self, mcp: Any):"""Register document-related tools."""@mcp.tool(description="Search documents in an index with a custom query")async def search_documents(index: str, body: dict) -> str:"""Search documents in a specified index using a custom query.Args:index: Name of the index to searchbody: OpenSearch query DSL"""self.logger.info(f"Searching in index: {index} with query: {body}")try:response = self.os_client.search(index=index, body=body)return str(response)except Exception as e:self.logger.error(f"Error searching documents: {e}")return f"Error: {str(e)}"
5. 主服务器实现
# src/opensearch_mcp_server/server.py
#!/usr/bin/env python3
import logging
import argparse
from fastmcp import FastMCP
from .tools.index import IndexTools
from .tools.cluster import ClusterTools
from .tools.document import DocumentTools
from .opensearch_client import OpenSearchClientclass OpenSearchMCPServer:def __init__(self):self.name = "opensearch_mcp_server"# Configure logginglogging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')self.logger = logging.getLogger(self.name)self.mcp = FastMCP(self.name)# Initialize OpenSearch clientself.os_client = OpenSearchClient(self.logger).os_client# Initialize toolsself._register_tools()def _register_tools(self):"""Register all MCP tools."""# Initialize tool classes with shared OpenSearch clientindex_tools = IndexTools(self.logger, self.os_client)cluster_tools = ClusterTools(self.logger, self.os_client)document_tools = DocumentTools(self.logger, self.os_client)# Register tools from each moduleindex_tools.register_tools(self.mcp)cluster_tools.register_tools(self.mcp)document_tools.register_tools(self.mcp)def run(self, host=None, port=None):"""Run the MCP server with streamable-http transport.Args:host: Optional host address, defaults to 127.0.0.1port: Optional port number, defaults to 8000"""host = host or "127.0.0.1"port = port or 8000self.logger.info(f"OpenSearch MCP service will start on {host}:{port}")self.mcp.run(transport="streamable-http", host=host, port=port)def main():# Parse command line argumentsparser = argparse.ArgumentParser(description='OpenSearch MCP Server')parser.add_argument('--host', type=str, default="127.0.0.1", help='Service listening host (default: 127.0.0.1)')parser.add_argument('--port', type=int, default=8000, help='Service listening port (default: 8000)')args = parser.parse_args()# Create and run the serverserver = OpenSearchMCPServer()server.run(host=args.host, port=args.port)if __name__ == "__main__":main()
6. 包入口点
# src/opensearch_mcp_server/__init__.py
#!/usr/bin/env python3
"""
OpenSearch MCP Server
A server for exposing OpenSearch functionality through MCP
"""from . import serverdef main():"""Main entry point for the package."""server.main()# Optionally expose other important items at package level
__all__ = ["main", "server"]
源码 https://github.com/showjason/opensearch-mcp-server
关键词标签
OpenSearch
MCP
Python
搜索引擎
AI工具
开发教程
实战项目
技术集成
Claude
Cursor
FastMCP
API开发