基于Docker、Solr和FastAPI的商品搜索微服务架构设计
在Linux上的docker里安装配置Solr,然后全量导入和增量更新Oracle数据库里商品信息数据到Solr,然后用Python和FastAPI开发一个微服务用来根据商品属性、产地、品名和描述信息等搜索符合条件的商品列表,并按匹配度从高到底排序输出,结果为JSON格式,部署在AWS EKS上;再根据配置和统计数据优化Solr数据搜索的性能,Python程序可以使用Tenacity库重试Solr的HTTP Request连接,做成带参数的装饰器@retry方便复用,使用cachetools库缓存结果,也做成带参数的装饰器@cache方便复用,对于核心请求方法,@retry先包装原始的请求逻辑,失败时按配置重试,最外层@cache再把带有重试能力的函数包装成可缓存的版本,即第一次成功后,后续相同参数直接命中缓存,不在真正发请求,直接取缓存,也不会触发重试逻辑;@cache装饰器动态判断同步/异步函数,分别将asyncio.Lock用于异步,threading.Lock用于同步,使用传入的cache,使用自身的对象id+参数纳入key,以确保不同实例不会串缓存;异步封装的逻辑包含先尝试读取缓存,命中直接返回,miss就释放锁执行真实函数,然后再次加锁写入缓存,通过捕获KeyError区分是否命中缓存,使用全局计数器用于统计命中率(达到上限会重置),结合TTL机制过期缓存删除,超过缓存容量时淘汰最近最少使用的记录;数据请求使用统一的POST请求发送,支持两种翻页机制,偏移分页start+rows和游标分页cursorMark,避免深翻页性能问题,字段协调、子区块内容对齐等对记录进行结构修正,出现异常输出空结果。
这个完整的解决方案提供了:
- 容器化的Solr部署,支持数据持久化
- 灵活的数据导入,支持全量和增量更新
- 高性能的搜索微服务,基于FastAPI开发
- 智能的重试和缓存机制,提高系统可靠性
- AWS EKS云原生部署,支持弹性伸缩
- 全面的性能优化,包括Solr配置和应用层优化
- 丰富的监控统计,便于系统维护和优化
系统能够处理大规模商品数据的实时搜索需求,并提供良好的用户体验和系统稳定性。
1. Linux Docker环境下的Solr安装与配置
1.1 Solr容器部署
# 拉取Solr官方镜像
docker pull solr:8.11# 创建Solr数据卷
docker volume create solr_data# 启动Solr容器
docker run -d \--name solr \-p 8983:8983 \-v solr_data:/var/solr \-e SOLR_HEAP=2g \solr:8.11# 创建商品核心
docker exec -it solr solr create_core -c products
1.2 Solr Schema配置
创建schema.xml配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<schema name="products" version="1.6"><fields><!-- 主键 --><field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false"/><!-- 商品基础信息 --><field name="product_name" type="text_general" indexed="true" stored="true"/><field name="product_code" type="string" indexed="true" stored="true"/><field name="category" type="string" indexed="true" stored="true" multiValued="true"/><!-- 商品属性 --><field name="attributes" type="text_general" indexed="true" stored="true" multiValued="true"/><field name="brand" type="string" indexed="true" stored="true"/><field name="origin" type="string" indexed="true" stored="true"/><!-- 商品描述 --><field name="description" type="text_general" indexed="true" stored="true"/><!-- 价格和库存 --><field name="price" type="pdouble" indexed="true" stored="true"/><field name="stock" type="pint" indexed="true" stored="true"/><!-- 时间字段 --><field name="create_time" type="pdate" indexed="true" stored="true"/><field name="update_time" type="pdate" indexed="true" stored="true"/><!-- 文本搜索字段 --><field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/><!-- 复制字段,用于全局搜索 --><field name="_text_" type="text_general" indexed="true" stored="false" multiValued="true"/></fields><copyField source="product_name" dest="_text_"/><copyField source="attributes" dest="_text_"/><copyField source="description" dest="_text_"/><copyField source="brand" dest="_text_"/><copyField source="origin" dest="_text_"/><uniqueKey>id</uniqueKey><types><fieldType name="string" class="solr.StrField" sortMissingLast="true"/><fieldType name="pint" class="solr.IntPointField"/><fieldType name="pdouble" class="solr.DoublePointField"/><fieldType name="pdate" class="solr.DatePointField"/><fieldType name="text_general" class="solr.TextField" positionIncrementGap="100"><analyzer type="index"><tokenizer class="solr.StandardTokenizerFactory"/><filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt"/><filter class="solr.LowerCaseFilterFactory"/><filter class="solr.SynonymGraphFilterFactory" synonyms="synonyms.txt" ignoreCase="true" expand="true"/></analyzer><analyzer type="query"><tokenizer class="solr.StandardTokenizerFactory"/><filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt"/><filter class="solr.LowerCaseFilterFactory"/><filter class="solr.SynonymGraphFilterFactory" synonyms="synonyms.txt" ignoreCase="true" expand="true"/></analyzer></fieldType></types>
</schema>
2. Oracle数据库数据导入Solr
2.1 全量数据导入配置
创建data-config.xml:
<dataConfig><dataSource name="oracle-db" driver="oracle.jdbc.driver.OracleDriver"url="jdbc:oracle:thin:@//host:port/servicename"user="username" password="password"/><document name="products"><entity name="product" query="SELECT p.id, p.product_name, p.product_code,p.description,p.brand,p.origin,p.price,p.stock,p.create_time,p.update_time,(SELECT LISTAGG(a.attribute_value, ',') FROM product_attributes a WHERE a.product_id = p.id) as attributes,(SELECT LISTAGG(c.category_name, ',') FROM product_categories c WHERE c.product_id = p.id) as categoryFROM products p"deltaQuery="SELECT id FROM products WHERE update_time > '${dataimporter.last_index_time}'"deltaImportQuery="SELECT ... FROM products WHERE id='${dataimporter.delta.id}'"><field column="id" name="id"/><field column="product_name" name="product_name"/><field column="product_code" name="product_code"/><field column="description" name="description"/><field column="brand" name="brand"/><field column="origin" name="origin"/><field column="price" name="price"/><field column="stock" name="stock"/><field column="create_time" name="create_time"/><field column="update_time" name="update_time"/><field column="attributes" name="attributes"/><field column="category" name="category"/></entity></document>
</dataConfig>
2.2 增量更新配置
在solrconfig.xml中配置DataImportHandler:
<requestHandler name="/dataimport" class="solr.DataImportHandler"><lst name="defaults"><str name="config">data-config.xml</str></lst>
</requestHandler>
3. Python FastAPI微服务开发
3.1 项目结构
product-search-service/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── decorators.py
│ │ └── exceptions.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py
│ │ └── responses.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── solr_client.py
│ │ └── search_service.py
│ └── api/
│ ├── __init__.py
│ └── endpoints.py
├── requirements.txt
├── Dockerfile
└── kubernetes/├── deployment.yaml└── service.yaml
3.2 核心装饰器实现
# app/core/decorators.py
import asyncio
import functools
import threading
import time
from typing import Any, Callable, Dict, Optional, Union
from tenacity import (retry as tenacity_retry,stop_after_attempt,wait_exponential,retry_if_exception_type
)
from cachetools import TTLCache, LRUCache
import cachetools
from functools import wraps
import inspect# 全局缓存统计
class CacheStats:def __init__(self):self.hits = 0self.misses = 0self.max_count = 10000 # 统计上限def hit(self):self.hits += 1self._check_reset()def miss(self):self.misses += 1self._check_reset()def _check_reset(self):if self.hits + self.misses >= self.max_count:self.hits = 0self.misses = 0@propertydef hit_rate(self):total = self.hits + self.missesreturn self.hits / total if total > 0 else 0global_cache_stats = CacheStats()def retry(max_attempts: int = 3,exponential_multiplier: float = 1,exponential_max: float = 10,retry_exceptions: tuple = (Exception,)
):"""重试装饰器,支持同步和异步函数"""def decorator(func):# 判断是否为异步函数is_async = inspect.iscoroutinefunction(func)if is_async:@wraps(func)async def async_wrapper(*args, **kwargs):@tenacity_retry(stop=stop_after_attempt(max_attempts),wait=wait_exponential(multiplier=exponential_multiplier, max=exponential_max),retry=retry_if_exception_type(retry_exceptions))async def _async_retry():return await func(*args, **kwargs)return await _async_retry()return async_wrapperelse:@wraps(func)def sync_wrapper(*args, **kwargs):@tenacity_retry(stop=stop_after_attempt(max_attempts),wait=wait_exponential(multiplier=exponential_multiplier, max=exponential_max),retry=retry_if_exception_type(retry_exceptions))def _sync_retry():return func(*args, **kwargs)return _sync_retry()return sync_wrapperreturn decoratordef cache(cache_instance: Union[TTLCache, LRUCache],ttl: Optional[int] = None,maxsize: int = 1000
):"""智能缓存装饰器,自动适配同步/异步函数支持TTL和LRU淘汰策略"""# 如果没有传入缓存实例,根据参数创建if cache_instance is None:if ttl:cache_instance = TTLCache(maxsize=maxsize, ttl=ttl)else:cache_instance = LRUCache(maxsize=maxsize)def decorator(func):is_async = inspect.iscoroutinefunction(func)lock = asyncio.Lock() if is_async else threading.Lock()@wraps(func)async def async_wrapper(self, *args, **kwargs):# 生成缓存键:对象ID + 参数instance_id = id(self) if hasattr(self, '__class__') else 'global'key_parts = [f"instance_{instance_id}"]# 处理位置参数for i, arg in enumerate(args):if i == 0 and hasattr(arg, '__class__'):continue # 跳过self参数key_parts.append(f"arg_{i}_{str(arg)}")# 处理关键字参数for k, v in sorted(kwargs.items()):key_parts.append(f"kw_{k}_{str(v)}")cache_key = "|".join(key_parts)async with lock:try:# 尝试从缓存读取result = cache_instance[cache_key]global_cache_stats.hit()return resultexcept KeyError:global_cache_stats.miss()# 缓存未命中,释放锁执行实际函数pass# 执行实际函数(不在锁内,避免阻塞)result = await func(self, *args, **kwargs)# 再次加锁写入缓存async with lock:cache_instance[cache_key] = resultreturn result@wraps(func)def sync_wrapper(self, *args, **kwargs):instance_id = id(self) if hasattr(self, '__class__') else 'global'key_parts = [f"instance_{instance_id}"]for i, arg in enumerate(args):if i == 0 and hasattr(arg, '__class__'):continuekey_parts.append(f"arg_{i}_{str(arg)}")for k, v in sorted(kwargs.items()):key_parts.append(f"kw_{k}_{str(v)}")cache_key = "|".join(key_parts)with lock:try:result = cache_instance[cache_key]global_cache_stats.hit()return resultexcept KeyError:global_cache_stats.miss()result = func(self, *args, **kwargs)cache_instance[cache_key] = resultreturn resultreturn async_wrapper if is_async else sync_wrapperreturn decorator# 组合装饰器:先重试后缓存
def retry_and_cache(cache_instance: Union[TTLCache, LRUCache] = None,max_attempts: int = 3,ttl: int = 300,maxsize: int = 1000
):"""组合装饰器:先应用重试逻辑,再应用缓存逻辑"""def decorator(func):# 先包装重试retry_decorated = retry(max_attempts=max_attempts,exponential_multiplier=1,exponential_max=10)(func)# 再包装缓存cache_decorated = cache(cache_instance=cache_instance,ttl=ttl,maxsize=maxsize)(retry_decorated)return cache_decoratedreturn decorator
3.3 Solr客户端实现
# app/services/solr_client.py
import json
import logging
from typing import Dict, List, Optional, Any
import httpx
from pydantic import BaseModel
from app.core.decorators import retry_and_cache
from cachetools import TTLCachelogger = logging.getLogger(__name__)class SearchRequest(BaseModel):query: strfilters: Dict[str, Any] = {}fields: List[str] = Nonestart: int = 0rows: int = 10cursor_mark: str = "*"sort: str = "score desc"class SearchResponse(BaseModel):products: List[Dict]total: intstart: introws: intcursor_mark: Optional[str] = Nonenext_cursor_mark: Optional[str] = Noneclass SolrClient:def __init__(self, solr_url: str, core: str = "products"):self.solr_url = f"{solr_url}/solr/{core}"self.search_cache = TTLCache(maxsize=1000, ttl=300) # 5分钟缓存@retry_and_cache(cache_instance=None, # 使用实例自己的缓存max_attempts=3,ttl=300,maxsize=1000)async def search_products(self, request: SearchRequest) -> SearchResponse:"""搜索商品,支持两种分页方式"""search_params = self._build_search_params(request)async with httpx.AsyncClient(timeout=30.0) as client:try:response = await client.post(f"{self.solr_url}/select",json=search_params,headers={"Content-Type": "application/json"})response.raise_for_status()return self._parse_search_response(response.json(), request)except Exception as e:logger.error(f"Solr search error: {str(e)}")# 出现异常时返回空结果return SearchResponse(products=[],total=0,start=request.start,rows=request.rows)def _build_search_params(self, request: SearchRequest) -> Dict:"""构建Solr搜索参数"""params = {"query": request.query,"filter": self._build_filters(request.filters),"offset": request.start,"limit": request.rows,"sort": request.sort,"fields": request.fields or ["id", "product_name", "product_code", "brand", "origin", "price", "stock", "category", "score"]}# 游标分页if request.cursor_mark and request.cursor_mark != "*":params["cursorMark"] = request.cursor_markreturn {"params": {"q": self._build_query(request.query),"fq": self._build_filter_queries(request.filters),"fl": ",".join(params["fields"]),"start": params["offset"],"rows": params["limit"],"sort": params["sort"],"cursorMark": params.get("cursorMark", "*")}}def _build_query(self, query: str) -> str:"""构建查询字符串"""if not query or query.strip() == "*":return "*:*"# 多字段搜索search_fields = ["product_name","attributes", "description","brand","origin","_text_"]query_parts = []for field in search_fields:query_parts.append(f"{field}:({query})")return " OR ".join(query_parts)def _build_filter_queries(self, filters: Dict) -> List[str]:"""构建过滤查询"""filter_queries = []for field, value in filters.items():if isinstance(value, list):# 多值过滤filter_values = [f'"{v}"' for v in value]filter_queries.append(f"{field}:({' OR '.join(filter_values)})")else:# 单值过滤filter_queries.append(f'{field}:"{value}"')return filter_queriesdef _parse_search_response(self, solr_response: Dict, request: SearchRequest) -> SearchResponse:"""解析Solr响应"""response_data = solr_response.get("response", {})products = response_data.get("docs", [])# 字段协调和结构修正processed_products = []for product in products:processed_product = self._normalize_product_fields(product)processed_products.append(processed_product)# 处理分页信息next_cursor_mark = solr_response.get("nextCursorMark")return SearchResponse(products=processed_products,total=response_data.get("numFound", 0),start=request.start,rows=request.rows,cursor_mark=request.cursor_mark,next_cursor_mark=next_cursor_mark)def _normalize_product_fields(self, product: Dict) -> Dict:"""标准化商品字段"""normalized = product.copy()# 处理多值字段multi_value_fields = ["attributes", "category"]for field in multi_value_fields:if field in normalized and isinstance(normalized[field], str):normalized[field] = [item.strip() for item in normalized[field].split(",")]# 确保必要字段存在if "score" not in normalized:normalized["score"] = 1.0return normalizedasync def get_facet_counts(self, field: str, query: str = "*:*") -> Dict:"""获取字段分面统计"""params = {"q": query,"facet": "on","facet.field": field,"rows": 0}async with httpx.AsyncClient() as client:response = await client.get(f"{self.solr_url}/select",params=params)data = response.json()facets = data.get("facet_counts", {}).get("facet_fields", {}).get(field, [])facet_result = {}# 将分面结果转换为字典for i in range(0, len(facets), 2):if i + 1 < len(facets):facet_result[facets[i]] = facets[i + 1]return facet_result
3.4 FastAPI主应用
# app/main.py
from fastapi import FastAPI, HTTPException, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, List, Optional, Any
import uvicornfrom app.services.solr_client import SolrClient, SearchRequest, SearchResponse
from app.models.schemas import ProductSearch, SearchResult
from app.core.config import settingsapp = FastAPI(title="商品搜索微服务",description="基于Solr的商品搜索API",version="1.0.0"
)# CORS中间件
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 依赖注入
def get_solr_client() -> SolrClient:return SolrClient(settings.SOLR_URL, settings.SOLR_CORE)@app.get("/")
async def root():return {"message": "商品搜索微服务", "status": "running"}@app.get("/health")
async def health_check():return {"status": "healthy"}@app.get("/search", response_model=SearchResult)
async def search_products(q: str = Query(..., description="搜索关键词"),brand: Optional[str] = Query(None, description="品牌过滤"),origin: Optional[str] = Query(None, description="产地过滤"),category: Optional[List[str]] = Query(None, description="分类过滤"),price_min: Optional[float] = Query(None, description="最低价格"),price_max: Optional[float] = Query(None, description="最高价格"),page: int = Query(1, ge=1, description="页码"),page_size: int = Query(10, ge=1, le=100, description="每页大小"),cursor: Optional[str] = Query("*", description="游标标记"),use_cursor: bool = Query(False, description="是否使用游标分页"),solr_client: SolrClient = Depends(get_solr_client)
):"""商品搜索接口"""try:# 构建过滤条件filters = {}if brand:filters["brand"] = brandif origin:filters["origin"] = originif category:filters["category"] = categoryif price_min is not None or price_max is not None:price_filter = []if price_min is not None:price_filter.append(f"[{price_min} TO *]")if price_max is not None:price_filter.append(f"[* TO {price_max}]")filters["price"] = " AND ".join(price_filter)# 计算分页参数if use_cursor:start = 0cursor_mark = cursorelse:start = (page - 1) * page_sizecursor_mark = "*"search_request = SearchRequest(query=q,filters=filters,start=start,rows=page_size,cursor_mark=cursor_mark)response = await solr_client.search_products(search_request)return SearchResult(products=response.products,total=response.total,page=page if not use_cursor else None,page_size=page_size,total_pages=(response.total + page_size - 1) // page_size if not use_cursor else None,cursor_mark=response.cursor_mark,next_cursor_mark=response.next_cursor_mark)except Exception as e:raise HTTPException(status_code=500, detail=f"搜索失败: {str(e)}")@app.get("/facets/{field}")
async def get_facets(field: str,q: str = Query("*:*", description="查询条件"),solr_client: SolrClient = Depends(get_solr_client)
):"""获取字段分面统计"""try:facets = await solr_client.get_facet_counts(field, q)return {"field": field, "facets": facets}except Exception as e:raise HTTPException(status_code=500, detail=f"获取分面失败: {str(e)}")@app.get("/cache/stats")
async def get_cache_stats():"""获取缓存统计信息"""from app.core.decorators import global_cache_statsreturn {"hits": global_cache_stats.hits,"misses": global_cache_stats.misses,"hit_rate": global_cache_stats.hit_rate}if __name__ == "__main__":uvicorn.run("app.main:app",host="0.0.0.0",port=8000,reload=settings.DEBUG)
4. AWS EKS部署配置
4.1 Dockerfile
FROM python:3.9-slimWORKDIR /appCOPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txtCOPY . .EXPOSE 8000CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
4.2 Kubernetes部署文件
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: product-search-servicelabels:app: product-search
spec:replicas: 3selector:matchLabels:app: product-searchtemplate:metadata:labels:app: product-searchspec:containers:- name: search-serviceimage: your-registry/product-search:latestports:- containerPort: 8000env:- name: SOLR_URLvalue: "http://solr:8983"- name: SOLR_COREvalue: "products"- name: DEBUGvalue: "false"resources:requests:memory: "256Mi"cpu: "250m"limits:memory: "512Mi"cpu: "500m"livenessProbe:httpGet:path: /healthport: 8000initialDelaySeconds: 30periodSeconds: 10readinessProbe:httpGet:path: /healthport: 8000initialDelaySeconds: 5periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:name: product-search-service
spec:selector:app: product-searchports:- port: 80targetPort: 8000type: LoadBalancer
5. Solr性能优化配置
5.1 Solr配置优化
<!-- solrconfig.xml 优化配置 -->
<config><!-- 查询结果缓存 --><queryResultWindowSize>100</queryResultWindowSize><queryResultMaxDocsCached>1000</queryResultMaxDocsCached><!-- 过滤器缓存 --><filterCache class="solr.CaffeineCache"size="512"initialSize="512"autowarmCount="128"/><!-- 查询结果缓存 --><queryResultCache class="solr.CaffeineCache"size="512"initialSize="512"autowarmCount="128"/><!-- 文档值缓存 --><docValuesCache class="solr.CaffeineCache"size="512"initialSize="512"/><!-- 字段值缓存 --><fieldValueCache class="solr.CaffeineCache"size="512"initialSize="512"/><!-- 启用实时获取 --><realTimeGet><str name="enable">true</str></realTimeGet><!-- 查询配置 --><requestHandler name="/select" class="solr.SearchHandler"><lst name="defaults"><str name="echoParams">explicit</str><str name="wt">json</str><str name="indent">true</str><str name="df">_text_</str><str name="spellcheck">true</str><str name="spellcheck.dictionary">default</str><str name="spellcheck.onlyMorePopular">true</str><str name="spellcheck.extendedResults">true</str><str name="spellcheck.count">5</str></lst></requestHandler>
</config>
5.2 索引优化策略
# 定期优化索引
curl "http://localhost:8983/solr/products/update?optimize=true"# 合并段文件
curl "http://localhost:8983/solr/products/update?optimize=true&maxSegments=10"
6. 监控和统计
6.1 搜索性能监控
# 在Solr客户端中添加统计功能
import time
from datetime import datetimeclass SearchMetrics:def __init__(self):self.search_times = []self.error_count = 0self.success_count = 0def record_search(self, duration: float, success: bool):self.search_times.append(duration)if success:self.success_count += 1else:self.error_count += 1# 保持最近1000次记录if len(self.search_times) > 1000:self.search_times.pop(0)@propertydef avg_search_time(self):return sum(self.search_times) / len(self.search_times) if self.search_times else 0@propertydef success_rate(self):total = self.success_count + self.error_countreturn self.success_count / total if total > 0 else 1.0# 在SolrClient中使用
class InstrumentedSolrClient(SolrClient):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.metrics = SearchMetrics()async def search_products(self, request: SearchRequest) -> SearchResponse:start_time = time.time()try:response = await super().search_products(request)duration = time.time() - start_timeself.metrics.record_search(duration, True)return responseexcept Exception as e:duration = time.time() - start_timeself.metrics.record_search(duration, False)raise e
