[智能客服project] 架构 | 对话记忆 | 通信层
闲鱼智能客服系统是一款为闲鱼平台打造的智能化AI客服解决方案。
- 系统能够自动对接闲鱼实时消息流,管理上下文对话记录,
- 通过配备不同"专家"代理的AI模型判断用户意图,并生成智能回复,
- 包含智能议价等核心功能。项目完整实现了与平台通信的技术细节处理。
- 项目链接:https://github.com/shaxiu/XianyuAutoAgent
架构示意图
章节导航
- 对话记忆库
- 闲鱼通信层
- AI代理系统
- 意图路由
- AI提示词配置
- 主协调器
- 闲鱼协议工具集
第一章:对话记忆系统
欢迎来到闲鱼智能客服系统教程的第一章!我们将从项目的核心组件之一开始:聊天机器人如何记忆对话内容。
为何需要对话记忆?
想象与一个即时遗忘所有对话的人交谈是何等沮丧qwq
。对于二手交易平台的聊天机器人而言,记忆对话历史至关重要:
典型场景示例:
- 用户询问"价格可议吗?" —— 机器人需
记住此问题
才能应答 - 用户追问"最低多少?" —— 需
结合前序对话
理解议价意图 - 多次议价时 —— 需
记录报价次数
以合理响应(如"您已出价两次,这是最低价")
对话记忆系统正是为此设计,负责追踪聊天上下文及关键对话状态。
对话记忆的本质
对话记忆系统如同为每个聊天线程配备专属记事本:
- 记录买卖双方消息内容
- 标记重要对话状态(如议价次数)
- 持久化存储历史记录
项目中的核心实现位于context_manager.py
文件的ChatContextManager
类。
核心功能解析
- 消息存储:保存用户与机器人的每条消息(发送者、内容、时间戳)
- 会话关联:通过
chat_id
唯一标识每个商品对话线程- 状态管理:记录对话特有状态(如议价计数器)
- 历史检索:生成回复前获取最近对话上下文
- 持久存储:采用
SQLite数据库
实现数据持久化
消息存取机制
当新消息到达时调用add_message_by_chat
方法:
# 摘自context_manager.py的简化代码片段
import sqlite3
from datetime import datetimeclass ChatContextManager:def add_message_by_chat(self, chat_id, user_id, item_id, role, content):conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 将消息插入数据库表cursor.execute("INSERT INTO messages (user_id, item_id, role, content, timestamp, chat_id) VALUES (?, ?, ?, ?, ?, ?)",(user_id, item_id, role, content, datetime.now().isoformat(), chat_id))conn.commit()
生成回复前调用get_context_by_chat
获取历史:
def get_context_by_chat(self, chat_id):conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 查询指定chat_id的最近消息cursor.execute("""SELECT role, content FROM messagesWHERE chat_id = ?ORDER BY timestamp ASCLIMIT ?""",(chat_id, self.max_history))# 附加议价次数系统消息messages.append({"role": "system", "content": f"议价次数: {bargain_count}"})
议价计数器实现
通过独立数据表管理议价状态:
def increment_bargain_count_by_chat(self, chat_id):# UPSERT操作:存在则累加,不存在则初始化cursor.execute("""INSERT INTO chat_bargain_counts (chat_id, count, last_updated)VALUES (?, 1, ?)ON CONFLICT(chat_id)DO UPDATE SET count = count + 1, last_updated = ?""",(chat_id, datetime.now(), datetime.now()))def get_bargain_count_by_chat(self, chat_id):# 查询当前议价次数cursor.execute("SELECT count FROM chat_bargain_counts WHERE chat_id = ?", (chat_id,))return result[0] if result else 0
数据库结构设计
SQLite数据库包含核心表:
表名 | 字段说明 |
---|---|
messages | 消息ID/用户ID/商品ID/角色/内容/时间戳/会话ID |
chat_bargain_counts | 会话ID/议价次数/最后更新时间 |
系统交互流程
本章小结
对话记忆系统通过:
- SQLite数据库实现持久化存储
- 会话ID关联多轮对话
- 专用状态表记录议价次数
- 上下文感知的响应生成
为智能客服提供连续对话能力。下一章将探讨系统如何与闲鱼平台进行通信对接。
下一章:闲鱼通信层
第二章:闲鱼通信层
欢迎回来!在第一章:对话记忆中,我们学习了机器人如何记住用户对话,就像为每个聊天提供智能笔记本。
-
但消息最初是如何进入机器人记忆的?机器人又是如何通过
闲鱼平台
将回复发送给用户的? -
这就是**闲鱼通信层**的作用所在。
为什么机器人需要"通信层"?
想象闲鱼平台(网站或APP)就像热闹的集市。我们的机器人需要具备以下能力:
- 监听:感知用户发送给它的专属消息
- 对话:向用户发送回复消息
- 信息查询:获取商品详情(如价格、描述)
- 保持连接:确保持续登录平台以维持通信
就像你需要网络连接和浏览器与网站交互,机器人需要专门工具与闲鱼平台对接
。闲鱼通信层就是这套工具集,负责处理连接闲鱼服务器和数据交换的复杂技术细节。
- 这是项目中能说"闲鱼语言"的模块,管理着机器人到平台的"网络连接"。
闲鱼通信层是什么?
该层负责与闲鱼平台的两种主要通信方式:
- 实时消息(WebSocket):类似持续开通的专线电话。当新消息到达时,闲鱼可以立即呼叫机器人,机器人也通过该专线即时回复。这通过WebSocket技术实现。
- 数据获取(HTTP接口):当需要特定信息(如商品详情或当前价格)时,类似于访问闲鱼网站的特定页面。这通过标准互联网请求HTTP接口实现。
除了收发消息和数据,该层还处理保持登录状态的关键技术细节,如管理网站Cookie和获取安全所需的临时密钥"token"。
在XianyuAutoAgent
项目中,该层主要实现在main.py
(管理WebSocket连接的XianyuLive
类)和XianyuApis.py
(处理HTTP请求的XianyuApis
类)。
工作原理(高层流程图)
当用户发送消息时,通信层的工作流程如下:
如图所示,通信层是闲鱼平台与机器人系统其他部分的关键中介,是唯一直接与闲鱼服务器交互的模块。
核心组件:XianyuLive
和 XianyuApis
通信层的工作由两个主要类实现:
组件名称 | 文件 | 主要职责 | 类比 |
---|---|---|---|
XianyuLive | main.py | 管理WebSocket连接(实时消息) | 与闲鱼保持的专线电话 |
XianyuApis | XianyuApis.py | 处理HTTP请求(商品信息、令牌、登录验证) | 访问闲鱼网站的特定页面 |
XianyuLive
:消息监听
XianyuLive
类负责建立和维护WebSocket连接,核心是持续监听闲鱼的入站数据。
# 摘自 main.py 的简化代码片段
import asyncio
import websockets # WebSocket连接库class XianyuLive:def __init__(self, cookies_str):# ... 初始化(设置cookie等)...self.base_url = 'wss://wss-goofish.dingtalk.com/' # WebSocket地址async def main(self):while True: # 失败时自动重连try:# 建立WebSocket连接async with websockets.connect(self.base_url, extra_headers=self.get_websocket_headers()) as websocket:self.ws = websocket# ... 连接初始化(发送认证)...logger.info('WebSocket已连接并注册')# 开始监听消息!async for message in websocket:# 消息到达时处理await self.handle_message(message, websocket)except Exception as e:logger.error(f"连接错误,正在重试: {e}")await asyncio.sleep(5) # 重连等待# ... handle_message、send_msg等方法...def get_websocket_headers(self):# 简化的WebSocket请求头构造return {"Cookie": self.cookies_str,"User-Agent": "...", # 浏览器UA"Origin": "https://www.goofish.com",# ... 其他必要请求头...}# 实际运行脚本:asyncio.run(xianyuLive.main())
该代码连接闲鱼实时通信的WebSocket URL。
关键行async for message in websocket:
使机器人暂停等待消息到达,随后调用handle_message
处理。
对于这部分感兴趣的,可以看一下我去年写的前文:
[Linux#58][HTTP] 自己构建服务器 | 实现网页分离 | 设计思路
XianyuLive
:发送回复
XianyuLive
也处理消息发送:
# 摘自 main.py 的简化代码片段
import json
import base64class XianyuLive:# ... 初始化和其他方法...async def send_msg(self, ws, cid, toid, text):# 格式化消息为闲鱼专用结构# ... 复杂的数据格式化(使用base64和JSON结构)...msg = {"lwp": "/r/MessageSend/sendByReceiverScope","headers": {"mid": generate_mid() # 消息唯一ID},"body": [{"uuid": generate_uuid(), # 消息UUID"cid": f"{cid}@goofish", # 会话ID"conversationType": 1,"content": {"custom": {"type": 1,"data": base64_encoded_message_text # base64编码的消息}},# ... 其他消息细节...},{"actualReceivers": [f"{toid}@goofish", # 接收者IDf"{self.myid}@goofish" # 自身ID]}]}logger.info(f"通过WebSocket发送消息: {text}")await ws.send(json.dumps(msg)) # 发送格式化消息
send_msg
方法接收聊天ID、接收者ID和消息文本,打包成闲鱼WebSocket要求的JSON结构(使用base64编码),通过await ws.send()
发送。
XianyuApis
:获取商品详情
XianyuApis
类负责通过HTTP请求获取非实时数据,典型场景是商品详情查询:
# 摘自 XianyuApis.py 的简化代码片段
import requests
import timeclass XianyuApis:def __init__(self):# ... 会话设置(包含请求头和cookies)...self.session = requests.Session()# 加载初始cookies到会话def get_item_info(self, item_id, retry_count=0):logger.debug(f"尝试获取商品{item_id}的信息")# ... 重试逻辑处理...# 构造API请求参数params = {'api': 'mtop.taobao.idle.pc.detail', # 指定查询商品详情的API't': str(int(time.time()) * 1000), # 时间戳'sign': '', # 签名占位}data_val = '{"itemId":"' + item_id + '"}'data = {'data': data_val}# 从cookies获取签名所需的tokentoken = self.session.cookies.get('_m_h5_tk', '').split('_')[0]# 生成签名(使用utils.xianyu_utils中的generate_sign)sign = generate_sign(params['t'], token, data_val)params['sign'] = signtry:# 发送HTTP POST请求response = self.session.post('https://h5api.m.goofish.com/h5/mtop.taobao.idle.pc.detail/1.0/',params=params,data=data)res_json = response.json()# ... 响应结果校验...if isinstance(res_json, dict) and any('SUCCESS::调用成功' in ret for ret in res_json.get('ret', [])):logger.debug(f"成功获取商品{item_id}信息")return res_jsonelse:# ... 令牌刷新和重试逻辑...return self.get_item_info(item_id, retry_count + 1)except Exception as e:logger.error(f"获取商品信息错误: {str(e)}")# ... 重试逻辑...return self.get_item_info(item_id, retry_count + 1)
-
该方法接收商品ID,构造闲鱼商品详情API的专用请求,包含时间戳、签名等安全参数
-
使用
requests.Session
发送HTTP POST请求。 -
成功时返回解析后的商品数据。
登录状态管理(Cookies和令牌)
保持登录状态至关重要。
-
XianyuApis
类通过hasLogin
(验证cookies有效性)和get_token
(获取API和WebSocket所需的临时令牌)等方法管理登录状态 -
同时包含
clear_duplicate_cookies
、update_env_cookies
等cookie管理方法。 -
XianyuLive
类在WebSocket连接前会使用get_token
获取必要令牌。 -
机器人需要有效cookies(通常存储在
.env
文件)作为初始登录凭证,用于获取短时效的_m_h5_tk
(API签名用)和accessToken
(WebSocket注册用)。 -
通信层设计为在API调用失败时自动刷新令牌,只要初始cookies有效即可保持连接。
系统连接方式
当XianyuLive
通过WebSocket收到需处理的消息时:
- 提取关键信息:
chat_id
、发送者user_id
、item_id
和消息content
- 通过
XianyuApis
(可能借助context_manager
)获取未入库的item_info
- 使用
ChatContextManager
将消息存入对话记忆 - 检查特殊命令(如切换手动模式)
- 若非命令且未处于手动模式,从对话记忆收集对话
context
- 将
send_message
、item_description
和context
传递给AI代理系统 - AI代理系统生成
bot_reply
后存入对话记忆 - 最后通过
send_msg
方法经WebSocket发送回复
结论
闲鱼通信层(主要由XianyuLive
和XianyuApis
类构成)是机器人与闲鱼平台的接口,主要功能包括:
- 维护实时连接(WebSocket)接收即时消息
- 发起请求(HTTP接口)获取必要数据
- 通过cookies和令牌管理登录状态
- 传递消息和相关数据到机器人系统其他部分
- 发送机器人回复到用户端
该层如同机器人在闲鱼市场的"感官系统"。
现在机器人已具备接收消息和获取信息的能力,接下来的问题是:它如何决定回复内容?这就是AI代理系统的职责,我们将在下一章探讨。
下一章:AI代理系统