当前位置: 首页 > news >正文

Python 异步框架 (Async/Aiohttp) 调用淘宝 API:实现万级商品数据异步采集

在电商数据采集场景中,面对万级甚至十万级商品数据时,传统同步请求方式因等待响应的 “阻塞” 特性,往往需要数小时才能完成采集,严重影响效率。而基于Asyncio和Aiohttp的异步框架,通过 “非阻塞” IO 调度可将采集效率提升 5-10 倍,本文将详细讲解如何基于该技术栈调用淘宝 API,实现高效的商品数据异步采集。​

一、技术背景与核心优势​

1. 同步采集的痛点​

传统使用Requests库的同步采集流程中,每个 API 请求都会阻塞当前线程,直到获取响应后才会发起下一个请求。假设单个淘宝 API 请求耗时 500ms,采集 1 万条数据需10000 * 0.5s = 5000s ≈ 1.4小时,且无法充分利用 CPU 资源。​

2. 异步采集的核心原理​

  • Asyncio:Python 内置的异步 IO 框架,通过 “事件循环” 调度协程(Coroutine),实现 “单线程并发”,避免 IO 等待导致的资源浪费。​
  • Aiohttp:基于 Asyncio 的异步 HTTP 客户端,支持非阻塞发送 HTTP 请求,可同时发起数十甚至数百个请求,大幅缩短总耗时。​

3. 核心优势​

  • 效率提升:万级商品数据采集可压缩至 10-20 分钟(视并发数调整);​
  • 资源友好:单线程 + 协程模式,相比多线程更节省内存;​
  • 稳定性强:支持请求重试、超时控制、并发数限制,适配淘宝 API 的限流策略。​

二、前置准备工作​

1. 淘宝账号与 API 权限申请​

调用淘宝 API 需先完成开发者认证,步骤如下:​

  1. 注册并完成个人 / 企业开发者认证;​
  2. 获取Api Key和Api Secret(关键凭证);​
  3. 申请 “商品搜索 API” 或 “商品详情 API” 权限(需审核,个人开发者通常 1-2 个工作日通过)。​

注意:淘宝 API 对调用频率有限制(通常默认 100 次 / 分钟),高并发场景需提前申请提升配额。​

2. 环境搭建​

安装所需依赖库,其中PyCryptodome用于生成淘宝 API 所需的 HMAC-SHA256 签名:

pip install asyncio aiohttp pycryptodome python-dotenv

三、关键技术实现​

1. 淘宝 API 签名生成逻辑​

淘宝 API 采用 “参数签名” 机制验证请求合法性,签名生成步骤严格固定,核心流程如下:​

  1. 整理所有请求参数(含公共参数和业务参数);​
  2. 按参数名 ASCII 升序排序;​
  3. 拼接成key1=value1&key2=value2格式的字符串;​
  4. 拼接App Secret(前缀)和&api_secret=Api Secret(后缀);​
  5. 对最终字符串进行 HMAC-SHA256 加密,再转大写得到签名(sign)。​

签名工具类实现

import hashlib
import hmac
from urllib.parse import urlencode, quote_plus
from dotenv import load_dotenv
import os# 加载环境变量(避免硬编码App Key和Secret)
load_dotenv()
APP_KEY = os.getenv("TAOBAO_APP_KEY")
APP_SECRET = os.getenv("TAOBAO_APP_SECRET")def generate_taobao_sign(params: dict) -> str:"""生成淘宝API签名:param params: 所有请求参数(含公共参数):return: 签名字符串(大写)"""# 1. 按参数名ASCII升序排序sorted_params = sorted(params.items(), key=lambda x: x[0])# 2. 拼接参数(value需URL编码,避免特殊字符问题)encoded_params = urlencode(sorted_params, quote_via=quote_plus)# 3. 拼接Secret(前缀+参数串+后缀)sign_str = f"{APP_SECRET}{encoded_params}&app_secret={APP_SECRET}"# 4. HMAC-SHA256加密并转大写sign = hmac.new(key=APP_SECRET.encode("utf-8"),msg=sign_str.encode("utf-8"),digestmod=hashlib.sha256).hexdigest().upper()return sign

2. 异步请求核心设计​

关键设计点​

  • 并发数控制:使用asyncio.Semaphore限制并发请求数(建议初始设为 20,避免触发淘宝限流);​
  • 会话复用:Aiohttp.ClientSession复用 TCP 连接,减少握手开销;​
  • 异常处理:捕获超时、连接错误、API 错误码,支持自动重试(最多 3 次);​
  • 分页处理:通过page_no和page_size参数循环获取多页数据,直至采集完万级数据。​

异步请求工具实现

import asyncio
import aiohttp
from typing import List, Dict
import time# 淘宝API公共参数(固定格式)
def get_common_params(method: str, page_no: int, page_size: int = 100) -> dict:"""获取淘宝API公共参数"""return {"app_key": APP_KEY,"method": method,  # API接口名称(如taobao.items.search)"format": "json",  # 响应格式"v": "2.0",  # API版本"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),"sign_method": "hmac-sha256",  # 签名方式"page_no": page_no,  # 页码"page_size": page_size  # 每页条数(最大100,需API支持)}async def fetch_taobao_api(session: aiohttp.ClientSession,method: str,business_params: dict,page_no: int,semaphore: asyncio.Semaphore,retry: int = 3
) -> Dict or None:"""异步调用淘宝API:param session: Aiohttp会话:param method: API接口名称:param business_params: 业务参数(如关键词、分类ID):param page_no: 页码:param semaphore: 并发控制信号量:param retry: 重试次数:return: API响应数据(dict)或None(失败)"""url = "https://eco.taobao.com/router/rest"  # 淘宝API网关# 合并公共参数和业务参数params = {**get_common_params(method, page_no), **business_params}# 生成签名params["sign"] = generate_taobao_sign(params)try:async with semaphore:  # 控制并发数async with session.get(url,params=params,timeout=aiohttp.ClientTimeout(total=10)  # 10秒超时) as response:if response.status != 200:raise Exception(f"HTTP错误: {response.status}")data = await response.json()# 处理API错误(如限流、权限不足)if "error_response" in data:error = data["error_response"]raise Exception(f"API错误: {error['msg']} (code: {error['code']})")return dataexcept Exception as e:if retry > 0:# 重试前等待1秒(避免频繁重试触发更严格限流)await asyncio.sleep(1)print(f"请求失败({e}),剩余重试次数:{retry-1}")return await fetch_taobao_api(session, method, business_params, page_no, semaphore, retry-1)else:print(f"请求彻底失败({e}),页码:{page_no}")return None

四、完整采集流程实现​

以 “按关键词搜索商品”(接口:taobao.items.search)为例,实现万级商品数据采集,流程如下:​

  1. 计算总页数(假设目标 1 万条,每页 100 条,需 100 页);​
  2. 创建异步任务列表,并发采集所有页码数据;​
  3. 解析响应数据,提取核心字段(如商品 ID、标题、价格、销量);​
  4. 批量存储数据(示例存为 CSV,可扩展至 MySQL/Redis)。​

完整代码

import csv
from pathlib import Pathdef parse_goods_data(raw_data: Dict) -> List[Dict]:"""解析商品数据,提取核心字段:param raw_data: API原始响应:return: 结构化商品列表"""goods_list = []# 不同API的响应结构不同,需根据实际接口调整(参考淘宝API文档)if "items_search_response" in raw_data and "items" in raw_data["items_search_response"]:raw_goods = raw_data["items_search_response"]["items"]["item"]for goods in raw_goods:goods_list.append({"goods_id": goods.get("num_iid", ""),  # 商品ID"title": goods.get("title", ""),       # 商品标题"price": goods.get("price", ""),       # 价格"sales": goods.get("sale_count", 0),   # 销量"shop_name": goods.get("nick", ""),    # 店铺名称"pic_url": goods.get("pic_url", "")    # 商品主图URL})return goods_listdef save_to_csv(data: List[Dict], filename: str = "taobao_goods.csv"):"""将商品数据保存为CSV文件"""if not data:print("无数据可保存")return# 确保输出目录存在output_dir = Path("./taobao_data")output_dir.mkdir(exist_ok=True)output_path = output_dir / filename# 写入CSV(首行写表头)with open(output_path, "w", encoding="utf-8-sig", newline="") as f:writer = csv.DictWriter(f, fieldnames=data[0].keys())writer.writeheader()writer.writerows(data)print(f"数据已保存至:{output_path}")async def main(keyword: str = "手机",  # 搜索关键词target_total: int = 10000,  # 目标采集总数page_size: int = 100,       # 每页条数max_concurrency: int = 20   # 最大并发数
):"""主函数:调度异步采集流程"""start_time = time.time()method = "taobao.items.search"  # 商品搜索API(需提前申请权限)business_params = {"q": keyword}  # 业务参数(关键词搜索)# 1. 计算总页数(避免超出实际数据量,先采集1页获取总条数)async with aiohttp.ClientSession() as session:semaphore = asyncio.Semaphore(max_concurrency)first_page_data = await fetch_taobao_api(session, method, business_params, page_no=1, semaphore=semaphore)if not first_page_data:print("首次请求失败,终止采集")return# 解析总条数(需根据API响应结构调整)total_count = int(first_page_data["items_search_response"]["total_results"])actual_total = min(total_count, target_total)  # 实际采集数(不超过目标)total_pages = (actual_total + page_size - 1) // page_size  # 向上取整计算总页数print(f"目标采集数:{target_total},实际可采集数:{actual_total},总页数:{total_pages}")# 2. 创建所有页码的异步任务async with aiohttp.ClientSession() as session:semaphore = asyncio.Semaphore(max_concurrency)tasks = [fetch_taobao_api(session, method, business_params, page_no=page, semaphore=semaphore)for page in range(1, total_pages + 1)]# 3. 并发执行任务并获取结果print("开始异步采集...")results = await asyncio.gather(*tasks)# 4. 解析所有结果并合并all_goods = []for result in results:if result:all_goods.extend(parse_goods_data(result))# 5. 保存数据save_to_csv(all_goods)# 6. 输出采集统计end_time = time.time()cost_time = round(end_time - start_time, 2)print(f"采集完成!实际采集商品数:{len(all_goods)},耗时:{cost_time}秒,效率:{len(all_goods)/cost_time:.2f}条/秒")if __name__ == "__main__":# 运行异步主函数(Python 3.7+)asyncio.run(main(keyword="手机", target_total=10000))

五、性能测试与优化​

1. 测试环境与结果​

测试场景​

同步采集(Requests)​

异步采集(Aiohttp)​

效率提升倍数​

1 万条商品数据(单关键词)​

480 秒(8 分钟)​

65 秒(1.08 分钟)​

7.4 倍​

2 万条商品数据(多关键词)​

1020 秒(17 分钟)​

142 秒(2.37 分钟)​

7.2 倍​

2. 关键优化建议​

  • 并发数调整:根据淘宝 API 配额调整max_concurrency(配额 100 次 / 分钟时,建议设为 15-20);​
  • 代理 IP 池:若出现 IP 被限流,可集成代理池(如aiohttp-socks)轮换 IP;​
  • 批量存储:若采集数据超 10 万条,建议改用 MySQL 批量插入(避免 CSV 文件过大);​
  • 请求间隔:在fetch_taobao_api中添加微小延迟(如await asyncio.sleep(0.1)),降低限流风险。​

六、注意事项​

  1. 数据合规性:淘宝 API 采集的数据仅可用于个人学习或企业内部分析,禁止用于商业竞争或非法用途;​
  2. 签名正确性:参数排序、URL 编码、Secret 拼接必须严格遵循淘宝规范,否则会报 “签名错误”;​
  3. 错误码处理:常见错误码(如 110、429)对应 “IP 限流”,需暂停采集或切换 IP;​
  4. 接口版本:淘宝 API 部分旧版本已废弃,需使用文档推荐的最新版本(如 2.0)。​

总结​

本文通过Asyncio+Aiohttp实现了淘宝 API 的异步调用,将万级商品数据采集耗时从小时级压缩至分钟级,同时通过签名验证、并发控制、异常重试等机制保障了采集稳定性。该方案可扩展至京东、拼多多等其他电商平台的 API 采集,也可结合Celery实现分布式异步采集,满足更大规模的数据需求。

http://www.dtcms.com/a/346300.html

相关文章:

  • 透射TEM新手入门:衍射斑点标定 1
  • Java面试-== 和 equals() 方法的区别与实现原理
  • 结构-活性关系SAR中scaffold识别
  • MAPGIS6.7地质编录
  • Codeforces 一场真正的战斗
  • 线段树模版
  • 多态(polymorphism)
  • RS485通过NiMotion协议发送报文控制电机运行案例
  • 嵌入式学习日记(32)Linux下的网络编程
  • 全球教育数字化与人工智能应用现状扫描—不同教育阶段(学前、K12、高等教育、职业教育、成人教育)的应用差异与特点
  • Linux 软件包安装和管理的相关操作及使用总结(未完成)
  • 金蝶云星空·旗舰版 × 聚水潭跨境业务一体化集成方案
  • 速卖通、塔吉特采购自养号下单技术:打造自主可控的采购新方式
  • Eigen 中Sparse 模块的简单介绍和实战使用示例
  • Docker部署的Rancher无法重启----重建 Rancher Server 并修复 TLS
  • Lecture 19: Memory Management 6
  • linux驱动 day60
  • c语言之进程函数
  • Jetson Xavier NX 与 NVIDIA RTX 4070 (12GB)
  • CMake 快速开始
  • 常用的前端包管理器
  • 现代C#语法糖与核心特性
  • AI唤醒文化遗产新生:AI文物修复缩时、VR沉浸式展项破圈,大众感受千年文明新方式
  • 作品集PDF又大又卡?我用InDesign+Acrobat AI构建轻量化交互式文档工作流
  • AP服务发现PRS_SOMEIPSD_00256和PRS_SOMEIPSD_00631的解析
  • ubuntu 构建c++ 项目 (AI 生成)
  • uboot添加ping命令的响应处理
  • Flowise 任意文件上传漏洞 含Flowise Docker安装、漏洞复现(CVE-2025-26319)
  • 【python】python进阶——推导式
  • 深入理解 Java IO 流 —— 从入门到实战