淘宝API数据采集的日志监控与异常报警
以下是关于淘宝API数据采集的日志监控与异常报警的详细技术方案,涵盖日志设计、监控策略、报警实现及代码示例:
一、日志监控设计
1. 日志分级与内容
- 级别划分:
DEBUG
:调试信息(如API请求参数、签名生成过程)INFO
:正常采集记录(如成功获取商品ID、采集时间)WARNING
:潜在问题(如代理IP失效、响应延迟)ERROR
:采集失败(如API限流、网络异常、数据解析错误)CRITICAL
:系统级故障(如数据库连接断开、服务崩溃)
- 日志内容:
{
"timestamp": "2025-03-20 14:30:45",
"level": "ERROR",
"module": "taobao_api_collector",
"message": "API请求失败: {'code': 40003, 'msg': 'Invalid signature'}",
"request_id": "abc123", # 唯一请求ID用于追踪
"stack_trace": "Traceback (most recent call last):\n..."
}
2. 日志存储方案
- 文件存储:按日期滚动存储(如
/logs/taobao_20250320.log
) - 结构化存储:使用JSON格式便于后续分析(推荐
loguru
库)from loguru import logger
logger.add(
"logs/taobao_{time:YYYYMMDD}.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
rotation="00:00", # 每天零点滚动
retention="7 days" # 保留7天日志
)
二、异常监控策略
1. 实时监控指标
- API错误率:每分钟ERROR日志占比 > 5%时触发报警
- 响应时间:P99延迟超过3秒时报警
- 数据完整性:连续5次采集未返回预期字段时报警
2. 监控实现方式
- Prometheus + Grafana(推荐):
- 通过
prometheus_client
暴露指标:from prometheus_client import start_http_server, Counter, Histogram
API_ERRORS = Counter("taobao_api_errors", "Total API errors")
REQUEST_LATENCY = Histogram("taobao_api_latency_seconds", "Request latency")
@REQUEST_LATENCY.time()
def fetch_taobao_data():
try:
# API调用逻辑
pass
except Exception as e:
API_ERRORS.inc()
raise
start_http_server(8000) # 暴露指标接口
- 通过
- 简易轮询检查(轻量级方案):
import time
from collections import deque
class Monitor:
def __init__(self):
self.error_queue = deque(maxlen=10) # 记录最近10次状态
self.last_success_time = time.time()
def record_status(self, is_success):
if not is_success:
self.error_queue.append(False)
else:
self.error_queue.append(True)
self.last_success_time = time.time()
def check_alarm(self):
error_rate = sum(1 for x in self.error_queue if not x) / len(self.error_queue)
if error_rate > 0.5: # 错误率>50%
return "HIGH_ERROR_RATE"
if time.time() - self.last_success_time > 300: # 5分钟无成功请求
return "NO_SUCCESS_RESPONSE"
return None
三、异常报警实现
1. 报警渠道
企业微信/钉钉机器人:
import requests
def send_wechat_alert(message, webhook_url):
data = {
"msgtype": "text",
"text": {"content": f"【淘宝API采集异常】\n{message}"}
}
requests.post(webhook_url, json=data)
# 示例调用
send_wechat_alert(
"API签名错误导致采集失败\n错误码: 40003\n时间: 2025-03-20 14:30:45",
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
)
邮件报警(适合非紧急通知):
import smtplib
from email.mime.text import MIMEText
def send_email_alert(subject, content):
msg = MIMEText(content)
msg["Subject"] = subject
msg["From"] = "monitor@example.com"
msg["To"] = "ops@example.com"
with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("username", "password")
server.send_message(msg)
2. 报警升级策略
- 分级报警:
- LEVEL1(WARNING):邮件通知
- LEVEL2(ERROR):企业微信+短信
- LEVEL3(CRITICAL):电话呼叫
- 静默期控制:
class AlarmSilencer:
def __init__(self):
self.last_alarm_time = 0
self.silence_window = 300 # 5分钟静默期
def can_send(self):
if time.time() - self.last_alarm_time > self.silence_window:
self.last_alarm_time = time.time()
return True
return False
四、完整实现示例
1. 带监控的采集器
import requests |
import time |
from loguru import logger |
from prometheus_client import Counter, Histogram, start_http_server |
# 初始化监控指标 |
API_CALLS = Counter("taobao_api_calls_total", "Total API calls") |
API_ERRORS = Counter("taobao_api_errors_total", "Total API errors") |
REQUEST_LATENCY = Histogram("taobao_api_latency_seconds", "Request latency") |
class TaobaoCollector: |
def __init__(self, webhook_url): |
self.webhook_url = webhook_url |
self.monitor = Monitor() |
@REQUEST_LATENCY.time() |
def fetch_item(self, item_id): |
API_CALLS.inc() |
url = "https://gw.api.taobao.com/router/rest" |
params = { |
"method": "taobao.item.get", |
"num_iid": item_id, |
"app_key": "YOUR_APP_KEY", |
"sign": "GENERATED_SIGN", |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") |
} |
try: |
response = requests.get(url, params=params, timeout=10) |
response.raise_for_status() |
self.monitor.record_status(True) |
return response.json() |
except Exception as e: |
error_msg = f"API请求失败: {str(e)}" |
logger.error(error_msg) |
API_ERRORS.inc() |
self.monitor.record_status(False) |
self.check_alarm(error_msg) |
raise |
def check_alarm(self, error_msg): |
alarm_type = self.monitor.check_alarm() |
if alarm_type and AlarmSilencer().can_send(): |
alert_msg = f"检测到异常: {alarm_type}\n详情: {error_msg}" |
send_wechat_alert(alert_msg, self.webhook_url) |
# 启动监控服务 |
start_http_server(8000) |
# 使用示例 |
collector = TaobaoCollector("YOUR_WECHAT_WEBHOOK_URL") |
try: |
data = collector.fetch_item("123456789") |
logger.info(f"成功采集数据: {data}") |
except Exception as e: |
logger.error(f"采集失败: {e}") |
2. 报警消息模板
【淘宝API采集告警】 |
级别: ERROR |
时间: 2025-03-20 14:30:45 |
模块: taobao_api_collector |
错误: API签名验证失败 (错误码: 40003) |
影响范围: 商品ID 123456789 采集失败 |
建议操作: 检查App Secret配置或联系开放平台支持 |
五、最佳实践建议
- 日志压缩:对历史日志进行gzip压缩,节省存储空间
- 报警收敛:同一问题5分钟内只发一次报警
- 值班关联:在报警消息中附带值班人员联系方式
- 演练测试:每月进行一次故障模拟演练,验证报警链路
通过上述方案,可实现淘宝API采集系统的全链路监控,确保在数据异常时第一时间通知运维人员,同时通过结构化日志为问题排查提供充足上下文。