亚马逊 API 接口开发:解锁商品详情页实时数据(接入流程解析)
在电商数据分析和竞品监控领域,获取亚马逊商品详情页的实时数据是一项核心需求。本文将详细介绍如何通过亚马逊 API 接口开发来获取这些有价值的数据,包括接入流程、认证机制和代码实现。
亚马逊 API 简介
亚马逊提供了多种 API 接口,其中最常用的是亚马逊产品广告 API(Amazon Product Advertising API)和亚马逊市场卖家 API(Amazon Marketplace Web Service, MWS)。本文主要围绕产品广告 API 展开,该 API 允许开发者访问亚马逊商品信息、价格、评论等数据。
接入准备工作
在开始开发前,需要完成以下准备工作:
- 注册账户(Associates Program)
- 获取 API 密钥(Access Key 和 Secret Key)
- 注册 AWS 账户(如果需要)
- 了解 API 调用限制和费用(部分功能需要付费)
认证机制
亚马逊 API 使用 HMAC-SHA256 算法进行请求签名认证,主要包含以下步骤:
- 构建规范化请求字符串
- 创建待签名字符串
- 计算签名
- 将签名添加到请求参数中
下面是一个完整的 Python 实现,展示如何构建和发送请求到亚马逊 API:
import hashlib
import hmac
import time
import urllib.parse
import requests
from datetime import datetimeclass AmazonAPIClient:def __init__(self, access_key, secret_key, associate_tag, region='US'):"""初始化亚马逊API客户端"""self.access_key = access_keyself.secret_key = secret_keyself.associate_tag = associate_tagself.region = region# 根据不同区域设置端点self.endpoints = {'US': 'webservices.amazon.com','CA': 'webservices.amazon.ca','UK': 'webservices.amazon.co.uk','DE': 'webservices.amazon.de','FR': 'webservices.amazon.fr','IT': 'webservices.amazon.it','ES': 'webservices.amazon.es','JP': 'webservices.amazon.co.jp','CN': 'webservices.amazon.cn','IN': 'webservices.amazon.in'}self.endpoint = self.endpoints.get(region, self.endpoints['US'])def get_timestamp(self):"""获取当前时间戳,格式为ISO 8601"""return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')def sign_request(self, params):"""使用HMAC-SHA256算法对请求进行签名"""# 添加必需的参数params['AWSAccessKeyId'] = self.access_keyparams['AssociateTag'] = self.associate_tagparams['Timestamp'] = self.get_timestamp()params['Version'] = '2013-08-01'# 按照字典序排序参数sorted_params = sorted(params.items(), key=lambda x: x[0])# 构建规范化请求字符串canonical_query_string = '&'.join([f"{k}={urllib.parse.quote_plus(str(v))}" for k, v in sorted_params])# 创建待签名字符串string_to_sign = f"GET\n{self.endpoint}\n/onca/xml\n{canonical_query_string}"# 计算签名signature = hmac.new(self.secret_key.encode('utf-8'),string_to_sign.encode('utf-8'),hashlib.sha256).digest()# 将签名转换为Base64编码signature = urllib.parse.quote_plus(signature.hex())# 添加签名到参数中params['Signature'] = signaturereturn paramsdef get_item_info(self, asin, response_group='ItemAttributes,Offers,Images,Reviews'):"""获取单个商品的详细信息"""params = {'Operation': 'ItemLookup','ItemId': asin,'ResponseGroup': response_group}# 签名请求signed_params = self.sign_request(params)# 构建请求URLquery_string = '&'.join([f"{k}={urllib.parse.quote_plus(str(v))}" for k, v in signed_params.items()])url = f"https://{self.endpoint}/onca/xml?{query_string}"# 发送请求try:response = requests.get(url)response.raise_for_status()return response.textexcept requests.exceptions.RequestException as e:print(f"请求出错: {e}")return Nonedef search_items(self, keywords, search_index='All', response_group='ItemAttributes,Offers,Images'):"""搜索商品"""params = {'Operation': 'ItemSearch','Keywords': keywords,'SearchIndex': search_index,'ResponseGroup': response_group}# 签名请求signed_params = self.sign_request(params)# 构建请求URLquery_string = '&'.join([f"{k}={urllib.parse.quote_plus(str(v))}" for k, v in signed_params.items()])url = f"https://{self.endpoint}/onca/xml?{query_string}"# 发送请求try:response = requests.get(url)response.raise_for_status()return response.textexcept requests.exceptions.RequestException as e:print(f"请求出错: {e}")return None# 使用示例
if __name__ == "__main__":# 替换为你的API凭证ACCESS_KEY = "YOUR_ACCESS_KEY"SECRET_KEY = "YOUR_SECRET_KEY"ASSOCIATE_TAG = "YOUR_ASSOCIATE_TAG"# 创建API客户端client = AmazonAPIClient(ACCESS_KEY, SECRET_KEY, ASSOCIATE_TAG)# 获取单个商品信息asin = "B07HGGYFZ6" # 示例ASINitem_info = client.get_item_info(asin)print(f"商品 {asin} 的信息:\n{item_info}")# 搜索商品search_keywords = "wireless headphones"search_results = client.search_items(search_keywords)print(f"搜索关键词 '{search_keywords}' 的结果:\n{search_results}")
解析 API 响应数据
亚马逊 API 返回的是 XML 格式的数据,我们可以使用 Python 的 ElementTree 库来解析:
import xml.etree.ElementTree as ETclass AmazonResponseParser:def __init__(self):# 定义命名空间self.ns = {'a': 'http://webservices.amazon.com/AWSECommerceService/2013-08-01'}def parse_item_info(self, xml_response):"""解析商品信息响应"""root = ET.fromstring(xml_response)# 检查是否有错误error = root.find('.//a:Error', self.ns)if error is not None:error_code = error.find('a:Code', self.ns).texterror_message = error.find('a:Message', self.ns).textreturn {'error': f"{error_code}: {error_message}"}# 提取商品信息item = root.find('.//a:Item', self.ns)if item is None:return {'error': 'No item found'}result = {'asin': item.find('a:ASIN', self.ns).text,'title': item.find('.//a:Title', self.ns).text,'url': item.find('.//a:DetailPageURL', self.ns).text,'price': None,'currency': None,'image_url': None}# 提取价格信息offer = item.find('.//a:Offer', self.ns)if offer is not None:price_element = offer.find('.//a:FormattedPrice', self.ns)if price_element is not None:result['price'] = price_element.text# 提取货币符号currency_element = offer.find('.//a:CurrencyCode', self.ns)if currency_element is not None:result['currency'] = currency_element.text# 提取主图URLimage = item.find('.//a:LargeImage', self.ns)if image is not None:image_url = image.find('a:URL', self.ns)if image_url is not None:result['image_url'] = image_url.text# 提取评分信息reviews = item.find('.//a:CustomerReviews', self.ns)if reviews is not None:rating = reviews.find('.//a:AverageRating', self.ns)if rating is not None:result['average_rating'] = rating.textreview_count = reviews.find('.//a:TotalReviews', self.ns)if review_count is not None:result['review_count'] = review_count.textreturn resultdef parse_search_results(self, xml_response):"""解析搜索结果响应"""root = ET.fromstring(xml_response)# 检查是否有错误error = root.find('.//a:Error', self.ns)if error is not None:error_code = error.find('a:Code', self.ns).texterror_message = error.find('a:Message', self.ns).textreturn {'error': f"{error_code}: {error_message}"}# 提取搜索结果items = root.findall('.//a:Item', self.ns)if not items:return {'error': 'No items found'}results = []for item in items:result = {'asin': item.find('a:ASIN', self.ns).text,'title': item.find('.//a:Title', self.ns).text,'url': item.find('.//a:DetailPageURL', self.ns).text,'price': None,'currency': None,'image_url': None}# 提取价格信息offer = item.find('.//a:Offer', self.ns)if offer is not None:price_element = offer.find('.//a:FormattedPrice', self.ns)if price_element is not None:result['price'] = price_element.text# 提取货币符号currency_element = offer.find('.//a:CurrencyCode', self.ns)if currency_element is not None:result['currency'] = currency_element.text# 提取主图URLimage = item.find('.//a:MediumImage', self.ns)if image is not None:image_url = image.find('a:URL', self.ns)if image_url is not None:result['image_url'] = image_url.textresults.append(result)# 提取总结果数total_results = root.find('.//a:TotalResults', self.ns)if total_results is not None:return {'total_results': int(total_results.text),'items': results}return {'items': results}# 使用示例
if __name__ == "__main__":# 假设我们已经有了API响应sample_response = """<ItemLookupResponse xmlns="http://webservices.amazon.com/AWSECommerceService/2013-08-01"><OperationRequest><HTTPHeaders><Header Name="UserAgent" Value="python-requests/2.25.1" /></HTTPHeaders><RequestId>12345678-1234-1234-1234-123456789012</RequestId><Arguments><Argument Name="Operation" Value="ItemLookup" /><Argument Name="ResponseGroup" Value="ItemAttributes,Offers,Images,Reviews" /><Argument Name="ItemId" Value="B07HGGYFZ6" /><Argument Name="AWSAccessKeyId" Value="AKIAIOSFODNN7EXAMPLE" /><Argument Name="AssociateTag" Value="yourtag-20" /><Argument Name="Timestamp" Value="2023-01-01T12:00:00Z" /><Argument Name="Version" Value="2013-08-01" /><Argument Name="Signature" Value="EXAMPLE" /></Arguments><RequestProcessingTime>0.0422150000000000</RequestProcessingTime></OperationRequest><Items><Request><IsValid>True</IsValid><ItemLookupRequest><IdType>ASIN</IdType><ItemId>B07HGGYFZ6</ItemId><ResponseGroup>ItemAttributes,Offers,Images,Reviews</ResponseGroup><VariationPage>All</VariationPage></ItemLookupRequest></Request><Item><ASIN>B07HGGYFZ6</ASIN><DetailPageURL>https://www.amazon.com/dp/B07HGGYFZ6</DetailPageURL><ItemAttributes><Binding>Electronics</Binding><Brand>ExampleBrand</Brand><Color>Black</Color><Department>Electronics</Department><IsAdultProduct>false</IsAdultProduct><Label>ExampleLabel</Label><ListPrice><Amount>12999</Amount><CurrencyCode>USD</CurrencyCode><FormattedPrice>$129.99</FormattedPrice></ListPrice><Manufacturer>ExampleManufacturer</Manufacturer><Model>EXAMPLE-MODEL</Model><NumberOfItems>1</NumberOfItems><PackageDimensions><Height Units="hundredths-inches">800</Height><Length Units="hundredths-inches">600</Length><Weight Units="hundredths-pounds">200</Weight><Width Units="hundredths-inches">200</Width></PackageDimensions><PackageQuantity>1</PackageQuantity><PartNumber>EXAMPLE-PN</PartNumber><ProductGroup>Electronics</ProductGroup><ProductTypeName>ELECTRONICS</ProductTypeName><Publisher>ExamplePublisher</Publisher><ReleaseDate>2019-01-01</ReleaseDate><Size>Standard</Size><Studio>ExampleStudio</Studio><Title>Example Product Title</Title><Warranty>1 Year Limited Warranty</Warranty></ItemAttributes><Offers><TotalOffers>2</TotalOffers><TotalOfferPages>1</TotalOfferPages><MoreOffersUrl>https://www.amazon.com/gp/offer-listing/B07HGGYFZ6</MoreOffersUrl><Offer><OfferAttributes><Condition>New</Condition></OfferAttributes><OfferListing><Price><Amount>12999</Amount><CurrencyCode>USD</CurrencyCode><FormattedPrice>$129.99</FormattedPrice></Price><Availability>Usually ships within 24 hours</Availability><AvailabilityAttributes><AvailabilityType>now</AvailabilityType><MinimumHours>0</MinimumHours><MaximumHours>0</MaximumHours></AvailabilityAttributes><IsEligibleForSuperSaverShipping>true</IsEligibleForSuperSaverShipping><IsEligibleForPrime>true</IsEligibleForPrime><OfferListingId>EXAMPLE-OFFER-LISTING-ID</OfferListingId></OfferListing></Offer></Offers><Images><LargeImage><URL>https://m.media-amazon.com/images/I/81abcdefg-h.jpg</URL><Height Units="pixels">500</Height><Width Units="pixels">500</Width></LargeImage><MediumImage><URL>https://m.media-amazon.com/images/I/81abcdefg-h._AC_SX300.jpg</URL><Height Units="pixels">300</Height><Width Units="pixels">300</Width></MediumImage><SmallImage><URL>https://m.media-amazon.com/images/I/81abcdefg-h._AC_SX150.jpg</URL><Height Units="pixels">150</Height><Width Units="pixels">150</Width></SmallImage></Images><CustomerReviews><IFrameURL>https://www.amazon.com/reviews/iframe?akid=AKIAIOSFODNN7EXAMPLE&alinkCode=xm2&asin=B07HGGYFZ6&atag=yourtag-20&encoding=UTF8&collapsed=0&format=embedded&language=en_US&showViewpoints=1&sortBy=recent</IFrameURL><AverageRating>4.8 out of 5 stars</AverageRating><TotalReviews>245</TotalReviews></CustomerReviews></Item></Items></ItemLookupResponse>"""parser = AmazonResponseParser()result = parser.parse_item_info(sample_response)print("解析结果:")for key, value in result.items():print(f"{key}: {value}")
实际应用案例
下面是一个简单的应用示例,展示如何使用上述代码获取商品信息并进行分析:
import time
import csv
import os
from datetime import datetime
from amazon_api_client import AmazonAPIClient
from amazon_response_parser import AmazonResponseParserclass AmazonPriceTracker:def __init__(self, access_key, secret_key, associate_tag, region='US'):"""初始化价格追踪器"""self.client = AmazonAPIClient(access_key, secret_key, associate_tag, region)self.parser = AmazonResponseParser()self.tracked_items = {}self.data_dir = "amazon_data"# 创建数据目录if not os.path.exists(self.data_dir):os.makedirs(self.data_dir)def add_item(self, asin, name=None):"""添加要追踪的商品"""# 获取商品信息xml_response = self.client.get_item_info(asin)if not xml_response:print(f"无法获取商品 {asin} 的信息")return False# 解析响应item_info = self.parser.parse_item_info(xml_response)# 检查是否有错误if 'error' in item_info:print(f"获取商品信息时出错: {item_info['error']}")return False# 使用商品名称或提供的名称item_name = name or item_info['title']# 添加到追踪列表self.tracked_items[asin] = {'name': item_name,'price_history': []}print(f"已添加商品 '{item_name}' (ASIN: {asin}) 到追踪列表")return Truedef track_price(self, asin):"""追踪单个商品的价格"""if asin not in self.tracked_items:print(f"商品 {asin} 不在追踪列表中")return False# 获取商品信息xml_response = self.client.get_item_info(asin)if not xml_response:print(f"无法获取商品 {asin} 的信息")return False# 解析响应item_info = self.parser.parse_item_info(xml_response)# 检查是否有错误if 'error' in item_info:print(f"获取商品信息时出错: {item_info['error']}")return False# 记录价格current_price = item_info.get('price')current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')if current_price:self.tracked_items[asin]['price_history'].append({'time': current_time,'price': current_price})print(f"已记录商品 '{self.tracked_items[asin]['name']}' 的价格: {current_price}")return Trueelse:print(f"无法获取商品 '{self.tracked_items[asin]['name']}' 的价格")return Falsedef track_all(self):"""追踪所有商品的价格"""success_count = 0for asin in self.tracked_items:if self.track_price(asin):success_count += 1return success_countdef export_data(self, asin=None):"""导出价格历史数据到CSV文件"""if asin:if asin not in self.tracked_items:print(f"商品 {asin} 不在追踪列表中")return False# 导出单个商品的数据item = self.tracked_items[asin]filename = f"{self.data_dir}/{asin}_{item['name'][:50].replace(' ', '_')}.csv"with open(filename, 'w', newline='') as csvfile:fieldnames = ['时间', '价格']writer = csv.DictWriter(csvfile, fieldnames=fieldnames)writer.writeheader()for record in item['price_history']:writer.writerow({'时间': record['time'],'价格': record['price']})print(f"已导出商品 '{item['name']}' 的价格历史到 {filename}")return Trueelse:# 导出所有商品的数据for asin, item in self.tracked_items.items():if item['price_history']:filename = f"{self.data_dir}/{asin}_{item['name'][:50].replace(' ', '_')}.csv"with open(filename, 'w', newline='') as csvfile:fieldnames = ['时间', '价格']writer = csv.DictWriter(csvfile, fieldnames=fieldnames)writer.writeheader()for record in item['price_history']:writer.writerow({'时间': record['time'],'价格': record['price']})print(f"已导出商品 '{item['name']}' 的价格历史到 {filename}")return Truedef run_scheduled_tracking(self, interval_seconds=3600, max_iterations=None):"""运行定时追踪任务"""iteration = 0while max_iterations is None or iteration < max_iterations:print(f"\n=== 开始第 {iteration + 1} 轮追踪 ===")success_count = self.track_all()print(f"=== 完成第 {iteration + 1} 轮追踪,成功记录 {success_count} 个商品的价格 ===")# 导出数据self.export_data()iteration += 1# 如果不是最后一轮,则等待指定时间if max_iterations is None or iteration < max_iterations:print(f"\n等待 {interval_seconds} 秒后进行下一轮追踪...")time.sleep(interval_seconds)# 使用示例
if __name__ == "__main__":# 替换为你的API凭证ACCESS_KEY = "YOUR_ACCESS_KEY"SECRET_KEY = "YOUR_SECRET_KEY"ASSOCIATE_TAG = "YOUR_ASSOCIATE_TAG"# 创建价格追踪器tracker = AmazonPriceTracker(ACCESS_KEY, SECRET_KEY, ASSOCIATE_TAG)# 添加要追踪的商品tracker.add_item("B07HGGYFZ6", "Example Product 1")tracker.add_item("B07HGGYFZ7", "Example Product 2")# 运行一次追踪tracker.track_all()# 导出数据tracker.export_data()# 或者,运行定时追踪任务(每小时一次,运行3次)# tracker.run_scheduled_tracking(interval_seconds=3600, max_iterations=3)
最佳实践与注意事项
-
API 调用限制:亚马逊对 API 调用有严格的限制,免费账户通常每分钟最多 1 个请求,每小时最多 60 个请求。商业账户有更高的限制。
-
请求频率控制:实现请求队列和重试机制,避免超出限制导致账户被封禁。
-
数据缓存:对于不经常变化的数据,如商品标题和描述,可以设置合理的缓存策略。
-
错误处理:完善的错误处理机制,包括网络错误、API 错误和超时处理。
-
数据存储:考虑使用数据库存储大量数据,方便后续分析和查询。
-
遵守亚马逊政策:在使用 API 获取的数据时,要遵守亚马逊的服务条款,特别是关于数据使用和展示的限制。
结语
通过亚马逊 API 接口开发,我们可以获取商品详情页的实时数据,这对于电商数据分析、价格监控和竞品分析都非常有价值。本文详细介绍了接入流程、认证机制和代码实现,希望能帮助开发者顺利完成亚马逊 API 的集成开发。
记住,在实际应用中要合理控制 API 调用频率,做好数据处理和存储,同时遵守亚马逊的服务条款,确保应用的稳定性和合规性。