MCP与智能问数技术全面指南:从协议设计到智能化数据查询
MCP与智能问数技术全面指南:从协议设计到智能化数据查询
引言
在人工智能与数据科学快速发展的今天,传统的数据查询方式正面临着前所未有的挑战。用户不再满足于编写复杂的SQL语句或学习特定的查询语法,而是希望能够用自然语言直接与数据对话,获得智能化的分析结果。MCP(Model Context Protocol)作为一种新兴的协议标准,为构建智能化的数据查询系统提供了强大的技术基础。
MCP协议通过标准化的接口定义,使得大语言模型能够与各种数据源和工具进行无缝集成,而智能问数技术则利用自然语言处理和机器学习技术,将用户的自然语言查询转换为精确的数据操作。两者的结合,正在重新定义人机交互的方式,让数据分析变得更加直观、高效和智能。
为什么MCP与智能问数如此重要?
1. 降低数据访问门槛
传统的数据查询需要用户掌握SQL、Python等技术技能,而基于MCP的智能问数系统允许业务用户用自然语言直接查询数据,大大降低了数据访问的技术门槛。
2. 提升查询效率和准确性
智能问数系统能够理解用户意图,自动优化查询逻辑,并提供上下文相关的建议,显著提升查询效率和结果准确性。
3. 实现真正的对话式分析
通过MCP协议的标准化接口,系统能够维护查询上下文,支持多轮对话,实现真正的对话式数据分析体验。
4. 支持复杂的跨系统集成
MCP协议的标准化特性使得系统能够轻松集成多种数据源、分析工具和可视化组件,构建完整的智能数据分析生态。
本文的价值与结构
本文将从MCP协议的基础概念出发,深入探讨智能问数系统的设计原理和实现技术,并提供大量实际代码示例和最佳实践。无论您是系统架构师、数据工程师,还是AI应用开发者,都能从本文中获得有价值的技术洞察和实用指导。
目录
- MCP协议基础与核心概念
- 智能问数系统架构设计
- 自然语言查询解析引擎
- MCP服务器端实现
- 智能查询优化与执行
- 多模态数据交互
- 上下文管理与会话状态
- 安全性与权限控制
- 性能优化与缓存策略
- 可视化与结果呈现
- 企业级部署与集成
- 实际应用案例分析
- 最佳实践与设计模式
- 未来发展趋势与展望
MCP协议基础与核心概念
MCP协议概述
MCP(Model Context Protocol)是一种开放标准协议,旨在为大语言模型与外部工具、数据源之间的交互提供统一的接口规范。该协议定义了标准化的消息格式、工具调用机制和资源访问方式,使得AI系统能够安全、高效地与各种外部系统进行集成。
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
import json
import asyncio
from abc import ABC, abstractmethodclass MCPMessageType(Enum):"""MCP消息类型"""REQUEST = "request"RESPONSE = "response"NOTIFICATION = "notification"ERROR = "error"class MCPResourceType(Enum):"""MCP资源类型"""DATABASE = "database"FILE = "file"API = "api"TOOL = "tool"MEMORY = "memory"@dataclass
class MCPMessage:"""MCP消息基础结构"""id: strtype: MCPMessageTypemethod: strparams: Dict[str, Any]timestamp: floatcontext: Optional[Dict[str, Any]] = Nonedef to_dict(self) -> Dict[str, Any]:"""转换为字典格式"""return asdict(self)@classmethoddef from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':"""从字典创建消息对象"""return cls(id=data['id'],type=MCPMessageType(data['type']),method=data['method'],params=data['params'],timestamp=data['timestamp'],context=data.get('context'))@dataclass
class MCPTool:"""MCP工具定义"""name: strdescription: strparameters: Dict[str, Any]required_permissions: List[str]category: strversion: str = "1.0.0"def validate_parameters(self, params: Dict[str, Any]) -> bool:"""验证参数有效性"""required_params = self.parameters.get('required', [])for param in required_params:if param not in params:return Falsereturn True@dataclass
class MCPResource:"""MCP资源定义"""uri: strtype: MCPResourceTypename: strdescription: strmetadata: Dict[str, Any]access_permissions: List[str]def is_accessible(self, user_permissions: List[str]) -> bool:"""检查资源是否可访问"""return all(perm in user_permissions for perm in self.access_permissions)class MCPServer(ABC):"""MCP服务器抽象基类"""def __init__(self, name: str, version: str = "1.0.0"):self.name = nameself.version = versionself.tools: Dict[str, MCPTool] = {}self.resources: Dict[str, MCPResource] = {}self.capabilities = {'tools': True,'resources': True,'prompts': True,'logging': True}def register_tool(self, tool: MCPTool):"""注册工具"""self.tools[tool.name] = tooldef register_resource(self, resource: MCPResource):"""注册资源"""self.resources[resource.uri] = resource@abstractmethodasync def handle_request(self, message: MCPMessage) -> MCPMessage:"""处理请求"""pass@abstractmethodasync def list_tools(self) -> List[MCPTool]:"""列出可用工具"""pass@abstractmethodasync def list_resources(self) -> List[MCPResource]:"""列出可用资源"""passasync def call_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:"""调用工具"""if tool_name not in self.tools:raise ValueError(f"Tool '{tool_name}' not found")tool = self.tools[tool_name]if not tool.validate_parameters(parameters):raise ValueError(f"Invalid parameters for tool '{tool_name}'")return await self._execute_tool(tool, parameters)@abstractmethodasync def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行工具"""pass
智能问数系统的核心组件
智能问数系统基于MCP协议构建,包含以下核心组件:
class IntelligentQuerySystem:"""智能问数系统"""def __init__(self, mcp_server: MCPServer):self.mcp_server = mcp_serverself.query_parser = NaturalLanguageQueryParser()self.query_optimizer = QueryOptimizer()self.execution_engine = QueryExecutionEngine()self.context_manager = ContextManager()self.result_formatter = ResultFormatter()async def process_query(self, natural_query: str, user_context: Dict[str, Any] = None) -> Dict[str, Any]:"""处理自然语言查询"""# 1. 解析自然语言查询parsed_query = await self.query_parser.parse(natural_query, user_context)# 2. 查询优化optimized_query = await self.query_optimizer.optimize(parsed_query)# 3. 执行查询raw_results = await self.execution_engine.execute(optimized_query)# 4. 格式化结果formatted_results = await self.result_formatter.format(raw_results, parsed_query)# 5. 更新上下文await self.context_manager.update_context(natural_query, formatted_results)return formatted_resultsclass NaturalLanguageQueryParser:"""自然语言查询解析器"""def __init__(self):self.intent_classifier = IntentClassifier()self.entity_extractor = EntityExtractor()self.query_builder = QueryBuilder()async def parse(self, query: str, context: Dict[str, Any] = None) -> 'ParsedQuery':"""解析自然语言查询"""# 意图识别intent = await self.intent_classifier.classify(query, context)# 实体提取entities = await self.entity_extractor.extract(query, context)# 构建结构化查询structured_query = await self.query_builder.build(intent, entities, context)return ParsedQuery(original_query=query,intent=intent,entities=entities,structured_query=structured_query,confidence=min(intent.confidence, entities.confidence))@dataclass
class ParsedQuery:"""解析后的查询"""original_query: strintent: 'QueryIntent'entities: 'ExtractedEntities'structured_query: Dict[str, Any]confidence: floatdef is_valid(self) -> bool:"""检查查询是否有效"""return self.confidence > 0.7 and self.intent.is_valid()@dataclass
class QueryIntent:"""查询意图"""type: str # SELECT, INSERT, UPDATE, DELETE, ANALYZE, VISUALIZEaction: str # 具体动作confidence: floatparameters: Dict[str, Any]def is_valid(self) -> bool:"""检查意图是否有效"""return self.confidence > 0.8 and self.type in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'ANALYZE', 'VISUALIZE']@dataclass
class ExtractedEntities:"""提取的实体"""tables: List[str]columns: List[str]values: List[Any]conditions: List[Dict[str, Any]]aggregations: List[str]time_ranges: List[Dict[str, Any]]confidence: float
智能问数系统架构设计
系统整体架构
class IntelligentQueryArchitecture:"""智能问数系统架构"""def __init__(self):self.components = {'frontend': FrontendInterface(),'api_gateway': APIGateway(),'query_processor': QueryProcessor(),'mcp_orchestrator': MCPOrchestrator(),'data_connectors': DataConnectorManager(),'cache_layer': CacheLayer(),'security_manager': SecurityManager(),'monitoring': MonitoringSystem()}def initialize_system(self):"""初始化系统"""# 启动各个组件for name, component in self.components.items():component.initialize()print(f"Component {name} initialized")# 建立组件间连接self._setup_component_connections()def _setup_component_connections(self):"""设置组件间连接"""# API网关连接查询处理器self.components['api_gateway'].connect(self.components['query_processor'])# 查询处理器连接MCP编排器self.components['query_processor'].connect(self.components['mcp_orchestrator'])# MCP编排器连接数据连接器self.components['mcp_orchestrator'].connect(self.components['data_connectors'])class QueryProcessor:"""查询处理器"""def __init__(self):self.nlp_engine = NLPEngine()self.query_planner = QueryPlanner()self.execution_coordinator = ExecutionCoordinator()async def process_natural_query(self, query: str, user_id: str) -> Dict[str, Any]:"""处理自然语言查询"""# 1. 预处理查询preprocessed_query = await self._preprocess_query(query, user_id)# 2. 语义理解semantic_analysis = await self.nlp_engine.analyze(preprocessed_query)# 3. 查询规划execution_plan = await self.query_planner.create_plan(semantic_analysis)# 4. 执行协调results = await self.execution_coordinator.execute(execution_plan)return resultsasync def _preprocess_query(self, query: str, user_id: str) -> Dict[str, Any]:"""预处理查询"""return {'original_query': query,'user_id': user_id,'timestamp': time.time(),'normalized_query': self._normalize_query(query),'user_context': await self._get_user_context(user_id)}def _normalize_query(self, query: str) -> str:"""规范化查询"""# 去除多余空格、标点符号等import renormalized = re.sub(r'\s+', ' ', query.strip())return normalized.lower()class NLPEngine:"""自然语言处理引擎"""def __init__(self):self.tokenizer = Tokenizer()self.pos_tagger = POSTagger()self.ner_model = NERModel()self.intent_classifier = IntentClassifier()self.dependency_parser = DependencyParser()async def analyze(self, query_data: Dict[str, Any]) -> Dict[str, Any]:"""分析查询语义"""query = query_data['normalized_query']# 分词tokens = self.tokenizer.tokenize(query)# 词性标注pos_tags = self.pos_tagger.tag(tokens)# 命名实体识别entities = self.ner_model.extract(tokens, pos_tags)# 意图分类intent = self.intent_classifier.classify(query, query_data['user_context'])# 依存句法分析dependencies = self.dependency_parser.parse(tokens, pos_tags)return {'tokens': tokens,'pos_tags': pos_tags,'entities': entities,'intent': intent,'dependencies': dependencies,'semantic_structure': self._build_semantic_structure(tokens, pos_tags, entities, intent, dependencies)}def _build_semantic_structure(self, tokens, pos_tags, entities, intent, dependencies):"""构建语义结构"""return {'query_type': intent['type'],'target_entities': [e for e in entities if e['type'] in ['TABLE', 'COLUMN']],'filter_conditions': self._extract_conditions(dependencies, entities),'aggregation_functions': self._extract_aggregations(tokens, pos_tags),'temporal_constraints': self._extract_temporal_info(entities),'grouping_criteria': self._extract_grouping(dependencies, entities)}class QueryPlanner:"""查询规划器"""def __init__(self):self.schema_manager = SchemaManager()self.cost_estimator = CostEstimator()self.plan_optimizer = PlanOptimizer()async def create_plan(self, semantic_analysis: Dict[str, Any]) -> 'ExecutionPlan':"""创建执行计划"""semantic_structure = semantic_analysis['semantic_structure']# 1. 模式匹配schema_info = await self.schema_manager.match_schema(semantic_structure)# 2. 生成候选计划candidate_plans = await self._generate_candidate_plans(semantic_structure, schema_info)# 3. 成本估算costed_plans = []for plan in candidate_plans:cost = await self.cost_estimator.estimate(plan)costed_plans.append((plan, cost))# 4. 选择最优计划best_plan = min(costed_plans, key=lambda x: x[1])[0]# 5. 计划优化optimized_plan = await self.plan_optimizer.optimize(best_plan)return ExecutionPlan(plan_id=str(uuid.uuid4()),steps=optimized_plan['steps'],estimated_cost=optimized_plan['cost'],expected_result_schema=optimized_plan['result_schema'],metadata=optimized_plan['metadata'])async def _generate_candidate_plans(self, semantic_structure: Dict[str, Any], schema_info: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成候选执行计划"""plans = []query_type = semantic_structure['query_type']if query_type == 'SELECT':plans.extend(await self._generate_select_plans(semantic_structure, schema_info))elif query_type == 'ANALYZE':plans.extend(await self._generate_analysis_plans(semantic_structure, schema_info))elif query_type == 'VISUALIZE':plans.extend(await self._generate_visualization_plans(semantic_structure, schema_info))return plansasync def _generate_select_plans(self, semantic_structure: Dict[str, Any], schema_info: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成SELECT查询计划"""plans = []# 基础查询计划base_plan = {'type': 'SELECT','steps': [{'operation': 'TABLE_SCAN','table': schema_info['primary_table'],'columns': semantic_structure['target_entities']}]}# 添加过滤条件if semantic_structure['filter_conditions']:base_plan['steps'].append({'operation': 'FILTER','conditions': semantic_structure['filter_conditions']})# 添加聚合操作if semantic_structure['aggregation_functions']:base_plan['steps'].append({'operation': 'AGGREGATE','functions': semantic_structure['aggregation_functions'],'group_by': semantic_structure['grouping_criteria']})plans.append(base_plan)# 如果涉及多表,生成JOIN计划if len(schema_info['involved_tables']) > 1:join_plan = await self._generate_join_plan(semantic_structure, schema_info)plans.append(join_plan)return plans@dataclass
class ExecutionPlan:"""执行计划"""plan_id: strsteps: List[Dict[str, Any]]estimated_cost: floatexpected_result_schema: Dict[str, Any]metadata: Dict[str, Any]def get_step(self, index: int) -> Dict[str, Any]:"""获取执行步骤"""if 0 <= index < len(self.steps):return self.steps[index]raise IndexError(f"Step index {index} out of range")def add_step(self, step: Dict[str, Any], position: int = None):"""添加执行步骤"""if position is None:self.steps.append(step)else:self.steps.insert(position, step)
自然语言查询解析引擎
意图识别与实体提取
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
import spacyclass IntentClassifier:"""意图分类器"""def __init__(self, model_name: str = "bert-base-uncased"):self.tokenizer = AutoTokenizer.from_pretrained(model_name)self.model = AutoModel.from_pretrained(model_name)self.classifier = self._build_classifier()self.intent_labels = ['SELECT_DATA', 'AGGREGATE_DATA', 'FILTER_DATA', 'JOIN_TABLES','CREATE_VISUALIZATION', 'ANALYZE_TRENDS', 'COMPARE_VALUES','FIND_ANOMALIES', 'GENERATE_REPORT', 'UPDATE_DATA']def _build_classifier(self) -> nn.Module:"""构建分类器网络"""return nn.Sequential(nn.Linear(768, 256), # BERT hidden sizenn.ReLU(),nn.Dropout(0.3),nn.Linear(256, 128),nn.ReLU(),nn.Dropout(0.2),nn.Linear(128, len(self.intent_labels)))async def classify(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:"""分类查询意图"""# 编码查询文本inputs = self.tokenizer(query, return_tensors="pt", padding=True, truncation=True)with torch.no_grad():# 获取BERT嵌入outputs = self.model(**inputs)embeddings = outputs.last_hidden_state.mean(dim=1) # 平均池化# 意图分类logits = self.classifier(embeddings)probabilities = torch.softmax(logits, dim=-1)# 获取最高概率的意图predicted_idx = torch.argmax(probabilities, dim=-1).item()confidence = probabilities[0][predicted_idx].item()# 结合上下文调整结果if context:confidence = self._adjust_confidence_with_context(query, predicted_idx, confidence, context)return {'type': self.intent_labels[predicted_idx],'confidence': confidence,'all_probabilities': {label: prob.item() for label, prob in zip(self.intent_labels, probabilities[0])},'reasoning': self._explain_classification(query, predicted_idx, confidence)}def _adjust_confidence_with_context(self, query: str, predicted_idx: int, confidence: float, context: Dict[str, Any]) -> float:"""基于上下文调整置信度"""# 检查历史查询模式if 'query_history' in context:recent_intents = [q.get('intent', {}).get('type') for q in context['query_history'][-3:]]current_intent = self.intent_labels[predicted_idx]# 如果与最近的查询意图一致,提高置信度if current_intent in recent_intents:confidence = min(1.0, confidence * 1.1)# 检查用户偏好if 'user_preferences' in context:preferred_operations = context['user_preferences'].get('common_operations', [])current_intent = self.intent_labels[predicted_idx]if current_intent in preferred_operations:confidence = min(1.0, confidence * 1.05)return confidencedef _explain_classification(self, query: str, predicted_idx: int, confidence: float) -> str:"""解释分类结果"""intent = self.intent_labels[predicted_idx]explanations = {'SELECT_DATA': f"查询包含数据检索关键词,意图是获取特定数据",'AGGREGATE_DATA': f"查询包含聚合函数关键词(如sum、count、average),意图是数据汇总",'FILTER_DATA': f"查询包含条件筛选关键词(如where、filter),意图是数据过滤",'CREATE_VISUALIZATION': f"查询包含可视化关键词(如chart、graph、plot),意图是创建图表",'ANALYZE_TRENDS': f"查询包含趋势分析关键词(如trend、pattern),意图是趋势分析"}return explanations.get(intent, f"基于语义分析,识别为{intent}操作")class EntityExtractor:"""实体提取器"""def __init__(self):self.nlp = spacy.load("en_core_web_sm")self.custom_entities = self._load_custom_entities()def _load_custom_entities(self) -> Dict[str, List[str]]:"""加载自定义实体"""return {'TABLE_NAMES': ['users', 'orders', 'products', 'customers', 'sales', 'inventory'],'COLUMN_NAMES': ['id', 'name', 'email', 'date', 'amount', 'price', 'quantity'],'AGGREGATION_FUNCTIONS': ['sum', 'count', 'average', 'max', 'min', 'median'],'TIME_PERIODS': ['today', 'yesterday', 'last week', 'last month', 'last year'],'COMPARISON_OPERATORS': ['greater than', 'less than', 'equal to', 'between']}async def extract(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:"""提取实体"""# 使用spaCy进行基础实体识别doc = self.nlp(query)# 提取标准实体standard_entities = self._extract_standard_entities(doc)# 提取自定义实体custom_entities = self._extract_custom_entities(query)# 提取数值和时间实体numerical_entities = self._extract_numerical_entities(doc)temporal_entities = self._extract_temporal_entities(doc)# 合并所有实体all_entities = {**standard_entities,**custom_entities,**numerical_entities,**temporal_entities}# 计算整体置信度confidence = self._calculate_entity_confidence(all_entities)return {'entities': all_entities,'confidence': confidence,'entity_count': sum(len(v) if isinstance(v, list) else 1 for v in all_entities.values()),'coverage': self._calculate_coverage(query, all_entities)}def _extract_standard_entities(self, doc) -> Dict[str, List[Dict[str, Any]]]:"""提取标准命名实体"""entities = {'PERSON': [],'ORG': [],'GPE': [], # 地理政治实体'MONEY': [],'DATE': [],'TIME': []}for ent in doc.ents:if ent.label_ in entities:entities[ent.label_].append({'text': ent.text,'start': ent.start_char,'end': ent.end_char,'confidence': 0.9 # spaCy实体的默认置信度})return entitiesdef _extract_custom_entities(self, query: str) -> Dict[str, List[Dict[str, Any]]]:"""提取自定义实体"""entities = {}for entity_type, entity_list in self.custom_entities.items():entities[entity_type] = []for entity in entity_list:if entity.lower() in query.lower():start_idx = query.lower().find(entity.lower())entities[entity_type].append({'text': entity,'start': start_idx,'end': start_idx + len(entity),'confidence': 0.8})return entitiesdef _extract_numerical_entities(self, doc) -> Dict[str, List[Dict[str, Any]]]:"""提取数值实体"""numerical_entities = {'NUMBERS': [],'PERCENTAGES': [],'RANGES': []}for token in doc:if token.like_num:numerical_entities['NUMBERS'].append({'text': token.text,'value': self._parse_number(token.text),'start': token.idx,'end': token.idx + len(token.text),'confidence': 0.95})elif '%' in token.text:numerical_entities['PERCENTAGES'].append({'text': token.text,'value': self._parse_percentage(token.text),'start': token.idx,'end': token.idx + len(token.text),'confidence': 0.9})return numerical_entitiesdef _parse_number(self, text: str) -> float:"""解析数字"""try:return float(text.replace(',', ''))except ValueError:return 0.0def _parse_percentage(self, text: str) -> float:"""解析百分比"""try:return float(text.replace('%', '')) / 100except ValueError:return 0.0class QueryBuilder:"""查询构建器"""def __init__(self):self.sql_generator = SQLGenerator()self.nosql_generator = NoSQLGenerator()self.api_generator = APIQueryGenerator()async def build(self, intent: Dict[str, Any], entities: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:"""构建结构化查询"""query_type = intent['type']if query_type in ['SELECT_DATA', 'AGGREGATE_DATA', 'FILTER_DATA']:return await self._build_sql_query(intent, entities, context)elif query_type == 'CREATE_VISUALIZATION':return await self._build_visualization_query(intent, entities, context)elif query_type == 'ANALYZE_TRENDS':return await self._build_analysis_query(intent, entities, context)else:return await self._build_generic_query(intent, entities, context)async def _build_sql_query(self, intent: Dict[str, Any], entities: Dict[str, Any],context: Dict[str, Any]) -> Dict[str, Any]:"""构建SQL查询"""# 提取表名tables = entities.get('TABLE_NAMES', [])if not tables and context and 'default_table' in context:tables = [context['default_table']]# 提取列名columns = entities.get('COLUMN_NAMES', [])if not columns:columns = ['*'] # 默认选择所有列# 构建WHERE条件conditions = self._build_where_conditions(entities)# 构建聚合函数aggregations = self._build_aggregations(entities, intent)# 构建GROUP BYgroup_by = self._build_group_by(entities)# 构建ORDER BYorder_by = self._build_order_by(entities)return {'type': 'SQL','operation': intent['type'],'tables': tables,'columns': columns,'conditions': conditions,'aggregations': aggregations,'group_by': group_by,'order_by': order_by,'limit': self._extract_limit(entities),'sql': self.sql_generator.generate({'tables': tables,'columns': columns,'conditions': conditions,'aggregations': aggregations,'group_by': group_by,'order_by': order_by})}def _build_where_conditions(self, entities: Dict[str, Any]) -> List[Dict[str, Any]]:"""构建WHERE条件"""conditions = []# 数值条件numbers = entities.get('NUMBERS', [])comparison_ops = entities.get('COMPARISON_OPERATORS', [])for i, number in enumerate(numbers):if i < len(comparison_ops):conditions.append({'type': 'comparison','operator': comparison_ops[i]['text'],'value': number['value'],'column': self._infer_column_for_condition(entities, i)})# 时间条件dates = entities.get('DATE', [])for date in dates:conditions.append({'type': 'temporal','operator': '>=','value': date['text'],'column': 'date' # 默认日期列})return conditionsdef _build_aggregations(self, entities: Dict[str, Any], intent: Dict[str, Any]) -> List[Dict[str, Any]]:"""构建聚合函数"""aggregations = []agg_functions = entities.get('AGGREGATION_FUNCTIONS', [])columns = entities.get('COLUMN_NAMES', [])if intent['type'] == 'AGGREGATE_DATA':for agg_func in agg_functions:target_column = columns[0]['text'] if columns else 'id'aggregations.append({'function': agg_func['text'].upper(),'column': target_column,'alias': f"{agg_func['text']}_{target_column}"})return aggregationsclass SQLGenerator:"""SQL生成器"""def generate(self, query_spec: Dict[str, Any]) -> str:"""生成SQL语句"""# SELECT子句select_clause = self._build_select_clause(query_spec.get('columns', []),query_spec.get('aggregations', []))# FROM子句from_clause = self._build_from_clause(query_spec.get('tables', []))# WHERE子句where_clause = self._build_where_clause(query_spec.get('conditions', []))# GROUP BY子句group_by_clause = self._build_group_by_clause(query_spec.get('group_by', []))# ORDER BY子句order_by_clause = self._build_order_by_clause(query_spec.get('order_by', []))# 组装SQLsql_parts = [select_clause, from_clause]if where_clause:sql_parts.append(where_clause)if group_by_clause:sql_parts.append(group_by_clause)if order_by_clause:sql_parts.append(order_by_clause)return ' '.join(sql_parts)def _build_select_clause(self, columns: List[str], aggregations: List[Dict[str, Any]]) -> str:"""构建SELECT子句"""if aggregations:select_items = []for agg in aggregations:select_items.append(f"{agg['function']}({agg['column']}) AS {agg['alias']}")return f"SELECT {', '.join(select_items)}"else:column_list = ', '.join(columns) if columns else '*'return f"SELECT {column_list}"def _build_from_clause(self, tables: List[str]) -> str:"""构建FROM子句"""if not tables:return "FROM default_table"return f"FROM {tables[0]}" # 简化处理,只处理单表def _build_where_clause(self, conditions: List[Dict[str, Any]]) -> str:"""构建WHERE子句"""if not conditions:return ""condition_strings = []for condition in conditions:if condition['type'] == 'comparison':condition_strings.append(f"{condition['column']} {self._map_operator(condition['operator'])} {condition['value']}")elif condition['type'] == 'temporal':condition_strings.append(f"{condition['column']} {condition['operator']} '{condition['value']}'")return f"WHERE {' AND '.join(condition_strings)}"def _map_operator(self, natural_operator: str) -> str:"""映射自然语言操作符到SQL操作符"""mapping = {'greater than': '>','less than': '<','equal to': '=','not equal to': '!=','greater than or equal to': '>=','less than or equal to': '<='}return mapping.get(natural_operator.lower(), '=')## MCP服务器端实现### 数据连接器MCP服务器```python
import asyncio
import json
from typing import Dict, List, Any, Optional
import sqlite3
import pandas as pd
import pymongo
from sqlalchemy import create_engine
import redisclass DataConnectorMCPServer(MCPServer):"""数据连接器MCP服务器"""def __init__(self):super().__init__("data-connector", "1.0.0")self.connections = {}self.query_cache = redis.Redis(host='localhost', port=6379, db=0)self._register_tools()self._register_resources()def _register_tools(self):"""注册工具"""# SQL查询工具sql_query_tool = MCPTool(name="execute_sql_query",description="Execute SQL query on connected database",parameters={"type": "object","properties": {"connection_id": {"type": "string", "description": "Database connection ID"},"query": {"type": "string", "description": "SQL query to execute"},"parameters": {"type": "array", "description": "Query parameters"}},"required": ["connection_id", "query"]},required_permissions=["database:read"],category="database")self.register_tool(sql_query_tool)# NoSQL查询工具nosql_query_tool = MCPTool(name="execute_nosql_query",description="Execute NoSQL query on MongoDB",parameters={"type": "object","properties": {"connection_id": {"type": "string", "description": "MongoDB connection ID"},"collection": {"type": "string", "description": "Collection name"},"query": {"type": "object", "description": "MongoDB query object"},"projection": {"type": "object", "description": "Projection object"}},"required": ["connection_id", "collection", "query"]},required_permissions=["database:read"],category="database")self.register_tool(nosql_query_tool)# 数据分析工具analyze_data_tool = MCPTool(name="analyze_data",description="Perform statistical analysis on data",parameters={"type": "object","properties": {"data": {"type": "array", "description": "Data to analyze"},"analysis_type": {"type": "string", "enum": ["descriptive", "correlation", "trend"]},"columns": {"type": "array", "description": "Columns to analyze"}},"required": ["data", "analysis_type"]},required_permissions=["data:analyze"],category="analytics")self.register_tool(analyze_data_tool)def _register_resources(self):"""注册资源"""# 数据库连接资源db_resource = MCPResource(uri="database://connections",type=MCPResourceType.DATABASE,name="Database Connections",description="Available database connections",metadata={"connection_types": ["postgresql", "mysql", "sqlite", "mongodb"]},access_permissions=["database:read"])self.register_resource(db_resource)# 查询历史资源query_history_resource = MCPResource(uri="memory://query_history",type=MCPResourceType.MEMORY,name="Query History",description="Historical query results and metadata",metadata={"retention_days": 30},access_permissions=["query:history"])self.register_resource(query_history_resource)async def handle_request(self, message: MCPMessage) -> MCPMessage:"""处理请求"""method = message.methodparams = message.paramstry:if method == "tools/list":result = await self.list_tools()elif method == "tools/call":result = await self.call_tool(params["name"], params.get("arguments", {}))elif method == "resources/list":result = await self.list_resources()elif method == "resources/read":result = await self.read_resource(params["uri"])else:raise ValueError(f"Unknown method: {method}")return MCPMessage(id=message.id,type=MCPMessageType.RESPONSE,method=method,params={"result": result},timestamp=time.time())except Exception as e:return MCPMessage(id=message.id,type=MCPMessageType.ERROR,method=method,params={"error": str(e)},timestamp=time.time())async def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行工具"""if tool.name == "execute_sql_query":return await self._execute_sql_query(parameters)elif tool.name == "execute_nosql_query":return await self._execute_nosql_query(parameters)elif tool.name == "analyze_data":return await self._analyze_data(parameters)else:raise ValueError(f"Unknown tool: {tool.name}")async def _execute_sql_query(self, params: Dict[str, Any]) -> Dict[str, Any]:"""执行SQL查询"""connection_id = params["connection_id"]query = params["query"]query_params = params.get("parameters", [])# 检查缓存cache_key = f"sql:{connection_id}:{hash(query)}:{hash(str(query_params))}"cached_result = self.query_cache.get(cache_key)if cached_result:return json.loads(cached_result)# 获取数据库连接if connection_id not in self.connections:raise ValueError(f"Connection {connection_id} not found")connection = self.connections[connection_id]try:# 执行查询if connection['type'] == 'sqlite':result = await self._execute_sqlite_query(connection, query, query_params)elif connection['type'] == 'postgresql':result = await self._execute_postgresql_query(connection, query, query_params)else:raise ValueError(f"Unsupported connection type: {connection['type']}")# 缓存结果self.query_cache.setex(cache_key, 300, json.dumps(result)) # 5分钟缓存return resultexcept Exception as e:return {"success": False,"error": str(e),"query": query}async def _execute_sqlite_query(self, connection: Dict[str, Any], query: str, params: List[Any]) -> Dict[str, Any]:"""执行SQLite查询"""conn = sqlite3.connect(connection['database'])conn.row_factory = sqlite3.Row # 返回字典格式的行try:cursor = conn.cursor()cursor.execute(query, params)if query.strip().upper().startswith('SELECT'):rows = cursor.fetchall()columns = [description[0] for description in cursor.description]data = [dict(row) for row in rows]return {"success": True,"data": data,"columns": columns,"row_count": len(data),"execution_time": 0 # 简化处理}else:conn.commit()return {"success": True,"affected_rows": cursor.rowcount,"message": "Query executed successfully"}finally:conn.close()async def _execute_nosql_query(self, params: Dict[str, Any]) -> Dict[str, Any]:"""执行NoSQL查询"""connection_id = params["connection_id"]collection_name = params["collection"]query = params["query"]projection = params.get("projection", {})if connection_id not in self.connections:raise ValueError(f"Connection {connection_id} not found")connection = self.connections[connection_id]try:client = pymongo.MongoClient(connection['uri'])db = client[connection['database']]collection = db[collection_name]# 执行查询cursor = collection.find(query, projection)results = list(cursor)# 转换ObjectId为字符串for result in results:if '_id' in result:result['_id'] = str(result['_id'])return {"success": True,"data": results,"count": len(results),"collection": collection_name}except Exception as e:return {"success": False,"error": str(e),"query": query}finally:client.close()async def _analyze_data(self, params: Dict[str, Any]) -> Dict[str, Any]:"""分析数据"""data = params["data"]analysis_type = params["analysis_type"]columns = params.get("columns", [])# 转换为DataFramedf = pd.DataFrame(data)if columns:df = df[columns]try:if analysis_type == "descriptive":result = self._descriptive_analysis(df)elif analysis_type == "correlation":result = self._correlation_analysis(df)elif analysis_type == "trend":result = self._trend_analysis(df)else:raise ValueError(f"Unknown analysis type: {analysis_type}")return {"success": True,"analysis_type": analysis_type,"result": result,"data_shape": df.shape}except Exception as e:return {"success": False,"error": str(e),"analysis_type": analysis_type}def _descriptive_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:"""描述性统计分析"""numeric_columns = df.select_dtypes(include=[np.number]).columnscategorical_columns = df.select_dtypes(include=['object']).columnsresult = {"numeric_summary": {},"categorical_summary": {},"missing_values": df.isnull().sum().to_dict(),"data_types": df.dtypes.astype(str).to_dict()}# 数值列统计if len(numeric_columns) > 0:result["numeric_summary"] = df[numeric_columns].describe().to_dict()# 分类列统计for col in categorical_columns:result["categorical_summary"][col] = {"unique_count": df[col].nunique(),"top_values": df[col].value_counts().head(5).to_dict()}return resultdef _correlation_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:"""相关性分析"""numeric_df = df.select_dtypes(include=[np.number])if numeric_df.empty:return {"error": "No numeric columns found for correlation analysis"}correlation_matrix = numeric_df.corr()return {"correlation_matrix": correlation_matrix.to_dict(),"strong_correlations": self._find_strong_correlations(correlation_matrix),"columns_analyzed": list(numeric_df.columns)}def _find_strong_correlations(self, corr_matrix: pd.DataFrame, threshold: float = 0.7) -> List[Dict[str, Any]]:"""找出强相关性"""strong_corrs = []for i in range(len(corr_matrix.columns)):for j in range(i+1, len(corr_matrix.columns)):corr_value = corr_matrix.iloc[i, j]if abs(corr_value) >= threshold:strong_corrs.append({"column1": corr_matrix.columns[i],"column2": corr_matrix.columns[j],"correlation": corr_value,"strength": "strong" if abs(corr_value) >= 0.8 else "moderate"})return strong_corrs### 可视化MCP服务器class VisualizationMCPServer(MCPServer):"""可视化MCP服务器"""def __init__(self):super().__init__("visualization", "1.0.0")self.chart_generators = {'bar': BarChartGenerator(),'line': LineChartGenerator(),'scatter': ScatterPlotGenerator(),'pie': PieChartGenerator(),'heatmap': HeatmapGenerator()}self._register_tools()def _register_tools(self):"""注册可视化工具"""create_chart_tool = MCPTool(name="create_chart",description="Create various types of charts from data",parameters={"type": "object","properties": {"chart_type": {"type": "string", "enum": ["bar", "line", "scatter", "pie", "heatmap"]},"data": {"type": "array", "description": "Data for the chart"},"x_column": {"type": "string", "description": "X-axis column"},"y_column": {"type": "string", "description": "Y-axis column"},"title": {"type": "string", "description": "Chart title"},"options": {"type": "object", "description": "Additional chart options"}},"required": ["chart_type", "data"]},required_permissions=["visualization:create"],category="visualization")self.register_tool(create_chart_tool)analyze_chart_tool = MCPTool(name="analyze_chart_data",description="Analyze data and suggest appropriate chart types",parameters={"type": "object","properties": {"data": {"type": "array", "description": "Data to analyze"},"analysis_goal": {"type": "string", "description": "Goal of the analysis"}},"required": ["data"]},required_permissions=["data:analyze"],category="analytics")self.register_tool(analyze_chart_tool)async def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行可视化工具"""if tool.name == "create_chart":return await self._create_chart(parameters)elif tool.name == "analyze_chart_data":return await self._analyze_chart_data(parameters)else:raise ValueError(f"Unknown tool: {tool.name}")async def _create_chart(self, params: Dict[str, Any]) -> Dict[str, Any]:"""创建图表"""chart_type = params["chart_type"]data = params["data"]if chart_type not in self.chart_generators:raise ValueError(f"Unsupported chart type: {chart_type}")generator = self.chart_generators[chart_type]try:chart_config = await generator.generate(params)return {"success": True,"chart_type": chart_type,"config": chart_config,"data_points": len(data),"recommendations": await self._get_chart_recommendations(params)}except Exception as e:return {"success": False,"error": str(e),"chart_type": chart_type}async def _analyze_chart_data(self, params: Dict[str, Any]) -> Dict[str, Any]:"""分析图表数据"""data = params["data"]analysis_goal = params.get("analysis_goal", "general")df = pd.DataFrame(data)# 分析数据特征data_analysis = {"row_count": len(df),"column_count": len(df.columns),"numeric_columns": list(df.select_dtypes(include=[np.number]).columns),"categorical_columns": list(df.select_dtypes(include=['object']).columns),"datetime_columns": list(df.select_dtypes(include=['datetime64']).columns)}# 推荐图表类型chart_recommendations = await self._recommend_chart_types(df, analysis_goal)return {"success": True,"data_analysis": data_analysis,"chart_recommendations": chart_recommendations,"analysis_goal": analysis_goal}async def _recommend_chart_types(self, df: pd.DataFrame, goal: str) -> List[Dict[str, Any]]:"""推荐图表类型"""recommendations = []numeric_cols = df.select_dtypes(include=[np.number]).columnscategorical_cols = df.select_dtypes(include=['object']).columns# 基于数据类型推荐if len(numeric_cols) >= 2:recommendations.append({"chart_type": "scatter","reason": "Multiple numeric columns suitable for correlation analysis","suggested_config": {"x_column": numeric_cols[0],"y_column": numeric_cols[1]},"confidence": 0.8})if len(categorical_cols) >= 1 and len(numeric_cols) >= 1:recommendations.append({"chart_type": "bar","reason": "Categorical and numeric columns suitable for comparison","suggested_config": {"x_column": categorical_cols[0],"y_column": numeric_cols[0]},"confidence": 0.9})# 基于分析目标推荐if goal == "trend":if 'date' in df.columns or 'time' in df.columns:recommendations.append({"chart_type": "line","reason": "Time series data suitable for trend analysis","confidence": 0.95})return recommendationsclass BarChartGenerator:"""柱状图生成器"""async def generate(self, params: Dict[str, Any]) -> Dict[str, Any]:"""生成柱状图配置"""data = params["data"]x_column = params.get("x_column")y_column = params.get("y_column")title = params.get("title", "Bar Chart")options = params.get("options", {})# 处理数据df = pd.DataFrame(data)if not x_column:x_column = df.columns[0]if not y_column:y_column = df.columns[1] if len(df.columns) > 1 else df.columns[0]# 聚合数据(如果需要)if df[x_column].dtype == 'object':chart_data = df.groupby(x_column)[y_column].sum().reset_index()else:chart_data = df[[x_column, y_column]]return {"type": "bar","data": {"labels": chart_data[x_column].tolist(),"datasets": [{"label": y_column,"data": chart_data[y_column].tolist(),"backgroundColor": options.get("color", "rgba(54, 162, 235, 0.6)"),"borderColor": options.get("border_color", "rgba(54, 162, 235, 1)"),"borderWidth": options.get("border_width", 1)}]},"options": {"responsive": True,"plugins": {"title": {"display": True,"text": title}},"scales": {"y": {"beginAtZero": True}}}}## 上下文管理与会话状态### 会话管理器```python
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import jsonclass SessionManager:"""会话管理器"""def __init__(self, redis_client=None):self.redis_client = redis_client or redis.Redis(host='localhost', port=6379, db=1)self.session_timeout = 3600 # 1小时self.max_context_length = 50 # 最大上下文长度async def create_session(self, user_id: str, initial_context: Dict[str, Any] = None) -> str:"""创建新会话"""session_id = str(uuid.uuid4())session_data = {'session_id': session_id,'user_id': user_id,'created_at': datetime.now().isoformat(),'last_activity': datetime.now().isoformat(),'context': initial_context or {},'query_history': [],'preferences': await self._load_user_preferences(user_id),'active_connections': [],'temporary_data': {}}# 存储会话数据session_key = f"session:{session_id}"self.redis_client.setex(session_key, self.session_timeout, json.dumps(session_data, default=str))# 添加到用户会话列表user_sessions_key = f"user_sessions:{user_id}"self.redis_client.sadd(user_sessions_key, session_id)self.redis_client.expire(user_sessions_key, self.session_timeout)return session_idasync def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:"""获取会话数据"""session_key = f"session:{session_id}"session_data = self.redis_client.get(session_key)if session_data:return json.loads(session_data)return Noneasync def update_session(self, session_id: str, updates: Dict[str, Any]):"""更新会话数据"""session_data = await self.get_session(session_id)if not session_data:raise ValueError(f"Session {session_id} not found")# 更新数据session_data.update(updates)session_data['last_activity'] = datetime.now().isoformat()# 保存更新后的数据session_key = f"session:{session_id}"self.redis_client.setex(session_key, self.session_timeout, json.dumps(session_data, default=str))async def add_query_to_history(self, session_id: str, query: str, result: Dict[str, Any]):"""添加查询到历史记录"""session_data = await self.get_session(session_id)if not session_data:returnquery_record = {'timestamp': datetime.now().isoformat(),'query': query,'result': result,'execution_time': result.get('execution_time', 0)}# 添加到历史记录session_data['query_history'].append(query_record)# 限制历史记录长度if len(session_data['query_history']) > self.max_context_length:session_data['query_history'] = session_data['query_history'][-self.max_context_length:]await self.update_session(session_id, session_data)async def get_context_for_query(self, session_id: str) -> Dict[str, Any]:"""获取查询上下文"""session_data = await self.get_session(session_id)if not session_data:return {}# 构建上下文context = {'user_id': session_data['user_id'],'session_id': session_id,'preferences': session_data['preferences'],'recent_queries': session_data['query_history'][-5:], # 最近5个查询'active_tables': self._extract_active_tables(session_data['query_history']),'common_columns': self._extract_common_columns(session_data['query_history']),'session_duration': self._calculate_session_duration(session_data)}return contextdef _extract_active_tables(self, query_history: List[Dict[str, Any]]) -> List[str]:"""提取活跃表名"""tables = set()for query_record in query_history[-10:]: # 最近10个查询result = query_record.get('result', {})if 'tables' in result:tables.update(result['tables'])return list(tables)def _extract_common_columns(self, query_history: List[Dict[str, Any]]) -> List[str]:"""提取常用列名"""column_counts = {}for query_record in query_history[-10:]:result = query_record.get('result', {})if 'columns' in result:for column in result['columns']:column_counts[column] = column_counts.get(column, 0) + 1# 返回使用频率最高的列return sorted(column_counts.keys(), key=lambda x: column_counts[x], reverse=True)[:10]async def _load_user_preferences(self, user_id: str) -> Dict[str, Any]:"""加载用户偏好设置"""prefs_key = f"user_preferences:{user_id}"prefs_data = self.redis_client.get(prefs_key)if prefs_data:return json.loads(prefs_data)# 默认偏好设置default_prefs = {'preferred_chart_types': ['bar', 'line'],'default_limit': 100,'preferred_aggregations': ['sum', 'count', 'avg'],'timezone': 'UTC','date_format': 'YYYY-MM-DD','number_format': 'en-US'}# 保存默认偏好self.redis_client.setex(prefs_key, 86400 * 30, json.dumps(default_prefs)) # 30天return default_prefsclass ContextManager:"""上下文管理器"""def __init__(self, session_manager: SessionManager):self.session_manager = session_managerself.context_analyzers = {'intent': IntentContextAnalyzer(),'entity': EntityContextAnalyzer(),'temporal': TemporalContextAnalyzer(),'semantic': SemanticContextAnalyzer()}async def analyze_context(self, query: str, session_id: str) -> Dict[str, Any]:"""分析查询上下文"""# 获取会话上下文session_context = await self.session_manager.get_context_for_query(session_id)# 运行各种上下文分析器context_analysis = {}for analyzer_name, analyzer in self.context_analyzers.items():try:analysis_result = await analyzer.analyze(query, session_context)context_analysis[analyzer_name] = analysis_resultexcept Exception as e:context_analysis[analyzer_name] = {'error': str(e)}# 综合分析结果integrated_context = await self._integrate_context_analysis(query, session_context, context_analysis)return integrated_contextasync def _integrate_context_analysis(self, query: str, session_context: Dict[str, Any],context_analysis: Dict[str, Any]) -> Dict[str, Any]:"""整合上下文分析结果"""integrated = {'query': query,'session_context': session_context,'context_signals': {},'recommendations': [],'confidence_scores': {}}# 整合意图上下文if 'intent' in context_analysis:intent_analysis = context_analysis['intent']integrated['context_signals']['intent_continuity'] = intent_analysis.get('continuity_score', 0)integrated['context_signals']['intent_shift'] = intent_analysis.get('shift_detected', False)# 整合实体上下文if 'entity' in context_analysis:entity_analysis = context_analysis['entity']integrated['context_signals']['entity_overlap'] = entity_analysis.get('overlap_score', 0)integrated['context_signals']['new_entities'] = entity_analysis.get('new_entities', [])# 整合时间上下文if 'temporal' in context_analysis:temporal_analysis = context_analysis['temporal']integrated['context_signals']['temporal_pattern'] = temporal_analysis.get('pattern', 'none')integrated['context_signals']['time_scope_change'] = temporal_analysis.get('scope_change', False)# 生成推荐integrated['recommendations'] = await self._generate_context_recommendations(integrated['context_signals'], session_context)return integratedasync def _generate_context_recommendations(self, context_signals: Dict[str, Any],session_context: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成基于上下文的推荐"""recommendations = []# 基于意图连续性的推荐if context_signals.get('intent_continuity', 0) > 0.8:recommendations.append({'type': 'query_refinement','message': '基于您之前的查询,建议添加更多筛选条件','confidence': 0.7})# 基于实体重叠的推荐if context_signals.get('entity_overlap', 0) > 0.6:recommendations.append({'type': 'related_analysis','message': '发现相关数据,建议进行关联分析','confidence': 0.8})# 基于新实体的推荐new_entities = context_signals.get('new_entities', [])if new_entities:recommendations.append({'type': 'entity_exploration','message': f'发现新的数据实体:{", ".join(new_entities)},建议深入探索','confidence': 0.6})return recommendationsclass IntentContextAnalyzer:"""意图上下文分析器"""async def analyze(self, query: str, session_context: Dict[str, Any]) -> Dict[str, Any]:"""分析意图上下文"""recent_queries = session_context.get('recent_queries', [])if not recent_queries:return {'continuity_score': 0, 'shift_detected': False}# 分析最近查询的意图recent_intents = []for query_record in recent_queries:result = query_record.get('result', {})if 'intent' in result:recent_intents.append(result['intent'])# 分析当前查询意图(简化实现)current_intent = await self._classify_intent(query)# 计算意图连续性continuity_score = self._calculate_intent_continuity(current_intent, recent_intents)# 检测意图转换shift_detected = self._detect_intent_shift(current_intent, recent_intents)return {'current_intent': current_intent,'recent_intents': recent_intents,'continuity_score': continuity_score,'shift_detected': shift_detected,'intent_pattern': self._identify_intent_pattern(recent_intents + [current_intent])}async def _classify_intent(self, query: str) -> str:"""分类查询意图(简化实现)"""query_lower = query.lower()if any(word in query_lower for word in ['show', 'select', 'get', 'find']):return 'SELECT'elif any(word in query_lower for word in ['sum', 'count', 'average', 'total']):return 'AGGREGATE'elif any(word in query_lower for word in ['chart', 'plot', 'graph', 'visualize']):return 'VISUALIZE'elif any(word in query_lower for word in ['compare', 'vs', 'versus']):return 'COMPARE'else:return 'GENERAL'def _calculate_intent_continuity(self, current_intent: str, recent_intents: List[str]) -> float:"""计算意图连续性分数"""if not recent_intents:return 0.0# 计算与最近意图的相似度recent_intent_counts = {}for intent in recent_intents:recent_intent_counts[intent] = recent_intent_counts.get(intent, 0) + 1total_recent = len(recent_intents)current_intent_frequency = recent_intent_counts.get(current_intent, 0)return current_intent_frequency / total_recent## 安全性与权限控制### 权限管理系统```python
from enum import Enum
import hashlib
import jwt
from datetime import datetime, timedeltaclass Permission(Enum):"""权限枚举"""DATABASE_READ = "database:read"DATABASE_WRITE = "database:write"DATABASE_ADMIN = "database:admin"VISUALIZATION_CREATE = "visualization:create"VISUALIZATION_SHARE = "visualization:share"DATA_EXPORT = "data:export"DATA_ANALYZE = "data:analyze"SYSTEM_ADMIN = "system:admin"USER_MANAGE = "user:manage"QUERY_HISTORY = "query:history"class Role(Enum):"""角色枚举"""VIEWER = "viewer"ANALYST = "analyst"DEVELOPER = "developer"ADMIN = "admin"SUPER_ADMIN = "super_admin"class SecurityManager:"""安全管理器"""def __init__(self, secret_key: str):self.secret_key = secret_keyself.role_permissions = self._define_role_permissions()self.active_tokens = set()self.blocked_tokens = set()def _define_role_permissions(self) -> Dict[Role, List[Permission]]:"""定义角色权限"""return {Role.VIEWER: [Permission.DATABASE_READ,Permission.VISUALIZATION_CREATE,Permission.QUERY_HISTORY],Role.ANALYST: [Permission.DATABASE_READ,Permission.VISUALIZATION_CREATE,Permission.VISUALIZATION_SHARE,Permission.DATA_ANALYZE,Permission.DATA_EXPORT,Permission.QUERY_HISTORY],Role.DEVELOPER: [Permission.DATABASE_READ,Permission.DATABASE_WRITE,Permission.VISUALIZATION_CREATE,Permission.VISUALIZATION_SHARE,Permission.DATA_ANALYZE,Permission.DATA_EXPORT,Permission.QUERY_HISTORY],Role.ADMIN: [Permission.DATABASE_READ,Permission.DATABASE_WRITE,Permission.DATABASE_ADMIN,Permission.VISUALIZATION_CREATE,Permission.VISUALIZATION_SHARE,Permission.DATA_ANALYZE,Permission.DATA_EXPORT,Permission.USER_MANAGE,Permission.QUERY_HISTORY],Role.SUPER_ADMIN: list(Permission) # 所有权限}def generate_token(self, user_id: str, role: Role, expires_in: int = 3600) -> str:"""生成访问令牌"""payload = {'user_id': user_id,'role': role.value,'permissions': [p.value for p in self.role_permissions[role]],'iat': datetime.utcnow(),'exp': datetime.utcnow() + timedelta(seconds=expires_in),'jti': hashlib.md5(f"{user_id}{datetime.utcnow()}".encode()).hexdigest()}token = jwt.encode(payload, self.secret_key, algorithm='HS256')self.active_tokens.add(payload['jti'])return tokendef validate_token(self, token: str) -> Optional[Dict[str, Any]]:"""验证访问令牌"""try:payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])# 检查令牌是否被阻止if payload['jti'] in self.blocked_tokens:return None# 检查令牌是否在活跃列表中if payload['jti'] not in self.active_tokens:return Nonereturn payloadexcept jwt.ExpiredSignatureError:return Noneexcept jwt.InvalidTokenError:return Nonedef revoke_token(self, token: str):"""撤销令牌"""try:payload = jwt.decode(token, self.secret_key, algorithms=['HS256'], options={"verify_exp": False})jti = payload['jti']self.blocked_tokens.add(jti)self.active_tokens.discard(jti)except jwt.InvalidTokenError:passdef check_permission(self, token: str, required_permission: Permission) -> bool:"""检查权限"""payload = self.validate_token(token)if not payload:return Falseuser_permissions = payload.get('permissions', [])return required_permission.value in user_permissionsdef get_user_permissions(self, token: str) -> List[str]:"""获取用户权限列表"""payload = self.validate_token(token)if not payload:return []return payload.get('permissions', [])class QuerySecurityValidator:"""查询安全验证器"""def __init__(self, security_manager: SecurityManager):self.security_manager = security_managerself.dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'INSERT', 'UPDATE','EXEC', 'EXECUTE', 'UNION', 'SCRIPT', 'JAVASCRIPT']self.allowed_functions = ['SELECT', 'COUNT', 'SUM', 'AVG', 'MAX', 'MIN', 'GROUP BY', 'ORDER BY','WHERE', 'HAVING', 'LIMIT', 'OFFSET']async def validate_query(self, query: str, token: str, query_context: Dict[str, Any] = None) -> Dict[str, Any]:"""验证查询安全性"""validation_result = {'is_valid': True,'security_issues': [],'recommendations': [],'risk_level': 'LOW'}# 1. 验证用户权限if not self.security_manager.validate_token(token):validation_result['is_valid'] = Falsevalidation_result['security_issues'].append('Invalid or expired token')validation_result['risk_level'] = 'HIGH'return validation_result# 2. 检查危险关键词dangerous_found = self._check_dangerous_keywords(query)if dangerous_found:# 检查是否有写权限if not self.security_manager.check_permission(token, Permission.DATABASE_WRITE):validation_result['is_valid'] = Falsevalidation_result['security_issues'].append(f'Dangerous keywords found: {dangerous_found}')validation_result['risk_level'] = 'HIGH'# 3. 检查SQL注入风险injection_risk = self._check_sql_injection_risk(query)if injection_risk['risk_detected']:validation_result['security_issues'].append('Potential SQL injection detected')validation_result['risk_level'] = 'HIGH'validation_result['recommendations'].extend(injection_risk['recommendations'])# 4. 检查数据访问权限access_validation = await self._validate_data_access(query, token, query_context)if not access_validation['is_valid']:validation_result['is_valid'] = Falsevalidation_result['security_issues'].extend(access_validation['issues'])# 5. 检查查询复杂度complexity_check = self._check_query_complexity(query)if complexity_check['is_complex']:validation_result['recommendations'].append('Query is complex, consider optimization')if complexity_check['risk_level'] == 'HIGH':validation_result['risk_level'] = 'MEDIUM'return validation_resultdef _check_dangerous_keywords(self, query: str) -> List[str]:"""检查危险关键词"""query_upper = query.upper()found_keywords = []for keyword in self.dangerous_keywords:if keyword in query_upper:found_keywords.append(keyword)return found_keywordsdef _check_sql_injection_risk(self, query: str) -> Dict[str, Any]:"""检查SQL注入风险"""risk_patterns = [r"'.*OR.*'.*'", # OR注入r"'.*UNION.*SELECT", # UNION注入r"--.*", # 注释注入r"/\*.*\*/", # 块注释注入r"'.*;\s*DROP", # 分号注入]import rerisk_detected = Falsedetected_patterns = []for pattern in risk_patterns:if re.search(pattern, query, re.IGNORECASE):risk_detected = Truedetected_patterns.append(pattern)return {'risk_detected': risk_detected,'detected_patterns': detected_patterns,'recommendations': ['Use parameterized queries','Validate input data','Escape special characters'] if risk_detected else []}async def _validate_data_access(self, query: str, token: str, query_context: Dict[str, Any]) -> Dict[str, Any]:"""验证数据访问权限"""# 提取查询中涉及的表名tables = self._extract_table_names(query)# 获取用户权限user_permissions = self.security_manager.get_user_permissions(token)validation_result = {'is_valid': True,'issues': []}# 检查表访问权限for table in tables:if not self._check_table_access_permission(table, user_permissions):validation_result['is_valid'] = Falsevalidation_result['issues'].append(f'No access permission for table: {table}')return validation_resultdef _extract_table_names(self, query: str) -> List[str]:"""提取查询中的表名(简化实现)"""import re# 简单的表名提取正则表达式from_pattern = r'FROM\s+(\w+)'join_pattern = r'JOIN\s+(\w+)'tables = []# 提取FROM子句中的表名from_matches = re.findall(from_pattern, query, re.IGNORECASE)tables.extend(from_matches)# 提取JOIN子句中的表名join_matches = re.findall(join_pattern, query, re.IGNORECASE)tables.extend(join_matches)return list(set(tables)) # 去重def _check_table_access_permission(self, table: str, user_permissions: List[str]) -> bool:"""检查表访问权限"""# 简化实现:检查是否有数据库读权限return Permission.DATABASE_READ.value in user_permissionsdef _check_query_complexity(self, query: str) -> Dict[str, Any]:"""检查查询复杂度"""complexity_indicators = {'subqueries': query.upper().count('SELECT') - 1,'joins': query.upper().count('JOIN'),'unions': query.upper().count('UNION'),'aggregations': sum([query.upper().count(func) for func in ['COUNT', 'SUM', 'AVG', 'MAX', 'MIN']]),'query_length': len(query)}# 计算复杂度分数complexity_score = (complexity_indicators['subqueries'] * 3 +complexity_indicators['joins'] * 2 +complexity_indicators['unions'] * 4 +complexity_indicators['aggregations'] * 1 +complexity_indicators['query_length'] / 100)risk_level = 'LOW'if complexity_score > 20:risk_level = 'HIGH'elif complexity_score > 10:risk_level = 'MEDIUM'return {'is_complex': complexity_score > 5,'complexity_score': complexity_score,'risk_level': risk_level,'indicators': complexity_indicators}## 实际应用案例分析### 企业数据分析平台案例```python
class EnterpriseDataAnalyticsPlatform:"""企业数据分析平台"""def __init__(self):self.mcp_servers = {'database': DataConnectorMCPServer(),'visualization': VisualizationMCPServer(),'analytics': AnalyticsMCPServer()}self.query_system = IntelligentQuerySystem(self.mcp_servers['database'])self.security_manager = SecurityManager("enterprise_secret_key")self.session_manager = SessionManager()async def process_business_query(self, natural_query: str, user_token: str) -> Dict[str, Any]:"""处理业务查询"""# 1. 安全验证if not self.security_manager.validate_token(user_token):return {'error': 'Authentication failed', 'code': 401}# 2. 创建或获取会话user_payload = self.security_manager.validate_token(user_token)user_id = user_payload['user_id']session_id = await self.session_manager.create_session(user_id)# 3. 处理查询try:result = await self.query_system.process_query(natural_query, {'user_id': user_id,'session_id': session_id,'permissions': user_payload['permissions']})# 4. 记录查询历史await self.session_manager.add_query_to_history(session_id, natural_query, result)return {'success': True,'result': result,'session_id': session_id}except Exception as e:return {'success': False,'error': str(e),'session_id': session_id}async def generate_business_report(self, report_config: Dict[str, Any], user_token: str) -> Dict[str, Any]:"""生成业务报告"""# 验证权限if not self.security_manager.check_permission(user_token, Permission.DATA_ANALYZE):return {'error': 'Insufficient permissions', 'code': 403}report_sections = []# 1. 销售数据分析if 'sales_analysis' in report_config:sales_query = "Show me sales trends for the last 6 months by product category"sales_result = await self.process_business_query(sales_query, user_token)if sales_result['success']:report_sections.append({'title': 'Sales Trend Analysis','type': 'chart','data': sales_result['result'],'insights': await self._generate_sales_insights(sales_result['result'])})# 2. 客户分析if 'customer_analysis' in report_config:customer_query = "Analyze customer segmentation by purchase behavior"customer_result = await self.process_business_query(customer_query, user_token)if customer_result['success']:report_sections.append({'title': 'Customer Segmentation','type': 'analysis','data': customer_result['result'],'insights': await self._generate_customer_insights(customer_result['result'])})# 3. 财务指标if 'financial_metrics' in report_config:financial_query = "Calculate key financial metrics including revenue, profit margin, and growth rate"financial_result = await self.process_business_query(financial_query, user_token)if financial_result['success']:report_sections.append({'title': 'Financial Performance','type': 'metrics','data': financial_result['result'],'insights': await self._generate_financial_insights(financial_result['result'])})return {'report_id': str(uuid.uuid4()),'generated_at': datetime.now().isoformat(),'sections': report_sections,'summary': await self._generate_report_summary(report_sections),'recommendations': await self._generate_business_recommendations(report_sections)}async def _generate_sales_insights(self, sales_data: Dict[str, Any]) -> List[str]:"""生成销售洞察"""insights = []# 分析销售趋势if 'trend' in sales_data:trend = sales_data['trend']if trend == 'increasing':insights.append("销售呈现上升趋势,业务增长良好")elif trend == 'decreasing':insights.append("销售呈现下降趋势,需要关注市场变化")# 分析产品类别表现if 'category_performance' in sales_data:top_category = sales_data['category_performance'][0]insights.append(f"表现最佳的产品类别是{top_category['name']},占总销售额的{top_category['percentage']:.1f}%")return insightsasync def _generate_customer_insights(self, customer_data: Dict[str, Any]) -> List[str]:"""生成客户洞察"""insights = []if 'segments' in customer_data:segments = customer_data['segments']high_value_segment = max(segments, key=lambda x: x['avg_value'])insights.append(f"高价值客户群体:{high_value_segment['name']},平均消费{high_value_segment['avg_value']:.2f}元")return insightsasync def _generate_financial_insights(self, financial_data: Dict[str, Any]) -> List[str]:"""生成财务洞察"""insights = []if 'profit_margin' in financial_data:margin = financial_data['profit_margin']if margin > 0.2:insights.append(f"利润率表现优秀:{margin:.1%}")elif margin < 0.1:insights.append(f"利润率偏低:{margin:.1%},需要优化成本结构")return insights### 智能客服系统案例class IntelligentCustomerServiceSystem:"""智能客服系统"""def __init__(self):self.mcp_orchestrator = MCPOrchestrator()self.nlp_processor = CustomerServiceNLPProcessor()self.knowledge_base = CustomerServiceKnowledgeBase()self.ticket_system = TicketManagementSystem()async def handle_customer_inquiry(self, inquiry: str, customer_id: str) -> Dict[str, Any]:"""处理客户咨询"""# 1. 理解客户意图intent_analysis = await self.nlp_processor.analyze_intent(inquiry)# 2. 提取关键信息entities = await self.nlp_processor.extract_entities(inquiry)# 3. 查询相关数据if intent_analysis['type'] == 'ORDER_INQUIRY':response = await self._handle_order_inquiry(entities, customer_id)elif intent_analysis['type'] == 'PRODUCT_QUESTION':response = await self._handle_product_question(entities)elif intent_analysis['type'] == 'COMPLAINT':response = await self._handle_complaint(inquiry, entities, customer_id)else:response = await self._handle_general_inquiry(inquiry, entities)return responseasync def _handle_order_inquiry(self, entities: Dict[str, Any], customer_id: str) -> Dict[str, Any]:"""处理订单查询"""# 构建查询if 'order_number' in entities:query = f"SELECT * FROM orders WHERE order_number = '{entities['order_number']}' AND customer_id = '{customer_id}'"else:query = f"SELECT * FROM orders WHERE customer_id = '{customer_id}' ORDER BY order_date DESC LIMIT 5"# 执行查询order_data = await self.mcp_orchestrator.execute_query(query)if order_data['success'] and order_data['data']:order = order_data['data'][0]return {'response_type': 'order_status','message': f"您的订单 {order['order_number']} 当前状态为:{order['status']}",'order_details': order,'tracking_info': await self._get_tracking_info(order['order_number'])}else:return {'response_type': 'no_order_found','message': "抱歉,没有找到相关订单信息。请检查订单号是否正确。"}async def _handle_product_question(self, entities: Dict[str, Any]) -> Dict[str, Any]:"""处理产品问题"""product_name = entities.get('product_name', '')# 从知识库查询产品信息product_info = await self.knowledge_base.search_product_info(product_name)if product_info:return {'response_type': 'product_info','message': f"关于{product_name}的信息:",'product_details': product_info,'related_products': await self.knowledge_base.get_related_products(product_name)}else:return {'response_type': 'product_not_found','message': f"抱歉,没有找到关于{product_name}的详细信息。我将为您转接人工客服。",'escalate_to_human': True}class CustomerServiceNLPProcessor:"""客服自然语言处理器"""def __init__(self):self.intent_patterns = {'ORDER_INQUIRY': [r'订单.*状态', r'订单.*查询', r'物流.*信息', r'发货.*时间',r'order.*status', r'track.*order', r'shipping.*info'],'PRODUCT_QUESTION': [r'产品.*介绍', r'商品.*详情', r'价格.*多少', r'规格.*参数',r'product.*info', r'item.*details', r'price.*how much'],'COMPLAINT': [r'投诉', r'问题', r'故障', r'不满意', r'退货', r'退款',r'complaint', r'problem', r'issue', r'refund', r'return'],'TECHNICAL_SUPPORT': [r'技术.*支持', r'使用.*方法', r'操作.*指南', r'故障.*排除',r'technical.*support', r'how.*to.*use', r'troubleshoot']}async def analyze_intent(self, text: str) -> Dict[str, Any]:"""分析客户意图"""import retext_lower = text.lower()intent_scores = {}for intent, patterns in self.intent_patterns.items():score = 0for pattern in patterns:if re.search(pattern, text_lower):score += 1if score > 0:intent_scores[intent] = score / len(patterns)if intent_scores:best_intent = max(intent_scores, key=intent_scores.get)confidence = intent_scores[best_intent]else:best_intent = 'GENERAL'confidence = 0.5return {'type': best_intent,'confidence': confidence,'all_scores': intent_scores}async def extract_entities(self, text: str) -> Dict[str, Any]:"""提取实体信息"""import reentities = {}# 提取订单号order_pattern = r'订单号?[::]?\s*([A-Z0-9]{8,})'order_match = re.search(order_pattern, text)if order_match:entities['order_number'] = order_match.group(1)# 提取产品名称(简化实现)product_keywords = ['手机', '电脑', '平板', '耳机', '音响', 'iPhone', 'iPad', 'MacBook']for keyword in product_keywords:if keyword in text:entities['product_name'] = keywordbreak# 提取时间信息time_pattern = r'(\d{1,2}月\d{1,2}日|\d{4}-\d{2}-\d{2}|昨天|今天|明天)'time_match = re.search(time_pattern, text)if time_match:entities['time_reference'] = time_match.group(1)return entities## 最佳实践与设计模式### MCP服务器设计模式```python
class MCPServerFactory:"""MCP服务器工厂"""@staticmethoddef create_server(server_type: str, config: Dict[str, Any]) -> MCPServer:"""创建MCP服务器"""if server_type == 'database':return DatabaseMCPServer(config)elif server_type == 'visualization':return VisualizationMCPServer(config)elif server_type == 'analytics':return AnalyticsMCPServer(config)elif server_type == 'file_system':return FileSystemMCPServer(config)else:raise ValueError(f"Unknown server type: {server_type}")class MCPServerBuilder:"""MCP服务器构建器"""def __init__(self):self.server = Noneself.tools = []self.resources = []self.middleware = []def set_server_type(self, server_type: str, config: Dict[str, Any]):"""设置服务器类型"""self.server = MCPServerFactory.create_server(server_type, config)return selfdef add_tool(self, tool: MCPTool):"""添加工具"""self.tools.append(tool)return selfdef add_resource(self, resource: MCPResource):"""添加资源"""self.resources.append(resource)return selfdef add_middleware(self, middleware):"""添加中间件"""self.middleware.append(middleware)return selfdef build(self) -> MCPServer:"""构建服务器"""if not self.server:raise ValueError("Server type not set")# 注册工具和资源for tool in self.tools:self.server.register_tool(tool)for resource in self.resources:self.server.register_resource(resource)# 应用中间件for middleware in self.middleware:self.server.add_middleware(middleware)return self.server### 查询优化最佳实践class QueryOptimizationBestPractices:"""查询优化最佳实践"""@staticmethoddef optimize_natural_language_query(query: str) -> str:"""优化自然语言查询"""# 1. 标准化查询normalized = query.strip().lower()# 2. 移除冗余词汇stop_words = ['the', 'a', 'an', 'and', 'or', 'but', 'please', 'can', 'you']words = normalized.split()filtered_words = [word for word in words if word not in stop_words]# 3. 同义词替换synonyms = {'show': 'select','display': 'select','get': 'select','find': 'select','total': 'sum','calculate': 'sum'}optimized_words = []for word in filtered_words:optimized_words.append(synonyms.get(word, word))return ' '.join(optimized_words)@staticmethoddef suggest_query_improvements(query_analysis: Dict[str, Any]) -> List[str]:"""建议查询改进"""suggestions = []# 检查查询复杂度if query_analysis.get('complexity_score', 0) > 10:suggestions.append("查询较为复杂,建议分解为多个简单查询")# 检查缺失的过滤条件if not query_analysis.get('has_filters', False):suggestions.append("建议添加过滤条件以提高查询效率")# 检查聚合函数使用if query_analysis.get('has_aggregation', False) and not query_analysis.get('has_group_by', False):suggestions.append("使用聚合函数时建议添加GROUP BY子句")return suggestions### 错误处理与恢复策略class ErrorHandlingStrategy:"""错误处理策略"""def __init__(self):self.retry_config = {'max_retries': 3,'backoff_factor': 2,'retry_exceptions': [ConnectionError, TimeoutError]}async def handle_query_error(self, error: Exception, query_context: Dict[str, Any]) -> Dict[str, Any]:"""处理查询错误"""error_type = type(error).__name__if error_type == 'SyntaxError':return await self._handle_syntax_error(error, query_context)elif error_type == 'PermissionError':return await self._handle_permission_error(error, query_context)elif error_type == 'ConnectionError':return await self._handle_connection_error(error, query_context)elif error_type == 'TimeoutError':return await self._handle_timeout_error(error, query_context)else:return await self._handle_generic_error(error, query_context)async def _handle_syntax_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:"""处理语法错误"""return {'error_type': 'syntax_error','message': '查询语法有误,请检查查询语句','suggestions': ['检查SQL语法是否正确','确认表名和列名是否存在','检查引号和括号是否匹配'],'recovery_action': 'suggest_correction'}async def _handle_permission_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:"""处理权限错误"""return {'error_type': 'permission_error','message': '权限不足,无法执行此查询','suggestions': ['联系管理员获取相应权限','尝试查询其他有权限的数据','使用受限的查询功能'],'recovery_action': 'request_permission'}async def _handle_connection_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:"""处理连接错误"""return {'error_type': 'connection_error','message': '数据库连接失败','suggestions': ['检查网络连接','稍后重试','联系技术支持'],'recovery_action': 'retry_with_backoff'}## 未来发展趋势与展望### AI驱动的查询优化```python
class AIQueryOptimizer:"""AI驱动的查询优化器"""def __init__(self):self.ml_model = self._load_optimization_model()self.query_history = QueryHistoryAnalyzer()self.performance_predictor = QueryPerformancePredictor()async def optimize_with_ai(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:"""使用AI优化查询"""# 1. 特征提取features = await self._extract_query_features(query, context)# 2. 性能预测predicted_performance = await self.performance_predictor.predict(features)# 3. 优化建议生成optimization_suggestions = await self._generate_ai_suggestions(features, predicted_performance)# 4. 自动优化optimized_query = await self._apply_ai_optimizations(query, optimization_suggestions)return {'original_query': query,'optimized_query': optimized_query,'predicted_improvement': predicted_performance['improvement_ratio'],'optimization_techniques': optimization_suggestions,'confidence': predicted_performance['confidence']}async def _extract_query_features(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:"""提取查询特征"""return {'query_length': len(query),'table_count': query.upper().count('FROM') + query.upper().count('JOIN'),'condition_count': query.upper().count('WHERE') + query.upper().count('AND') + query.upper().count('OR'),'aggregation_count': sum([query.upper().count(func) for func in ['SUM', 'COUNT', 'AVG', 'MAX', 'MIN']]),'subquery_count': query.upper().count('SELECT') - 1,'user_experience_level': context.get('user_level', 'beginner'),'historical_performance': await self.query_history.get_similar_query_performance(query)}class QuantumQueryProcessor:"""量子查询处理器(概念性实现)"""def __init__(self):self.quantum_simulator = QuantumSimulator()self.quantum_algorithms = {'search': GroverSearch(),'optimization': QuantumAnnealing(),'pattern_matching': QuantumPatternMatcher()}async def process_with_quantum_advantage(self, query: str) -> Dict[str, Any]:"""使用量子优势处理查询"""# 检查是否适合量子处理if await self._is_quantum_suitable(query):return await self._quantum_process(query)else:return await self._classical_process(query)async def _is_quantum_suitable(self, query: str) -> bool:"""检查是否适合量子处理"""# 量子计算适合的场景:# 1. 大规模搜索问题# 2. 优化问题# 3. 模式匹配quantum_indicators = ['search' in query.lower(),'optimize' in query.lower(),'pattern' in query.lower(),len(query.split()) > 50 # 复杂查询]return sum(quantum_indicators) >= 2### 边缘计算集成class EdgeComputingIntegration:"""边缘计算集成"""def __init__(self):self.edge_nodes = EdgeNodeManager()self.data_synchronizer = EdgeDataSynchronizer()self.query_distributor = EdgeQueryDistributor()async def process_edge_query(self, query: str, location: str) -> Dict[str, Any]:"""处理边缘查询"""# 1. 选择最近的边缘节点nearest_node = await self.edge_nodes.find_nearest_node(location)# 2. 检查数据可用性data_availability = await self._check_edge_data_availability(query, nearest_node)# 3. 决定处理策略if data_availability['complete']:# 在边缘节点处理return await self._process_on_edge(query, nearest_node)else:# 混合处理:边缘+云端return await self._hybrid_processing(query, nearest_node, data_availability)async def _process_on_edge(self, query: str, edge_node: str) -> Dict[str, Any]:"""在边缘节点处理"""return {'processing_location': 'edge','edge_node': edge_node,'latency': 'low','result': await self.query_distributor.execute_on_edge(query, edge_node)}async def _hybrid_processing(self, query: str, edge_node: str, data_availability: Dict[str, Any]) -> Dict[str, Any]:"""混合处理"""# 分解查询edge_query = data_availability['available_query_part']cloud_query = data_availability['missing_query_part']# 并行执行edge_result = await self.query_distributor.execute_on_edge(edge_query, edge_node)cloud_result = await self.query_distributor.execute_on_cloud(cloud_query)# 合并结果merged_result = await self._merge_edge_cloud_results(edge_result, cloud_result)return {'processing_location': 'hybrid','edge_node': edge_node,'latency': 'medium','result': merged_result}## 总结与展望MCP与智能问数技术的结合,正在重新定义人机交互的方式,让数据分析变得更加直观、高效和智能。通过本文的全面介绍,我们可以看到:### 技术成熟度与应用前景**1. 当前技术水平**
- MCP协议提供了标准化的接口规范,为AI系统与外部工具的集成奠定了基础
- 自然语言处理技术已经能够较好地理解用户的查询意图
- 智能查询优化技术能够显著提升查询性能**2. 应用场景广泛**
- 企业数据分析平台
- 智能客服系统
- 商业智能工具
- 数据科学平台**3. 技术挑战与解决方案**
- **准确性挑战**:通过多轮对话和上下文管理提升理解准确性
- **性能挑战**:通过智能缓存和查询优化提升响应速度
- **安全性挑战**:通过完善的权限控制和安全验证保障数据安全### 未来发展方向**1. 技术演进**
- AI模型将更加智能,能够理解更复杂的业务逻辑
- 量子计算技术将为大规模数据处理提供新的可能性
- 边缘计算将使数据处理更加贴近用户**2. 应用拓展**
- 多模态交互:支持语音、图像等多种输入方式
- 实时协作:支持多用户实时协作分析
- 自动化决策:基于数据分析结果自动执行业务决策**3. 生态建设**
- 标准化程度将进一步提高
- 开源社区将推动技术快速发展
- 产业生态将更加完善### 对开发者的建议**1. 技术学习路径**
- 掌握MCP协议的核心概念和实现方法
- 学习自然语言处理和机器学习技术
- 了解数据库优化和分布式系统设计**2. 实践建议**
- 从简单的查询场景开始,逐步扩展功能
- 重视用户体验设计,提供直观的交互界面
- 建立完善的测试和监控体系**3. 未来准备**
- 关注新兴技术的发展趋势
- 参与开源项目和技术社区
- 培养跨领域的综合能力MCP与智能问数技术的融合,不仅是技术的进步,更是数据分析方式的革命。随着技术的不断成熟和应用的深入,我们有理由相信,未来的数据分析将变得更加智能、高效和人性化。---*本文提供了MCP与智能问数技术的全面指南,涵盖了从基础概念到实际应用的各个方面。希望能够为相关技术的学习和实践提供有价值的参考。*
# MCP与智能问数技术全面指南:从协议设计到智能化数据查询## 引言在人工智能与数据科学快速发展的今天,传统的数据查询方式正面临着前所未有的挑战。用户不再满足于编写复杂的SQL语句或学习特定的查询语法,而是希望能够用自然语言直接与数据对话,获得智能化的分析结果。MCP(Model Context Protocol)作为一种新兴的协议标准,为构建智能化的数据查询系统提供了强大的技术基础。MCP协议通过标准化的接口定义,使得大语言模型能够与各种数据源和工具进行无缝集成,而智能问数技术则利用自然语言处理和机器学习技术,将用户的自然语言查询转换为精确的数据操作。两者的结合,正在重新定义人机交互的方式,让数据分析变得更加直观、高效和智能。### 为什么MCP与智能问数如此重要?**1. 降低数据访问门槛**
传统的数据查询需要用户掌握SQL、Python等技术技能,而基于MCP的智能问数系统允许业务用户用自然语言直接查询数据,大大降低了数据访问的技术门槛。**2. 提升查询效率和准确性**
智能问数系统能够理解用户意图,自动优化查询逻辑,并提供上下文相关的建议,显著提升查询效率和结果准确性。**3. 实现真正的对话式分析**
通过MCP协议的标准化接口,系统能够维护查询上下文,支持多轮对话,实现真正的对话式数据分析体验。**4. 支持复杂的跨系统集成**
MCP协议的标准化特性使得系统能够轻松集成多种数据源、分析工具和可视化组件,构建完整的智能数据分析生态。### 本文的价值与结构本文将从MCP协议的基础概念出发,深入探讨智能问数系统的设计原理和实现技术,并提供大量实际代码示例和最佳实践。无论您是系统架构师、数据工程师,还是AI应用开发者,都能从本文中获得有价值的技术洞察和实用指导。## 目录1. [MCP协议基础与核心概念](#mcp协议基础与核心概念)
2. [智能问数系统架构设计](#智能问数系统架构设计)
3. [自然语言查询解析引擎](#自然语言查询解析引擎)
4. [MCP服务器端实现](#mcp服务器端实现)
5. [智能查询优化与执行](#智能查询优化与执行)
6. [多模态数据交互](#多模态数据交互)
7. [上下文管理与会话状态](#上下文管理与会话状态)
8. [安全性与权限控制](#安全性与权限控制)
9. [性能优化与缓存策略](#性能优化与缓存策略)
10. [可视化与结果呈现](#可视化与结果呈现)
11. [企业级部署与集成](#企业级部署与集成)
12. [实际应用案例分析](#实际应用案例分析)
13. [最佳实践与设计模式](#最佳实践与设计模式)
14. [未来发展趋势与展望](#未来发展趋势与展望)## MCP协议基础与核心概念### MCP协议概述MCP(Model Context Protocol)是一种开放标准协议,旨在为大语言模型与外部工具、数据源之间的交互提供统一的接口规范。该协议定义了标准化的消息格式、工具调用机制和资源访问方式,使得AI系统能够安全、高效地与各种外部系统进行集成。```python
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
import json
import asyncio
from abc import ABC, abstractmethodclass MCPMessageType(Enum):"""MCP消息类型"""REQUEST = "request"RESPONSE = "response"NOTIFICATION = "notification"ERROR = "error"class MCPResourceType(Enum):"""MCP资源类型"""DATABASE = "database"FILE = "file"API = "api"TOOL = "tool"MEMORY = "memory"@dataclass
class MCPMessage:"""MCP消息基础结构"""id: strtype: MCPMessageTypemethod: strparams: Dict[str, Any]timestamp: floatcontext: Optional[Dict[str, Any]] = Nonedef to_dict(self) -> Dict[str, Any]:"""转换为字典格式"""return asdict(self)@classmethoddef from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':"""从字典创建消息对象"""return cls(id=data['id'],type=MCPMessageType(data['type']),method=data['method'],params=data['params'],timestamp=data['timestamp'],context=data.get('context'))@dataclass
class MCPTool:"""MCP工具定义"""name: strdescription: strparameters: Dict[str, Any]required_permissions: List[str]category: strversion: str = "1.0.0"def validate_parameters(self, params: Dict[str, Any]) -> bool:"""验证参数有效性"""required_params = self.parameters.get('required', [])for param in required_params:if param not in params:return Falsereturn True@dataclass
class MCPResource:"""MCP资源定义"""uri: strtype: MCPResourceTypename: strdescription: strmetadata: Dict[str, Any]access_permissions: List[str]def is_accessible(self, user_permissions: List[str]) -> bool:"""检查资源是否可访问"""return all(perm in user_permissions for perm in self.access_permissions)class MCPServer(ABC):"""MCP服务器抽象基类"""def __init__(self, name: str, version: str = "1.0.0"):self.name = nameself.version = versionself.tools: Dict[str, MCPTool] = {}self.resources: Dict[str, MCPResource] = {}self.capabilities = {'tools': True,'resources': True,'prompts': True,'logging': True}def register_tool(self, tool: MCPTool):"""注册工具"""self.tools[tool.name] = tooldef register_resource(self, resource: MCPResource):"""注册资源"""self.resources[resource.uri] = resource@abstractmethodasync def handle_request(self, message: MCPMessage) -> MCPMessage:"""处理请求"""pass@abstractmethodasync def list_tools(self) -> List[MCPTool]:"""列出可用工具"""pass@abstractmethodasync def list_resources(self) -> List[MCPResource]:"""列出可用资源"""passasync def call_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:"""调用工具"""if tool_name not in self.tools:raise ValueError(f"Tool '{tool_name}' not found")tool = self.tools[tool_name]if not tool.validate_parameters(parameters):raise ValueError(f"Invalid parameters for tool '{tool_name}'")return await self._execute_tool(tool, parameters)@abstractmethodasync def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行工具"""pass
智能问数系统的核心组件
智能问数系统基于MCP协议构建,包含以下核心组件:
class IntelligentQuerySystem:"""智能问数系统"""def __init__(self, mcp_server: MCPServer):self.mcp_server = mcp_serverself.query_parser = NaturalLanguageQueryParser()self.query_optimizer = QueryOptimizer()self.execution_engine = QueryExecutionEngine()self.context_manager = ContextManager()self.result_formatter = ResultFormatter()async def process_query(self, natural_query: str, user_context: Dict[str, Any] = None) -> Dict[str, Any]:"""处理自然语言查询"""# 1. 解析自然语言查询parsed_query = await self.query_parser.parse(natural_query, user_context)# 2. 查询优化optimized_query = await self.query_optimizer.optimize(parsed_query)# 3. 执行查询raw_results = await self.execution_engine.execute(optimized_query)# 4. 格式化结果formatted_results = await self.result_formatter.format(raw_results, parsed_query)# 5. 更新上下文await self.context_manager.update_context(natural_query, formatted_results)return formatted_resultsclass NaturalLanguageQueryParser:"""自然语言查询解析器"""def __init__(self):self.intent_classifier = IntentClassifier()self.entity_extractor = EntityExtractor()self.query_builder = QueryBuilder()async def parse(self, query: str, context: Dict[str, Any] = None) -> 'ParsedQuery':"""解析自然语言查询"""# 意图识别intent = await self.intent_classifier.classify(query, context)# 实体提取entities = await self.entity_extractor.extract(query, context)# 构建结构化查询structured_query = await self.query_builder.build(intent, entities, context)return ParsedQuery(original_query=query,intent=intent,entities=entities,structured_query=structured_query,confidence=min(intent.confidence, entities.confidence))@dataclass
class ParsedQuery:"""解析后的查询"""original_query: strintent: 'QueryIntent'entities: 'ExtractedEntities'structured_query: Dict[str, Any]confidence: floatdef is_valid(self) -> bool:"""检查查询是否有效"""return self.confidence > 0.7 and self.intent.is_valid()@dataclass
class QueryIntent:"""查询意图"""type: str # SELECT, INSERT, UPDATE, DELETE, ANALYZE, VISUALIZEaction: str # 具体动作confidence: floatparameters: Dict[str, Any]def is_valid(self) -> bool:"""检查意图是否有效"""return self.confidence > 0.8 and self.type in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'ANALYZE', 'VISUALIZE']@dataclass
class ExtractedEntities:"""提取的实体"""tables: List[str]columns: List[str]values: List[Any]conditions: List[Dict[str, Any]]aggregations: List[str]time_ranges: List[Dict[str, Any]]confidence: float
智能问数系统架构设计
系统整体架构
class IntelligentQueryArchitecture:"""智能问数系统架构"""def __init__(self):self.components = {'frontend': FrontendInterface(),'api_gateway': APIGateway(),'query_processor': QueryProcessor(),'mcp_orchestrator': MCPOrchestrator(),'data_connectors': DataConnectorManager(),'cache_layer': CacheLayer(),'security_manager': SecurityManager(),'monitoring': MonitoringSystem()}def initialize_system(self):"""初始化系统"""# 启动各个组件for name, component in self.components.items():component.initialize()print(f"Component {name} initialized")# 建立组件间连接self._setup_component_connections()def _setup_component_connections(self):"""设置组件间连接"""# API网关连接查询处理器self.components['api_gateway'].connect(self.components['query_processor'])# 查询处理器连接MCP编排器self.components['query_processor'].connect(self.components['mcp_orchestrator'])# MCP编排器连接数据连接器self.components['mcp_orchestrator'].connect(self.components['data_connectors'])class QueryProcessor:"""查询处理器"""def __init__(self):self.nlp_engine = NLPEngine()self.query_planner = QueryPlanner()self.execution_coordinator = ExecutionCoordinator()async def process_natural_query(self, query: str, user_id: str) -> Dict[str, Any]:"""处理自然语言查询"""# 1. 预处理查询preprocessed_query = await self._preprocess_query(query, user_id)# 2. 语义理解semantic_analysis = await self.nlp_engine.analyze(preprocessed_query)# 3. 查询规划execution_plan = await self.query_planner.create_plan(semantic_analysis)# 4. 执行协调results = await self.execution_coordinator.execute(execution_plan)return resultsasync def _preprocess_query(self, query: str, user_id: str) -> Dict[str, Any]:"""预处理查询"""return {'original_query': query,'user_id': user_id,'timestamp': time.time(),'normalized_query': self._normalize_query(query),'user_context': await self._get_user_context(user_id)}def _normalize_query(self, query: str) -> str:"""规范化查询"""# 去除多余空格、标点符号等import renormalized = re.sub(r'\s+', ' ', query.strip())return normalized.lower()class NLPEngine:"""自然语言处理引擎"""def __init__(self):self.tokenizer = Tokenizer()self.pos_tagger = POSTagger()self.ner_model = NERModel()self.intent_classifier = IntentClassifier()self.dependency_parser = DependencyParser()async def analyze(self, query_data: Dict[str, Any]) -> Dict[str, Any]:"""分析查询语义"""query = query_data['normalized_query']# 分词tokens = self.tokenizer.tokenize(query)# 词性标注pos_tags = self.pos_tagger.tag(tokens)# 命名实体识别entities = self.ner_model.extract(tokens, pos_tags)# 意图分类intent = self.intent_classifier.classify(query, query_data['user_context'])# 依存句法分析dependencies = self.dependency_parser.parse(tokens, pos_tags)return {'tokens': tokens,'pos_tags': pos_tags,'entities': entities,'intent': intent,'dependencies': dependencies,'semantic_structure': self._build_semantic_structure(tokens, pos_tags, entities, intent, dependencies)}def _build_semantic_structure(self, tokens, pos_tags, entities, intent, dependencies):"""构建语义结构"""return {'query_type': intent['type'],'target_entities': [e for e in entities if e['type'] in ['TABLE', 'COLUMN']],'filter_conditions': self._extract_conditions(dependencies, entities),'aggregation_functions': self._extract_aggregations(tokens, pos_tags),'temporal_constraints': self._extract_temporal_info(entities),'grouping_criteria': self._extract_grouping(dependencies, entities)}class QueryPlanner:"""查询规划器"""def __init__(self):self.schema_manager = SchemaManager()self.cost_estimator = CostEstimator()self.plan_optimizer = PlanOptimizer()async def create_plan(self, semantic_analysis: Dict[str, Any]) -> 'ExecutionPlan':"""创建执行计划"""semantic_structure = semantic_analysis['semantic_structure']# 1. 模式匹配schema_info = await self.schema_manager.match_schema(semantic_structure)# 2. 生成候选计划candidate_plans = await self._generate_candidate_plans(semantic_structure, schema_info)# 3. 成本估算costed_plans = []for plan in candidate_plans:cost = await self.cost_estimator.estimate(plan)costed_plans.append((plan, cost))# 4. 选择最优计划best_plan = min(costed_plans, key=lambda x: x[1])[0]# 5. 计划优化optimized_plan = await self.plan_optimizer.optimize(best_plan)return ExecutionPlan(plan_id=str(uuid.uuid4()),steps=optimized_plan['steps'],estimated_cost=optimized_plan['cost'],expected_result_schema=optimized_plan['result_schema'],metadata=optimized_plan['metadata'])async def _generate_candidate_plans(self, semantic_structure: Dict[str, Any], schema_info: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成候选执行计划"""plans = []query_type = semantic_structure['query_type']if query_type == 'SELECT':plans.extend(await self._generate_select_plans(semantic_structure, schema_info))elif query_type == 'ANALYZE':plans.extend(await self._generate_analysis_plans(semantic_structure, schema_info))elif query_type == 'VISUALIZE':plans.extend(await self._generate_visualization_plans(semantic_structure, schema_info))return plansasync def _generate_select_plans(self, semantic_structure: Dict[str, Any], schema_info: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成SELECT查询计划"""plans = []# 基础查询计划base_plan = {'type': 'SELECT','steps': [{'operation': 'TABLE_SCAN','table': schema_info['primary_table'],'columns': semantic_structure['target_entities']}]}# 添加过滤条件if semantic_structure['filter_conditions']:base_plan['steps'].append({'operation': 'FILTER','conditions': semantic_structure['filter_conditions']})# 添加聚合操作if semantic_structure['aggregation_functions']:base_plan['steps'].append({'operation': 'AGGREGATE','functions': semantic_structure['aggregation_functions'],'group_by': semantic_structure['grouping_criteria']})plans.append(base_plan)# 如果涉及多表,生成JOIN计划if len(schema_info['involved_tables']) > 1:join_plan = await self._generate_join_plan(semantic_structure, schema_info)plans.append(join_plan)return plans@dataclass
class ExecutionPlan:"""执行计划"""plan_id: strsteps: List[Dict[str, Any]]estimated_cost: floatexpected_result_schema: Dict[str, Any]metadata: Dict[str, Any]def get_step(self, index: int) -> Dict[str, Any]:"""获取执行步骤"""if 0 <= index < len(self.steps):return self.steps[index]raise IndexError(f"Step index {index} out of range")def add_step(self, step: Dict[str, Any], position: int = None):"""添加执行步骤"""if position is None:self.steps.append(step)else:self.steps.insert(position, step)
自然语言查询解析引擎
意图识别与实体提取
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
import spacyclass IntentClassifier:"""意图分类器"""def __init__(self, model_name: str = "bert-base-uncased"):self.tokenizer = AutoTokenizer.from_pretrained(model_name)self.model = AutoModel.from_pretrained(model_name)self.classifier = self._build_classifier()self.intent_labels = ['SELECT_DATA', 'AGGREGATE_DATA', 'FILTER_DATA', 'JOIN_TABLES','CREATE_VISUALIZATION', 'ANALYZE_TRENDS', 'COMPARE_VALUES','FIND_ANOMALIES', 'GENERATE_REPORT', 'UPDATE_DATA']def _build_classifier(self) -> nn.Module:"""构建分类器网络"""return nn.Sequential(nn.Linear(768, 256), # BERT hidden sizenn.ReLU(),nn.Dropout(0.3),nn.Linear(256, 128),nn.ReLU(),nn.Dropout(0.2),nn.Linear(128, len(self.intent_labels)))async def classify(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:"""分类查询意图"""# 编码查询文本inputs = self.tokenizer(query, return_tensors="pt", padding=True, truncation=True)with torch.no_grad():# 获取BERT嵌入outputs = self.model(**inputs)embeddings = outputs.last_hidden_state.mean(dim=1) # 平均池化# 意图分类logits = self.classifier(embeddings)probabilities = torch.softmax(logits, dim=-1)# 获取最高概率的意图predicted_idx = torch.argmax(probabilities, dim=-1).item()confidence = probabilities[0][predicted_idx].item()# 结合上下文调整结果if context:confidence = self._adjust_confidence_with_context(query, predicted_idx, confidence, context)return {'type': self.intent_labels[predicted_idx],'confidence': confidence,'all_probabilities': {label: prob.item() for label, prob in zip(self.intent_labels, probabilities[0])},'reasoning': self._explain_classification(query, predicted_idx, confidence)}def _adjust_confidence_with_context(self, query: str, predicted_idx: int, confidence: float, context: Dict[str, Any]) -> float:"""基于上下文调整置信度"""# 检查历史查询模式if 'query_history' in context:recent_intents = [q.get('intent', {}).get('type') for q in context['query_history'][-3:]]current_intent = self.intent_labels[predicted_idx]# 如果与最近的查询意图一致,提高置信度if current_intent in recent_intents:confidence = min(1.0, confidence * 1.1)# 检查用户偏好if 'user_preferences' in context:preferred_operations = context['user_preferences'].get('common_operations', [])current_intent = self.intent_labels[predicted_idx]if current_intent in preferred_operations:confidence = min(1.0, confidence * 1.05)return confidencedef _explain_classification(self, query: str, predicted_idx: int, confidence: float) -> str:"""解释分类结果"""intent = self.intent_labels[predicted_idx]explanations = {'SELECT_DATA': f"查询包含数据检索关键词,意图是获取特定数据",'AGGREGATE_DATA': f"查询包含聚合函数关键词(如sum、count、average),意图是数据汇总",'FILTER_DATA': f"查询包含条件筛选关键词(如where、filter),意图是数据过滤",'CREATE_VISUALIZATION': f"查询包含可视化关键词(如chart、graph、plot),意图是创建图表",'ANALYZE_TRENDS': f"查询包含趋势分析关键词(如trend、pattern),意图是趋势分析"}return explanations.get(intent, f"基于语义分析,识别为{intent}操作")class EntityExtractor:"""实体提取器"""def __init__(self):self.nlp = spacy.load("en_core_web_sm")self.custom_entities = self._load_custom_entities()def _load_custom_entities(self) -> Dict[str, List[str]]:"""加载自定义实体"""return {'TABLE_NAMES': ['users', 'orders', 'products', 'customers', 'sales', 'inventory'],'COLUMN_NAMES': ['id', 'name', 'email', 'date', 'amount', 'price', 'quantity'],'AGGREGATION_FUNCTIONS': ['sum', 'count', 'average', 'max', 'min', 'median'],'TIME_PERIODS': ['today', 'yesterday', 'last week', 'last month', 'last year'],'COMPARISON_OPERATORS': ['greater than', 'less than', 'equal to', 'between']}async def extract(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:"""提取实体"""# 使用spaCy进行基础实体识别doc = self.nlp(query)# 提取标准实体standard_entities = self._extract_standard_entities(doc)# 提取自定义实体custom_entities = self._extract_custom_entities(query)# 提取数值和时间实体numerical_entities = self._extract_numerical_entities(doc)temporal_entities = self._extract_temporal_entities(doc)# 合并所有实体all_entities = {**standard_entities,**custom_entities,**numerical_entities,**temporal_entities}# 计算整体置信度confidence = self._calculate_entity_confidence(all_entities)return {'entities': all_entities,'confidence': confidence,'entity_count': sum(len(v) if isinstance(v, list) else 1 for v in all_entities.values()),'coverage': self._calculate_coverage(query, all_entities)}def _extract_standard_entities(self, doc) -> Dict[str, List[Dict[str, Any]]]:"""提取标准命名实体"""entities = {'PERSON': [],'ORG': [],'GPE': [], # 地理政治实体'MONEY': [],'DATE': [],'TIME': []}for ent in doc.ents:if ent.label_ in entities:entities[ent.label_].append({'text': ent.text,'start': ent.start_char,'end': ent.end_char,'confidence': 0.9 # spaCy实体的默认置信度})return entitiesdef _extract_custom_entities(self, query: str) -> Dict[str, List[Dict[str, Any]]]:"""提取自定义实体"""entities = {}for entity_type, entity_list in self.custom_entities.items():entities[entity_type] = []for entity in entity_list:if entity.lower() in query.lower():start_idx = query.lower().find(entity.lower())entities[entity_type].append({'text': entity,'start': start_idx,'end': start_idx + len(entity),'confidence': 0.8})return entitiesdef _extract_numerical_entities(self, doc) -> Dict[str, List[Dict[str, Any]]]:"""提取数值实体"""numerical_entities = {'NUMBERS': [],'PERCENTAGES': [],'RANGES': []}for token in doc:if token.like_num:numerical_entities['NUMBERS'].append({'text': token.text,'value': self._parse_number(token.text),'start': token.idx,'end': token.idx + len(token.text),'confidence': 0.95})elif '%' in token.text:numerical_entities['PERCENTAGES'].append({'text': token.text,'value': self._parse_percentage(token.text),'start': token.idx,'end': token.idx + len(token.text),'confidence': 0.9})return numerical_entitiesdef _parse_number(self, text: str) -> float:"""解析数字"""try:return float(text.replace(',', ''))except ValueError:return 0.0def _parse_percentage(self, text: str) -> float:"""解析百分比"""try:return float(text.replace('%', '')) / 100except ValueError:return 0.0class QueryBuilder:"""查询构建器"""def __init__(self):self.sql_generator = SQLGenerator()self.nosql_generator = NoSQLGenerator()self.api_generator = APIQueryGenerator()async def build(self, intent: Dict[str, Any], entities: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:"""构建结构化查询"""query_type = intent['type']if query_type in ['SELECT_DATA', 'AGGREGATE_DATA', 'FILTER_DATA']:return await self._build_sql_query(intent, entities, context)elif query_type == 'CREATE_VISUALIZATION':return await self._build_visualization_query(intent, entities, context)elif query_type == 'ANALYZE_TRENDS':return await self._build_analysis_query(intent, entities, context)else:return await self._build_generic_query(intent, entities, context)async def _build_sql_query(self, intent: Dict[str, Any], entities: Dict[str, Any],context: Dict[str, Any]) -> Dict[str, Any]:"""构建SQL查询"""# 提取表名tables = entities.get('TABLE_NAMES', [])if not tables and context and 'default_table' in context:tables = [context['default_table']]# 提取列名columns = entities.get('COLUMN_NAMES', [])if not columns:columns = ['*'] # 默认选择所有列# 构建WHERE条件conditions = self._build_where_conditions(entities)# 构建聚合函数aggregations = self._build_aggregations(entities, intent)# 构建GROUP BYgroup_by = self._build_group_by(entities)# 构建ORDER BYorder_by = self._build_order_by(entities)return {'type': 'SQL','operation': intent['type'],'tables': tables,'columns': columns,'conditions': conditions,'aggregations': aggregations,'group_by': group_by,'order_by': order_by,'limit': self._extract_limit(entities),'sql': self.sql_generator.generate({'tables': tables,'columns': columns,'conditions': conditions,'aggregations': aggregations,'group_by': group_by,'order_by': order_by})}def _build_where_conditions(self, entities: Dict[str, Any]) -> List[Dict[str, Any]]:"""构建WHERE条件"""conditions = []# 数值条件numbers = entities.get('NUMBERS', [])comparison_ops = entities.get('COMPARISON_OPERATORS', [])for i, number in enumerate(numbers):if i < len(comparison_ops):conditions.append({'type': 'comparison','operator': comparison_ops[i]['text'],'value': number['value'],'column': self._infer_column_for_condition(entities, i)})# 时间条件dates = entities.get('DATE', [])for date in dates:conditions.append({'type': 'temporal','operator': '>=','value': date['text'],'column': 'date' # 默认日期列})return conditionsdef _build_aggregations(self, entities: Dict[str, Any], intent: Dict[str, Any]) -> List[Dict[str, Any]]:"""构建聚合函数"""aggregations = []agg_functions = entities.get('AGGREGATION_FUNCTIONS', [])columns = entities.get('COLUMN_NAMES', [])if intent['type'] == 'AGGREGATE_DATA':for agg_func in agg_functions:target_column = columns[0]['text'] if columns else 'id'aggregations.append({'function': agg_func['text'].upper(),'column': target_column,'alias': f"{agg_func['text']}_{target_column}"})return aggregationsclass SQLGenerator:"""SQL生成器"""def generate(self, query_spec: Dict[str, Any]) -> str:"""生成SQL语句"""# SELECT子句select_clause = self._build_select_clause(query_spec.get('columns', []),query_spec.get('aggregations', []))# FROM子句from_clause = self._build_from_clause(query_spec.get('tables', []))# WHERE子句where_clause = self._build_where_clause(query_spec.get('conditions', []))# GROUP BY子句group_by_clause = self._build_group_by_clause(query_spec.get('group_by', []))# ORDER BY子句order_by_clause = self._build_order_by_clause(query_spec.get('order_by', []))# 组装SQLsql_parts = [select_clause, from_clause]if where_clause:sql_parts.append(where_clause)if group_by_clause:sql_parts.append(group_by_clause)if order_by_clause:sql_parts.append(order_by_clause)return ' '.join(sql_parts)def _build_select_clause(self, columns: List[str], aggregations: List[Dict[str, Any]]) -> str:"""构建SELECT子句"""if aggregations:select_items = []for agg in aggregations:select_items.append(f"{agg['function']}({agg['column']}) AS {agg['alias']}")return f"SELECT {', '.join(select_items)}"else:column_list = ', '.join(columns) if columns else '*'return f"SELECT {column_list}"def _build_from_clause(self, tables: List[str]) -> str:"""构建FROM子句"""if not tables:return "FROM default_table"return f"FROM {tables[0]}" # 简化处理,只处理单表def _build_where_clause(self, conditions: List[Dict[str, Any]]) -> str:"""构建WHERE子句"""if not conditions:return ""condition_strings = []for condition in conditions:if condition['type'] == 'comparison':condition_strings.append(f"{condition['column']} {self._map_operator(condition['operator'])} {condition['value']}")elif condition['type'] == 'temporal':condition_strings.append(f"{condition['column']} {condition['operator']} '{condition['value']}'")return f"WHERE {' AND '.join(condition_strings)}"def _map_operator(self, natural_operator: str) -> str:"""映射自然语言操作符到SQL操作符"""mapping = {'greater than': '>','less than': '<','equal to': '=','not equal to': '!=','greater than or equal to': '>=','less than or equal to': '<='}return mapping.get(natural_operator.lower(), '=')## MCP服务器端实现### 数据连接器MCP服务器```python
import asyncio
import json
from typing import Dict, List, Any, Optional
import sqlite3
import pandas as pd
import pymongo
from sqlalchemy import create_engine
import redisclass DataConnectorMCPServer(MCPServer):"""数据连接器MCP服务器"""def __init__(self):super().__init__("data-connector", "1.0.0")self.connections = {}self.query_cache = redis.Redis(host='localhost', port=6379, db=0)self._register_tools()self._register_resources()def _register_tools(self):"""注册工具"""# SQL查询工具sql_query_tool = MCPTool(name="execute_sql_query",description="Execute SQL query on connected database",parameters={"type": "object","properties": {"connection_id": {"type": "string", "description": "Database connection ID"},"query": {"type": "string", "description": "SQL query to execute"},"parameters": {"type": "array", "description": "Query parameters"}},"required": ["connection_id", "query"]},required_permissions=["database:read"],category="database")self.register_tool(sql_query_tool)# NoSQL查询工具nosql_query_tool = MCPTool(name="execute_nosql_query",description="Execute NoSQL query on MongoDB",parameters={"type": "object","properties": {"connection_id": {"type": "string", "description": "MongoDB connection ID"},"collection": {"type": "string", "description": "Collection name"},"query": {"type": "object", "description": "MongoDB query object"},"projection": {"type": "object", "description": "Projection object"}},"required": ["connection_id", "collection", "query"]},required_permissions=["database:read"],category="database")self.register_tool(nosql_query_tool)# 数据分析工具analyze_data_tool = MCPTool(name="analyze_data",description="Perform statistical analysis on data",parameters={"type": "object","properties": {"data": {"type": "array", "description": "Data to analyze"},"analysis_type": {"type": "string", "enum": ["descriptive", "correlation", "trend"]},"columns": {"type": "array", "description": "Columns to analyze"}},"required": ["data", "analysis_type"]},required_permissions=["data:analyze"],category="analytics")self.register_tool(analyze_data_tool)def _register_resources(self):"""注册资源"""# 数据库连接资源db_resource = MCPResource(uri="database://connections",type=MCPResourceType.DATABASE,name="Database Connections",description="Available database connections",metadata={"connection_types": ["postgresql", "mysql", "sqlite", "mongodb"]},access_permissions=["database:read"])self.register_resource(db_resource)# 查询历史资源query_history_resource = MCPResource(uri="memory://query_history",type=MCPResourceType.MEMORY,name="Query History",description="Historical query results and metadata",metadata={"retention_days": 30},access_permissions=["query:history"])self.register_resource(query_history_resource)async def handle_request(self, message: MCPMessage) -> MCPMessage:"""处理请求"""method = message.methodparams = message.paramstry:if method == "tools/list":result = await self.list_tools()elif method == "tools/call":result = await self.call_tool(params["name"], params.get("arguments", {}))elif method == "resources/list":result = await self.list_resources()elif method == "resources/read":result = await self.read_resource(params["uri"])else:raise ValueError(f"Unknown method: {method}")return MCPMessage(id=message.id,type=MCPMessageType.RESPONSE,method=method,params={"result": result},timestamp=time.time())except Exception as e:return MCPMessage(id=message.id,type=MCPMessageType.ERROR,method=method,params={"error": str(e)},timestamp=time.time())async def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行工具"""if tool.name == "execute_sql_query":return await self._execute_sql_query(parameters)elif tool.name == "execute_nosql_query":return await self._execute_nosql_query(parameters)elif tool.name == "analyze_data":return await self._analyze_data(parameters)else:raise ValueError(f"Unknown tool: {tool.name}")async def _execute_sql_query(self, params: Dict[str, Any]) -> Dict[str, Any]:"""执行SQL查询"""connection_id = params["connection_id"]query = params["query"]query_params = params.get("parameters", [])# 检查缓存cache_key = f"sql:{connection_id}:{hash(query)}:{hash(str(query_params))}"cached_result = self.query_cache.get(cache_key)if cached_result:return json.loads(cached_result)# 获取数据库连接if connection_id not in self.connections:raise ValueError(f"Connection {connection_id} not found")connection = self.connections[connection_id]try:# 执行查询if connection['type'] == 'sqlite':result = await self._execute_sqlite_query(connection, query, query_params)elif connection['type'] == 'postgresql':result = await self._execute_postgresql_query(connection, query, query_params)else:raise ValueError(f"Unsupported connection type: {connection['type']}")# 缓存结果self.query_cache.setex(cache_key, 300, json.dumps(result)) # 5分钟缓存return resultexcept Exception as e:return {"success": False,"error": str(e),"query": query}async def _execute_sqlite_query(self, connection: Dict[str, Any], query: str, params: List[Any]) -> Dict[str, Any]:"""执行SQLite查询"""conn = sqlite3.connect(connection['database'])conn.row_factory = sqlite3.Row # 返回字典格式的行try:cursor = conn.cursor()cursor.execute(query, params)if query.strip().upper().startswith('SELECT'):rows = cursor.fetchall()columns = [description[0] for description in cursor.description]data = [dict(row) for row in rows]return {"success": True,"data": data,"columns": columns,"row_count": len(data),"execution_time": 0 # 简化处理}else:conn.commit()return {"success": True,"affected_rows": cursor.rowcount,"message": "Query executed successfully"}finally:conn.close()async def _execute_nosql_query(self, params: Dict[str, Any]) -> Dict[str, Any]:"""执行NoSQL查询"""connection_id = params["connection_id"]collection_name = params["collection"]query = params["query"]projection = params.get("projection", {})if connection_id not in self.connections:raise ValueError(f"Connection {connection_id} not found")connection = self.connections[connection_id]try:client = pymongo.MongoClient(connection['uri'])db = client[connection['database']]collection = db[collection_name]# 执行查询cursor = collection.find(query, projection)results = list(cursor)# 转换ObjectId为字符串for result in results:if '_id' in result:result['_id'] = str(result['_id'])return {"success": True,"data": results,"count": len(results),"collection": collection_name}except Exception as e:return {"success": False,"error": str(e),"query": query}finally:client.close()async def _analyze_data(self, params: Dict[str, Any]) -> Dict[str, Any]:"""分析数据"""data = params["data"]analysis_type = params["analysis_type"]columns = params.get("columns", [])# 转换为DataFramedf = pd.DataFrame(data)if columns:df = df[columns]try:if analysis_type == "descriptive":result = self._descriptive_analysis(df)elif analysis_type == "correlation":result = self._correlation_analysis(df)elif analysis_type == "trend":result = self._trend_analysis(df)else:raise ValueError(f"Unknown analysis type: {analysis_type}")return {"success": True,"analysis_type": analysis_type,"result": result,"data_shape": df.shape}except Exception as e:return {"success": False,"error": str(e),"analysis_type": analysis_type}def _descriptive_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:"""描述性统计分析"""numeric_columns = df.select_dtypes(include=[np.number]).columnscategorical_columns = df.select_dtypes(include=['object']).columnsresult = {"numeric_summary": {},"categorical_summary": {},"missing_values": df.isnull().sum().to_dict(),"data_types": df.dtypes.astype(str).to_dict()}# 数值列统计if len(numeric_columns) > 0:result["numeric_summary"] = df[numeric_columns].describe().to_dict()# 分类列统计for col in categorical_columns:result["categorical_summary"][col] = {"unique_count": df[col].nunique(),"top_values": df[col].value_counts().head(5).to_dict()}return resultdef _correlation_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:"""相关性分析"""numeric_df = df.select_dtypes(include=[np.number])if numeric_df.empty:return {"error": "No numeric columns found for correlation analysis"}correlation_matrix = numeric_df.corr()return {"correlation_matrix": correlation_matrix.to_dict(),"strong_correlations": self._find_strong_correlations(correlation_matrix),"columns_analyzed": list(numeric_df.columns)}def _find_strong_correlations(self, corr_matrix: pd.DataFrame, threshold: float = 0.7) -> List[Dict[str, Any]]:"""找出强相关性"""strong_corrs = []for i in range(len(corr_matrix.columns)):for j in range(i+1, len(corr_matrix.columns)):corr_value = corr_matrix.iloc[i, j]if abs(corr_value) >= threshold:strong_corrs.append({"column1": corr_matrix.columns[i],"column2": corr_matrix.columns[j],"correlation": corr_value,"strength": "strong" if abs(corr_value) >= 0.8 else "moderate"})return strong_corrs### 可视化MCP服务器class VisualizationMCPServer(MCPServer):"""可视化MCP服务器"""def __init__(self):super().__init__("visualization", "1.0.0")self.chart_generators = {'bar': BarChartGenerator(),'line': LineChartGenerator(),'scatter': ScatterPlotGenerator(),'pie': PieChartGenerator(),'heatmap': HeatmapGenerator()}self._register_tools()def _register_tools(self):"""注册可视化工具"""create_chart_tool = MCPTool(name="create_chart",description="Create various types of charts from data",parameters={"type": "object","properties": {"chart_type": {"type": "string", "enum": ["bar", "line", "scatter", "pie", "heatmap"]},"data": {"type": "array", "description": "Data for the chart"},"x_column": {"type": "string", "description": "X-axis column"},"y_column": {"type": "string", "description": "Y-axis column"},"title": {"type": "string", "description": "Chart title"},"options": {"type": "object", "description": "Additional chart options"}},"required": ["chart_type", "data"]},required_permissions=["visualization:create"],category="visualization")self.register_tool(create_chart_tool)analyze_chart_tool = MCPTool(name="analyze_chart_data",description="Analyze data and suggest appropriate chart types",parameters={"type": "object","properties": {"data": {"type": "array", "description": "Data to analyze"},"analysis_goal": {"type": "string", "description": "Goal of the analysis"}},"required": ["data"]},required_permissions=["data:analyze"],category="analytics")self.register_tool(analyze_chart_tool)async def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行可视化工具"""if tool.name == "create_chart":return await self._create_chart(parameters)elif tool.name == "analyze_chart_data":return await self._analyze_chart_data(parameters)else:raise ValueError(f"Unknown tool: {tool.name}")async def _create_chart(self, params: Dict[str, Any]) -> Dict[str, Any]:"""创建图表"""chart_type = params["chart_type"]data = params["data"]if chart_type not in self.chart_generators:raise ValueError(f"Unsupported chart type: {chart_type}")generator = self.chart_generators[chart_type]try:chart_config = await generator.generate(params)return {"success": True,"chart_type": chart_type,"config": chart_config,"data_points": len(data),"recommendations": await self._get_chart_recommendations(params)}except Exception as e:return {"success": False,"error": str(e),"chart_type": chart_type}async def _analyze_chart_data(self, params: Dict[str, Any]) -> Dict[str, Any]:"""分析图表数据"""data = params["data"]analysis_goal = params.get("analysis_goal", "general")df = pd.DataFrame(data)# 分析数据特征data_analysis = {"row_count": len(df),"column_count": len(df.columns),"numeric_columns": list(df.select_dtypes(include=[np.number]).columns),"categorical_columns": list(df.select_dtypes(include=['object']).columns),"datetime_columns": list(df.select_dtypes(include=['datetime64']).columns)}# 推荐图表类型chart_recommendations = await self._recommend_chart_types(df, analysis_goal)return {"success": True,"data_analysis": data_analysis,"chart_recommendations": chart_recommendations,"analysis_goal": analysis_goal}async def _recommend_chart_types(self, df: pd.DataFrame, goal: str) -> List[Dict[str, Any]]:"""推荐图表类型"""recommendations = []numeric_cols = df.select_dtypes(include=[np.number]).columnscategorical_cols = df.select_dtypes(include=['object']).columns# 基于数据类型推荐if len(numeric_cols) >= 2:recommendations.append({"chart_type": "scatter","reason": "Multiple numeric columns suitable for correlation analysis","suggested_config": {"x_column": numeric_cols[0],"y_column": numeric_cols[1]},"confidence": 0.8})if len(categorical_cols) >= 1 and len(numeric_cols) >= 1:recommendations.append({"chart_type": "bar","reason": "Categorical and numeric columns suitable for comparison","suggested_config": {"x_column": categorical_cols[0],"y_column": numeric_cols[0]},"confidence": 0.9})# 基于分析目标推荐if goal == "trend":if 'date' in df.columns or 'time' in df.columns:recommendations.append({"chart_type": "line","reason": "Time series data suitable for trend analysis","confidence": 0.95})return recommendationsclass BarChartGenerator:"""柱状图生成器"""async def generate(self, params: Dict[str, Any]) -> Dict[str, Any]:"""生成柱状图配置"""data = params["data"]x_column = params.get("x_column")y_column = params.get("y_column")title = params.get("title", "Bar Chart")options = params.get("options", {})# 处理数据df = pd.DataFrame(data)if not x_column:x_column = df.columns[0]if not y_column:y_column = df.columns[1] if len(df.columns) > 1 else df.columns[0]# 聚合数据(如果需要)if df[x_column].dtype == 'object':chart_data = df.groupby(x_column)[y_column].sum().reset_index()else:chart_data = df[[x_column, y_column]]return {"type": "bar","data": {"labels": chart_data[x_column].tolist(),"datasets": [{"label": y_column,"data": chart_data[y_column].tolist(),"backgroundColor": options.get("color", "rgba(54, 162, 235, 0.6)"),"borderColor": options.get("border_color", "rgba(54, 162, 235, 1)"),"borderWidth": options.get("border_width", 1)}]},"options": {"responsive": True,"plugins": {"title": {"display": True,"text": title}},"scales": {"y": {"beginAtZero": True}}}}## 上下文管理与会话状态### 会话管理器```python
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import jsonclass SessionManager:"""会话管理器"""def __init__(self, redis_client=None):self.redis_client = redis_client or redis.Redis(host='localhost', port=6379, db=1)self.session
# MCP与智能问数技术全面指南:从协议设计到智能化数据查询## 引言在人工智能与数据科学快速发展的今天,传统的数据查询方式正面临着前所未有的挑战。用户不再满足于编写复杂的SQL语句或学习特定的查询语法,而是希望能够用自然语言直接与数据对话,获得智能化的分析结果。MCP(Model Context Protocol)作为一种新兴的协议标准,为构建智能化的数据查询系统提供了强大的技术基础。MCP协议通过标准化的接口定义,使得大语言模型能够与各种数据源和工具进行无缝集成,而智能问数技术则利用自然语言处理和机器学习技术,将用户的自然语言查询转换为精确的数据操作。两者的结合,正在重新定义人机交互的方式,让数据分析变得更加直观、高效和智能。### 为什么MCP与智能问数如此重要?**1. 降低数据访问门槛**
传统的数据查询需要用户掌握SQL、Python等技术技能,而基于MCP的智能问数系统允许业务用户用自然语言直接查询数据,大大降低了数据访问的技术门槛。**2. 提升查询效率和准确性**
智能问数系统能够理解用户意图,自动优化查询逻辑,并提供上下文相关的建议,显著提升查询效率和结果准确性。**3. 实现真正的对话式分析**
通过MCP协议的标准化接口,系统能够维护查询上下文,支持多轮对话,实现真正的对话式数据分析体验。**4. 支持复杂的跨系统集成**
MCP协议的标准化特性使得系统能够轻松集成多种数据源、分析工具和可视化组件,构建完整的智能数据分析生态。### 本文的价值与结构本文将从MCP协议的基础概念出发,深入探讨智能问数系统的设计原理和实现技术,并提供大量实际代码示例和最佳实践。无论您是系统架构师、数据工程师,还是AI应用开发者,都能从本文中获得有价值的技术洞察和实用指导。## 目录1. [MCP协议基础与核心概念](#mcp协议基础与核心概念)
2. [智能问数系统架构设计](#智能问数系统架构设计)
3. [自然语言查询解析引擎](#自然语言查询解析引擎)
4. [MCP服务器端实现](#mcp服务器端实现)
5. [智能查询优化与执行](#智能查询优化与执行)
6. [多模态数据交互](#多模态数据交互)
7. [上下文管理与会话状态](#上下文管理与会话状态)
8. [安全性与权限控制](#安全性与权限控制)
9. [性能优化与缓存策略](#性能优化与缓存策略)
10. [可视化与结果呈现](#可视化与结果呈现)
11. [企业级部署与集成](#企业级部署与集成)
12. [实际应用案例分析](#实际应用案例分析)
13. [最佳实践与设计模式](#最佳实践与设计模式)
14. [未来发展趋势与展望](#未来发展趋势与展望)## MCP协议基础与核心概念### MCP协议概述MCP(Model Context Protocol)是一种开放标准协议,旨在为大语言模型与外部工具、数据源之间的交互提供统一的接口规范。该协议定义了标准化的消息格式、工具调用机制和资源访问方式,使得AI系统能够安全、高效地与各种外部系统进行集成。```python
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
import json
import asyncio
from abc import ABC, abstractmethodclass MCPMessageType(Enum):"""MCP消息类型"""REQUEST = "request"RESPONSE = "response"NOTIFICATION = "notification"ERROR = "error"class MCPResourceType(Enum):"""MCP资源类型"""DATABASE = "database"FILE = "file"API = "api"TOOL = "tool"MEMORY = "memory"@dataclass
class MCPMessage:"""MCP消息基础结构"""id: strtype: MCPMessageTypemethod: strparams: Dict[str, Any]timestamp: floatcontext: Optional[Dict[str, Any]] = Nonedef to_dict(self) -> Dict[str, Any]:"""转换为字典格式"""return asdict(self)@classmethoddef from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':"""从字典创建消息对象"""return cls(id=data['id'],type=MCPMessageType(data['type']),method=data['method'],params=data['params'],timestamp=data['timestamp'],context=data.get('context'))@dataclass
class MCPTool:"""MCP工具定义"""name: strdescription: strparameters: Dict[str, Any]required_permissions: List[str]category: strversion: str = "1.0.0"def validate_parameters(self, params: Dict[str, Any]) -> bool:"""验证参数有效性"""required_params = self.parameters.get('required', [])for param in required_params:if param not in params:return Falsereturn True@dataclass
class MCPResource:"""MCP资源定义"""uri: strtype: MCPResourceTypename: strdescription: strmetadata: Dict[str, Any]access_permissions: List[str]def is_accessible(self, user_permissions: List[str]) -> bool:"""检查资源是否可访问"""return all(perm in user_permissions for perm in self.access_permissions)class MCPServer(ABC):"""MCP服务器抽象基类"""def __init__(self, name: str, version: str = "1.0.0"):self.name = nameself.version = versionself.tools: Dict[str, MCPTool] = {}self.resources: Dict[str, MCPResource] = {}self.capabilities = {'tools': True,'resources': True,'prompts': True,'logging': True}def register_tool(self, tool: MCPTool):"""注册工具"""self.tools[tool.name] = tooldef register_resource(self, resource: MCPResource):"""注册资源"""self.resources[resource.uri] = resource@abstractmethodasync def handle_request(self, message: MCPMessage) -> MCPMessage:"""处理请求"""pass@abstractmethodasync def list_tools(self) -> List[MCPTool]:"""列出可用工具"""pass@abstractmethodasync def list_resources(self) -> List[MCPResource]:"""列出可用资源"""passasync def call_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:"""调用工具"""if tool_name not in self.tools:raise ValueError(f"Tool '{tool_name}' not found")tool = self.tools[tool_name]if not tool.validate_parameters(parameters):raise ValueError(f"Invalid parameters for tool '{tool_name}'")return await self._execute_tool(tool, parameters)@abstractmethodasync def _execute_tool(self, tool: MCPTool, parameters: Dict[str, Any]) -> Dict[str, Any]:"""执行工具"""pass
智能问数系统的核心组件
智能问数系统基于MCP协议构建,包含以下核心组件:
class IntelligentQuerySystem:"""智能问数系统"""def __init__(self, mcp_server: MCPServer):self.mcp_server = mcp_serverself.query_parser = NaturalLanguageQueryParser()self.query_optimizer = QueryOptimizer()self.execution_engine = QueryExecutionEngine()self.context_manager = ContextManager()self.result_formatter = ResultFormatter()async def process_query(self, natural_query: str, user_context: Dict[str, Any] = None) -> Dict[str, Any]:"""处理自然语言查询"""# 1. 解析自然语言查询parsed_query = await self.query_parser.parse(natural_query, user_context)# 2. 查询优化optimized_query = await self.query_optimizer.optimize(parsed_query)# 3. 执行查询raw_results = await self.execution_engine.execute(optimized_query)# 4. 格式化结果formatted_results = await self.result_formatter.format(raw_results, parsed_query)# 5. 更新上下文await self.context_manager.update_context(natural_query, formatted_results)return formatted_resultsclass NaturalLanguageQueryParser:"""自然语言查询解析器"""def __init__(self):self.intent_classifier = IntentClassifier()self.entity_extractor = EntityExtractor()self.query_builder = QueryBuilder()async def parse(self, query: str, context: Dict[str, Any] = None) -> 'ParsedQuery':"""解析自然语言查询"""# 意图识别intent = await self.intent_classifier.classify(query, context)# 实体提取entities = await self.entity_extractor.extract(query, context)# 构建结构化查询structured_query = await self.query_builder.build(intent, entities, context)return ParsedQuery(original_query=query,intent=intent,entities=entities,structured_query=structured_query,confidence=min(intent.confidence, entities.confidence))@dataclass
class ParsedQuery:"""解析后的查询"""original_query: strintent: 'QueryIntent'entities: 'ExtractedEntities'structured_query: Dict[str, Any]confidence: floatdef is_valid(self) -> bool:"""检查查询是否有效"""return self.confidence > 0.7 and self.intent.is_valid()@dataclass
class QueryIntent:"""查询意图"""type: str # SELECT, INSERT, UPDATE, DELETE, ANALYZE, VISUALIZEaction: str # 具体动作confidence: floatparameters: Dict[str, Any]def is_valid(self) -> bool:"""检查意图是否有效"""return self.confidence > 0.8 and self.type in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'ANALYZE', 'VISUALIZE']@dataclass
class ExtractedEntities:"""提取的实体"""tables: List[str]columns: List[str]values: List[Any]conditions: List[Dict[str, Any]]aggregations: List[str]time_ranges: List[Dict[str, Any]]confidence: float
智能问数系统架构设计
系统整体架构
class IntelligentQueryArchitecture:"""智能问数系统架构"""def __init__(self):self.components = {'frontend': FrontendInterface(),'api_gateway': APIGateway(),'query_processor': QueryProcessor(),'mcp_orchestrator': MCPOrchestrator(),'data_connectors': DataConnectorManager(),'cache_layer': CacheLayer(),'security_manager': SecurityManager(),'monitoring': MonitoringSystem()}def initialize_system(self):"""初始化系统"""# 启动各个组件for name, component in self.components.items():component.initialize()print(f"Component {name} initialized")# 建立组件间连接self._setup_component_connections()def _setup_component_connections(self):"""设置组件间连接"""# API网关连接查询处理器self.components['api_gateway'].connect(self.components['query_processor'])# 查询处理器连接MCP编排器self.components['query_processor'].connect(self.components['mcp_orchestrator'])# MCP编排器连接数据连接器self.components['mcp_orchestrator'].connect(self.components['data_connectors'])class QueryProcessor:"""查询处理器"""def __init__(self):self.nlp_engine = NLPEngine()self.query_planner = QueryPlanner()self.execution_coordinator = ExecutionCoordinator()async def process_natural_query(self, query: str, user_id: str) -> Dict[str, Any]:"""处理自然语言查询"""# 1. 预处理查询preprocessed_query = await self._preprocess_query(query, user_id)# 2. 语义理解semantic_analysis = await self.nlp_engine.analyze(preprocessed_query)# 3. 查询规划execution_plan = await self.query_planner.create_plan(semantic_analysis)# 4. 执行协调results = await self.execution_coordinator.execute(execution_plan)return resultsasync def _preprocess_query(self, query: str, user_id: str) -> Dict[str, Any]:"""预处理查询"""return {'original_query': query,'user_id': user_id,'timestamp': time.time(),'normalized_query': self._normalize_query(query),'user_context': await self._get_user_context(user_id)}def _normalize_query(self, query: str) -> str:"""规范化查询"""# 去除多余空格、标点符号等import renormalized = re.sub(r'\s+', ' ', query.strip())return normalized.lower()class NLPEngine:"""自然语言处理引擎"""def __init__(self):self.tokenizer = Tokenizer()self.pos_tagger = POSTagger()self.ner_model = NERModel()self.intent_classifier = IntentClassifier()self.dependency_parser = DependencyParser()async def analyze(self, query_data: Dict[str, Any]) -> Dict[str, Any]:"""分析查询语义"""query = query_data['normalized_query']# 分词tokens = self.tokenizer.tokenize(query)# 词性标注pos_tags = self.pos_tagger.tag(tokens)# 命名实体识别entities = self.ner_model.extract(tokens, pos_tags)# 意图分类intent = self.intent_classifier.classify(query, query_data['user_context'])# 依存句法分析dependencies = self.dependency_parser.parse(tokens, pos_tags)return {'tokens': tokens,'pos_tags': pos_tags,'entities': entities,'intent': intent,'dependencies': dependencies,'semantic_structure': self._build_semantic_structure(tokens, pos_tags, entities, intent, dependencies)}def _build_semantic_structure(self, tokens, pos_tags, entities, intent, dependencies):"""构建语义结构"""return {'query_type': intent['type'],'target_entities': [e for e in entities if e['type'] in ['TABLE', 'COLUMN']],'filter_conditions': self._extract_conditions(dependencies, entities),'aggregation_functions': self._extract_aggregations(tokens, pos_tags),'temporal_constraints': self._extract_temporal_info(entities),'grouping_criteria': self._extract_grouping(dependencies, entities)}class QueryPlanner:"""查询规划器"""def __init__(self):self.schema_manager = SchemaManager()self.cost_estimator = CostEstimator()self.plan_optimizer = PlanOptimizer()async def create_plan(self, semantic_analysis: Dict[str, Any]) -> 'ExecutionPlan':"""创建执行计划"""semantic_structure = semantic_analysis['semantic_structure']# 1. 模式匹配schema_info = await self.schema_manager.match_schema(semantic_structure)# 2. 生成候选计划candidate_plans = await self._generate_candidate_plans(semantic_structure, schema_info)# 3. 成本估算costed_plans = []for plan in candidate_plans:cost = await self.cost_estimator.estimate(plan)costed_plans.append((plan, cost))# 4. 选择最优计划best_plan = min(costed_plans, key=lambda x: x[1])[0]# 5. 计划优化optimized_plan = await self.plan_optimizer.optimize(best_plan)return ExecutionPlan(plan_id=str(uuid.uuid4()),steps=optimized_plan['steps'],estimated_cost=optimized_plan['cost'],expected_result_schema=optimized_plan['result_schema'],metadata=optimized_plan['metadata'])async def _generate_candidate_plans(self, semantic_structure: Dict[str, Any], schema_info: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成候选执行计划"""plans = []query_type = semantic_structure['query_type']if query_type == 'SELECT':plans.extend(await self._generate_select_plans(semantic_structure, schema_info))elif query_type == 'ANALYZE':plans.extend(await self._generate_analysis_plans(semantic_structure, schema_info))elif query_type == 'VISUALIZE':plans.extend(await self._generate_visualization_plans(semantic_structure, schema_info))return plansasync def _generate_select_plans(self, semantic_structure: Dict[str, Any], schema_info: Dict[str, Any]) -> List[Dict[str, Any]]:"""生成SELECT查询计划"""plans = []# 基础查询计划base_plan = {'type': 'SELECT','steps': [{'operation': 'TABLE_SCAN','table': schema_info['primary_table'],'columns': semantic_structure['target_entities']}]}# 添加过滤条件if semantic_structure['filter_conditions']:base_plan['steps'].append({'operation': 'FILTER','conditions': semantic_structure['filter_conditions']})# 添加聚合操作if semantic_structure['aggregation_functions']:base_plan['steps'].append({'operation': 'AGGREGATE','functions': semantic_structure['aggregation_functions'],'group_by': semantic_structure['grouping_criteria']})plans.append(base_plan)# 如果涉及多表,生成JOIN计划if len(schema_info['involved_tables']) > 1:join_plan = await self._generate_join_plan(semantic_structure, schema_info)plans.append(join_plan)return plans@dataclass
class ExecutionPlan:"""执行计划"""plan_id: strsteps: List[Dict[str, Any]]estimated_cost: floatexpected_result_schema: Dict[str, Any]metadata: Dict[str, Any]def get_step(self, index: int) -> Dict[str, Any]:"""获取执行步骤"""if 0 <= index < len(self.steps):return self.steps[index]raise IndexError(f"Step index {index} out of range")def add_step(self, step: Dict[str, Any], position: int = None):"""添加执行步骤"""if position is None:self.steps.append(step)else:self.steps.insert(position, step)
自然语言查询解析引擎
意图识别与实体提取
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
import spacyclass IntentClassifier:"""意图分类器"""def __init__(self, model_name: str = "bert-base-uncased"):self.tokenizer = AutoTokenizer.from_pretrained(model_name)self.model = AutoModel.from_pretrained(model_name)self.classifier = self._build_classifier()self.intent_labels = ['SELECT_DATA', 'AGGREGATE_DATA', 'FILTER_DATA', 'JOIN_TABLES','CREATE_VISUALIZATION', 'ANALYZE_TRENDS', 'COMPARE_VALUES','FIND_ANOMALIES', 'GENERATE_REPORT', 'UPDATE_DATA']def _build_classifier(self) -> nn.Module:"""构建分类器网络"""return nn.Sequential(nn.Linear(768, 256), # BERT hidden sizenn.ReLU(),nn.Dropout(0.3),nn.Linear(256, 128),nn.ReLU(),nn.Dropout(0.2),nn.Linear(128, len(self.intent_labels)))async def classify(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:"""分类查询意图"""# 编码查询文本inputs = self.tokenizer(query, return_tensors="pt", padding=True, truncation=True)with torch.no_grad():# 获取BERT嵌入outputs = self.model(**inputs)embeddings = outputs.last_hidden_state.mean(dim=1) # 平均池化# 意图分类logits = self.classifier(embeddings)probabilities = torch.softmax(logits, dim=-1)# 获取最高概率的意图predicted_idx = torch.argmax(probabilities, dim=-1).item()confidence = probabilities[0][predicted_idx].item()# 结合上下文调整结果if context:confidence = self._adjust_confidence_with_context(query, predicted_idx, confidence, context)return {'type': self.intent_labels[predicted_idx],'confidence': confidence,'all_probabilities': {label: prob.item() for label, prob in zip(self.intent_labels, probabilities[0])},'reasoning': self._explain_classification(query, predicted_idx, confidence)}def _adjust_confidence_with_context(self, query: str, predicted_idx: int, confidence: float, context: Dict[str, Any]) -> float:"""基于上下文调整置信度"""# 检查历史查询模式if 'query_history' in context:recent_intents = [q.get('intent', {}).get('type') for q in context['query_history'][-3:]]current_intent = self.intent_labels[predicted_idx]# 如果与最近的查询意图一致,提高置信度if current_intent in recent_intents:confidence = min(1.0, confidence * 1.1)# 检查用户偏好if 'user_preferences' in context:preferred_operations = context['user_preferences'].get('common_operations', [])current_intent = self.intent_labels[predicted_idx]if current_intent in preferred_operations:confidence = min(1.0, confidence * 1.05)return confidencedef _explain_classification(self, query: str, predicted_idx: int, confidence: float) -> str:"""解释分类结果"""intent = self.intent_labels[predicted_idx]explanations = {'SELECT_DATA': f"查询包含数据检索关键词,意图是获取特定数据",'AGGREGATE_DATA': f"查询包含聚合函数关键词(如sum、count、average),意图是数据汇总",'FILTER_DATA': f"查询包含条件筛选关键词(如where、filter),意图是数据过滤",'CREATE_VISUALIZATION': f"查询包含可视化关键词(如chart、graph、plot),意图是创建图表",'ANALYZE_TRENDS': f"查询包含趋势分析关键词(如trend、pattern),意图是趋势分析"}return explanations.get(intent, f"基于语义分析,识别为{intent}操作")class EntityExtractor:"""实体提取器"""def __init__(self):self.nlp = spacy.load("en_core_web_sm")self.custom_entities = self._load_custom_entities()def _load_custom_entities(self) -> Dict[str, List[str]]:"""加载自定义实体"""return {'TABLE_NAMES': ['users', 'orders', 'products', 'customers', 'sales', 'inventory'],'COLUMN_NAMES': ['id', 'name', 'email', 'date', 'amount', 'price', 'quantity'],'AGGREGATION_FUNCTIONS': ['sum', 'count', 'average', 'max', 'min', 'median'],'TIME_PERIODS': ['today', 'yesterday', 'last week', 'last month', 'last year'],'COMPARISON_OPERATORS': ['greater than', 'less than', 'equal to', 'between']}async def extract(self, query: str, context: Dict[str, Any] = None) -> Dict[str, Any]:"""提取实体"""# 使用spaCy进行基础实体识别doc = self.nlp(query)# 提取标准实体standard_entities = self._extract_standard_entities(doc)# 提取自定义实体custom_entities = self._extract_custom_entities(query)# 提取数值和时间实体numerical_entities = self._extract_numerical_entities(doc)temporal_entities = self._extract_temporal_entities(doc)# 合并所有实体all_entities = {**standard_entities,**custom_entities,**numerical_entities,**temporal_entities}# 计算整体置信度confidence = self._calculate_entity_confidence(all_entities)return {'entities': all_entities,'confidence': confidence,'entity_count': sum(len(v) if isinstance(v, list) else 1 for v in all_entities.values()),'coverage': self._calculate_coverage(query, all_entities)}def _extract_standard_entities(self, doc) -> Dict[str, List[Dict[str, Any]]]:"""提取标准命名实体"""entities = {'PERSON': [],'ORG': [],'GPE': [], # 地理政治实体'MONEY': [],'DATE': [],'TIME': []}for ent in doc.ents:if ent.label_ in entities:entities[ent.label_].append({'text': ent.text,'start': ent.start_char,'end': ent.end_char,'confidence': 0.9 # spaCy实体的默认置信度})return entitiesdef _extract_custom_entities(self, query: str) -> Dict[str, List[Dict[str, Any]]]:"""提取自定义实体"""entities = {}for entity_type, entity_list in self.custom_entities.items():entities[entity_type] = []for entity in entity_list:if entity.lower() in query.lower():start_idx = query.lower().find(entity.lower