使用FastAPI微服务在AWS EKS上实现AI会话历史的管理
架构概述
本文介绍如何使用FastAPI构建微服务架构,在AWS EKS上部署两个微服务:
- 服务A:接收用户提示
- 服务B:处理对话逻辑,与Redis缓存和MongoDB数据库交互
该架构利用AWS ElastiCache(Redis)实现快速响应,并通过MongoDB RDS持久化存储会话数据。
该架构提供了:
- 通过Kubernetes实现的可扩展性
- 通过Redis缓存实现的快速响应
- 通过MongoDB实现的持久化存储
- 通过微服务实现的模块化设计
这种架构非常适合需要处理大量会话数据并保证快速响应的对话式AI应用场景。
组件说明
AWS EKS (Elastic Kubernetes Service)
作为容器编排平台,托管所有微服务。
微服务A (Prompt Receiver)
- 基于FastAPI构建
- 通过REST API接收用户提示
- 将请求转发给微服务B
微服务B (Conversational Logic)
- 基于FastAPI构建
- 访问ElastiCache(Redis)缓存最近对话
- 使用MongoDB RDS持久化存储会话数据
AWS ElastiCache (Redis)
- 提供内存数据库服务
- 加速实时交互响应
- 缓存最近对话内容
MongoDB on RDS
- 关系型数据库服务
- 持久化存储完整聊天记录
- 支持元数据存储和长期检索
Kubernetes部署配置
Docker镜像构建
两个服务共享相同的Docker基础镜像:
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
依赖文件requirements.txt:
fastapi uvicorn httpx redis pymongo
微服务A部署配置(service-a-deployment.yaml)
apiVersion: apps/v1
kind: Deployment
metadata:name: service-a-deployment
spec:replicas: 2selector:matchLabels:app: service-atemplate:metadata:labels:app: service-aspec:containers:- name: service-aimage: your-docker-imageports:- containerPort: 8000env:- name: SERVICE_B_URLvalue: "http://service-b-service:8000/process"- name: REDIS_HOSTvalue: "redis-service"- name: REDIS_PORTvalue: "6379"- name: MONGODB_URIvalue: "mongodb://mongodb-service:27017"
---
apiVersion: v1
kind: Service
metadata:name: service-a-service
spec:ports:- port: 8000selector:app: service-a
微服务B部署配置(service-b-deployment.yaml)
apiVersion: apps/v1
kind: Deployment
metadata:name: service-b-deployment
spec:replicas: 2selector:matchLabels:app: service-btemplate:metadata:labels:app: service-bspec:containers:- name: service-bimage: your-docker-imageports:- containerPort: 8000env:- name: REDIS_HOSTvalue: "redis-service"- name: REDIS_PORTvalue: "6379"- name: MONGODB_URIvalue: "mongodb://mongodb-service:27017"
---
apiVersion: v1
kind: Service
metadata:name: service-b-service
spec:ports:- port: 8000selector:app: service-b
Redis部署配置(redis-deployment.yaml)
apiVersion: apps/v1
kind: Deployment
metadata:name: redis-deployment
spec:replicas: 1selector:matchLabels:app: redistemplate:metadata:labels:app: redisspec:containers:- name: redisimage: redis:6.2.5-alpineports:- containerPort: 6379env:- name: REDIS_PASSWORDvalueFrom:secretKeyRef:name: redis-secretkey: REDIS_PASSWORD
---
apiVersion: v1
kind: Service
metadata:name: redis-service
spec:ports:- port: 6379selector:app: redis
MongoDB部署配置(mongodb-deployment.yaml)
apiVersion: apps/v1
kind: Deployment
metadata:name: mongodb-deployment
spec:replicas: 1selector:matchLabels:app: mongodbtemplate:metadata:labels:app: mongodbspec:containers:- name: mongodbimage: mongo:5.0ports:- containerPort: 27017env:- name: MONGO_INITDB_ROOT_USERNAMEvalueFrom:secretKeyRef:name: mongodb-secretkey: MONGO_INITDB_ROOT_USERNAME- name: MONGO_INITDB_ROOT_PASSWORDvalueFrom:secretKeyRef:name: mongodb-secretkey: MONGO_INITDB_ROOT_PASSWORD
---
apiVersion: v1
kind: Service
metadata:name: mongodb-service
spec:ports:- port: 27017selector:app: mongodb
微服务实现细节
微服务A实现(service_a/main.py)
from fastapi import FastAPI, Request
import httpxapp = FastAPI()
SERVICE_B_URL = "http://service-b.default.svc.cluster.local/process"@app.post("/prompt")
async def receive_prompt(request: Request):data = await request.json()async with httpx.AsyncClient() as client:response = await client.post(SERVICE_B_URL, json=data)return response.json()
微服务B实现(service_b/main.py)
from fastapi import FastAPI, Request
from redis import Redis
from pymongo import MongoClient
import osapp = FastAPI()# 初始化Redis连接
redis_client = Redis(host=os.getenv("REDIS_HOST"),port=int(os.getenv("REDIS_PORT")),decode_responses=True
)# 初始化MongoDB连接
mongo_client = MongoClient(os.getenv("MONGODB_URI"))
db = mongo_client["chatbot"]
conversations = db["conversations"]@app.post("/process")
async def process_prompt(request: Request):data = await request.json()session_id = data["session_id"]prompt = data["prompt"]# 检查Redis缓存cached_response = redis_client.get(f"{session_id}:{prompt}")if cached_response:return {"response": cached_response, "cached": True}# 模拟AI处理(占位符)ai_response = f"Processed: {prompt}"# 缓存响应redis_client.set(f"{session_id}:{prompt}", ai_response, ex=3600)# 存储到MongoDBconversations.update_one({"session_id": session_id},{"$push": {"messages": {"prompt": prompt, "response": ai_response}}},upsert=True)return {"response": ai_response, "cached": False}
安全注意事项
-
环境变量管理
- 敏感信息如数据库凭证应存储在Kubernetes Secrets中
-
IAM角色配置
- 使用IAM角色服务账户(IRSA)为EKS Pod提供安全访问AWS服务的权限
-
网络配置
- 确保正确的VPC、子网和安全组规则
- 允许EKS与ElastiCache和RDS实例之间的通信
部署选项
-
Redis部署
- 推荐使用Bitnami Helm Chart快速部署Redis集群
helm repo add bitnami https://charts.bitnami.com/bitnami helm repo update helm install redis bitnami/redis
-
MongoDB部署
- AWS不提供原生MongoDB服务,可选择:
- Amazon DocumentDB(兼容MongoDB)
- 在EC2上手动部署MongoDB
- AWS不提供原生MongoDB服务,可选择: