电商数据中台基石:通过 API 构建淘宝商品实时数据源
在电商业务中,实时、准确的商品数据是数据中台的核心资产。本文将详细介绍如何通过淘宝平台 API 构建商品实时数据源,为电商数据中台提供基础支撑,并提供完整的实现代码示例。
一、淘宝平台 API 概述
淘宝平台(Taobao Platform,简称 TOP)提供了丰富的 API 接口,涵盖商品、订单、用户等多个维度。对于商品数据获取,核心 API 包括:
taobao.item.get:获取单个商品详情taobao.items.search:搜索商品列表taobao.itemcats.get:获取商品分类taobao.item.sku.get:获取商品 SKU 信息
这些 API 构成了商品数据采集的基础,通过合理调用可以构建完整的商品数据链路。
二、系统架构设计
构建淘宝商品实时数据源的系统架构主要包含以下组件:
- API 接入层:负责与淘宝平台交互,处理认证、签名和请求发送
- 数据转换层:将 API 返回的原始数据转换为标准化格式
- 存储层:存储实时采集的商品数据(可选用 MySQL、MongoDB 等)
- 监控层:监控 API 调用状态、数据完整性和系统健康度
三、核心实现代码
1. 环境准备
首先需要安装必要的依赖库:
pip install top-api-sdk-python requests pymysql python-dotenv2. 配置文件
创建.env配置文件存储关键信息:
APP_KEY=你的淘宝开放平台APP_KEY
APP_SECRET=你的淘宝开放平台APP_SECRET
ACCESS_TOKEN=你的访问令牌
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=password
MYSQL_DB=taobao_product3. API 调用工具类
import os
import time
import hmac
import hashlib
import base64
import requests
from urllib.parse import urlencode
from dotenv import load_dotenv# 加载环境变量
load_dotenv()class TaobaoAPI:def __init__(self):self.app_key = os.getenv('APP_KEY')self.app_secret = os.getenv('APP_SECRET')self.access_token = os.getenv('ACCESS_TOKEN')self.gateway_url = "http://gw.api.taobao.com/router/rest"def _sign(self, params):"""生成签名"""sorted_params = sorted(params.items(), key=lambda x: x[0])query_string = urlencode(sorted_params)sign_str = self.app_secret + query_string + self.app_secretsign = hmac.new(sign_str.encode('utf-8'), digestmod=hashlib.sha1).digest()return base64.b64encode(sign).decode('utf-8')def call(self, method, params=None):"""调用API"""if params is None:params = {}# 公共参数public_params = {"app_key": self.app_key,"format": "json","method": method,"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),"v": "2.0","sign_method": "hmac-sha1","session": self.access_token}# 合并参数all_params = {**public_params,** params}# 生成签名all_params["sign"] = self._sign(all_params)try:response = requests.get(self.gateway_url, params=all_params, timeout=10)response.raise_for_status()result = response.json()# 检查是否有错误if "error_response" in result:error = result["error_response"]raise Exception(f"API Error: {error['msg']} (code: {error['code']})")return resultexcept Exception as e:print(f"API调用失败: {str(e)}")return None4. 数据存储工具类
import pymysql
from pymysql.cursors import DictCursor
import osclass DBHelper:def __init__(self):self.connection = pymysql.connect(host=os.getenv('MYSQL_HOST'),port=int(os.getenv('MYSQL_PORT')),user=os.getenv('MYSQL_USER'),password=os.getenv('MYSQL_PASSWORD'),db=os.getenv('MYSQL_DB'),charset='utf8mb4',cursorclass=DictCursor)def create_table(self):"""创建商品表"""with self.connection.cursor() as cursor:sql = """CREATE TABLE IF NOT EXISTS taobao_products (num_iid BIGINT PRIMARY KEY,title VARCHAR(255) NOT NULL,nick VARCHAR(100),price DECIMAL(10,2),sales INT,stock INT,category_id BIGINT,created_time DATETIME,modified_time DATETIME,raw_data TEXT,last_sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"""cursor.execute(sql)self.connection.commit()def save_product(self, product_data):"""保存商品数据"""try:with self.connection.cursor() as cursor:# 检查商品是否已存在cursor.execute("SELECT num_iid FROM taobao_products WHERE num_iid = %s", (product_data['num_iid'],))exists = cursor.fetchone()if exists:# 更新商品sql = """UPDATE taobao_products SET title=%s, nick=%s, price=%s, sales=%s, stock=%s, category_id=%s, modified_time=%s, raw_data=%sWHERE num_iid=%s"""else:# 插入新商品sql = """INSERT INTO taobao_products (num_iid, title, nick, price, sales, stock, category_id, created_time, modified_time, raw_data)VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""# 提取需要的字段params = (product_data.get('title', ''),product_data.get('nick', ''),product_data.get('price', 0),product_data.get('sale_count', 0),product_data.get('stock', 0),product_data.get('cid', 0),product_data.get('modified', None),str(product_data), # 存储原始数据product_data.get('num_iid'))if not exists:params = (product_data.get('num_iid'),) + params[:-1]cursor.execute(sql, params)self.connection.commit()return Trueexcept Exception as e:print(f"保存商品失败: {str(e)}")self.connection.rollback()return Falsedef close(self):"""关闭数据库连接"""self.connection.close()5. 商品数据同步服务
import json
import time
from taobao_api import TaobaoAPI
from db_helper import DBHelperclass ProductSyncService:def __init__(self):self.api = TaobaoAPI()self.db = DBHelper()# 初始化数据库表self.db.create_table()def sync_product(self, num_iid):"""同步单个商品数据"""print(f"开始同步商品: {num_iid}")# 调用淘宝API获取商品详情result = self.api.call("taobao.item.get",{"num_iid": num_iid,"fields": "num_iid,title,nick,price,sale_count,stock,cid,created,modified"})if result and "item_get_response" in result:product_data = result["item_get_response"]["item"]# 保存商品数据到数据库success = self.db.save_product(product_data)if success:print(f"商品 {num_iid} 同步成功")return Trueprint(f"商品 {num_iid} 同步失败")return Falsedef batch_sync_products(self, num_iids, interval=2):"""批量同步商品数据"""success_count = 0for num_iid in num_iids:if self.sync_product(num_iid):success_count += 1# 控制API调用频率,避免触发限流time.sleep(interval)print(f"批量同步完成,成功 {success_count}/{len(num_iids)}")return success_countdef search_and_sync(self, keyword, page=1, page_size=20):"""搜索商品并同步"""print(f"搜索商品: {keyword}, 第 {page} 页")result = self.api.call("taobao.items.search",{"q": keyword,"page_no": page,"page_size": page_size,"fields": "num_iid,title,nick,price,sale_count,stock,cid,created,modified"})if result and "items_search_response" in result:items = result["items_search_response"]["items"]["item"]num_iids = [item["num_iid"] for item in items]return self.batch_sync_products(num_iids)return 0def close(self):"""关闭资源"""self.db.close()if __name__ == "__main__":sync_service = ProductSyncService()try:# 示例1:同步单个商品sync_service.sync_product(572060726845)# 示例2:批量同步商品# sync_service.batch_sync_products([572060726845, 614662268993, 634862718573])# 示例3:搜索并同步商品# sync_service.search_and_sync("手机", page=1, page_size=10)finally:sync_service.close()四、数据中台集成建议
- 增量同步策略:通过维护商品最后更新时间,实现增量同步,减少 API 调用量
- 缓存机制:对频繁访问的商品数据进行缓存,提高查询性能
- 限流控制:严格遵守淘宝 API 的调用频率限制,避免账号被封
- 异常重试:实现失败重试机制,保证数据完整性
- 监控告警:建立 API 调用成功率、数据同步延迟等指标的监控告警
五、总结
通过淘宝平台 API 构建商品实时数据源,是电商数据中台建设的重要基础。本文提供的方案实现了从 API 调用、数据转换到存储的完整链路,可根据实际业务需求进行扩展。
在实际应用中,还需要考虑数据安全、API 权限管理、数据清洗等问题,以确保数据源的稳定、可靠和高质量,为后续的数据分析、业务决策提供有力支持。
