企业微信智能机器人消息监听与回复完整指引

目标:30 分钟内完成「零到可收可发」的企业微信智能机器人,支持文本 / 图片 / 流式消息,并给出可直接落地的 Python 代码、部署流程与 Mermaid 图解。
1 整体交互概览
2 系统结构与数据流
3 账号&后台配置总览
4 机器人四件套(先抄下来)
| 字段 | 示例值 | 来源 |
|---|---|---|
Token | `QDfCNvKdKgD5Jp | 后台「API接收」随机生成 |
EncodingAESKey | YT3mL4VaaHnpv... 43位 | 同上 |
ReceiveId | wwdemo_123456 | 机器人资料页 |
CorpID | wwcorp_abcdef | 企业信息页(仅加解密用,代码留空) |
注:智能机器人 receiveid 固定为空字符串
"",代码已处理。
5 本地开发环境(venv 一键版)
# 克隆/新建项目目录
mkdir wxbot && cd wxbot
python3 -m venv venv
source venv/bin/activate # Win: .\venv\Scripts\activate
python -m pip install -U pip
pip install -r requirements.txt # 文件见第0章
requirements.txt(已锁定版本)
annotated-doc==0.0.3
annotated-types==0.7.0
anyio==4.11.0
certifi==2025.10.5
charset-normalizer==3.4.4
click==8.3.0
fastapi==0.121.0
h11==0.16.0
idna==3.11
pycryptodome==3.23.0
pydantic==2.12.3
pydantic_core==2.41.4
python-dotenv==1.2.1
requests==2.32.5
sniffio==1.3.1
starlette==0.49.3
typing-inspection==0.4.2
typing_extensions==4.15.0
urllib3==2.5.0
uvicorn==0.38.0
6 代码目录结构
wxbot/
├─ main.py # FastAPI 入口(上文完整代码)
├─ WXBizJsonMsgCrypt.py # 官方加解密库
├─ requirements.txt
├─ .env # 本地密钥
└─ venv/ # 虚拟环境
.env 示例
Token=QDfCNvK5Jp....
EncodingAESKey=YT3mL4VaaHnpv...
main.py代码
#!/usr/bin/env python
# coding=utf-8
# 文档:https://developer.work.weixin.qq.com/document/path/101039from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import Response
import uvicorn
import os
import logging
import json
import random
import string
import time
import base64
import hashlib
from WXBizJsonMsgCrypt import WXBizJsonMsgCrypt
from Crypto.Cipher import AES
from dotenv import load_dotenv
import requests# 加载 .env 文件
load_dotenv()app = FastAPI()# 常量定义
CACHE_DIR = "/tmp/llm_demo_cache"
MAX_STEPS = 10# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)def _generate_random_string(length):letters = string.ascii_letters + string.digitsreturn ''.join(random.choice(letters) for _ in range(length))def _process_encrypted_image(image_url, aes_key_base64):"""下载并解密加密图片参数:image_url: 加密图片的URLaes_key_base64: Base64编码的AES密钥(与回调加解密相同)返回:tuple: (status: bool, data: bytes/str) status为True时data是解密后的图片数据,status为False时data是错误信息"""try:# 1. 下载加密图片logger.info("开始下载加密图片: %s", image_url)response = requests.get(image_url, timeout=15)response.raise_for_status()encrypted_data = response.contentlogger.info("图片下载成功,大小: %d 字节", len(encrypted_data))# 2. 准备AES密钥和IVif not aes_key_base64:raise ValueError("AES密钥不能为空")# Base64解码密钥 (自动处理填充)aes_key = base64.b64decode(aes_key_base64 + "=" * (-len(aes_key_base64) % 4))if len(aes_key) != 32:raise ValueError("无效的AES密钥长度: 应为32字节")iv = aes_key[:16] # 初始向量为密钥前16字节# 3. 解密图片数据cipher = AES.new(aes_key, AES.MODE_CBC, iv)decrypted_data = cipher.decrypt(encrypted_data)# 4. 去除PKCS#7填充 (Python 3兼容写法)pad_len = decrypted_data[-1] # 直接获取最后一个字节的整数值if pad_len > 32: # AES-256块大小为32字节raise ValueError("无效的填充长度 (大于32字节)")decrypted_data = decrypted_data[:-pad_len]logger.info("图片解密成功,解密后大小: %d 字节", len(decrypted_data))return True, decrypted_dataexcept requests.exceptions.RequestException as e:error_msg = f"图片下载失败 : {str(e)}"logger.error(error_msg)return False, error_msgexcept ValueError as e:error_msg = f"参数错误 : {str(e)}"logger.error(error_msg)return False, error_msgexcept Exception as e:error_msg = f"图片处理异常 : {str(e)}"logger.error(error_msg)return False, error_msgdef MakeTextStream(stream_id, content, finish):plain = {"msgtype": "stream","stream": {"id": stream_id,"finish": finish, "content" : content}}return json.dumps(plain, ensure_ascii=False)def MakeImageStream(stream_id, image_data, finish):image_md5 = hashlib.md5(image_data).hexdigest()image_base64 = base64.b64encode(image_data).decode('utf-8')plain = {"msgtype": "stream","stream": {"id": stream_id,"finish": finish, "msg_item": [{"msgtype": "image","image": {"base64": image_base64,"md5": image_md5 }}]}}return json.dumps(plain)def EncryptMessage(receiveid, nonce, timestamp, stream):logger.info("开始加密消息,receiveid=%s, nonce=%s, timestamp=%s", receiveid, nonce, timestamp)logger.debug("发送流消息: %s", stream)wxcpt = WXBizJsonMsgCrypt(os.getenv('Token', ''), os.getenv('EncodingAESKey', ''), receiveid)ret, resp = wxcpt.EncryptMsg(stream, nonce, timestamp)if ret != 0:logger.error("加密失败,错误码: %d", ret)returnstream_id = json.loads(stream)['stream']['id']finish = json.loads(stream)['stream']['finish']logger.info("回调处理完成, 返回加密的流消息, stream_id=%s, finish=%s", stream_id, finish)logger.debug("加密后的消息: %s", resp)return resp# TODO 这里模拟一个大模型的行为
class LLMDemo():def __init__(self):self.cache_dir = CACHE_DIRif not os.path.exists(self.cache_dir):os.makedirs(self.cache_dir)def invoke(self, question):stream_id = _generate_random_string(10) # 生成一个随机字符串作为任务ID# 创建任务缓存文件cache_file = os.path.join(self.cache_dir, "%s.json" % stream_id)with open(cache_file, 'w', encoding='utf-8') as f:json.dump({'question': question,'created_time': time.time(),'current_step': 0,'max_steps': MAX_STEPS}, f)return stream_iddef get_answer(self, stream_id):cache_file = os.path.join(self.cache_dir, "%s.json" % stream_id)if not os.path.exists(cache_file):return u"任务不存在或已过期"with open(cache_file, 'r', encoding='utf-8') as f:task_data = json.load(f)# 更新缓存current_step = task_data['current_step'] + 1task_data['current_step'] = current_stepwith open(cache_file, 'w', encoding='utf-8') as f:json.dump(task_data, f)response = u'收到问题:%s\n' % task_data['question']for i in range(current_step):response += u'处理步骤 %d: 已完成\n' % (i)return responsedef is_task_finish(self, stream_id):cache_file = os.path.join(self.cache_dir, "%s.json" % stream_id)if not os.path.exists(cache_file):return Truewith open(cache_file, 'r', encoding='utf-8') as f:task_data = json.load(f)return task_data['current_step'] >= task_data['max_steps']@app.get("/bot/callback")
async def verify_url(request: Request,msg_signature: str,timestamp: str,nonce: str,echostr: str
):# 企业创建的自能机器人的 VerifyUrl 请求, receiveid 是空串receiveid = ''wxcpt = WXBizJsonMsgCrypt(os.getenv('Token', ''), os.getenv('EncodingAESKey', ''), receiveid)ret, echostr = wxcpt.VerifyURL(msg_signature,timestamp,nonce,echostr)if ret != 0:echostr = "verify fail"return Response(content=echostr, media_type="text/plain")@app.post("/bot/callback")
async def handle_message(request: Request,msg_signature: str = None,timestamp: str = None,nonce: str = None
):query_params = dict(request.query_params)if not all([msg_signature, timestamp, nonce]):raise HTTPException(status_code=400, detail="缺少必要参数")logger.info("收到消息, msg_signature=%s, timestamp=%s, nonce=%s", msg_signature, timestamp, nonce)post_data = await request.body()# 智能机器人的 receiveid 是空串receiveid = ''wxcpt = WXBizJsonMsgCrypt(os.getenv('Token', ''), os.getenv('EncodingAESKey', ''), receiveid)ret, msg = wxcpt.DecryptMsg(post_data,msg_signature,timestamp,nonce)if ret != 0:raise HTTPException(status_code=400, detail="解密失败")data = json.loads(msg)logger.debug('Decrypted data: %s', data)if 'msgtype' not in data:logger.info("不认识的事件: %s", data)return Response(content="success", media_type="text/plain")msgtype = data['msgtype']if(msgtype == 'text'):content = data['text']['content']# 询问大模型产生回复llm = LLMDemo()stream_id = llm.invoke(content)answer = llm.get_answer(stream_id)finish = llm.is_task_finish(stream_id)stream = MakeTextStream(stream_id, answer, finish)resp = EncryptMessage(receiveid, nonce, timestamp, stream)return Response(content=resp, media_type="text/plain")elif (msgtype == 'stream'): # case stream# 询问大模型最新的回复stream_id = data['stream']['id']llm = LLMDemo()answer = llm.get_answer(stream_id)finish = llm.is_task_finish(stream_id)stream = MakeTextStream(stream_id, answer, finish)resp = EncryptMessage(receiveid, nonce, timestamp, stream)return Response(content=resp, media_type="text/plain")elif (msgtype == 'image'):# 从环境变量获取AES密钥aes_key = os.getenv('EncodingAESKey', '') # 调用图片处理函数success, result = _process_encrypted_image(data['image']['url'], aes_key)if not success:logger.error("图片处理失败: %s", result)return# 这里简单处理直接原图回复decrypted_data = resultstream_id = _generate_random_string(10)finish = Truestream = MakeImageStream(stream_id, decrypted_data, finish)resp = EncryptMessage(receiveid, nonce, timestamp, stream)return Response(content=resp, media_type="text/plain")elif (msgtype == 'mixed'):# TODO 处理图文混排消息logger.warning("需要支持mixed消息类型")elif (msgtype == 'event'): # TODO 一些事件的处理logger.warning("需要支持event消息类型: %s", data)returnelse:logger.warning("不支持的消息类型: %s", msgtype)returnif __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=3001)
7 部署&外网穿透(3 选 1)
| 方案 | 优点 | 备注 |
|---|---|---|
| 云服务器公网 IP | 最稳 | 安全组放行 TCP 3001 |
| 云函数(SCF/FC) | 免运维 | 入口需改成 async def 兼容 |
| frp / ngrok 内网穿透 | 零成本调试 | 域名会变动,验证完即换 |
最小防火墙规则
Inbound TCP 3001 0.0.0.0/0
8 后台配置步骤(图解)
9 回调验证接口(GET)详解
企业微信会先发 GET 验证可达性:
GET /bot/callback?msg_signature=XXX×tamp=123&nonce=456&echostr=BASE64
返回:
- 成功 → 200 + 纯明文 echostr(不带引号)
- 失败 → 400/500 任意内容
代码已内置,无需改动。
10 消息解密→路由→加密回复(核心时序)
11 流式消息协议(TextStream)
{"msgtype": "stream","stream": {"id": "a1b2c3d4e5","finish": false,"content": "收到问题:今天天气?\n处理步骤 1: 已完成\n"}
}
finish=true表示最后一包,企业微信自动合并展示- 同一
id的多包必须按序到达(企业微信不做排序)
12 图片消息解密&再加密流程
关键代码片段
aes_key = base64.b64decode(aes_key_base64 + "====")
iv = aes_key[:16]
cipher = AES.new(aes_key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted_data)[:-pad_len]
13 本地调试小技巧
-
Ngrok 一键穿透
ngrok http 3001 # 复制 https://abc123.ngrok.io/bot/callback 到后台 -
打印明文包
在handle_message里加logger.info("明文=%s", json.dumps(data, indent=2, ensure_ascii=False)) -
重放测试
把企业微信的原始 POST 保存为test.json,用curl重复发送:curl -X POST http://localhost:3001/bot/callback \-d @test.json \-H "Content-Type: application/json"
14 常见错误码速查
| 错误码 | 含义 | 解决 |
|---|---|---|
| 40001 | token/aeskey 错 | 检查 .env 与后台是否一致 |
| 40003 | 接收人非法 | receiveid 必须为空字符串 |
| 40007 | URL 验证失败 | 返回内容非纯 echostr |
| 60020 | IP 不在白名单 | 把出口 IP 填到「可信 IP」 |
| 83014 | 机器人被踢出群 | 重新拉群即可 |
15 生产级加固建议
-
异步 + 队列
把LLMDemo.invoke改为扔 Redis Stream / Celery,避免阻塞回调线程。 -
签名防重放
用msg_signature+timestamp+nonce做 5 分钟幂等缓存。 -
图片落盘
解密后先存对象存储,返回 CDN 地址,减少 Base64 流量。 -
日志脱敏
明文日志关闭或打码userid/content敏感字段。
16 一键运行脚本(start.sh)
#!/usr/bin/env bash
set -e
cd "$(dirname "$0")"
[ ! -d venv ] && python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt -q
exec uvicorn main:app --host 0.0.0.0 --port 3001 --reload
赋权:
chmod +x start.sh
./start.sh
17 结语
至此,你已完成:
- 本地 venv 环境搭建
- 机器人四件套配置 & 白名单
- 文本/图片/流式消息收发闭环
- 生产级改造路线图
把代码推到云主机,改 .env 里的密钥,3 分钟即可对外服务。祝调试愉快,机器人早日上线!
企业微信:

后端服务:

示例代码下载:https://gitee.com/ericluo1008/aibot_demo_python3
以我之思,借AI之力!
