亚马逊商品数据实时获取方案:API 接口开发与安全接入实践
在当今数字化电商时代,获取亚马逊商品的实时数据对于市场分析、竞品监控和商业决策至关重要。本文将深入探讨如何通过亚马逊 API 接口开发实现商品数据的实时获取,包括安全接入实践和完整的代码实现。
亚马逊 API 概述与接入准备
亚马逊提供了多种 API 接口供开发者使用,主要包括:
- 亚马逊产品 API(Product Advertising API) - 用于获取商品信息、价格、评论等数据
- 亚马逊卖家 API(Marketplace Web Service,MWS) - 面向卖家的 API,提供订单管理、库存管理等功能
- 亚马逊广告 API(Advertising API) - 用于管理亚马逊广告活动
本文主要聚焦于产品广告 API,因为它是获取商品详情最直接的方式。
接入亚马逊 API 前,需要完成以下准备工作:
- 注册账户(Associates Program)
- 申请 API 访问权限
- 获取 API 密钥(Access Key 和 Secret Key)
- 注册 AWS 账户(部分地区需要)
安全认证机制详解
亚马逊 API 使用 HMAC-SHA256 算法进行请求签名认证,这是一种安全的认证方式,确保请求的真实性和完整性。认证流程主要包括:
- 构建规范化请求字符串
- 创建待签名字符串
- 计算 HMAC-SHA256 签名
- 将签名添加到请求参数中
下面是一个完整的 Python 实现,展示如何构建和发送安全的请求到亚马逊 API:
import hashlib
import hmac
import time
import urllib.parse
import requests
from datetime import datetime
import logging
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
from dotenv import load_dotenv# 加载环境变量
load_dotenv()# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("amazon_api.log"),logging.StreamHandler()]
)
logger = logging.getLogger("AmazonAPIClient")class AmazonAPIClient:def __init__(self, access_key=None, secret_key=None, associate_tag=None, region='US', timeout=10, max_retries=3, backoff_factor=1):"""初始化亚马逊API客户端"""# 从环境变量或参数获取凭证self.access_key = access_key or os.getenv("AMAZON_ACCESS_KEY")self.secret_key = secret_key or os.getenv("AMAZON_SECRET_KEY")self.associate_tag = associate_tag or os.getenv("AMAZON_ASSOCIATE_TAG")# 验证凭证if not all([self.access_key, self.secret_key, self.associate_tag]):raise ValueError("亚马逊API凭证不完整,请提供access_key, secret_key和associate_tag")self.region = regionself.timeout = timeoutself.max_retries = max_retriesself.backoff_factor = backoff_factor# 创建会话并配置重试机制self.session = requests.Session()retry_strategy = Retry(total=max_retries,backoff_factor=backoff_factor,status_forcelist=[429, 500, 502, 503, 504],allowed_methods=["GET"])adapter = HTTPAdapter(max_retries=retry_strategy)self.session.mount("https://", adapter)# 根据不同区域设置端点self.endpoints = {'US': 'webservices.amazon.com','CA': 'webservices.amazon.ca','UK': 'webservices.amazon.co.uk','DE': 'webservices.amazon.de','FR': 'webservices.amazon.fr','IT': 'webservices.amazon.it','ES': 'webservices.amazon.es','JP': 'webservices.amazon.co.jp','CN': 'webservices.amazon.cn','IN': 'webservices.amazon.in'}self.endpoint = self.endpoints.get(region, self.endpoints['US'])# 设置请求头self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1',}logger.info(f"亚马逊API客户端初始化完成,区域: {region}, 端点: {self.endpoint}")def get_timestamp(self):"""获取当前时间戳,格式为ISO 8601"""return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')def sign_request(self, params):"""使用HMAC-SHA256算法对请求进行签名"""# 添加必需的参数params['AWSAccessKeyId'] = self.access_keyparams['AssociateTag'] = self.associate_tagparams['Timestamp'] = self.get_timestamp()params['Version'] = '2013-08-01'# 按照字典序排序参数sorted_params = sorted(params.items(), key=lambda x: x[0])# 构建规范化请求字符串canonical_query_string = '&'.join([f"{k}={urllib.parse.quote_plus(str(v))}" for k, v in sorted_params])# 创建待签名字符串string_to_sign = f"GET\n{self.endpoint}\n/onca/xml\n{canonical_query_string}"# 计算签名signature = hmac.new(self.secret_key.encode('utf-8'),string_to_sign.encode('utf-8'),hashlib.sha256).digest()# 将签名转换为Base64编码signature = urllib.parse.quote_plus(signature.hex())# 添加签名到参数中params['Signature'] = signaturereturn paramsdef make_request(self, params):"""发送签名后的请求"""# 签名请求signed_params = self.sign_request(params)# 构建请求URLquery_string = '&'.join([f"{k}={urllib.parse.quote_plus(str(v))}" for k, v in signed_params.items()])url = f"https://{self.endpoint}/onca/xml?{query_string}"# 记录请求logger.info(f"发送请求: {url[:100]}..." if len(url) > 100 else f"发送请求: {url}")# 发送请求try:response = self.session.get(url, headers=self.headers, timeout=self.timeout)response.raise_for_status()# 记录请求成功logger.info(f"请求成功,状态码: {response.status_code}")return response.textexcept requests.exceptions.HTTPError as e:logger.error(f"HTTP错误: {e}")logger.error(f"响应内容: {response.text[:500]}..." if len(response.text) > 500 else f"响应内容: {response.text}")return Noneexcept requests.exceptions.ConnectionError as e:logger.error(f"连接错误: {e}")return Noneexcept requests.exceptions.Timeout as e:logger.error(f"请求超时: {e}")return Noneexcept requests.exceptions.RequestException as e:logger.error(f"请求异常: {e}")return Nonedef get_item_info(self, asin, response_group='ItemAttributes,Offers,Images,Reviews'):"""获取单个商品的详细信息"""params = {'Operation': 'ItemLookup','ItemId': asin,'ResponseGroup': response_group}return self.make_request(params)def search_items(self, keywords, search_index='All', response_group='ItemAttributes,Offers,Images', sort='relevance', page=1):"""搜索商品"""params = {'Operation': 'ItemSearch','Keywords': keywords,'SearchIndex': search_index,'ResponseGroup': response_group,'Sort': sort,'ItemPage': str(page)}return self.make_request(params)def get_multiple_items(self, asins, response_group='ItemAttributes,Offers,Images,Reviews'):"""批量获取多个商品的信息"""if not asins:logger.warning("未提供ASIN列表")return []# 限制每次请求的ASIN数量batch_size = 10results = []for i in range(0, len(asins), batch_size):batch = asins[i:i+batch_size]batch_asins = ','.join(batch)logger.info(f"批量请求商品信息,批次 {i//batch_size + 1}/{(len(asins)-1)//batch_size + 1}")xml_response = self.get_item_info(batch_asins)if xml_response:results.append(xml_response)# 避免请求过于频繁time.sleep(1)return results# 使用示例
if __name__ == "__main__":# 从环境变量获取凭证# 或者直接在代码中提供(不推荐在生产环境中使用)ACCESS_KEY = os.getenv("AMAZON_ACCESS_KEY")SECRET_KEY = os.getenv("AMAZON_SECRET_KEY")ASSOCIATE_TAG = os.getenv("AMAZON_ASSOCIATE_TAG")if not all([ACCESS_KEY, SECRET_KEY, ASSOCIATE_TAG]):print("请设置环境变量或在代码中提供亚马逊API凭证")exit(1)# 创建API客户端client = AmazonAPIClient(access_key=ACCESS_KEY,secret_key=SECRET_KEY,associate_tag=ASSOCIATE_TAG,region='US',max_retries=3,backoff_factor=1)# 获取单个商品信息asin = "B07HGGYFZ6" # 示例ASINitem_info = client.get_item_info(asin)if item_info:print(f"成功获取商品 {asin} 的信息")# 这里可以添加解析XML的代码# 搜索商品search_keywords = "wireless headphones"search_results = client.search_items(search_keywords)if search_results:print(f"成功获取搜索结果 (关键词: {search_keywords})")# 这里可以添加解析XML的代码# 批量获取多个商品信息asins = ["B07HGGYFZ6", "B07HGGYFZ7", "B07HGGYFZ8"]batch_results = client.get_multiple_items(asins)if batch_results:print(f"成功批量获取 {len(batch_results)} 个批次的商品信息")# 这里可以添加解析XML的代码
数据解析与处理
API 返回的是 XML 格式数据,需要安全地解析才能提取有用信息。下面是一个安全的数据解析器实现:
import xml.etree.ElementTree as ET
import re
import logging
import html# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AmazonDataParser")class AmazonDataParser:def __init__(self):"""初始化亚马逊数据解析器"""# 定义命名空间self.ns = {'a': 'http://webservices.amazon.com/AWSECommerceService/2013-08-01'}logger.info("亚马逊数据解析器初始化完成")def parse_item_info(self, xml_response):"""安全地解析商品信息响应"""if not xml_response:logger.warning("无XML响应数据")return {'error': 'No XML response data'}try:# 安全地解析XML,防止XXE攻击parser = ET.XMLParser(encoding='utf-8')root = ET.fromstring(xml_response, parser=parser)except ET.ParseError as e:logger.error(f"XML解析错误: {e}")return {'error': f'XML parse error: {e}'}# 检查是否有错误error = root.find('.//a:Error', self.ns)if error is not None:error_code = self._safe_get_text(error, 'a:Code')error_message = self._safe_get_text(error, 'a:Message')logger.error(f"API错误: {error_code} - {error_message}")return {'error': f"{error_code}: {error_message}"}# 提取商品信息item = root.find('.//a:Item', self.ns)if item is None:logger.warning("未找到商品信息")return {'error': 'No item found'}# 提取基本信息result = {'asin': self._safe_get_text(item, 'a:ASIN'),'title': self._sanitize_text(self._safe_get_text(item, './/a:Title')),'url': self._safe_get_text(item, './/a:DetailPageURL'),'brand': self._sanitize_text(self._safe_get_text(item, './/a:Brand')),'model': self._sanitize_text(self._safe_get_text(item, './/a:Model')),'manufacturer': self._sanitize_text(self._safe_get_text(item, './/a:Manufacturer')),'release_date': self._safe_get_text(item, './/a:ReleaseDate'),'product_group': self._safe_get_text(item, './/a:ProductGroup'),'price': None,'currency': None,'availability': None,'image_urls': [],'rating': None,'review_count': None,'features': []}# 提取价格信息offer = item.find('.//a:Offer', self.ns)if offer is not None:price_element = offer.find('.//a:FormattedPrice', self.ns)if price_element is not None:result['price'] = self._sanitize_text(price_element.text)# 提取货币符号currency_element = offer.find('.//a:CurrencyCode', self.ns)if currency_element is not None:result['currency'] = self._safe_get_text(currency_element)# 提取可用性信息availability = offer.find('.//a:Availability', self.ns)if availability is not None:result['availability'] = self._sanitize_text(availability.text)# 提取图片URLimages = item.findall('.//a:ImageSet', self.ns)for image_set in images:image_types = ['LargeImage', 'MediumImage', 'SmallImage']for image_type in image_types:image = image_set.find(f'a:{image_type}', self.ns)if image is not None:url = image.find('a:URL', self.ns)if url is not None and url.text not in result['image_urls']:result['image_urls'].append(self._safe_get_text(url))# 提取评分信息reviews = item.find('.//a:CustomerReviews', self.ns)if reviews is not None:rating = reviews.find('.//a:AverageRating', self.ns)if rating is not None:result['rating'] = self._parse_rating(self._sanitize_text(rating.text))review_count = reviews.find('.//a:TotalReviews', self.ns)if review_count is not None:try:result['review_count'] = int(self._safe_get_text(review_count))except (ValueError, TypeError):logger.warning("无法解析评论数量为整数")# 提取产品特性features = item.findall('.//a:Feature', self.ns)for feature in features:if feature.text:result['features'].append(self._sanitize_text(feature.text.strip()))# 提取技术规格technical_details = {}technical_specs = item.findall('.//a:TechnicalDetail', self.ns)for spec in technical_specs:name = self._safe_get_text(spec, 'a:Name')value = self._safe_get_text(spec, 'a:Value')if name and value:technical_details[self._sanitize_text(name)] = self._sanitize_text(value)if technical_details:result['technical_details'] = technical_detailslogger.info(f"成功解析商品 {result['asin']} 的信息")return resultdef parse_search_results(self, xml_response):"""安全地解析搜索结果响应"""if not xml_response:logger.warning("无XML响应数据")return {'error': 'No XML response data'}try:# 安全地解析XML,防止XXE攻击parser = ET.XMLParser(encoding='utf-8')root = ET.fromstring(xml_response, parser=parser)except ET.ParseError as e:logger.error(f"XML解析错误: {e}")return {'error': f'XML parse error: {e}'}# 检查是否有错误error = root.find('.//a:Error', self.ns)if error is not None:error_code = self._safe_get_text(error, 'a:Code')error_message = self._safe_get_text(error, 'a:Message')logger.error(f"API错误: {error_code} - {error_message}")return {'error': f"{error_code}: {error_message}"}# 提取搜索结果items = root.findall('.//a:Item', self.ns)if not items:logger.warning("未找到搜索结果")return {'error': 'No items found'}results = []for item in items:item_info = {'asin': self._safe_get_text(item, 'a:ASIN'),'title': self._sanitize_text(self._safe_get_text(item, './/a:Title')),'url': self._safe_get_text(item, './/a:DetailPageURL'),'price': self._sanitize_text(self._safe_get_text(item, './/a:FormattedPrice')),'currency': self._safe_get_text(item, './/a:CurrencyCode'),'image_url': self._safe_get_text(item, './/a:MediumImage/a:URL'),'rating': None,'review_count': None}# 提取评分信息reviews = item.find('.//a:CustomerReviews', self.ns)if reviews is not None:rating = reviews.find('.//a:AverageRating', self.ns)if rating is not None:item_info['rating'] = self._parse_rating(self._sanitize_text(rating.text))review_count = reviews.find('.//a:TotalReviews', self.ns)if review_count is not None:try:item_info['review_count'] = int(self._safe_get_text(review_count))except (ValueError, TypeError):logger.warning("无法解析评论数量为整数")results.append(item_info)# 提取总结果数total_results = root.find('.//a:TotalResults', self.ns)total_pages = root.find('.//a:TotalPages', self.ns)result_dict = {'total_results': int(total_results.text) if total_results is not None else len(results),'total_pages': int(total_pages.text) if total_pages is not None else 1,'items': results}logger.info(f"成功解析搜索结果,共 {result_dict['total_results']} 个商品,{len(results)} 个在当前页面")return result_dictdef _safe_get_text(self, element, xpath):"""安全地获取XML元素的文本内容,防止XSS攻击"""result = element.find(xpath, self.ns)if result is not None and result.text:# 对文本进行HTML转义,防止XSSreturn html.escape(result.text.strip())return Nonedef _parse_rating(self, rating_text):"""解析评分文本为数值"""if not rating_text:return None# 使用正则表达式提取评分match = re.search(r'(\d+\.\d+|\d+)', rating_text)if match:try:return float(match.group(1))except ValueError:logger.warning(f"无法解析评分为浮点数: {rating_text}")return Nonedef _sanitize_text(self, text):"""清理和净化文本,防止XSS攻击"""if not text:return text# HTML转义text = html.escape(text)# 移除潜在的恶意脚本标签text = re.sub(r'<script.*?>.*?</script>', '', text, flags=re.IGNORECASE | re.DOTALL)text = re.sub(r'<style.*?>.*?</style>', '', text, flags=re.IGNORECASE | re.DOTALL)# 移除事件处理属性text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', text, flags=re.IGNORECASE)return text# 使用示例
if __name__ == "__main__":# 假设我们已经有了API响应sample_response = """<ItemLookupResponse xmlns="http://webservices.amazon.com/AWSECommerceService/2013-08-01"><OperationRequest><HTTPHeaders><Header Name="UserAgent" Value="python-requests/2.25.1" /></HTTPHeaders><RequestId>12345678-1234-1234-1234-123456789012</RequestId><Arguments><Argument Name="Operation" Value="ItemLookup" /><Argument Name="ResponseGroup" Value="ItemAttributes,Offers,Images,Reviews" /><Argument Name="ItemId" Value="B07HGGYFZ6" /><Argument Name="AWSAccessKeyId" Value="AKIAIOSFODNN7EXAMPLE" /><Argument Name="AssociateTag" Value="yourtag-20" /><Argument Name="Timestamp" Value="2023-01-01T12:00:00Z" /><Argument Name="Version" Value="2013-08-01" /><Argument Name="Signature" Value="EXAMPLE" /></Arguments><RequestProcessingTime>0.0422150000000000</RequestProcessingTime></OperationRequest><Items><Request><IsValid>True</IsValid><ItemLookupRequest><IdType>ASIN</IdType><ItemId>B07HGGYFZ6</ItemId><ResponseGroup>ItemAttributes,Offers,Images,Reviews</ResponseGroup><VariationPage>All</VariationPage></ItemLookupRequest></Request><Item><ASIN>B07HGGYFZ6</ASIN><DetailPageURL>https://www.amazon.com/dp/B07HGGYFZ6</DetailPageURL><ItemAttributes><Binding>Electronics</Binding><Brand>ExampleBrand</Brand><Color>Black</Color><Department>Electronics</Department><IsAdultProduct>false</IsAdultProduct><Label>ExampleLabel</Label><ListPrice><Amount>12999</Amount><CurrencyCode>USD</CurrencyCode><FormattedPrice>$129.99</FormattedPrice></ListPrice><Manufacturer>ExampleManufacturer</Manufacturer><Model>EXAMPLE-MODEL</Model><NumberOfItems>1</NumberOfItems><PackageDimensions><Height Units="hundredths-inches">800</Height><Length Units="hundredths-inches">600</Length><Weight Units="hundredths-pounds">200</Weight><Width Units="hundredths-inches">200</Width></PackageDimensions><PackageQuantity>1</PackageQuantity><PartNumber>EXAMPLE-PN</PartNumber><ProductGroup>Electronics</ProductGroup><ProductTypeName>ELECTRONICS</ProductTypeName><Publisher>ExamplePublisher</Publisher><ReleaseDate>2019-01-01</ReleaseDate><Size>Standard</Size><Studio>ExampleStudio</Studio><Title>Example Product Title</Title><Warranty>1 Year Limited Warranty</Warranty><Feature>Feature 1: High quality sound</Feature><Feature>Feature 2: Wireless connectivity</Feature><Feature>Feature 3: Long battery life</Feature></ItemAttributes><Offers><TotalOffers>2</TotalOffers><TotalOfferPages>1</TotalOfferPages><MoreOffersUrl>https://www.amazon.com/gp/offer-listing/B07HGGYFZ6</MoreOffersUrl><Offer><OfferAttributes><Condition>New</Condition></OfferAttributes><OfferListing><Price><Amount>12999</Amount><CurrencyCode>USD</CurrencyCode><FormattedPrice>$129.99</FormattedPrice></Price><Availability>Usually ships within 24 hours</Availability><AvailabilityAttributes><AvailabilityType>now</AvailabilityType><MinimumHours>0</MinimumHours><MaximumHours>0</MaximumHours></AvailabilityAttributes><IsEligibleForSuperSaverShipping>true</IsEligibleForSuperSaverShipping><IsEligibleForPrime>true</IsEligibleForPrime><OfferListingId>EXAMPLE-OFFER-LISTING-ID</OfferListingId></OfferListing></Offer></Offers><Images><ImageSet Category="primary"><LargeImage><URL>https://m.media-amazon.com/images/I/81abcdefg-h.jpg</URL><Height Units="pixels">500</Height><Width Units="pixels">500</Width></LargeImage><MediumImage><URL>https://m.media-amazon.com/images/I/81abcdefg-h._AC_SX300.jpg</URL><Height Units="pixels">300</Height><Width Units="pixels">300</Width></MediumImage><SmallImage><URL>https://m.media-amazon.com/images/I/81abcdefg-h._AC_SX150.jpg</URL><Height Units="pixels">150</Height><Width Units="pixels">150</Width></SmallImage></ImageSet><ImageSet Category="variant"><LargeImage><URL>https://m.media-amazon.com/images/I/91ijklmno-p.jpg</URL><Height Units="pixels">500</Height><Width Units="pixels">500</Width></LargeImage><MediumImage><URL>https://m.media-amazon.com/images/I/91ijklmno-p._AC_SX300.jpg</URL><Height Units="pixels">300</Height><Width Units="pixels">300</Width></MediumImage><SmallImage><URL>https://m.media-amazon.com/images/I/91ijklmno-p._AC_SX150.jpg</URL><Height Units="pixels">150</Height><Width Units="pixels">150</Width></SmallImage></ImageSet></Images><CustomerReviews><IFrameURL>https://www.amazon.com/reviews/iframe?akid=AKIAIOSFODNN7EXAMPLE&alinkCode=xm2&asin=B07HGGYFZ6&atag=yourtag-20&encoding=UTF8&collapsed=0&format=embedded&language=en_US&showViewpoints=1&sortBy=recent</IFrameURL><AverageRating>4.8 out of 5 stars</AverageRating><TotalReviews>245</TotalReviews></CustomerReviews></Item></Items></ItemLookupResponse>"""parser = AmazonDataParser()result = parser.parse_item_info(sample_response)print("\n=== 解析结果 ===")for key, value in result.items():if key != 'image_urls' and key != 'features' and key != 'technical_details':print(f"{key}: {value}")print("\n=== 图片URL ===")for i, url in enumerate(result['image_urls'], 1):print(f"图片 {i}: {url}")print("\n=== 产品特性 ===")for i, feature in enumerate(result['features'], 1):print(f"特性 {i}: {feature}")if 'technical_details' in result:print("\n=== 技术规格 ===")for key, value in result['technical_details'].items():print(f"{key}: {value}")
数据存储与安全实践
采集到的数据需要安全存储和管理,下面是使用 SQLite 数据库安全存储亚马逊商品信息的实现:
import sqlite3
import os
import logging
from datetime import datetime
import hashlib
import hmac
import secrets
import bcrypt
from dotenv import load_dotenv# 加载环境变量
load_dotenv()# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AmazonDataStorage")class AmazonDataStorage:def __init__(self, db_path="amazon_data.db", encryption_key=None):"""初始化安全的数据存储模块"""self.db_path = db_pathself.encryption_key = encryption_key or os.getenv("DB_ENCRYPTION_KEY")self.conn = Noneself.cursor = None# 确保数据目录存在db_dir = os.path.dirname(db_path)if db_dir and not os.path.exists(db_dir):os.makedirs(db_dir)# 连接数据库并创建表self.connect()self.create_tables()logger.info(f"数据存储模块初始化完成,数据库路径: {db_path}")def connect(self):"""安全地连接到SQLite数据库"""try:self.conn = sqlite3.connect(self.db_path)self.cursor = self.conn.cursor()# 启用外键约束self.cursor.execute("PRAGMA foreign_keys = ON")logger.info("成功连接到数据库")return Trueexcept sqlite3.Error as e:logger.error(f"数据库连接错误: {e}")return Falsedef close(self):"""安全地关闭数据库连接"""if self.conn:self.conn.close()logger.info("数据库连接已关闭")def create_tables(self):"""创建必要的数据库表"""if not self.cursor:logger.error("数据库游标未初始化")return Falsetry:# 创建商品表self.cursor.execute('''CREATE TABLE IF NOT EXISTS products (asin TEXT PRIMARY KEY,title TEXT,url TEXT,brand TEXT,model TEXT,manufacturer TEXT,release_date TEXT,product_group TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,-- 添加完整性检查CHECK (asin IS NOT NULL))''')# 创建价格历史表self.cursor.execute('''CREATE TABLE IF NOT EXISTS price_history (id INTEGER PRIMARY KEY AUTOINCREMENT,asin TEXT,price TEXT,currency TEXT,availability TEXT,recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (asin) REFERENCES products (asin) ON DELETE CASCADE,-- 添加完整性检查CHECK (price IS NOT NULL))''')# 创建图片表self.cursor.execute('''CREATE TABLE IF NOT EXISTS product_images (id INTEGER PRIMARY KEY AUTOINCREMENT,asin TEXT,image_url TEXT,image_type TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (asin) REFERENCES products (asin) ON DELETE CASCADE,-- 添加完整性检查CHECK (image_url IS NOT NULL))''')# 创建特性表self.cursor.execute('''CREATE TABLE IF NOT EXISTS product_features (id INTEGER PRIMARY KEY AUTOINCREMENT,asin TEXT,feature TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (asin) REFERENCES products (asin) ON DELETE CASCADE,-- 添加完整性检查CHECK (feature IS NOT NULL))''')# 创建技术规格表self.cursor.execute('''CREATE TABLE IF NOT EXISTS technical_specs (id INTEGER PRIMARY KEY AUTOINCREMENT,asin TEXT,spec_name TEXT,spec_value TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (asin) REFERENCES products (asin) ON DELETE CASCADE,-- 添加完整性检查CHECK (spec_name IS NOT NULL))''')# 创建用户表(用于权限控制)self.cursor.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,username TEXT UNIQUE,password_hash TEXT,api_key TEXT UNIQUE,api_secret TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,-- 添加完整性检查CHECK (username IS NOT NULL),CHECK (password_hash IS NOT NULL),CHECK (api_key IS NOT NULL),CHECK (api_secret IS NOT NULL))''')# 创建API访问日志表self.cursor.execute('''CREATE TABLE IF NOT EXISTS api_logs (id INTEGER PRIMARY KEY AUTOINCREMENT,user_id INTEGER,api_key TEXT,operation TEXT,parameters TEXT,timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,success BOOLEAN,response_code INTEGER,FOREIGN KEY (user_id) REFERENCES users (id))''')self.conn.commit()logger.info("成功创建所有必要的表")return Trueexcept sqlite3.Error as e:logger.error(f"创建表时出错: {e}")self.conn.rollback()return Falsedef save_product(self, product_data):"""安全地保存商品信息"""if not product_data or 'asin' not in product_data:logger.warning("无效的商品数据")return Falsetry:# 开始事务self.conn.execute("BEGIN")# 插入或更新商品基本信息self.cursor.execute('''INSERT OR REPLACE INTO products (asin, title, url, brand, model, manufacturer, release_date, product_group, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)''', (product_data.get('asin'),product_data.get('title'),product_data.get('url'),product_data.get('brand'),product_data.get('model'),product_data.get('manufacturer'),product_data.get('release_date'),product_data.get('product_group')))# 保存价格信息if 'price' in product_data and product_data['price']:self.cursor.execute('''INSERT INTO price_history (asin, price, currency, availability)VALUES (?, ?, ?, ?)''', (product_data.get('asin'),product_data.get('price'),product_data.get('currency'),product_data.get('availability')))# 保存图片URLif 'image_urls' in product_data and product_data['image_urls']:for url in product_data['image_urls']:# 确定图片类型image_type = 'primary'if '_AC_SX' in url:if '_AC_SX300' in url:image_type = 'medium'elif '_AC_SX150' in url:image_type = 'small'self.cursor.execute('''INSERT OR IGNORE INTO product_images (asin, image_url, image_type)VALUES (?, ?, ?)''', (product_data.get('asin'), url, image_type))# 保存产品特性if 'features' in product_data and product_data['features']:for feature in product_data['features']:self.cursor.execute('''INSERT OR IGNORE INTO product_features (asin, feature)VALUES (?, ?)''', (product_data.get('asin'), feature))# 保存技术规格if 'technical_details' in product_data and product_data['technical_details']:for name, value in product_data['technical_details'].items():self.cursor.execute('''INSERT OR IGNORE INTO technical_specs (asin, spec_name, spec_value)VALUES (?, ?, ?)''', (product_data.get('asin'), name, value))# 提交事务self.conn.commit()logger.info(f"成功保存商品信息: {product_data.get('asin')}")return Trueexcept sqlite3.Error as e:logger.error(f"保存商品信息时出错: {e}")self.conn.rollback()return Falsedef get_product(self, asin):"""安全地获取商品信息"""try:# 获取基本信息self.cursor.execute('''SELECT asin, title, url, brand, model, manufacturer, release_date, product_group, created_at, updated_atFROM productsWHERE asin = ?''', (asin,))product = self.cursor.fetchone()if not product:logger.warning(f"未找到商品: {asin}")return None# 转换为字典columns = [desc[0] for desc in self.cursor.description]product_data = dict(zip(columns, product))# 获取最新价格self.cursor.execute('''SELECT price, currency, availability, recorded_atFROM price_historyWHERE asin = ?ORDER BY recorded_at DESCLIMIT 1''', (asin,))price_info = self.cursor.fetchone()if price_info:columns = [desc[0] for desc in self.cursor.description]product_data.update(dict(zip(columns, price_info)))# 获取图片self.cursor.execute('''SELECT image_url, image_typeFROM product_imagesWHERE asin = ?ORDER BY image_type DESC''', (asin,))images = self.cursor.fetchall()if images:product_data['image_urls'] = [img[0] for img in images]product_data['images'] = [{'url': img[0], 'type': img[1]} for img in images]# 获取特性self.cursor.execute('''SELECT featureFROM product_featuresWHERE asin = ?ORDER BY id''', (asin,))features = self.cursor.fetchall()if features:product_data['features'] = [feature[0] for feature in features]# 获取技术规格self.cursor.execute('''SELECT spec_name, spec_valueFROM technical_specsWHERE asin = ?ORDER BY id''', (asin,))specs = self.cursor.fetchall()if specs:product_data['technical_details'] = {spec[0]: spec[1] for spec in specs}logger.info(f"成功获取商品信息: {asin}")return product_dataexcept sqlite3.Error as e:logger.error(f"获取商品信息时出错: {e}")return Nonedef create_user(self, username, password):"""安全地创建用户"""if not username or not password:logger.warning("用户名和密码不能为空")return Nonetry:# 生成密码哈希password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())# 生成API密钥和密钥api_key = secrets.token_urlsafe(16)api_secret = secrets.token_urlsafe(32)self.cursor.execute('''INSERT INTO users (username, password_hash, api_key, api_secret)VALUES (?, ?, ?, ?)''', (username, password_hash, api_key, api_secret))self.conn.commit()logger.info(f"成功创建用户: {username}")# 返回创建的用户信息(不包括密码哈希)return {'id': self.cursor.lastrowid,'username': username,'api_key': api_key,'api_secret': api_secret}except sqlite3.Error as e:logger.error(f"创建用户时出错: {e}")self.conn.rollback()return Nonedef authenticate_user(self, username, password):"""安全地验证用户"""if not username or not password:logger.warning("用户名和密码不能为空")return Falsetry:self.cursor.execute('''SELECT password_hash FROM users WHERE username = ?''', (username,))result = self.cursor.fetchone()if not result:logger.warning(f"用户不存在: {username}")return Falsestored_hash = result[0]return bcrypt.checkpw(password.encode('utf-8'), stored_hash)except sqlite3.Error as e:logger.error(f"验证用户时出错: {e}")return Falsedef log_api_access(self, user_id, api_key, operation, parameters, success, response_code):"""记录API访问日志"""try:self.cursor.execute('''INSERT INTO api_logs (user_id, api_key, operation, parameters, success, response_code)VALUES (?, ?, ?, ?, ?, ?)''', (user_id, api_key, operation, str(parameters), success, response_code))self.conn.commit()logger.info(f"成功记录API访问: {operation}")return Trueexcept sqlite3.Error as e:logger.error(f"记录API访问时出错: {e}")self.conn.rollback()return False# 使用示例
if __name__ == "__main__":# 创建数据存储实例storage = AmazonDataStorage(db_path="test_amazon_data.db")# 示例商品数据sample_product = {'asin': 'B07HGGYFZ6','title': 'Example Product Title','url': 'https://www.amazon.com/dp/B07HGGYFZ6','brand': 'ExampleBrand','model': 'EXAMPLE-MODEL','manufacturer': 'ExampleManufacturer','release_date': '2019-01-01','product_group': 'Electronics','price': '$129.99','currency': 'USD','availability': 'Usually ships within 24 hours','image_urls': ['https://m.media-amazon.com/images/I/81abcdefg-h.jpg','https://m.media-amazon.com/images/I/81abcdefg-h._AC_SX300.jpg','https://m.media-amazon.com/images/I/81abcdefg-h._AC_SX150.jpg'],'rating': 4.8,'review_count': 245,'features': ['Feature 1: High quality sound','Feature 2: Wireless connectivity','Feature 3: Long battery life'],'technical_details': {'Color': 'Black','Weight': '8.8 ounces','Battery Life': 'Up to 20 hours'}}# 保存商品数据if storage.save_product(sample_product):print(f"成功保存商品: {sample_product['asin']}")# 获取商品数据retrieved_product = storage.get_product(sample_product['asin'])if retrieved_product:print("\n=== 从数据库获取的商品信息 ===")for key, value in retrieved_product.items():if key != 'image_urls' and key != 'features' and key != 'technical_details' and key != 'images':print(f"{key}: {value}")print("\n=== 图片URL ===")if 'image_urls' in retrieved_product:for i, url in enumerate(retrieved_product['image_urls'], 1):print(f"图片 {i}: {url}")print("\n=== 产品特性 ===")if 'features' in retrieved_product:for i, feature in enumerate(retrieved_product['features'], 1):print(f"特性 {i}: {feature}")print("\n=== 技术规格 ===")if 'technical_details' in retrieved_product:for key, value in retrieved_product['technical_details'].items():print(f"{key}: {value}")# 创建用户user = storage.create_user("test_user", "test_password")if user:print(f"\n成功创建用户: {user['username']}")print(f"API Key: {user['api_key']}")print(f"API Secret: {user['api_secret']}")# 验证用户if storage.authenticate_user("test_user", "test_password"):print("用户验证成功")else:print("用户验证失败")# 关闭连接storage.close()
安全最佳实践与注意事项
-
凭证管理:
- 永远不要在代码中硬编码 API 密钥
- 使用环境变量或安全的配置文件存储敏感信息
- 定期轮换 API 密钥
-
请求限制:
- 遵守亚马逊 API 的请求频率限制
- 实现请求队列和限流机制
- 合理使用缓存减少 API 调用
-
数据安全:
- 对敏感数据进行加密存储
- 实现严格的访问控制
- 定期备份数据
-
异常处理:
- 实现完善的错误处理和重试机制
- 记录详细的日志以便排查问题
- 处理 API 限制和服务不可用情况
结论
通过以上完整的实现,我们展示了如何安全地接入亚马逊 API,获取商品数据并进行安全存储。这种方法不仅确保了数据获取的实时性和准确性,还通过多层安全机制保护了 API 凭证和数据的安全。
在实际应用中,你可能需要根据具体需求扩展这些代码,例如添加更多的数据处理功能、实现更复杂的存储方案,或者构建一个完整的 API 服务。但基本的安全原则和实现方法应该保持一致。
记住,遵守亚马逊的服务条款和 API 使用政策是至关重要的,任何违规行为都可能导致 API 访问被限制或终止。