项目六:ChatBI智能分析与可视化(制造业经营决策ChatBI系统)
项目原型
📊 ChatBI 智能数据分析平台
==================================================[对话界面]
用户: 帮我分析上月各产品线的销售额趋势🤖 BI助手:
已为您生成分析结果:[可视化图表 - 上月各产品线销售额趋势]▲销售额│(万元)│ ██████████500 │ ██ ██│ ██ ██400 │ ██ ██│ ██████████300 │ ██ ██│ ██ ██200 │ ██ ██│ ██████████100 │ ██ ██└──────────────────────────▶1 5 10 15 20 25 31 日期[数据摘要]
• 分析期间: 2024年1月1日 - 2024年1月31日
• 总销售额: 1,245万元
• 产品线数量: 8个
• 日均销售额: 40.2万元[详细数据表]
日期 | 产品线A | 产品线B | 产品线C | 产品线D | 总计
2024-01-01 | 45.2 | 32.1 | 28.7 | 15.6 | 121.6
2024-01-02 | 42.8 | 34.5 | 31.2 | 16.8 | 125.3
... | ... | ... | ... | ... | ...
2024-01-31 | 51.3 | 38.9 | 35.6 | 19.2 | 145.0[生成的SQL查询]
SELECT sales_date,product_line,SUM(sales_amount) as daily_sales
FROM sales_data
WHERE sales_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY sales_date, product_line
ORDER BY sales_date;[推荐后续分析]
1. 📈 查看利润趋势
2. 🏆 分析销售冠军产品
3. 📊 比较各渠道表现
4. 🔍 深入分析异常日期[操作选项]
1. 💾 导出报告 2. 📧 分享结果 3. 🔄 重新分析
4. ⚙️ 调整参数 5. ❓ 帮助 6. 🗑️ 清除对话请输入选择 [1-6]:
配置参数
# config/chatbi_config.yaml
database:host: "bi-db.company.com"port: 5432database: "business_intelligence"username: "chatbi_user"password: "encrypted_password"pool_size: 10max_overflow: 20nl2sql:max_query_length: 1000timeout: 30allowed_tables:- "sales_data"- "products"- "customers"- "regions"prohibited_operations:- "DELETE"- "UPDATE"- "INSERT"- "DROP"visualization:default_chart_size: [800, 500]color_palette: "husl"supported_charts:- "line"- "bar"- "pie"- "scatter"- "heatmap"performance:cache_size: 1000query_timeout: 60max_results: 10000security:query_whitelist: truemax_query_complexity: 5 # 最大连接表数
核心代码
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Any, Tuple
import re
import json
from sqlalchemy import create_engine, text
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
warnings.filterwarnings('ignore')class ChatBIAnalyst:"""ChatBI智能分析系统自然语言转SQL查询+自动可视化"""def __init__(self,db_config:Dict):self.db_config = db.configself.engine = create_engine(f"postgresql://{db_config['user']}:{db_config['password']}"f"@{db_config['host']}:{db_config['port']}/{db_config['database']}")#数据库schema缓存self.schema_info = self._load_schema_info()#可视化配置self.setup_plotting()def setup_plotting(self):"""设置绘图样式"""plt.style.use('seaborn-v0_8')sns.set_palette("husl")#Plotly配置self.plotly_config = {'displayModeBar': True,'displaylogo': False,'modeBarButtonsToRemove': ['lasso2d', 'select2d']}def _load_schema_info(self)->Dict:"""加载数据库schema信息"""schema = {}try:# 获取所有表tables_query = """SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"""tables = pd.read_sql(tables_query, self.engine)['table_name'].tolist()for table in tables:# 获取表结构columns_query = f"""SELECT column_name, data_type, is_nullableFROM information_schema.columns WHERE table_name = '{table}'"""columns_df = pd.read_sql(columns_query, self.engine)schema[table] = columns_df.to_dict('records')return schemaexcept Exception as e:print(f"加载schema失败: {e}")return {}def parse_natural_language(self,question:str)->Dict:"""解析自然语言问题"""# 问题分类question_type = self._classify_question(question)# 提取关键信息extracted_info = self._extract_query_info(question, question_type)return {'question_type': question_type,'extracted_info': extracted_info,'original_question': question}def _classify_question(self,question:str)->str:"""问题分类"""question_lower = question.lower()patterns = {'trend_analysis': [r'.*趋势.*',r'.*变化.*', r'.*增长.*',r'.*下降.*'],'comparison': [r'.*比较.*',r'.*对比.*',r'.*哪个.*更.*',r'.*vs.*'],'distribution': [r'.*分布.*',r'.*占比.*',r'.*比例.*',r'.*组成.*'],'ranking': [r'.*排名.*',r'.*前.*',r'.*最.*',r'.*top.*'],'correlation': [r'.*相关.*',r'.*关系.*',r'.*影响.*']}for q_type, pattern_list in patterns.items():for pattern in pattern_list:if re.search(pattern, question_lower):return q_typereturn 'general_query'def _extract_query_info(self,question:str,question_type:str)->Dict:"""提取查询信息"""info = {'metrics': [],'dimensions': [],'time_range': None,'filters': []}# 提取指标(数值字段)metric_patterns = [r'(\w+)额', r'(\w+)量', r'(\w+)数', r'(\w+)率',r'(\w+)成本', r'(\w+)收入', r'(\w+)利润']for pattern in metric_patterns:matches = re.findall(pattern, question)info['metrics'].extend(matches)# 提取维度(分类字段)dimension_keywords = ['地区', '产品', '类别', '部门', '渠道', '时间', '月份']for keyword in dimension_keywords:if keyword in question:info['dimensions'].append(keyword)# 提取时间范围time_patterns = {r'(\d+)月': 'month',r'(\d+)季度': 'quarter', r'(\d+)年': 'year',r'最近(\d+)天': 'recent_days',r'上周': 'last_week',r'上月': 'last_month'}for pattern, time_type in time_patterns.items():match = re.search(pattern, question)if match:info['time_range'] = {'type': time_type,'value': match.group(1) if match.groups() else None}breakreturn infodef generate_sql_query(self,parsed_question:Dict)->str:"""生成SQL查询"""question_type = parsed_question['question_type']extracted_info = parsed_question['extracted_info']# 根据问题类型选择模板sql_template = self._get_sql_template(question_type, extracted_info)# 填充模板sql_query = self._fill_sql_template(sql_template, extracted_info)return sql_querydef _get_sql_template(self,question_type:str,extracted_info:Dict)->str:"""获取SQL模板"""templates = {'trend_analysis': """SELECT {time_dimension},{metrics}FROM {table}WHERE {time_filter}GROUP BY {time_dimension}ORDER BY {time_dimension}""",'comparison': """SELECT {dimension},{metrics}FROM {table}WHERE {filters}GROUP BY {dimension}ORDER BY {metrics} DESC""",'distribution': """SELECT {dimension},COUNT(*) as count,ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM {table}), 2) as percentageFROM {table}GROUP BY {dimension}ORDER BY count DESC""",'ranking': """SELECT {dimension},{metrics}FROM {table}WHERE {filters}GROUP BY {dimension}ORDER BY {metrics} DESCLIMIT {limit}""",'general_query': """SELECT {dimensions},{metrics}FROM {table}WHERE {filters}GROUP BY {dimensions}"""}return templates.get(question_type, templates['general_query'])def _fill_sql_template(self,template:str,extracted_info:Dict)->str:"""填充SQL模板"""# 这里简化实现,实际应该根据schema映射字段filled_template = template# 映射字段到实际表名和列名table_mapping = {'销售': 'sales_data','产品': 'products', '客户': 'customers','地区': 'regions'}metric_mapping = {'销售': 'sales_amount','收入': 'revenue','利润': 'profit','数量': 'quantity'}# 选择主表(简化逻辑)main_table = 'sales_data'# 替换占位符filled_template = filled_template.replace('{table}', main_table)# 处理指标if extracted_info['metrics']:metrics = []for metric in extracted_info['metrics']:mapped_metric = metric_mapping.get(metric, f"{metric}_value")metrics.append(f"SUM({mapped_metric}) as total_{mapped_metric}")filled_template = filled_template.replace('{metrics}', ', '.join(metrics))else:filled_template = filled_template.replace('{metrics}', 'COUNT(*) as total_count')# 处理维度if extracted_info['dimensions']:dimensions = extracted_info['dimensions'][:2] # 最多两个维度filled_template = filled_template.replace('{dimensions}', ', '.join(dimensions))filled_template = filled_template.replace('{dimension}', dimensions[0])else:filled_template = filled_template.replace('{dimensions}', '')filled_template = filled_template.replace('{dimension}', 'category') # 默认维度# 处理时间范围if extracted_info['time_range']:time_filter = self._build_time_filter(extracted_info['time_range'])filled_template = filled_template.replace('{time_filter}', time_filter)filled_template = filled_template.replace('{time_dimension}', 'sales_month')else:filled_template = filled_template.replace('{time_filter}', '1=1')filled_template = filled_template.replace('{time_dimension}', 'sales_month')# 其他占位符filled_template = filled_template.replace('{filters}', '1=1')filled_template = filled_template.replace('{limit}', '10')return filled_templatedef _build_time_filter(self,time_range:Dict)->str:"""构建时间过滤条件"""time_type = time_range['type']value = time_range['value']filters = {'month': f"EXTRACT(MONTH FROM sales_date) = {value}",'quarter': f"EXTRACT(QUARTER FROM sales_date) = {value}",'year': f"EXTRACT(YEAR FROM sales_date) = {value}",'recent_days': f"sales_date >= CURRENT_DATE - INTERVAL '{value} days'",'last_week': "sales_date >= DATE_TRUNC('week', CURRENT_DATE) - INTERVAL '1 week'",'last_month': "sales_date >= DATE_TRUNC('month', CURRENT_DATE) - INTERVAL '1 month'"}return filters.get(time_type, '1=1')def execute_query(self,sql_query:str)->pd.DataFrame:"""执行SQL查询"""try:df = pd.read_sql(sql_query, self.engine)return dfexcept Exception as e:print(f"查询执行失败: {e}")return pd.DataFrame()def generate_visualization(self,df:pd.DataFrame,question_type:str,question:str)->Dict:"""生成可视化图表"""if df.empty:return {'error': '无数据可可视化'}visualization_results = {}# 根据问题类型选择图表类型chart_type = self._recommend_chart_type(question_type, df)# 生成图表if chart_type == 'line':fig = self._create_line_chart(df, question)elif chart_type == 'bar':fig = self._create_bar_chart(df, question)elif chart_type == 'pie':fig = self._create_pie_chart(df, question)elif chart_type == 'scatter':fig = self._create_scatter_plot(df, question)else:fig = self._create_bar_chart(df, question) # 默认柱状图# 保存图表chart_path = f"/tmp/chart_{hash(question)}.html"fig.write_html(chart_path, config=self.plotly_config)# 生成数据摘要data_summary = self._generate_data_summary(df)visualization_results.update({'chart_type': chart_type,'chart_path': chart_path,'data_summary': data_summary,'raw_data': df.to_dict('records')})return visualization_resultsdef _recommend_chart_type(self,question_type:str,df:pd.DataFrame)->str:"""推荐图表类型"""chart_mapping = {'trend_analysis': 'line','comparison': 'bar','distribution': 'pie','ranking': 'bar','correlation': 'scatter'}# 根据数据特征调整if question_type == 'trend_analysis' and len(df) < 5:return 'bar' # 数据点少时用柱状图return chart_mapping.get(question_type, 'bar')def _create_line_chart()->go.Figure:"""创建折线图"""# 自动选择数值列和时间列numeric_cols = df.select_dtypes(include=[np.number]).columnstime_col = df.select_dtypes(include=['datetime', 'object']).columns[0] if not df.select_dtypes(include=['datetime']).empty else df.index.nameif len(numeric_cols) == 0:# 没有数值列,使用计数value_col = 'count'df[value_col] = range(1, len(df) + 1)else:value_col = numeric_cols[0]fig = px.line(df, x=time_col, y=value_col, title=title)return figdef _create_bar_chart(self, df: pd.DataFrame, title: str) -> go.Figure:"""创建柱状图"""numeric_cols = df.select_dtypes(include=[np.number]).columnscategory_col = df.select_dtypes(include=['object']).columns[0] if not df.select_dtypes(include=['object']).empty else df.index.nameif len(numeric_cols) == 0:value_col = 'count'df[value_col] = range(1, len(df) + 1)else:value_col = numeric_cols[0]fig = px.bar(df, x=category_col, y=value_col, title=title)return figdef _create_pie_chart(self,df:pd.DataFrame,title:str)->go.Fiture:"""创建饼图"""numeric_cols = df.select_dtypes(include=[np.number]).columnscategory_col = df.select_dtypes(include=['object']).columns[0] if not df.select_dtypes(include=['object']).empty else df.index.nameif len(numeric_cols) == 0:value_col = 'count'df[value_col] = range(1, len(df) + 1)else:value_col = numeric_cols[0]fig = px.pie(df, names=category_col, values=value_col, title=title)return figdef _create_scatter_plot(self, df: pd.DataFrame, title: str)->go.Figure:"""创建散点图"""numeric_cols = df.select_dtypes(include=[np.number]).columnsif len(numeric_cols) < 2:return self._create_bar_chart(df, title) # 数值列不足,回退到柱状图x_col, y_col = numeric_cols[0], numeric_cols[1]# 如果有分类列,用于着色category_col = df.select_dtypes(include=['object']).columns[0] if not df.select_dtypes(include=['object']).empty else Noneif category_col:fig = px.scatter(df, x=x_col, y=y_col, color=category_col, title=title)else:fig = px.scatter(df, x=x_col, y=y_col, title=title)return figdef _generate_data_summary(self, df: pd.DataFrame) -> str:"""生成数据摘要"""summary = []summary.append(f"数据概览: {len(df)} 行 × {len(df.columns)} 列")# 数值列统计numeric_cols = df.select_dtypes(include=[np.number]).columnsfor col in numeric_cols:summary.append(f"- {col}: 均值 {df[col].mean():.2f}, 范围 [{df[col].min():.2f}, {df[col].max():.2f}]")# 分类列统计category_cols = df.select_dtypes(include=['object']).columnsfor col in category_cols[:3]: # 最多显示3个分类列unique_count = df[col].nunique()top_value = df[col].mode().iloc[0] if len(df[col].mode()) > 0 else 'N/A'summary.append(f"- {col}: {unique_count} 个唯一值, 最常见: {top_value}")return "\n".join(summary)def full_analysis_pipeline(self, question: str) -> Dict:"""完整分析流程"""# 1. 解析自然语言parsed_question = self.parse_natural_language(question)# 2. 生成SQL查询sql_query = self.generate_sql_query(parsed_question)# 3. 执行查询df = self.execute_query(sql_query)# 4. 生成可视化visualization = self.generate_visualization(df, parsed_question['question_type'], question)return {'question': question,'parsed_question': parsed_question,'sql_query': sql_query,'visualization': visualization,'status': 'success' if not df.empty else 'no_data'}class ChatBIWebInterface:"""ChatBI Web界面"""def __init__(self, analyst: ChatBIAnalyst):self.analyst = analystself.conversation_history = []def handle_user_query(self, user_input: str) -> Dict:"""处理用户查询""""""处理用户查询"""# 执行分析analysis_result = self.analyst.full_analysis_pipeline(user_input)# 添加到对话历史self.conversation_history.append({'user_query': user_input,'analysis_result': analysis_result,'timestamp': pd.Timestamp.now().isoformat()})return analysis_resultdef get_suggested_questions(self) -> List[str]:"""获取推荐问题"""return ["上月销售额趋势如何?","各产品类别的销售占比是多少?","哪个地区的利润最高?","最近三个月客户数量的变化趋势?","销售额和广告投入有关系吗?"]
==============================================
Java架构
ChatBI在零售运营决策的实战应用
- 完整的业务架构 - 从自然语言解析到可视化展示的全流程
- 多数据源支持 - MySQL + ClickHouse的混合架构
- 智能NLP处理 - 规则引擎 + AI模型的混合方案
- 高性能查询 - 查询缓存 + 异步处理 + 连接池优化
- 企业级安全 - 权限控制 + 数据脱敏 + 访问审计
- 完整的监控 - 性能指标 + 业务日志 + 异常追踪
- 生产就绪 - 健康检查 + 配置管理 + 部署方案
业务场景:区域运营总监通过自然语言快速获取经营分析数据
┌─ 零售运营智能分析平台 ───────────────────────────────────┐
│ 用户: 张总监(华东区) | 时间: 2024年Q1 | [保存为看板] │
├─────────────────────────────────────────────────────────┤
│ 语音输入: "显示上海区域三月份销售额前十的门店,并对比 │
│ 去年同期增长率" [🎤] │
│ │
│ 【智能解析】 │
│ ✓ 区域: 上海 ✓ 时间: 2024-03 ✓ 指标: 销售额 │
│ ✓ 排名: Top10 ✓ 对比: 同比增长率 │
├─────────────────────────────────────────────────────────┤
│ 【自动生成可视化】 │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 门店销售额TOP10 │ │ 同比增长分析 │ │
│ │ ████ 南京东路店│ │ 📈 南京东路 +25%│ │
│ │ ████ 陆家嘴店 │ │ 📈 徐家汇 +18%│ │
│ │ ████ 徐家汇店 │ │ 📉 虹桥店 -5%│ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ 【深度洞察】 │
│ 💡 南京东路店增长主要来自新品首发活动 │
│ ⚠️ 虹桥店客流同比下降15%,建议加强社区营销 │
│ 📅 三月第二周为销售高峰,与促销活动周期一致 │
│ │
│ 进一步分析: "虹桥店客单价和来客数具体变化?" [分析] │
│ [生成PPT] [发送邮件] [下钻分析] [设置预警] │
└─────────────────────────────────────────────────────────┘
retail-chatbi-system/
├── src/main/java/com/company/retailbi/
│ ├── controller/
│ │ ├── ChatBIController.java
│ │ ├── DataSourceController.java
│ │ └── ReportController.java
│ ├── service/
│ │ ├── impl/
│ │ │ ├── ChatBIServiceImpl.java
│ │ │ ├── NLPServiceImpl.java
│ │ │ ├── DataQueryServiceImpl.java
│ │ │ └── VisualizationServiceImpl.java
│ │ ├── ChatBIService.java
│ │ ├── NLPService.java
│ │ ├── DataQueryService.java
│ │ └── VisualizationService.java
│ ├── repository/
│ │ ├── SalesDataRepository.java
│ │ ├── QueryLogRepository.java
│ │ └── custom/
│ │ └── SalesDataRepositoryCustom.java
│ ├── entity/
│ │ ├── SalesDataEntity.java
│ │ ├── QueryLogEntity.java
│ │ └── DataSourceConfigEntity.java
│ ├── dto/
│ │ ├── request/
│ │ │ ├── ChatQueryRequest.java
│ │ │ └── DataQueryRequest.java
│ │ ├── response/
│ │ │ ├── ChatResponse.java
│ │ │ ├── QueryResult.java
│ │ │ └── VisualizationResult.java
│ │ └── internal/
│ │ ├── ParsedQuery.java
│ │ └── QueryContext.java
│ ├── config/
│ │ ├── DataSourceConfig.java
│ │ ├── CacheConfig.java
│ │ ├── AsyncConfig.java
│ │ └── SecurityConfig.java
│ ├── nlp/
│ │ ├── parser/
│ │ │ ├── QueryParser.java
│ │ │ ├── IntentRecognizer.java
│ │ │ └── EntityExtractor.java
│ │ ├── model/
│ │ │ ├── QueryIntent.java
│ │ │ ├── BusinessEntity.java
│ │ │ └── TimeRange.java
│ │ └── processor/
│ │ ├── NaturalLanguageProcessor.java
│ │ └── SQLGenerator.java
│ ├── data/
│ │ ├── query/
│ │ │ ├── QueryExecutor.java
│ │ │ ├── ClickHouseExecutor.java
│ │ │ └── MySQLExecutor.java
│ │ ├── model/
│ │ │ ├── SalesMetric.java
│ │ │ └── BusinessDimension.java
│ │ └── cache/
│ │ ├── QueryCacheManager.java
│ │ └── DataCacheService.java
│ ├── visualization/
│ │ ├── chart/
│ │ │ ├── ChartGenerator.java
│ │ │ ├── BarChartGenerator.java
│ │ │ ├── LineChartGenerator.java
│ │ │ └── PieChartGenerator.java
│ │ ├── template/
│ │ │ ├── ChartTemplate.java
│ │ │ └── DashboardTemplate.java
│ │ └── render/
│ │ ├── ChartRenderer.java
│ │ └── ImageRenderer.java
│ ├── ai/
│ │ ├── client/
│ │ │ ├── OpenAIClient.java
│ │ │ └── LocalModelClient.java
│ │ ├── service/
│ │ │ ├── InsightService.java
│ │ │ └── RecommendationService.java
│ │ └── prompt/
│ │ ├── QueryPrompt.java
│ │ └── InsightPrompt.java
│ ├── security/
│ │ ├── JwtTokenProvider.java
│ │ ├── UserPrincipal.java
│ │ └── SecurityUtils.java
│ └── async/
│ ├── QueryAsyncProcessor.java
│ └── ReportGenerator.java
├── src/test/java/
│ ├── unit/
│ ├── integration/
│ └── performance/
├── src/main/resources/
│ ├── application.yml
│ ├── application-prod.yml
│ ├── db/
│ │ ├── migration/
│ │ └── init/
│ ├── nlp/
│ │ ├── patterns/
│ │ └── rules/
│ └── templates/
│ ├── chart/
│ └── report/
└── docker/├── docker-compose.yml└── prometheus.yml
后端框架: Spring Boot 3.1 + Spring Data JPA + QueryDSL
数据库: MySQL 8.0 (OLTP) + ClickHouse (OLAP)
缓存: Redis Cluster
消息队列: Apache Kafka
搜索引擎: Elasticsearch 8.x
BI引擎: Apache Superset集成
AI组件: OpenAI API + 本地微调模型
安全: Spring Security + JWT
监控: Micrometer + Prometheus + Grafana
# application-prod.yml
spring:datasource:url: jdbc:mysql://mysql-cluster:3306/retail_bi?useSSL=true&requireSSL=trueusername: ${MYSQL_USERNAME}password: ${MYSQL_PASSWORD}hikari:maximum-pool-size: 50minimum-idle: 10connection-timeout: 30000max-lifetime: 1800000data:clickhouse:url: jdbc:clickhouse://clickhouse-cluster:8123/retail_biusername: ${CLICKHOUSE_USERNAME}password: ${CLICKHOUSE_PASSWORD}redis:cluster:nodes: ${REDIS_CLUSTER_NODES}password: ${REDIS_PASSWORD}lettuce:pool:max-active: 100max-idle: 50min-idle: 20kafka:bootstrap-servers: ${KAFKA_BROKERS}producer:acks: allretries: 3consumer:group-id: chatbi-consumerauto-offset-reset: latestelasticsearch:uris: ${ELASTICSEARCH_NODES}username: ${ES_USERNAME}password: ${ES_PASSWORD}security:oauth2:resourceserver:jwt:issuer-uri: ${AUTH_SERVER_URL}chatbi:ai:openai:api-key: ${OPENAI_API_KEY}timeout: 30000max-tokens: 2000cache:query:enabled: truettl: 600insight:enabled: truettl: 1800security:rate-limit:enabled: truerequests-per-minute: 60monitoring:enabled: trueprometheus:enabled: truemanagement:endpoints:web:exposure:include: health,metrics,prometheusendpoint:health:show-details: alwaysprometheus:enabled: truelogging:level:com.company.retailbi: INFOfile:name: /app/logs/chatbi.loglogback:rollingpolicy:max-file-size: 100MBmax-history: 30server:port: 8080tomcat:max-threads: 200min-spare-threads: 20
核心实体设计
// 销售数据实体 - MySQL
@Entity
@Table(name = "bi_sales_data", indexes = {@Index(name = "idx_date_region", columnList = "saleDate,region"),@Index(name = "idx_product_date", columnList = "productId,saleDate")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SalesDataEntity {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(nullable = false)private String dataId; // 业务唯一ID@Column(nullable = false)private LocalDate saleDate;@Column(nullable = false, length = 50)private String region; // 区域@Column(nullable = false, length = 100)private String storeName; // 门店名称@Column(nullable = false, length = 50)private String productId;@Column(nullable = false, length = 200)private String productName;@Column(nullable = false, length = 50)private String productCategory;@Column(nullable = false)private Integer saleQuantity;@Column(nullable = false, precision = 15, scale = 2)private BigDecimal saleAmount;@Column(nullable = false, precision = 15, scale = 2)private BigDecimal costAmount;@Column(precision = 10, scale = 2)private BigDecimal discountRate;@Column(nullable = false, length = 20)private String saleChannel; // 销售渠道@Column(length = 50)private String promotionId; // 促销活动ID@CreationTimestampprivate LocalDateTime createTime;@UpdateTimestampprivate LocalDateTime updateTime;
}// ClickHouse分布式表实体
@Table("sales_data_distributed")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SalesDataClickHouse {@Column(name = "data_id")private String dataId;@Column(name = "sale_date")private LocalDate saleDate;@Column(name = "region")private String region;@Column(name = "store_name")private String storeName;@Column(name = "product_id")private String productId;@Column(name = "sale_amount")private BigDecimal saleAmount;@Column(name = "sale_quantity")private Integer saleQuantity;// 聚合字段@Column(name = "daily_total")private BigDecimal dailyTotal;@Column(name = "monthly_growth")private BigDecimal monthlyGrowth;
}// 查询日志实体
@Entity
@Table(name = "bi_query_log")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QueryLogEntity {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(nullable = false)private String queryId;@Column(nullable = false, columnDefinition = "TEXT")private String originalQuery; // 原始查询语句@Column(columnDefinition = "TEXT")private String parsedQuery; // 解析后的查询@Column(columnDefinition = "TEXT")private String generatedSql; // 生成的SQL@Column(nullable = false, length = 50)private String intent; // 查询意图@Column(columnDefinition = "JSON")private String entities; // 提取的实体JSON@Column(nullable = false, length = 50)private String userId;@Column(nullable = false, length = 100)private String userRole;@Column(nullable = false)private Long responseTime; // 响应时间(ms)@Column(nullable = false)private Boolean success;@Column(columnDefinition = "TEXT")private String errorMessage;@Column(columnDefinition = "JSON")private String queryResult; // 查询结果摘要@CreationTimestampprivate LocalDateTime createTime;
}// 数据源配置实体
@Entity
@Table(name = "bi_data_source")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DataSourceConfigEntity {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(nullable = false, unique = true, length = 100)private String sourceName;@Column(nullable = false, length = 20)private String sourceType; // MYSQL, CLICKHOUSE, ES@Column(nullable = false, length = 500)private String jdbcUrl;@Column(nullable = false, length = 100)private String username;@Column(nullable = false, length = 500)private String password; // 加密存储@Column(nullable = false, length = 100)private String databaseName;@Column(length = 1000)private String description;@Column(nullable = false)private Boolean enabled;@Column(nullable = false)private Integer timeoutSeconds;@Column(nullable = false)private Integer maxConnections;@CreationTimestampprivate LocalDateTime createTime;@UpdateTimestampprivate LocalDateTime updateTime;
}
REST API控制器
- ChatBIService 方法:processNaturalLanguageQuery() getQueryHistory()
- QueryParser
- ChatQueryRequest 方法参数 request.getQuery()收到的自然语言查询
- UserPrincipal 方法参数
// ChatBI主控制器
@RestController
@RequestMapping("/api/v1/chatbi")
@Validated
@Slf4j
public class ChatBIController {private final ChatBIService chatBIService;private final QueryParser queryParser;public ChatBIController(ChatBIService chatBIService, QueryParser queryParser) {this.chatBIService = chatBIService;this.queryParser = queryParser;}/*** 自然语言查询入口*/@PostMapping("/query")public ResponseEntity<ApiResponse<ChatResponse>> processQuery(@Valid @RequestBody ChatQueryRequest request,@AuthenticationPrincipal UserPrincipal user) {log.info("收到自然语言查询: user={}, query={}", user.getUsername(), request.getQuery());try {ChatResponse response = chatBIService.processNaturalLanguageQuery(request, user);return ResponseEntity.ok(ApiResponse.success("查询成功", response));} catch (QueryParseException e) {log.warn("查询解析失败: {}", e.getMessage());return ResponseEntity.badRequest().body(ApiResponse.error(ErrorCode.QUERY_PARSE_ERROR, e.getMessage()));} catch (AccessDeniedException e) {log.warn("权限验证失败: {}", e.getMessage());return ResponseEntity.status(HttpStatus.FORBIDDEN).body(ApiResponse.error(ErrorCode.ACCESS_DENIED, e.getMessage()));} catch (QueryExecutionException e) {log.error("查询执行失败: {}", e.getMessage(), e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ApiResponse.error(ErrorCode.QUERY_EXECUTION_ERROR, e.getMessage()));} catch (Exception e) {log.error("系统异常: {}", e.getMessage(), e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ApiResponse.error(ErrorCode.SYSTEM_ERROR, "系统繁忙,请稍后重试"));}}/*** 批量查询处理*/@PostMapping("/queries/batch")public ResponseEntity<ApiResponse<List<ChatResponse>>> processBatchQueries(@Valid @RequestBody List<ChatQueryRequest> requests,@AuthenticationPrincipal UserPrincipal user) {log.info("收到批量查询请求: user={}, count={}", user.getUsername(), requests.size());if (requests.size() > 10) {return ResponseEntity.badRequest().body(ApiResponse.error(ErrorCode.REQUEST_LIMIT_EXCEEDED, "单次批量查询不能超过10个"));}List<ChatResponse> responses = requests.stream().map(request -> {try {return chatBIService.processNaturalLanguageQuery(request, user);} catch (Exception e) {log.warn("批量查询处理失败: query={}, error={}", request.getQuery(), e.getMessage());return ChatResponse.error(request.getQueryId(), e.getMessage());}}).collect(Collectors.toList());return ResponseEntity.ok(ApiResponse.success("批量查询完成", responses));}/*** 查询历史*/@GetMapping("/queries/history")public ResponseEntity<ApiResponse<PageResponse<QueryLogEntity>>> getQueryHistory(@RequestParam(defaultValue = "0") int page,@RequestParam(defaultValue = "20") int size,@RequestParam(required = false) String intent,@RequestParam(required = false) String dateRange,@AuthenticationPrincipal UserPrincipal user) {Pageable pageable = PageRequest.of(page, size, Sort.by("createTime").descending());Page<QueryLogEntity> history = chatBIService.getQueryHistory(user.getUsername(), intent, dateRange, pageable);return ResponseEntity.ok(ApiResponse.success("查询成功", PageResponse.from(history)));}
}
// 数据源管理控制器
@RestController
@RequestMapping("/api/v1/chatbi/datasources")
@Validated
@Slf4j
@PreAuthorize("hasRole('ADMIN')")
public class DataSourceController {private final DataSourceService dataSourceService;/*** 测试数据源连接*/@PostMapping("/{id}/test")public ResponseEntity<ApiResponse<Boolean>> testDataSource(@PathVariable Long id) {boolean connected = dataSourceService.testConnection(id);if (connected) {return ResponseEntity.ok(ApiResponse.success("数据源连接正常", true));} else {return ResponseEntity.ok(ApiResponse.error(ErrorCode.DATASOURCE_CONNECTION_FAILED, "数据源连接失败", false));}}/*** 获取数据源元数据*/@GetMapping("/{id}/metadata")public ResponseEntity<ApiResponse<DataSourceMetadata>> getDataSourceMetadata(@PathVariable Long id) {DataSourceMetadata metadata = dataSourceService.getDataSourceMetadata(id);return ResponseEntity.ok(ApiResponse.success("元数据获取成功", metadata));}
}
NLP自然语言处理模块
// 查询解析器
@Component
@Slf4j
public class QueryParser {private final IntentRecognizer intentRecognizer;private final EntityExtractor entityExtractor;private final SQLGenerator sqlGenerator;public QueryParser(IntentRecognizer intentRecognizer, EntityExtractor entityExtractor,SQLGenerator sqlGenerator) {this.intentRecognizer = intentRecognizer;this.entityExtractor = entityExtractor;this.sqlGenerator = sqlGenerator;}/*** 解析自然语言查询*/public ParsedQuery parseQuery(String naturalLanguageQuery, String userId) {long startTime = System.currentTimeMillis();try {// 1. 意图识别QueryIntent intent = intentRecognizer.recognizeIntent(naturalLanguageQuery);// 2. 实体提取List<BusinessEntity> entities = entityExtractor.extractEntities(naturalLanguageQuery);// 3. 时间范围解析TimeRange timeRange = entityExtractor.extractTimeRange(naturalLanguageQuery);// 4. 生成SQL查询String generatedSql = sqlGenerator.generateSQL(intent, entities, timeRange);// 5. 构建解析结果ParsedQuery parsedQuery = ParsedQuery.builder().queryId(generateQueryId()).originalQuery(naturalLanguageQuery).intent(intent).entities(entities).timeRange(timeRange).generatedSql(generatedSql).userId(userId).parseTime(System.currentTimeMillis() - startTime).build();log.info("查询解析完成: queryId={}, intent={}, entities={}", parsedQuery.getQueryId(), intent, entities.size());return parsedQuery;} catch (Exception e) {log.error("查询解析失败: query={}, error={}", naturalLanguageQuery, e.getMessage(), e);throw new QueryParseException("自然语言查询解析失败: " + e.getMessage());}}/*** 验证查询权限*/public void validateQueryPermission(ParsedQuery parsedQuery, UserPrincipal user) {// 检查数据权限if (!hasDataAccessPermission(parsedQuery, user)) {throw new AccessDeniedException("用户没有访问该数据的权限");}// 检查查询复杂度if (isQueryTooComplex(parsedQuery)) {throw new QueryComplexityException("查询过于复杂,请简化查询条件");}}private boolean hasDataAccessPermission(ParsedQuery parsedQuery, UserPrincipal user) {// 实现基于用户角色和数据的权限验证return user.getAuthorities().stream().anyMatch(auth -> auth.getAuthority().equals("DATA_ACCESS"));}private boolean isQueryTooComplex(ParsedQuery parsedQuery) {// 基于实体数量、时间范围等判断查询复杂度return parsedQuery.getEntities().size() > 10;}private String generateQueryId() {return "Q_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);}
}// 意图识别器
@Component
@Slf4j
public class IntentRecognizer {private final PatternBasedIntentDetector patternDetector;private final MLBasedIntentDetector mlDetector;/*** 识别查询意图*/public QueryIntent recognizeIntent(String query) {// 优先使用模式匹配(速度快)QueryIntent intent = patternDetector.detectIntent(query);// 如果模式匹配失败,使用机器学习模型if (intent == QueryIntent.UNKNOWN) {intent = mlDetector.detectIntent(query);}log.debug("意图识别结果: query={}, intent={}", query, intent);return intent;}
}// 模式匹配意图识别器
@Component
@Slf4j
public class PatternBasedIntentDetector {private final Map<String, QueryIntent> intentPatterns;public PatternBasedIntentDetector() {this.intentPatterns = loadIntentPatterns();}/*** 加载意图匹配模式*/private Map<String, QueryIntent> loadIntentPatterns() {Map<String, QueryIntent> patterns = new HashMap<>();// 销售相关意图patterns.put(".*(销售|营业额|收入).*(多少|多少|如何).*", QueryIntent.SALES_QUERY);patterns.put(".*(TOP|前十|排名).*销售.*", QueryIntent.TOP_N_QUERY);patterns.put(".*(同比|环比|增长|下降).*", QueryIntent.GROWTH_QUERY);patterns.put(".*(趋势|变化).*", QueryIntent.TREND_QUERY);// 产品相关意图patterns.put(".*(产品|商品).*(销售|表现).*", QueryIntent.PRODUCT_PERFORMANCE);patterns.put(".*(品类|类别).*分布.*", QueryIntent.CATEGORY_DISTRIBUTION);// 门店相关意图patterns.put(".*(门店|店铺).*(业绩|表现).*", QueryIntent.STORE_PERFORMANCE);patterns.put(".*(区域|地区).*对比.*", QueryIntent.REGION_COMPARISON);// 客户相关意图patterns.put(".*(客户|顾客).*(消费|购买).*", QueryIntent.CUSTOMER_BEHAVIOR);patterns.put(".*(复购|回头客).*", QueryIntent.CUSTOMER_RETENTION);return patterns;}public QueryIntent detectIntent(String query) {for (Map.Entry<String, QueryIntent> entry : intentPatterns.entrySet()) {if (query.matches(entry.getKey())) {return entry.getValue();}}return QueryIntent.UNKNOWN;}
}// SQL生成器
@Component
@Slf4j
public class SQLGenerator {private final Map<QueryIntent, String> sqlTemplates;public SQLGenerator() {this.sqlTemplates = loadSQLTemplates();}/*** 生成SQL查询*/public String generateSQL(QueryIntent intent, List<BusinessEntity> entities, TimeRange timeRange) {String template = sqlTemplates.get(intent);if (template == null) {throw new UnsupportedOperationException("不支持的查询意图: " + intent);}// 替换模板变量String sql = replaceTemplateVariables(template, entities, timeRange);// 添加数据权限过滤条件sql = addDataSecurityFilters(sql);log.debug("生成的SQL: {}", sql);return sql;}private String replaceTemplateVariables(String template, List<BusinessEntity> entities, TimeRange timeRange) {String sql = template;// 替换时间范围if (timeRange != null) {sql = sql.replace("${startDate}", timeRange.getStartDate().toString()).replace("${endDate}", timeRange.getEndDate().toString());}// 替换实体条件for (BusinessEntity entity : entities) {switch (entity.getType()) {case REGION:sql = sql.replace("${region}", "'" + entity.getValue() + "'");break;case PRODUCT:sql = sql.replace("${product}", "'" + entity.getValue() + "'");break;case STORE:sql = sql.replace("${store}", "'" + entity.getValue() + "'");break;}}return sql;}private String addDataSecurityFilters(String sql) {// 添加基于用户的数据权限过滤// 例如:AND region IN ('上海', '北京') return sql;}private Map<QueryIntent, String> loadSQLTemplates() {Map<QueryIntent, String> templates = new HashMap<>();// 销售查询模板templates.put(QueryIntent.SALES_QUERY, "SELECT region, store_name, SUM(sale_amount) as total_sales " +"FROM sales_data " +"WHERE sale_date BETWEEN '${startDate}' AND '${endDate}' " +"${region} ${store} " +"GROUP BY region, store_name " +"ORDER BY total_sales DESC");templates.put(QueryIntent.TOP_N_QUERY,"SELECT product_name, SUM(sale_amount) as total_sales " +"FROM sales_data " +"WHERE sale_date BETWEEN '${startDate}' AND '${endDate}' " +"${region} ${store} " +"GROUP BY product_name " +"ORDER BY total_sales DESC " +"LIMIT 10");templates.put(QueryIntent.GROWTH_QUERY,"SELECT t1.region, t1.month_sales, t2.month_sales as last_month_sales, " +" (t1.month_sales - t2.month_sales) / t2.month_sales as growth_rate " +"FROM (" +" SELECT region, SUM(sale_amount) as month_sales " +" FROM sales_data " +" WHERE sale_date BETWEEN '${startDate}' AND '${endDate}' " +" GROUP BY region" +") t1 " +"LEFT JOIN (" +" SELECT region, SUM(sale_amount) as month_sales " +" FROM sales_data " +" WHERE sale_date BETWEEN DATE_SUB('${startDate}', INTERVAL 1 MONTH) " +" AND DATE_SUB('${endDate}', INTERVAL 1 MONTH) " +" GROUP BY region" +") t2 ON t1.region = t2.region");return templates;}
}
数据查询服务
// 数据查询服务
@Service
@Slf4j
@Transactional
public class DataQueryServiceImpl implements DataQueryService {private final QueryExecutor mysqlExecutor;private final QueryExecutor clickHouseExecutor;private final QueryCacheManager cacheManager;private final QueryLogRepository queryLogRepository;/*** 执行数据查询*/@Overridepublic QueryResult executeQuery(ParsedQuery parsedQuery) {long startTime = System.currentTimeMillis();try {// 1. 检查缓存String cacheKey = generateCacheKey(parsedQuery);QueryResult cachedResult = cacheManager.get(cacheKey);if (cachedResult != null) {log.info("缓存命中: queryId={}", parsedQuery.getQueryId());return cachedResult;}// 2. 选择执行器QueryExecutor executor = selectExecutor(parsedQuery);// 3. 执行查询List<Map<String, Object>> data = executor.executeQuery(parsedQuery.getGeneratedSql());// 4. 处理结果QueryResult result = processQueryResult(data, parsedQuery);// 5. 缓存结果cacheManager.put(cacheKey, result, getCacheTTL(parsedQuery));// 6. 记录查询日志logQuery(parsedQuery, System.currentTimeMillis() - startTime, true, null);return result;} catch (Exception e) {log.error("查询执行失败: queryId={}, error={}", parsedQuery.getQueryId(), e.getMessage(), e);logQuery(parsedQuery, System.currentTimeMillis() - startTime, false, e.getMessage());throw new QueryExecutionException("数据查询执行失败: " + e.getMessage());}}/*** 选择查询执行器*/private QueryExecutor selectExecutor(ParsedQuery parsedQuery) {// 根据查询复杂度选择执行器if (isComplexQuery(parsedQuery)) {return clickHouseExecutor;}return mysqlExecutor;}private boolean isComplexQuery(ParsedQuery parsedQuery) {// 判断查询复杂度的逻辑return parsedQuery.getIntent() == QueryIntent.GROWTH_QUERY ||parsedQuery.getIntent() == QueryIntent.TREND_QUERY ||parsedQuery.getGeneratedSql().toLowerCase().contains("join");}/*** 处理查询结果*/private QueryResult processQueryResult(List<Map<String, Object>> data, ParsedQuery parsedQuery) {QueryResult result = new QueryResult();result.setQueryId(parsedQuery.getQueryId());result.setData(data);result.setTotalCount(data.size());result.setExecutionTime(System.currentTimeMillis());// 计算统计指标calculateStatistics(result, data);// 生成数据摘要generateDataSummary(result, data, parsedQuery.getIntent());return result;}private void calculateStatistics(QueryResult result, List<Map<String, Object>> data) {if (data.isEmpty()) {return;}// 计算数值型字段的统计信息Map<String, Double> stats = new HashMap<>();List<String> numericFields = detectNumericFields(data.get(0));for (String field : numericFields) {List<Double> values = data.stream().map(row -> convertToDouble(row.get(field))).filter(Objects::nonNull).collect(Collectors.toList());if (!values.isEmpty()) {stats.put(field + "_sum", values.stream().mapToDouble(Double::doubleValue).sum());stats.put(field + "_avg", values.stream().mapToDouble(Double::doubleValue).average().orElse(0));stats.put(field + "_max", values.stream().mapToDouble(Double::doubleValue).max().orElse(0));stats.put(field + "_min", values.stream().mapToDouble(Double::doubleValue).min().orElse(0));}}result.setStatistics(stats);}private void generateDataSummary(QueryResult result, List<Map<String, Object>> data, QueryIntent intent) {DataSummary summary = new DataSummary();summary.setIntent(intent);summary.setRecordCount(data.size());if (!data.isEmpty()) {// 根据意图生成不同的摘要switch (intent) {case SALES_QUERY:generateSalesSummary(summary, data);break;case TOP_N_QUERY:generateTopNSummary(summary, data);break;case GROWTH_QUERY:generateGrowthSummary(summary, data);break;default:generateDefaultSummary(summary, data);}}result.setSummary(summary);}private void generateSalesSummary(DataSummary summary, List<Map<String, Object>> data) {double totalSales = data.stream().mapToDouble(row -> convertToDouble(row.get("total_sales"))).sum();summary.setMainMetric(totalSales);summary.setSummaryText(String.format("总销售额: ¥%.2f", totalSales));}private void logQuery(ParsedQuery parsedQuery, long responseTime, boolean success, String errorMessage) {QueryLogEntity logEntity = QueryLogEntity.builder().queryId(parsedQuery.getQueryId()).originalQuery(parsedQuery.getOriginalQuery()).parsedQuery(parsedQuery.toString()).generatedSql(parsedQuery.getGeneratedSql()).intent(parsedQuery.getIntent().name()).entities(JsonUtils.toJson(parsedQuery.getEntities())).userId(parsedQuery.getUserId()).userRole("OPERATION_DIRECTOR") // 从用户信息获取.responseTime(responseTime).success(success).errorMessage(errorMessage).queryResult(success ? "查询成功,返回" + parsedQuery.getEntities().size() + "条记录" : null).build();queryLogRepository.save(logEntity);}private String generateCacheKey(ParsedQuery parsedQuery) {return "query:" + parsedQuery.getQueryId() + ":" + DigestUtils.md5DigestAsHex(parsedQuery.getGeneratedSql().getBytes());}private long getCacheTTL(ParsedQuery parsedQuery) {// 根据查询类型设置不同的缓存时间switch (parsedQuery.getIntent()) {case SALES_QUERY:return 300; // 5分钟case GROWTH_QUERY:return 1800; // 30分钟default:return 600; // 10分钟}}
}// ClickHouse查询执行器
@Component
@Slf4j
public class ClickHouseExecutor implements QueryExecutor {private final JdbcTemplate clickHouseJdbcTemplate;@Overridepublic List<Map<String, Object>> executeQuery(String sql) {try {long startTime = System.currentTimeMillis();List<Map<String, Object>> result = clickHouseJdbcTemplate.queryForList(sql);long executionTime = System.currentTimeMillis() - startTime;log.info("ClickHouse查询执行完成: rows={}, time={}ms", result.size(), executionTime);return result;} catch (DataAccessException e) {log.error("ClickHouse查询执行失败: sql={}, error={}", sql, e.getMessage(), e);throw new QueryExecutionException("ClickHouse查询执行失败: " + e.getMessage());}}@Overridepublic boolean supports(String sql) {// 检查SQL是否适合在ClickHouse执行return sql.toLowerCase().contains("sum(") || sql.toLowerCase().contains("avg(") ||sql.toLowerCase().contains("group by");}
}
数据查询服务
// 数据查询服务
@Service
@Slf4j
@Transactional
public class DataQueryServiceImpl implements DataQueryService {private final QueryExecutor mysqlExecutor;private final QueryExecutor clickHouseExecutor;private final QueryCacheManager cacheManager;private final QueryLogRepository queryLogRepository;/*** 执行数据查询*/@Overridepublic QueryResult executeQuery(ParsedQuery parsedQuery) {long startTime = System.currentTimeMillis();try {// 1. 检查缓存String cacheKey = generateCacheKey(parsedQuery);QueryResult cachedResult = cacheManager.get(cacheKey);if (cachedResult != null) {log.info("缓存命中: queryId={}", parsedQuery.getQueryId());return cachedResult;}// 2. 选择执行器QueryExecutor executor = selectExecutor(parsedQuery);// 3. 执行查询List<Map<String, Object>> data = executor.executeQuery(parsedQuery.getGeneratedSql());// 4. 处理结果QueryResult result = processQueryResult(data, parsedQuery);// 5. 缓存结果cacheManager.put(cacheKey, result, getCacheTTL(parsedQuery));// 6. 记录查询日志logQuery(parsedQuery, System.currentTimeMillis() - startTime, true, null);return result;} catch (Exception e) {log.error("查询执行失败: queryId={}, error={}", parsedQuery.getQueryId(), e.getMessage(), e);logQuery(parsedQuery, System.currentTimeMillis() - startTime, false, e.getMessage());throw new QueryExecutionException("数据查询执行失败: " + e.getMessage());}}/*** 选择查询执行器*/private QueryExecutor selectExecutor(ParsedQuery parsedQuery) {// 根据查询复杂度选择执行器if (isComplexQuery(parsedQuery)) {return clickHouseExecutor;}return mysqlExecutor;}private boolean isComplexQuery(ParsedQuery parsedQuery) {// 判断查询复杂度的逻辑return parsedQuery.getIntent() == QueryIntent.GROWTH_QUERY ||parsedQuery.getIntent() == QueryIntent.TREND_QUERY ||parsedQuery.getGeneratedSql().toLowerCase().contains("join");}/*** 处理查询结果*/private QueryResult processQueryResult(List<Map<String, Object>> data, ParsedQuery parsedQuery) {QueryResult result = new QueryResult();result.setQueryId(parsedQuery.getQueryId());result.setData(data);result.setTotalCount(data.size());result.setExecutionTime(System.currentTimeMillis());// 计算统计指标calculateStatistics(result, data);// 生成数据摘要generateDataSummary(result, data, parsedQuery.getIntent());return result;}private void calculateStatistics(QueryResult result, List<Map<String, Object>> data) {if (data.isEmpty()) {return;}// 计算数值型字段的统计信息Map<String, Double> stats = new HashMap<>();List<String> numericFields = detectNumericFields(data.get(0));for (String field : numericFields) {List<Double> values = data.stream().map(row -> convertToDouble(row.get(field))).filter(Objects::nonNull).collect(Collectors.toList());if (!values.isEmpty()) {stats.put(field + "_sum", values.stream().mapToDouble(Double::doubleValue).sum());stats.put(field + "_avg", values.stream().mapToDouble(Double::doubleValue).average().orElse(0));stats.put(field + "_max", values.stream().mapToDouble(Double::doubleValue).max().orElse(0));stats.put(field + "_min", values.stream().mapToDouble(Double::doubleValue).min().orElse(0));}}result.setStatistics(stats);}private void generateDataSummary(QueryResult result, List<Map<String, Object>> data, QueryIntent intent) {DataSummary summary = new DataSummary();summary.setIntent(intent);summary.setRecordCount(data.size());if (!data.isEmpty()) {// 根据意图生成不同的摘要switch (intent) {case SALES_QUERY:generateSalesSummary(summary, data);break;case TOP_N_QUERY:generateTopNSummary(summary, data);break;case GROWTH_QUERY:generateGrowthSummary(summary, data);break;default:generateDefaultSummary(summary, data);}}result.setSummary(summary);}private void generateSalesSummary(DataSummary summary, List<Map<String, Object>> data) {double totalSales = data.stream().mapToDouble(row -> convertToDouble(row.get("total_sales"))).sum();summary.setMainMetric(totalSales);summary.setSummaryText(String.format("总销售额: ¥%.2f", totalSales));}private void logQuery(ParsedQuery parsedQuery, long responseTime, boolean success, String errorMessage) {QueryLogEntity logEntity = QueryLogEntity.builder().queryId(parsedQuery.getQueryId()).originalQuery(parsedQuery.getOriginalQuery()).parsedQuery(parsedQuery.toString()).generatedSql(parsedQuery.getGeneratedSql()).intent(parsedQuery.getIntent().name()).entities(JsonUtils.toJson(parsedQuery.getEntities())).userId(parsedQuery.getUserId()).userRole("OPERATION_DIRECTOR") // 从用户信息获取.responseTime(responseTime).success(success).errorMessage(errorMessage).queryResult(success ? "查询成功,返回" + parsedQuery.getEntities().size() + "条记录" : null).build();queryLogRepository.save(logEntity);}private String generateCacheKey(ParsedQuery parsedQuery) {return "query:" + parsedQuery.getQueryId() + ":" + DigestUtils.md5DigestAsHex(parsedQuery.getGeneratedSql().getBytes());}private long getCacheTTL(ParsedQuery parsedQuery) {// 根据查询类型设置不同的缓存时间switch (parsedQuery.getIntent()) {case SALES_QUERY:return 300; // 5分钟case GROWTH_QUERY:return 1800; // 30分钟default:return 600; // 10分钟}}
}// ClickHouse查询执行器
@Component
@Slf4j
public class ClickHouseExecutor implements QueryExecutor {private final JdbcTemplate clickHouseJdbcTemplate;@Overridepublic List<Map<String, Object>> executeQuery(String sql) {try {long startTime = System.currentTimeMillis();List<Map<String, Object>> result = clickHouseJdbcTemplate.queryForList(sql);long executionTime = System.currentTimeMillis() - startTime;log.info("ClickHouse查询执行完成: rows={}, time={}ms", result.size(), executionTime);return result;} catch (DataAccessException e) {log.error("ClickHouse查询执行失败: sql={}, error={}", sql, e.getMessage(), e);throw new QueryExecutionException("ClickHouse查询执行失败: " + e.getMessage());}}@Overridepublic boolean supports(String sql) {// 检查SQL是否适合在ClickHouse执行return sql.toLowerCase().contains("sum(") || sql.toLowerCase().contains("avg(") ||sql.toLowerCase().contains("group by");}
}
可视化生成服务
// 可视化服务
@Service
@Slf4j
public class VisualizationServiceImpl implements VisualizationService {private final ChartGenerator chartGenerator;private final InsightService insightService;private final ImageRenderer imageRenderer;/*** 生成可视化结果*/@Overridepublic VisualizationResult generateVisualization(QueryResult queryResult, ParsedQuery parsedQuery) {long startTime = System.currentTimeMillis();try {VisualizationResult result = new VisualizationResult();result.setQueryId(parsedQuery.getQueryId());// 1. 生成图表List<ChartDefinition> charts = generateCharts(queryResult, parsedQuery.getIntent());result.setCharts(charts);// 2. 生成图表图片List<String> chartImages = renderChartImages(charts);result.setChartImages(chartImages);// 3. 生成数据洞察List<DataInsight> insights = insightService.generateInsights(queryResult, parsedQuery);result.setInsights(insights);// 4. 生成推荐建议List<Recommendation> recommendations = generateRecommendations(insights, parsedQuery);result.setRecommendations(recommendations);result.setGenerationTime(System.currentTimeMillis() - startTime);log.info("可视化生成完成: queryId={}, charts={}, insights={}", parsedQuery.getQueryId(), charts.size(), insights.size());return result;} catch (Exception e) {log.error("可视化生成失败: queryId={}, error={}", parsedQuery.getQueryId(), e.getMessage(), e);throw new VisualizationException("可视化生成失败: " + e.getMessage());}}/*** 根据查询意图生成图表*/private List<ChartDefinition> generateCharts(QueryResult queryResult, QueryIntent intent) {List<ChartDefinition> charts = new ArrayList<>();switch (intent) {case SALES_QUERY:case TOP_N_QUERY:charts.add(createBarChart(queryResult, "销售排名"));charts.add(createPieChart(queryResult, "销售分布"));break;case GROWTH_QUERY:case TREND_QUERY:charts.add(createLineChart(queryResult, "增长趋势"));charts.add(createBarChart(queryResult, "同比对比"));break;case CATEGORY_DISTRIBUTION:charts.add(createPieChart(queryResult, "品类分布"));charts.add(createTreemapChart(queryResult, "品类层级"));break;default:charts.add(createTableChart(queryResult, "数据明细"));}return charts;}/*** 创建柱状图*/private ChartDefinition createBarChart(QueryResult queryResult, String title) {ChartDefinition chart = new ChartDefinition();chart.setType(ChartType.BAR);chart.setTitle(title);chart.setData(queryResult.getData());// 配置图表选项ChartOptions options = new ChartOptions();options.setXAxisField(detectCategoryField(queryResult.getData()));options.setYAxisField(detectValueField(queryResult.getData()));options.setShowLegend(true);options.setStacked(false);chart.setOptions(options);return chart;}/*** 创建折线图*/private ChartDefinition createLineChart(QueryResult queryResult, String title) {ChartDefinition chart = new ChartDefinition();chart.setType(ChartType.LINE);chart.setTitle(title);chart.setData(queryResult.getData());ChartOptions options = new ChartOptions();options.setXAxisField(detectTimeField(queryResult.getData()));options.setYAxisField(detectValueField(queryResult.getData()));options.setShowLegend(true);options.setSmooth(true);chart.setOptions(options);return chart;}/*** 渲染图表图片*/private List<String> renderChartImages(List<ChartDefinition> charts) {return charts.stream().map(chart -> imageRenderer.renderToBase64(chart)).collect(Collectors.toList());}/*** 生成推荐建议*/private List<Recommendation> generateRecommendations(List<DataInsight> insights, ParsedQuery parsedQuery) {List<Recommendation> recommendations = new ArrayList<>();for (DataInsight insight : insights) {if (insight.getType() == InsightType.NEGATIVE_TREND) {recommendations.add(createImprovementRecommendation(insight));} else if (insight.getType() == InsightType.POSITIVE_OPPORTUNITY) {recommendations.add(createOpportunityRecommendation(insight));}}return recommendations;}private Recommendation createImprovementRecommendation(DataInsight insight) {return Recommendation.builder().type(RecommendationType.IMPROVEMENT).title("改进建议").description(insight.getDescription() + ",建议采取以下措施:").actions(Arrays.asList("优化产品陈列", "加强促销活动", "培训销售人员")).priority(RecommendationPriority.HIGH).build();}private String detectCategoryField(List<Map<String, Object>> data) {// 自动检测分类字段if (data.isEmpty()) return "region";Map<String, Object> firstRow = data.get(0);for (String key : firstRow.keySet()) {if (key.contains("name") || key.contains("region") || key.contains("product")) {return key;}}return firstRow.keySet().iterator().next();}private String detectValueField(List<Map<String, Object>> data) {// 自动检测数值字段if (data.isEmpty()) return "total_sales";Map<String, Object> firstRow = data.get(0);for (String key : firstRow.keySet()) {Object value = firstRow.get(key);if (value instanceof Number) {return key;}}return "total_sales";}
}
AI洞察服务
// AI洞察服务
@Service
@Slf4j
public class InsightServiceImpl implements InsightService {private final OpenAIClient openAIClient;private final LocalModelClient localModelClient;/*** 生成数据洞察*/@Overridepublic List<DataInsight> generateInsights(QueryResult queryResult, ParsedQuery parsedQuery) {if (queryResult.getData().isEmpty()) {return Collections.emptyList();}try {// 1. 使用规则引擎生成基础洞察List<DataInsight> ruleBasedInsights = generateRuleBasedInsights(queryResult, parsedQuery);// 2. 使用AI生成深度洞察List<DataInsight> aiInsights = generateAIInsights(queryResult, parsedQuery);// 3. 合并和去重List<DataInsight> allInsights = mergeInsights(ruleBasedInsights, aiInsights);// 4. 排序和过滤return filterAndSortInsights(allInsights);} catch (Exception e) {log.warn("AI洞察生成失败,使用规则引擎: {}", e.getMessage());return generateRuleBasedInsights(queryResult, parsedQuery);}}/*** 规则引擎生成洞察*/private List<DataInsight> generateRuleBasedInsights(QueryResult queryResult, ParsedQuery parsedQuery) {List<DataInsight> insights = new ArrayList<>();// 销售下降检测if (parsedQuery.getIntent() == QueryIntent.GROWTH_QUERY) {insights.addAll(detectSalesDecline(queryResult));}// 异常值检测insights.addAll(detectAnomalies(queryResult));// 趋势检测if (parsedQuery.getIntent() == QueryIntent.TREND_QUERY) {insights.addAll(detectTrends(queryResult));}return insights;}/*** 检测销售下降*/private List<DataInsight> detectSalesDecline(QueryResult queryResult) {List<DataInsight> insights = new ArrayList<>();for (Map<String, Object> row : queryResult.getData()) {Object growthRateObj = row.get("growth_rate");if (growthRateObj instanceof Number) {double growthRate = ((Number) growthRateObj).doubleValue();if (growthRate < -0.1) { // 下降超过10%DataInsight insight = DataInsight.builder().type(InsightType.NEGATIVE_TREND).title("销售下降预警").description(String.format("%s区域销售同比下降%.2f%%", row.get("region"), growthRate * 100)).confidence(0.85).impact(InsightImpact.HIGH).build();insights.add(insight);}}}return insights;}/*** AI生成深度洞察*/private List<DataInsight> generateAIInsights(QueryResult queryResult, ParsedQuery parsedQuery) {try {// 构建AI提示String prompt = buildInsightPrompt(queryResult, parsedQuery);// 调用AI服务String aiResponse = openAIClient.generateCompletion(prompt);// 解析AI响应return parseAIResponse(aiResponse);} catch (Exception e) {log.error("AI洞察生成失败: {}", e.getMessage(), e);return Collections.emptyList();}}private String buildInsightPrompt(QueryResult queryResult, ParsedQuery parsedQuery) {StringBuilder prompt = new StringBuilder();prompt.append("你是一个资深零售数据分析师。请基于以下数据生成3-5个关键洞察:\n\n");prompt.append("查询意图: ").append(parsedQuery.getIntent().getDescription()).append("\n");prompt.append("原始问题: ").append(parsedQuery.getOriginalQuery()).append("\n\n");prompt.append("数据摘要:\n");prompt.append(queryResult.getSummary().getSummaryText()).append("\n\n");prompt.append("详细数据 (前10行):\n");queryResult.getData().stream().limit(10).forEach(row -> {prompt.append(row.toString()).append("\n");});prompt.append("\n请以JSON格式返回洞察,包含以下字段:type, title, description, confidence, impact");return prompt.toString();}
}
测试用例
// 集成测试
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles("test")
class ChatBIServiceIntegrationTest {@Autowiredprivate ChatBIService chatBIService;@Autowiredprivate SalesDataRepository salesDataRepository;@Autowiredprivate TestDataBuilder testDataBuilder;@BeforeAllvoid setUp() {// 初始化测试数据testDataBuilder.buildSalesTestData();}@Test@DisplayName("自然语言查询 - 销售排名查询")void testNaturalLanguageQuery_SalesRanking() {// 准备测试数据ChatQueryRequest request = ChatQueryRequest.builder().query("显示上海区域三月份销售额前十的门店,并对比去年同期增长率").userId("test_user").userRole("OPERATION_DIRECTOR").build();UserPrincipal user = UserPrincipal.builder().username("test_user").authorities(Set.of(new SimpleGrantedAuthority("DATA_ACCESS"))).build();// 执行测试ChatResponse response = chatBIService.processNaturalLanguageQuery(request, user);// 验证结果assertNotNull(response);assertTrue(response.isSuccess());assertNotNull(response.getQueryId());assertNotNull(response.getData());assertFalse(response.getData().isEmpty());assertNotNull(response.getVisualization());assertFalse(response.getInsights().isEmpty());assertFalse(response.getRecommendations().isEmpty());// 验证查询意图识别assertEquals(QueryIntent.TOP_N_QUERY, response.getParsedQuery().getIntent());// 验证数据准确性assertTrue(response.getData().size() <= 10); // TOP 10}@Test@DisplayName("自然语言查询 - 增长趋势分析")void testNaturalLanguageQuery_GrowthTrend() {ChatQueryRequest request = ChatQueryRequest.builder().query("分析北京区域Q1销售额同比增长趋势").userId("test_user").build();UserPrincipal user = UserPrincipal.builder().username("test_user").authorities(Set.of(new SimpleGrantedAuthority("DATA_ACCESS"))).build();ChatResponse response = chatBIService.processNaturalLanguageQuery(request, user);assertNotNull(response);assertTrue(response.isSuccess());assertEquals(QueryIntent.GROWTH_QUERY, response.getParsedQuery().getIntent());// 验证可视化结果VisualizationResult visualization = response.getVisualization();assertNotNull(visualization);assertFalse(visualization.getCharts().isEmpty());// 应该包含趋势图表boolean hasTrendChart = visualization.getCharts().stream().anyMatch(chart -> chart.getType() == ChartType.LINE);assertTrue(hasTrendChart);}@Test@DisplayName("自然语言查询 - 权限验证失败")void testNaturalLanguageQuery_AccessDenied() {ChatQueryRequest request = ChatQueryRequest.builder().query("显示所有门店销售数据").userId("no_access_user").build();UserPrincipal user = UserPrincipal.builder().username("no_access_user").authorities(Set.of()) // 无数据访问权限.build();// 验证异常抛出assertThrows(AccessDeniedException.class, () -> {chatBIService.processNaturalLanguageQuery(request, user);});}@Test@DisplayName("自然语言查询 - 复杂查询性能测试")void testNaturalLanguageQuery_Performance() {ChatQueryRequest request = ChatQueryRequest.builder().query("显示全国各区域近三年每月销售额趋势,并按品类分析增长情况").userId("test_user").build();UserPrincipal user = UserPrincipal.builder().username("test_user").authorities(Set.of(new SimpleGrantedAuthority("DATA_ACCESS"))).build();long startTime = System.currentTimeMillis();ChatResponse response = chatBIService.processNaturalLanguageQuery(request, user);long executionTime = System.currentTimeMillis() - startTime;assertNotNull(response);assertTrue(response.isSuccess());// 性能断言:复杂查询应在10秒内完成assertTrue(executionTime < 10000, "复杂查询执行时间应小于10秒,实际: " + executionTime + "ms");log.info("复杂查询性能测试 - 执行时间: {}ms", executionTime);}
}// 性能压测
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles("test")
class ChatBIPerformanceTest {@Autowiredprivate ChatBIService chatBIService;private List<String> testQueries;@BeforeAllvoid setUp() {// 准备测试查询集testQueries = Arrays.asList("显示上海区域销售前十的门店","分析北京区域Q1销售增长趋势", "对比华东和华南区域销售表现","查看电子产品品类销售分布","分析促销活动对销售额的影响");}@Test@DisplayName("并发性能测试")void testConcurrentPerformance() throws InterruptedException {int threadCount = 20;int queriesPerThread = 10;ExecutorService executor = Executors.newFixedThreadPool(threadCount);CountDownLatch latch = new CountDownLatch(threadCount);AtomicInteger successCount = new AtomicInteger(0);AtomicInteger failureCount = new AtomicInteger(0);List<Long> responseTimes = Collections.synchronizedList(new ArrayList<>());UserPrincipal user = UserPrincipal.builder().username("load_test_user").authorities(Set.of(new SimpleGrantedAuthority("DATA_ACCESS"))).build();long startTime = System.currentTimeMillis();for (int i = 0; i < threadCount; i++) {executor.submit(() -> {try {for (int j = 0; j < queriesPerThread; j++) {String query = testQueries.get(j % testQueries.size());ChatQueryRequest request = ChatQueryRequest.builder().query(query).userId("load_test_user").build();long queryStart = System.currentTimeMillis();try {ChatResponse response = chatBIService.processNaturalLanguageQuery(request, user);if (response.isSuccess()) {successCount.incrementAndGet();responseTimes.add(System.currentTimeMillis() - queryStart);} else {failureCount.incrementAndGet();}} catch (Exception e) {failureCount.incrementAndGet();}}} finally {latch.countDown();}});}latch.await(2, TimeUnit.MINUTES);executor.shutdown();long totalTime = System.currentTimeMillis() - startTime;long totalQueries = threadCount * queriesPerThread;// 计算性能指标double successRate = (double) successCount.get() / totalQueries * 100;double avgResponseTime = responseTimes.stream().mapToLong(Long::longValue).average().orElse(0);double qps = (double) totalQueries / (totalTime / 1000.0);log.info("性能测试结果:");log.info("总查询数: {}", totalQueries);log.info("成功数: {}", successCount.get());log.info("失败数: {}", failureCount.get());log.info("成功率: {:.2f}%", successRate);log.info("平均响应时间: {:.2f}ms", avgResponseTime);log.info("QPS: {:.2f}", qps);log.info("总耗时: {}ms", totalTime);// 性能断言assertTrue(successRate >= 95, "成功率应大于95%,实际: " + successRate + "%");assertTrue(avgResponseTime < 3000, "平均响应时间应小于3秒,实际: " + avgResponseTime + "ms");assertTrue(qps >= 5, "QPS应大于5,实际: " + qps);}
}// 数据一致性测试
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class DataConsistencyTest {@Autowiredprivate SalesDataRepository salesDataRepository;@Autowiredprivate DataQueryService dataQueryService;@Test@DisplayName("数据一致性验证 - MySQL vs ClickHouse")void testDataConsistency() {// 在MySQL和ClickHouse中执行相同的聚合查询String sql = "SELECT region, SUM(sale_amount) as total_sales " +"FROM sales_data " +"WHERE sale_date BETWEEN '2024-01-01' AND '2024-03-31' " +"GROUP BY region " +"ORDER BY total_sales DESC";// MySQL查询结果List<Map<String, Object>> mysqlResult = executeMySQLQuery(sql);// ClickHouse查询结果 List<Map<String, Object>> clickhouseResult = executeClickHouseQuery(sql);// 验证结果一致性assertEquals(mysqlResult.size(), clickhouseResult.size(), "结果集大小不一致");for (int i = 0; i < mysqlResult.size(); i++) {Map<String, Object> mysqlRow = mysqlResult.get(i);Map<String, Object> chRow = clickhouseResult.get(i);assertEquals(mysqlRow.get("region"), chRow.get("region"), "区域不一致");BigDecimal mysqlSales = (BigDecimal) mysqlRow.get("total_sales");BigDecimal chSales = (BigDecimal) chRow.get("total_sales");// 允许0.1%的误差BigDecimal difference = mysqlSales.subtract(chSales).abs();BigDecimal tolerance = mysqlSales.multiply(new BigDecimal("0.001"));assertTrue(difference.compareTo(tolerance) <= 0, String.format("销售额不一致: MySQL=%s, ClickHouse=%s", mysqlSales, chSales));}}
}
