Python全栈项目:结合Puppeteer和AI模型操作浏览器
引言
在现代Web自动化领域,将浏览器自动化工具与AI模型相结合,正在开启全新的可能性。本文将介绍如何构建一个Python全栈项目,利用Puppeteer(通过Pyppeteer)和AI模型实现智能浏览器操作。
项目概述
技术栈
- 后端: Python 3.8+
- 浏览器自动化: Pyppeteer (Puppeteer的Python移植版)
- AI模型: Claude API / OpenAI GPT-4
- Web框架: FastAPI
- 前端: React + TypeScript + Tailwind CSS
- 实时通信: WebSocket
核心功能
- 聊天式交互界面 - 通过对话窗口控制浏览器
- 智能网页导航和交互
- 基于AI的元素识别和操作
- 自然语言指令执行
- 网页内容智能提取
- 实时浏览器预览 - 同步显示浏览器操作过程
- 自动化测试生成
环境搭建
安装依赖
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate# 安装核心包
pip install pyppeteer
pip install anthropic # 或 openai
pip install fastapi uvicorn
pip install python-dotenv
pip install websockets # WebSocket支持
pip install python-multipart # 文件上传支持
前端依赖安装
# 创建React项目
npm create vite@latest frontend -- --template react-ts
cd frontend# 安装依赖
npm install
npm install axios
npm install lucide-react # 图标库
项目结构
project/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI应用入口
│ │ ├── browser_controller.py # 浏览器控制器
│ │ ├── ai_agent.py # AI代理
│ │ └── utils.py # 工具函数
│ ├── .env # 环境变量
│ └── requirements.txt
├── frontend/
│ ├── src/
│ │ ├── components/
│ │ │ ├── ChatWindow.tsx # 聊天窗口
│ │ │ ├── BrowserPreview.tsx # 浏览器预览
│ │ │ └── MessageList.tsx # 消息列表
│ │ ├── App.tsx
│ │ └── main.tsx
│ ├── package.json
│ └── tsconfig.json
└── README.md
核心实现
1. 浏览器控制器
# browser_controller.py
import asyncio
from pyppeteer import launch
from typing import Optional, Dict, Anyclass BrowserController:def __init__(self):self.browser = Noneself.page = Noneasync def initialize(self, headless: bool = False):"""初始化浏览器"""self.browser = await launch(headless=headless,args=['--no-sandbox', '--disable-setuid-sandbox'])self.page = await self.browser.newPage()await self.page.setViewport({'width': 1920, 'height': 1080})async def navigate(self, url: str):"""导航到指定URL"""await self.page.goto(url, {'waitUntil': 'networkidle2'})async def get_page_content(self) -> str:"""获取页面内容"""content = await self.page.content()return contentasync def get_screenshot(self, path: str = 'screenshot.png'):"""截图"""await self.page.screenshot({'path': path})return pathasync def click_element(self, selector: str):"""点击元素"""await self.page.click(selector)async def input_text(self, selector: str, text: str):"""输入文本"""await self.page.type(selector, text)async def extract_elements(self, selector: str) -> list:"""提取元素"""elements = await self.page.querySelectorAll(selector)results = []for element in elements:text = await self.page.evaluate('(element) => element.textContent', element)results.append(text)return resultsasync def close(self):"""关闭浏览器"""if self.browser:await self.browser.close()
2. AI代理
# ai_agent.py
import anthropic
from typing import Dict, Any, List
import jsonclass AIAgent:def __init__(self, api_key: str):self.client = anthropic.Anthropic(api_key=api_key)self.conversation_history = []def parse_command(self, user_input: str, page_context: str) -> Dict[str, Any]:"""解析用户命令"""prompt = f"""
你是一个浏览器自动化助手。用户给出了一个操作指令,你需要将其转换为具体的浏览器操作。当前页面上下文:
{page_context[:2000]}用户指令: {user_input}请分析用户意图,返回JSON格式的操作指令。格式如下:
{{"action": "navigate|click|input|extract|scroll","selector": "CSS选择器(如需要)","value": "操作值(如需要)","description": "操作描述"
}}只返回JSON,不要其他内容。
"""response = self.client.messages.create(model="claude-sonnet-4-20250514",max_tokens=1000,messages=[{"role": "user", "content": prompt}])# 解析响应content = response.content[0].text# 清理可能的markdown代码块标记content = content.replace('```json', '').replace('```', '').strip()try:action = json.loads(content)return actionexcept json.JSONDecodeError:return {"action": "error","description": "无法解析AI响应"}def analyze_page(self, html_content: str) -> Dict[str, Any]:"""分析页面内容"""prompt = f"""
请分析以下HTML页面内容,提取关键信息:{html_content[:3000]}请返回JSON格式的分析结果,包括:
1. 页面标题和主要内容
2. 可交互元素(按钮、链接、表单等)
3. 页面结构建议只返回JSON格式。
"""response = self.client.messages.create(model="claude-sonnet-4-20250514",max_tokens=2000,messages=[{"role": "user", "content": prompt}])content = response.content[0].textcontent = content.replace('```json', '').replace('```', '').strip()try:analysis = json.loads(content)return analysisexcept json.JSONDecodeError:return {"error": "分析失败"}
3. FastAPI应用(增强WebSocket支持)
# main.py
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import json
import base64
from dotenv import load_dotenv
from browser_controller import BrowserController
from ai_agent import AIAgent
from typing import Optionalload_dotenv()app = FastAPI(title="AI Browser Automation")# CORS配置
app.add_middleware(CORSMiddleware,allow_origins=["http://localhost:5173"], # Vite默认端口allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 全局实例
browser = BrowserController()
ai_agent = AIAgent(api_key=os.getenv("ANTHROPIC_API_KEY"))# WebSocket连接管理
class ConnectionManager:def __init__(self):self.active_connections: list[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def broadcast(self, message: dict):for connection in self.active_connections:await connection.send_json(message)manager = ConnectionManager()class CommandRequest(BaseModel):command: strurl: Optional[str] = None@app.on_event("startup")
async def startup_event():"""应用启动时初始化浏览器"""await browser.initialize(headless=False)@app.on_event("shutdown")
async def shutdown_event():"""应用关闭时清理资源"""await browser.close()@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):"""WebSocket连接处理"""await manager.connect(websocket)try:while True:# 接收客户端消息data = await websocket.receive_text()message = json.loads(data)# 发送确认消息await websocket.send_json({"type": "status","message": "正在处理您的指令..."})try:# 处理命令command = message.get("command")url = message.get("url")# 如果提供了URL,先导航if url:await browser.navigate(url)await websocket.send_json({"type": "navigation","message": f"已导航到: {url}"})# 获取页面内容作为上下文page_content = await browser.get_page_content()# AI解析命令action = ai_agent.parse_command(command, page_content)await websocket.send_json({"type": "ai_response","action": action})# 执行操作result = await execute_action(action)# 获取截图screenshot_path = await browser.get_screenshot()with open(screenshot_path, "rb") as f:screenshot_base64 = base64.b64encode(f.read()).decode()# 发送执行结果await websocket.send_json({"type": "result","success": True,"action": action,"result": result,"screenshot": f"data:image/png;base64,{screenshot_base64}"})except Exception as e:await websocket.send_json({"type": "error","message": str(e)})except WebSocketDisconnect:manager.disconnect(websocket)@app.post("/execute")
async def execute_command(request: CommandRequest):"""执行用户命令(REST API方式)"""try:# 如果提供了URL,先导航if request.url:await browser.navigate(request.url)# 获取页面内容作为上下文page_content = await browser.get_page_content()# AI解析命令action = ai_agent.parse_command(request.command, page_content)# 执行操作result = await execute_action(action)# 广播更新到所有WebSocket连接await manager.broadcast({"type": "update","action": action,"result": result})return JSONResponse({"success": True,"action": action,"result": result})except Exception as e:raise HTTPException(status_code=500, detail=str(e))async def execute_action(action: dict) -> dict:"""执行具体操作"""action_type = action.get("action")if action_type == "navigate":await browser.navigate(action.get("value"))return {"message": "导航成功"}elif action_type == "click":await browser.click_element(action.get("selector"))return {"message": "点击成功"}elif action_type == "input":await browser.input_text(action.get("selector"),action.get("value"))return {"message": "输入成功"}elif action_type == "extract":data = await browser.extract_elements(action.get("selector"))return {"data": data}else:return {"message": "未知操作"}@app.get("/screenshot")
async def take_screenshot():"""获取截图"""try:path = await browser.get_screenshot()return FileResponse(path, media_type="image/png")except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.post("/analyze")
async def analyze_page():"""分析当前页面"""try:content = await browser.get_page_content()analysis = ai_agent.analyze_page(content)return {"success": True, "analysis": analysis}except Exception as e:raise HTTPException(status_code=500, detail=str(e))if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
前端实现
1. 聊天窗口组件
// ChatWindow.tsx
import React, { useState, useEffect, useRef } from 'react';
import { Send, Loader2 } from 'lucide-react';interface Message {id: string;type: 'user' | 'assistant' | 'system';content: string;timestamp: Date;screenshot?: string;action?: any;
}interface ChatWindowProps {onScreenshotUpdate?: (screenshot: string) => void;
}export const ChatWindow: React.FC<ChatWindowProps> = ({ onScreenshotUpdate }) => {const [messages, setMessages] = useState<Message[]>([]);const [input, setInput] = useState('');const [isLoading, setIsLoading] = useState(false);const [ws, setWs] = useState<WebSocket | null>(null);const messagesEndRef = useRef<HTMLDivElement>(null);// WebSocket连接useEffect(() => {const websocket = new WebSocket('ws://localhost:8000/ws');websocket.onopen = () => {console.log('WebSocket连接已建立');addMessage('system', '已连接到AI浏览器助手');};websocket.onmessage = (event) => {const data = JSON.parse(event.data);handleWebSocketMessage(data);};websocket.onerror = (error) => {console.error('WebSocket错误:', error);addMessage('system', '连接错误,请刷新页面重试');};websocket.onclose = () => {console.log('WebSocket连接已关闭');addMessage('system', '连接已断开');};setWs(websocket);return () => {websocket.close();};}, []);// 滚动到底部useEffect(() => {messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });}, [messages]);const handleWebSocketMessage = (data: any) => {setIsLoading(false);if (data.type === 'status') {addMessage('system', data.message);} else if (data.type === 'navigation') {addMessage('assistant', data.message);} else if (data.type === 'ai_response') {addMessage('assistant', `AI解析: ${data.action.description}`, data.action);} else if (data.type === 'result') {if (data.success) {addMessage('assistant', `执行成功: ${data.result.message || JSON.stringify(data.result)}`, data.action, data.screenshot);if (data.screenshot && onScreenshotUpdate) {onScreenshotUpdate(data.screenshot);}}} else if (data.type === 'error') {addMessage('system', `错误: ${data.message}`);}};const addMessage = (type: Message['type'], content: string, action?: any, screenshot?: string) => {const newMessage: Message = {id: Date.now().toString(),type,content,timestamp: new Date(),action,screenshot};setMessages(prev => [...prev, newMessage]);};const handleSend = () => {if (!input.trim() || !ws || ws.readyState !== WebSocket.OPEN) return;// 添加用户消息addMessage('user', input);// 发送到服务器ws.send(JSON.stringify({command: input,url: input.startsWith('http') ? input : null}));setInput('');setIsLoading(true);};const handleKeyPress = (e: React.KeyboardEvent) => {if (e.key === 'Enter' && !e.shiftKey) {e.preventDefault();handleSend();}};return (<div className="flex flex-col h-full bg-white rounded-lg shadow-lg">{/* 头部 */}<div className="px-6 py-4 border-b border-gray-200"><h2 className="text-xl font-semibold text-gray-800">AI 浏览器助手</h2><p className="text-sm text-gray-500">用自然语言控制浏览器</p></div>{/* 消息列表 */}<div className="flex-1 overflow-y-auto px-6 py-4 space-y-4">{messages.map((message) => (<divkey={message.id}className={`flex ${message.type === 'user' ? 'justify-end' : 'justify-start'}`}><divclassName={`max-w-[80%] rounded-lg px-4 py-2 ${message.type === 'user'? 'bg-blue-600 text-white': message.type === 'system'? 'bg-gray-200 text-gray-700': 'bg-gray-100 text-gray-800'}`}><p className="text-sm whitespace-pre-wrap">{message.content}</p>{message.action && (<div className="mt-2 text-xs opacity-75"><code>{JSON.stringify(message.action, null, 2)}</code></div>)}<span className="text-xs opacity-75 mt-1 block">{message.timestamp.toLocaleTimeString()}</span></div></div>))}{isLoading && (<div className="flex justify-start"><div className="bg-gray-100 rounded-lg px-4 py-2"><Loader2 className="w-5 h-5 animate-spin text-gray-600" /></div></div>)}<div ref={messagesEndRef} /></div>{/* 输入框 */}<div className="px-6 py-4 border-t border-gray-200"><div className="flex gap-2"><inputtype="text"value={input}onChange={(e) => setInput(e.target.value)}onKeyPress={handleKeyPress}placeholder="输入指令,例如:打开 google.com 并搜索 Python"className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"disabled={isLoading}/><buttononClick={handleSend}disabled={isLoading || !input.trim()}className="px-6 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700 disabled:bg-gray-300 disabled:cursor-not-allowed transition-colors"><Send className="w-5 h-5" /></button></div><div className="mt-2 text-xs text-gray-500"><p>💡 提示:试试说 "打开百度"、"点击搜索按钮"、"截图" 等</p></div></div></div>);
};
2. 浏览器预览组件
// BrowserPreview.tsx
import React from 'react';
import { Monitor, RefreshCw } from 'lucide-react';interface BrowserPreviewProps {screenshot?: string;onRefresh?: () => void;
}export const BrowserPreview: React.FC<BrowserPreviewProps> = ({ screenshot, onRefresh
}) => {return (<div className="flex flex-col h-full bg-gray-900 rounded-lg shadow-lg overflow-hidden">{/* 浏览器工具栏 */}<div className="flex items-center justify-between px-4 py-3 bg-gray-800"><div className="flex items-center gap-2"><Monitor className="w-5 h-5 text-gray-400" /><span className="text-sm text-gray-300">浏览器预览</span></div><buttononClick={onRefresh}className="p-2 hover:bg-gray-700 rounded transition-colors"title="刷新截图"><RefreshCw className="w-4 h-4 text-gray-400" /></button></div>{/* 预览区域 */}<div className="flex-1 flex items-center justify-center bg-gray-800 p-4">{screenshot ? (<imgsrc={screenshot}alt="浏览器截图"className="max-w-full max-h-full object-contain rounded shadow-2xl"/>) : (<div className="text-center text-gray-500"><Monitor className="w-16 h-16 mx-auto mb-4 opacity-50" /><p>等待浏览器操作...</p><p className="text-sm mt-2">发送指令后将显示浏览器截图</p></div>)}</div>{/* 状态栏 */}<div className="px-4 py-2 bg-gray-800 border-t border-gray-700"><p className="text-xs text-gray-400">实时同步 • 最后更新: {screenshot ? new Date().toLocaleTimeString() : '-'}</p></div></div>);
};
3. 主应用组件
// App.tsx
import React, { useState } from 'react';
import { ChatWindow } from './components/ChatWindow';
import { BrowserPreview } from './components/BrowserPreview';function App() {const [screenshot, setScreenshot] = useState<string>();const handleRefresh = async () => {try {const response = await fetch('http://localhost:8000/screenshot');const blob = await response.blob();const url = URL.createObjectURL(blob);setScreenshot(url);} catch (error) {console.error('刷新截图失败:', error);}};return (<div className="h-screen bg-gradient-to-br from-blue-50 to-indigo-100 p-4"><div className="max-w-7xl mx-auto h-full">{/* 标题 */}<div className="text-center mb-6"><h1 className="text-4xl font-bold text-gray-800 mb-2">🤖 AI 浏览器自动化平台</h1><p className="text-gray-600">使用自然语言控制浏览器 • Powered by Claude & Puppeteer</p></div>{/* 主界面 */}<div className="grid grid-cols-1 lg:grid-cols-2 gap-4 h-[calc(100%-100px)]">{/* 左侧:聊天窗口 */}<div className="h-full"><ChatWindow onScreenshotUpdate={setScreenshot} /></div>{/* 右侧:浏览器预览 */}<div className="h-full"><BrowserPreview screenshot={screenshot} onRefresh={handleRefresh}/></div></div></div></div>);
}export default App;
运行项目
启动后端
cd backend
python -m uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
启动前端
cd frontend
npm run dev
访问 http://localhost:5173 即可使用应用。
使用示例
对话示例
用户: "打开 google.com" AI助手: "已导航到: https://google.com" + [显示截图]
用户: "在搜索框输入 Python教程" AI助手: "AI解析: 在搜索框输入文本" → "执行成功" + [显示截图]
用户: "点击第一个搜索结果" AI助手: "AI解析: 点击指定元素" → "执行成功" + [显示截图]
支持的指令类型
- 导航指令: "打开百度"、"访问 github.com"
- 点击操作: "点击登录按钮"、"点击第一个链接"
- 输入操作: "在搜索框输入 AI"、"填写用户名为 admin"
- 数据提取: "提取所有标题"、"获取页面上的链接"
- 页面操作: "向下滚动"、"刷新页面"
- 截图: "截图"、"保存当前页面"
高级应用场景
1. 智能表单填充
async def smart_form_fill(form_data: Dict[str, str]):"""AI辅助智能表单填充"""page_content = await browser.get_page_content()# AI识别表单字段prompt = f"""分析页面中的表单,匹配以下数据到对应字段:{json.dumps(form_data, ensure_ascii=False)}页面内容:{page_content[:2000]}返回字段映射JSON。"""# 执行填充...
2. 自动化测试生成
async def generate_test_cases(url: str):"""基于页面自动生成测试用例"""await browser.navigate(url)content = await browser.get_page_content()# AI分析页面生成测试用例analysis = ai_agent.analyze_page(content)# 生成pytest测试代码...
3. 智能数据采集
async def intelligent_scraping(target_url: str, requirements: str):"""基于自然语言描述进行数据采集"""await browser.navigate(target_url)# AI理解采集需求page_content = await browser.get_page_content()# 动态生成选择器并提取数据# ...
最佳实践
1. 错误处理
class BrowserError(Exception):"""浏览器操作异常"""passasync def safe_execute(func, *args, **kwargs):"""安全执行浏览器操作"""max_retries = 3for attempt in range(max_retries):try:return await func(*args, **kwargs)except Exception as e:if attempt == max_retries - 1:raise BrowserError(f"操作失败: {str(e)}")await asyncio.sleep(1)
2. 性能优化
- 使用页面缓存减少重复加载
- 实现选择器缓存机制
- 合理使用headless模式
- 控制AI API调用频率
3. 安全考虑
- API密钥安全存储
- 请求频率限制
- 输入验证和清理
- 防止XSS和注入攻击
部署建议
Docker部署
# backend/Dockerfile
FROM python:3.9-slim# 安装Chrome依赖
RUN apt-get update && apt-get install -y \chromium \chromium-driver \&& rm -rf /var/lib/apt/lists/*WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txtCOPY . .CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# frontend/Dockerfile
FROM node:18-alpineWORKDIR /app
COPY package*.json ./
RUN npm installCOPY . .
RUN npm run buildFROM nginx:alpine
COPY --from=0 /app/dist /usr/share/nginx/html
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
# docker-compose.yml
version: '3.8'services:backend:build: ./backendports:- "8000:8000"environment:- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}- HEADLESS=truevolumes:- ./backend:/appfrontend:build: ./frontendports:- "80:80"depends_on:- backend
环境变量
# .env
ANTHROPIC_API_KEY=your_api_key
HEADLESS=true
BROWSER_TIMEOUT=30000
界面预览
应用截图
┌─────────────────────────────────────────────────────────────────┐
│ 🤖 AI 浏览器自动化平台 │
│ 使用自然语言控制浏览器 • Powered by Claude & Puppeteer │
├──────────────────────────┬──────────────────────────────────────┤
│ │ │
│ AI 浏览器助手 │ 浏览器预览 │
│ 用自然语言控制浏览器 │ [实时截图显示区域] │
│ │ │
│ 💬 消息列表 │ │
│ ┌─────────────────────┐ │ [浏览器当前状态的截图] │
│ │ 用户: 打开google │ │ │
│ └─────────────────────┘ │ │
│ ┌─────────────────────┐ │ │
│ │ AI: 已导航到google │ │ 实时同步 • 最后更新: 14:32:15 │
│ └─────────────────────┘ │ │
│ │ │
│ 输入指令框 [发送] │ │
│ 💡 提示:试试说"打开百度" │ │
└──────────────────────────┴──────────────────────────────────────┘
交互流程
- 用户在左侧聊天窗口输入自然语言指令
- WebSocket实时传输指令到后端
- AI解析指令并生成操作步骤
- Pyppeteer执行浏览器操作
- 实时截图返回并在右侧预览区显示
- 聊天窗口显示操作结果和状态
总结
将Puppeteer与AI模型结合,并配合现代化的聊天界面,可以创建强大且易用的智能浏览器自动化系统。这种方法的优势在于:
- 自然语言交互: 用户可以用自然语言描述操作意图,无需学习复杂API
- 实时可视化: 通过WebSocket和截图功能实时查看浏览器状态
- 智能适应: AI能够理解页面结构并自适应不同网站
- 减少维护: 不需要硬编码选择器,降低维护成本
- 扩展性强: 容易添加新功能和支持更多场景
- 用户友好: 类ChatGPT的对话界面,降低使用门槛
技术亮点
- WebSocket实时通信: 实现即时的双向数据传输
- React + TypeScript: 类型安全的现代前端开发
- FastAPI异步支持: 高性能的异步请求处理
- AI驱动: 智能理解用户意图并生成操作
- 截图反馈: 可视化展示每一步操作结果
这个项目可以应用于测试自动化、数据采集、RPA(机器人流程自动化)、用户行为录制等多个领域,为Web自动化带来新的可能性。
下一步优化方向
- 添加更多浏览器操作支持(滚动、拖拽、表单验证等)
- 实现会话管理和操作历史记录
- 集成计算机视觉进行页面元素识别
- 支持多浏览器实例并发操作
- 添加操作录制和回放功能
- 实现自动化脚本导出(如Selenium代码生成)
- 添加语音输入支持
- 构建Chrome扩展版本
项目代码
下载链接
