当前位置: 首页 > news >正文

亚马逊 MWS 关键字 API 实战:关键字搜索商品列表接口深度解析与优化方案

在亚马逊平台的数据分析与选品工作中,基于关键字的商品搜索是获取市场动态的重要手段。与直接通过 ASIN 查询商品详情不同,关键字搜索能帮助我们发现新兴产品、分析市场竞争格局并挖掘潜在商机。本文将聚焦亚马逊 MWS API 中的商品搜索功能,详细讲解如何通过关键字高效获取商品列表,解决搜索结果分页、多条件筛选、数据去重和性能优化等关键问题,提供一套可直接应用于市场分析和选品决策的完整技术方案。

一、接口基础信息与应用场景

接口核心信息

亚马逊关键字搜索相关接口的关键技术参数:

  • 核心域名:因站点而异(如北美站https://mws.amazonservices.com
  • 认证方式:AWS4-HMAC-SHA256 签名机制
  • 请求格式:HTTP GET,URL 参数
  • 响应格式:XML
  • 编码格式:UTF-8
  • 调用限制:通常 QPS=1-5,每日调用上限与账号类型相关

核心搜索接口列表

接口名称主要功能适用场景
ListMatchingProducts通过关键字搜索商品市场趋势分析、竞品调研
SearchForProducts高级搜索,支持多条件筛选精准商品定位、细分市场分析
GetProductCategoriesForASIN获取商品分类信息类目竞争分析
ListProducts按卖家商品列表搜索店铺竞品分析

典型应用场景

  • 市场趋势分析工具:通过热门关键词追踪商品趋势变化
  • 选品辅助系统:基于关键词搜索结果分析潜在产品机会
  • 竞品监控平台:跟踪特定关键词下的竞品排名变化
  • 关键词优化工具:分析不同关键词对应的商品特征
  • 新品发现系统:通过长尾关键词挖掘新兴商品

接口调用流程

plaintext

开发者认证 → 密钥获取 → 搜索参数构建 → 请求签名生成 → 
接口调用 → 结果解析 → 分页处理 → 数据存储 → 搜索优化

点击获取key和secret

二、搜索接口参数与响应解析

ListMatchingProducts 接口核心参数

参数名类型说明是否必须
AWSAccessKeyIdStringMWS 访问密钥
ActionString接口名称,固定为 ListMatchingProducts
SellerIdString卖家账号 ID
SignatureVersionString签名版本,固定为 2
TimestampString时间戳,格式为 ISO8601
VersionStringAPI 版本,固定为 2011-10-01
MarketplaceIdString市场 ID,如 US 市场为 ATVPDKIKX0DER
QueryString搜索关键词
QueryContextIdString搜索上下文 ID,用于分页
MaxResultsPerPageInteger每页最大结果数,1-10

响应结果结构(简化版)

xml

<ListMatchingProductsResponse xmlns="http://mws.amazonservices.com/schema/Products/2011-10-01"><ListMatchingProductsResult><Products><Product><Identifiers><MarketplaceASIN><MarketplaceId>ATVPDKIKX0DER</MarketplaceId><ASIN>B07XYZ1234</ASIN></MarketplaceASIN></Identifiers><AttributeSets><ItemAttributes xml:lang="en-US"><Title>Wireless Bluetooth Headphones</Title><Brand>SoundMaster</Brand><Price><Amount>79.99</Amount><CurrencyCode>USD</CurrencyCode></Price><ProductGroup>Electronics</ProductGroup></ItemAttributes></AttributeSets><SalesRankings><SalesRank><ProductCategoryId>electronics</ProductCategoryId><Rank>1500</Rank></SalesRank></SalesRankings></Product><!-- 更多商品... --></Products><NextPageToken>eyJTaG93TWFya2VyIjoiMiJ9</NextPageToken></ListMatchingProductsResult><ResponseMetadata><RequestId>abc12345-6789-0123-4567-890abcdef123</RequestId></ResponseMetadata>
</ListMatchingProductsResponse>

分页机制详解

亚马逊搜索接口采用基于令牌的分页机制:

  1. 首次请求不提供NextPageToken,获取第一页结果
  2. 响应中若包含NextPageToken,表示还有更多结果
  3. 后续请求需在参数中携带NextPageToken获取下一页
  4. 当响应中不再包含NextPageToken时,表示已获取所有结果

默认每页返回 10 条结果,最多可设置为 10 条。对于热门关键词,完整获取所有结果可能需要多次调用。

三、核心技术实现

1. 搜索参数构建与签名工具

python

运行

import urllib.parse
import hashlib
import hmac
from datetime import datetimeclass AmazonSearchSigner:"""亚马逊搜索接口签名工具"""@staticmethoddef build_search_params(access_key, seller_id, marketplace_id, query, page_token=None, max_results=10):"""构建搜索请求参数"""params = {'AWSAccessKeyId': access_key,'Action': 'ListMatchingProducts','SellerId': seller_id,'SignatureVersion': '2','Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),'Version': '2011-10-01','MarketplaceId': marketplace_id,'Query': query,'MaxResultsPerPage': min(10, max(1, max_results))  # 限制在1-10之间}# 添加分页令牌if page_token:params['QueryContextId'] = page_tokenreturn params@staticmethoddef sign_parameters(params, secret_key, region, service='mws'):"""为参数签名"""# 按字母顺序排序参数sorted_params = sorted(params.items(), key=lambda x: x[0])# 构建待签名字符串canonical_querystring = '&'.join([f"{AmazonSearchSigner.percent_encode(k)}={AmazonSearchSigner.percent_encode(v)}" for k, v in sorted_params])# 创建签名基础字符串signature_base = f"GET\nmws.amazonservices.com\n/Products/2011-10-01\n{canonical_querystring}"# 计算签名signature = hmac.new(secret_key.encode('utf-8'),signature_base.encode('utf-8'),hashlib.sha256).digest()# 编码签名encoded_signature = urllib.parse.quote_plus(signature)return {**params,'Signature': encoded_signature}@staticmethoddef percent_encode(value):"""百分比编码,符合AWS要求"""if not value:return ''encoded = urllib.parse.quote(str(value), safe='-_.~')return encoded.replace('+', '%20').replace('*', '%2A').replace('%7E', '~')

2. 商品搜索客户端实现

python

运行

import requests
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from threading import Lock
import reclass AmazonProductSearchClient:"""亚马逊商品搜索客户端"""# 亚马逊各站点信息MARKETPLACES = {'US': {'id': 'ATVPDKIKX0DER','endpoint': 'https://mws.amazonservices.com','region': 'us-east-1'},'UK': {'id': 'A1F83G8C2ARO7P','endpoint': 'https://mws-eu.amazonservices.com','region': 'eu-west-1'},'DE': {'id': 'A1PA6795UKMFR9','endpoint': 'https://mws-eu.amazonservices.com','region': 'eu-west-1'},'JP': {'id': 'A1VC38T7YXB528','endpoint': 'https://mws.amazonservices.jp','region': 'us-west-2'},'CA': {'id': 'A2EUQ1WTGCTBG2','endpoint': 'https://mws.amazonservices.ca','region': 'us-east-1'}}def __init__(self, access_key, secret_key, seller_id, marketplace='US'):self.access_key = access_keyself.secret_key = secret_keyself.seller_id = seller_idself.set_marketplace(marketplace)self.timeout = 30  # 超时时间(秒)self.qps_limit = 1  # QPS限制self.last_request_time = 0self.request_lock = Lock()  # 控制并发请求self.namespace = {'ns': 'http://mws.amazonservices.com/schema/Products/2011-10-01'}def set_marketplace(self, marketplace):"""设置市场"""if marketplace not in self.MARKETPLACES:raise ValueError(f"不支持的市场: {marketplace}")self.marketplace = marketplaceself.marketplace_id = self.MARKETPLACES[marketplace]['id']self.endpoint = self.MARKETPLACES[marketplace]['endpoint']self.region = self.MARKETPLACES[marketplace]['region']def _throttle_request(self):"""控制请求频率"""with self.request_lock:current_time = time.time()# 确保请求间隔不小于1/QPSsleep_time = max(0, (1.0 / self.qps_limit) - (current_time - self.last_request_time))if sleep_time > 0:time.sleep(sleep_time)self.last_request_time = time.time()def search_products(self, query, max_results=10, max_pages=None):"""搜索商品:param query: 搜索关键词:param max_results: 最大结果数:param max_pages: 最大页数:return: 商品列表和分页信息"""all_products = []page_token = Nonepage_count = 0# 计算每页请求数和总页数page_size = min(10, max_results)remaining = max_resultswhile True:# 控制最大页数if max_pages and page_count >= max_pages:break# 最后一页可能需要减少请求数量current_page_size = min(page_size, remaining)# 构建请求参数params = AmazonSearchSigner.build_search_params(self.access_key,self.seller_id,self.marketplace_id,query,page_token,current_page_size)# 签名参数signed_params = AmazonSearchSigner.sign_parameters(params,self.secret_key,self.region)# 控制请求频率self._throttle_request()# 发送请求try:response = requests.get(f"{self.endpoint}/Products/2011-10-01",params=signed_params,timeout=self.timeout)response.raise_for_status()except requests.exceptions.RequestException as e:print(f"搜索请求失败: {str(e)}")break# 解析响应page_data = self._parse_search_response(response.text)if not page_data:break# 提取商品数据products = page_data.get('products', [])if not products:breakall_products.extend(products)remaining -= len(products)page_count += 1# 检查是否还有更多页面page_token = page_data.get('next_page_token')if not page_token or remaining <= 0:breakreturn {'query': query,'marketplace': self.marketplace,'products': all_products[:max_results],  # 确保不超过最大结果数'total_count': len(all_products[:max_results]),'page_count': page_count,'has_more': bool(page_token) and len(all_products[:max_results]) < max_results,'search_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}def _parse_search_response(self, xml_content):"""解析搜索响应XML"""try:root = ET.fromstring(xml_content)except ET.ParseError as e:print(f"XML解析错误: {str(e)}")return None# 查找结果节点result_node = root.find('.//ns:ListMatchingProductsResult', self.namespace)if not result_node:print("未找到结果节点")return None# 提取下一页令牌next_page_token = Nonenext_page_node = result_node.find('.//ns:NextPageToken', self.namespace)if next_page_node is not None and next_page_node.text:next_page_token = next_page_node.text# 提取商品列表products = []products_node = result_node.find('.//ns:Products', self.namespace)if products_node is not None:for product_node in products_node.findall('.//ns:Product', self.namespace):product_data = self._parse_product_node(product_node)if product_data:products.append(product_data)return {'products': products,'next_page_token': next_page_token}def _parse_product_node(self, product_node):"""解析单个商品节点"""# 提取ASINasin_node = product_node.find('.//ns:ASIN', self.namespace)if not asin_node or not asin_node.text:return Noneasin = asin_node.textif not re.match(r'^[A-Z0-9]{10}$', asin):return None# 提取标题title_node = product_node.find('.//ns:Title', self.namespace)title = title_node.text if title_node is not None else ''# 提取品牌brand_node = product_node.find('.//ns:Brand', self.namespace)brand = brand_node.text if brand_node is not None else ''# 提取价格price = Noneprice_node = product_node.find('.//ns:Price', self.namespace)if price_node is not None:amount_node = price_node.find('.//ns:Amount', self.namespace)currency_node = price_node.find('.//ns:CurrencyCode', self.namespace)if amount_node is not None and currency_node is not None:try:price = {'amount': float(amount_node.text),'currency': currency_node.text}except (ValueError, TypeError):pass# 提取类别category_node = product_node.find('.//ns:ProductGroup', self.namespace)category = category_node.text if category_node is not None else ''# 提取销售排名sales_rank = Nonerank_node = product_node.find('.//ns:Rank', self.namespace)if rank_node is not None and rank_node.text:try:sales_rank = int(rank_node.text)except ValueError:passreturn {'asin': asin,'title': title,'brand': brand,'price': price,'category': category,'sales_rank': sales_rank,'marketplace': self.marketplace,'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

3. 搜索结果处理与分析工具

python

运行

import os
import json
import sqlite3
from datetime import datetime, timedelta
import time
import pandas as pd
from collections import defaultdict
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import jieba
import reclass AmazonSearchManager:"""亚马逊搜索结果管理与分析工具"""def __init__(self, access_key, secret_key, seller_id, default_marketplace='US', cache_dir="./amazon_search_cache"):self.search_client = AmazonProductSearchClient(access_key, secret_key, seller_id, default_marketplace)self.cache_dir = cache_dirself.db_path = os.path.join(cache_dir, "amazon_searches.db")self._init_cache()self._init_stopwords()def _init_cache(self):"""初始化缓存数据库"""if not os.path.exists(self.cache_dir):os.makedirs(self.cache_dir)# 连接数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 创建搜索结果表cursor.execute('''CREATE TABLE IF NOT EXISTS search_results (search_id TEXT PRIMARY KEY,query TEXT,marketplace TEXT,params TEXT,results TEXT,total_count INTEGER,search_time TEXT,duration REAL)''')# 创建商品缓存表cursor.execute('''CREATE TABLE IF NOT EXISTS product_cache (asin TEXT,marketplace TEXT,data TEXT,fetch_time TEXT,PRIMARY KEY (asin, marketplace))''')conn.commit()conn.close()def _init_stopwords(self):"""初始化停用词表"""self.stopwords = {'the', 'and', 'of', 'to', 'a', 'in', 'is', 'it', 'you', 'that', 'he', 'she', 'this', 'my', 'your', 'for', 'on', 'with', 'at', 'by', 'i', 'best', 'top', 'new', 'high', 'quality', 'premium', 'hot', 'sale'}def search_products(self, query, marketplace=None, max_results=50, max_pages=None, use_cache=True, cache_ttl=3600):"""搜索商品并支持缓存:param query: 搜索关键词:param marketplace: 市场:param max_results: 最大结果数:param max_pages: 最大页数:param use_cache: 是否使用缓存:param cache_ttl: 缓存有效期(秒):return: 搜索结果"""marketplace = marketplace or self.search_client.marketplace# 生成唯一搜索IDsearch_id = self._generate_search_id(query, marketplace, max_results, max_pages)# 尝试从缓存获取if use_cache:cached_data = self._get_cached_search(search_id, cache_ttl)if cached_data:print(f"使用缓存搜索结果: {query} ({marketplace})")return cached_data# 切换市场(如果需要)original_marketplace = self.search_client.marketplaceif marketplace != original_marketplace:self.search_client.set_marketplace(marketplace)# 执行搜索print(f"执行搜索: {query} ({marketplace}),最多{max_results}个结果")start_time = time.time()results = self.search_client.search_products(query,max_results=max_results,max_pages=max_pages)duration = time.time() - start_timeprint(f"搜索完成,耗时{duration:.2f}秒,找到{results['total_count']}个结果")# 恢复原始市场if marketplace != original_marketplace:self.search_client.set_marketplace(original_marketplace)# 缓存结果self._cache_search_results(search_id, query, marketplace, {'max_results': max_results, 'max_pages': max_pages},results, duration)# 缓存商品数据self._cache_products(results['products'])return resultsdef compare_keywords(self, keywords, marketplace=None, max_results=50):"""比较多个关键词的搜索结果"""marketplace = marketplace or self.search_client.marketplacecomparison_results = {}for keyword in keywords:results = self.search_products(keyword, marketplace=marketplace,max_results=max_results,use_cache=True)# 提取关键指标avg_price = self._calculate_average_price(results['products'])avg_rank = self._calculate_average_rank(results['products'])brand_counts = self._count_brands(results['products'])category_counts = self._count_categories(results['products'])comparison_results[keyword] = {'total_products': results['total_count'],'avg_price': avg_price,'avg_sales_rank': avg_rank,'top_brands': self._get_top_items(brand_counts, 5),'top_categories': self._get_top_items(category_counts, 3),'results': results['products']}return {'keywords': keywords,'marketplace': marketplace,'comparison_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'results': comparison_results}def analyze_search_results(self, search_results):"""分析搜索结果"""if not search_results or 'products' not in search_results:return Noneproducts = search_results['products']if not products:return None# 价格分析price_stats = self._analyze_prices(products)# 品牌分析brand_analysis = self._analyze_brands(products)# 类别分析category_analysis = self._analyze_categories(products)# 标题关键词分析title_keywords = self._analyze_title_keywords(products)return {'query': search_results['query'],'marketplace': search_results['marketplace'],'total_products': len(products),'price_analysis': price_stats,'brand_analysis': brand_analysis,'category_analysis': category_analysis,'title_keywords': title_keywords,'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}def generate_word_cloud(self, search_results, output_path=None):"""生成标题关键词词云"""if not search_results or 'products' not in search_results or not search_results['products']:return None# 提取所有标题titles = [p['title'] for p in search_results['products'] if 'title' in p and p['title']]if not titles:return None# 合并所有标题并预处理text = ' '.join(titles).lower()text = re.sub(r'[^\w\s]', ' ', text)# 分词words = text.split()words = [word for word in words if len(word) > 2 and word not in self.stopwords]# 生成词云wordcloud = WordCloud(width=800, height=400,background_color='white',max_words=50).generate(' '.join(words))# 保存或显示if output_path:wordcloud.to_file(output_path)return output_pathelse:plt.figure(figsize=(12, 6))plt.imshow(wordcloud, interpolation='bilinear')plt.axis('off')plt.title(f"关键词: {search_results['query']}")plt.tight_layout(pad=0)plt.show()return Truedef _analyze_prices(self, products):"""分析价格分布"""prices = [p['price']['amount'] for p in products if 'price' in p and p['price'] and 'amount' in p['price']]if not prices:return Nonedf = pd.DataFrame(prices, columns=['price'])return {'min': df['price'].min(),'max': df['price'].max(),'mean': df['price'].mean(),'median': df['price'].median(),'std': df['price'].std(),'count': len(prices),'distribution': self._calculate_price_distribution(prices)}def _analyze_brands(self, products):"""分析品牌分布"""brand_counts = defaultdict(int)for p in products:brand = p.get('brand', 'Unknown')brand_counts[brand] += 1total = sum(brand_counts.values())brand_stats = {'total_brands': len(brand_counts),'top_brands': self._get_top_items(brand_counts, 5),'brand_diversity': len(brand_counts) / total if total > 0 else 0}return brand_statsdef _analyze_categories(self, products):"""分析类别分布"""category_counts = defaultdict(int)for p in products:category = p.get('category', 'Unknown')category_counts[category] += 1return {'total_categories': len(category_counts),'top_categories': self._get_top_items(category_counts, 5)}def _analyze_title_keywords(self, products):"""分析标题关键词"""titles = [p.get('title', '').lower() for p in products]text = ' '.join(titles)text = re.sub(r'[^\w\s]', ' ', text)words = text.split()words = [word for word in words if len(word) > 2 and word not in self.stopwords]word_counts = defaultdict(int)for word in words:word_counts[word] += 1return {'top_keywords': self._get_top_items(word_counts, 10),'total_unique_words': len(word_counts)}def _calculate_average_price(self, products):"""计算平均价格"""prices = [p['price']['amount'] for p in products if 'price' in p and p['price'] and 'amount' in p['price']]return sum(prices) / len(prices) if prices else Nonedef _calculate_average_rank(self, products):"""计算平均销售排名"""ranks = [p['sales_rank'] for p in products if 'sales_rank' in p and p['sales_rank'] is not None]return sum(ranks) / len(ranks) if ranks else Nonedef _count_brands(self, products):"""统计品牌出现次数"""brand_counts = defaultdict(int)for p in products:brand = p.get('brand', 'Unknown')brand_counts[brand] += 1return brand_countsdef _count_categories(self, products):"""统计类别出现次数"""category_counts = defaultdict(int)for p in products:category = p.get('category', 'Unknown')category_counts[category] += 1return category_countsdef _get_top_items(self, item_counts, top_n):"""获取排名靠前的项目"""sorted_items = sorted(item_counts.items(), key=lambda x: x[1], reverse=True)top_items = sorted_items[:top_n]total = sum(count for _, count in item_counts.items())result = []for item, count in top_items:result.append({'name': item,'count': count,'percentage': (count / total) * 100 if total > 0 else 0})return resultdef _calculate_price_distribution(self, prices, bins=5):"""计算价格分布"""if not prices:return []min_price = min(prices)max_price = max(prices)bin_width = (max_price - min_price) / bins if bins > 0 else 0distribution = []for i in range(bins):bin_start = min_price + i * bin_widthbin_end = min_price + (i + 1) * bin_width if i < bins - 1 else max_price + 1count = sum(1 for p in prices if bin_start <= p < bin_end)distribution.append({'range': f"{bin_start:.2f}-{bin_end:.2f}",'count': count,'percentage': (count / len(prices)) * 100})return distributiondef _generate_search_id(self, query, marketplace, max_results, max_pages):"""生成唯一搜索ID"""import hashlibsearch_str = f"{query}|{marketplace}|{max_results}|{max_pages}"return hashlib.md5(search_str.encode('utf-8')).hexdigest()def _get_cached_search(self, search_id, ttl):"""从缓存获取搜索结果"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("SELECT results, search_time FROM search_results WHERE search_id = ?",(search_id,))result = cursor.fetchone()conn.close()if result:results_str, search_time = result# 检查缓存是否过期search_time_obj = datetime.strptime(search_time, '%Y-%m-%d %H:%M:%S')if (datetime.now() - search_time_obj).total_seconds() <= ttl:try:return json.loads(results_str)except:return Nonereturn Nonedef _cache_search_results(self, search_id, query, marketplace, params, results, duration):"""缓存搜索结果"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()results_str = json.dumps(results, ensure_ascii=False)search_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')params_str = json.dumps(params, ensure_ascii=False)cursor.execute('''INSERT OR REPLACE INTO search_results (search_id, query, marketplace, params, results, total_count, search_time, duration)VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (search_id, query, marketplace, params_str, results_str,results['total_count'], search_time, duration))conn.commit()conn.close()def _cache_products(self, products):"""缓存商品数据"""if not products:returnconn = sqlite3.connect(self.db_path)cursor = conn.cursor()fetch_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')for product in products:asin = product.get('asin')marketplace = product.get('marketplace')if asin and marketplace:data_str = json.dumps(product, ensure_ascii=False)cursor.execute('''INSERT OR REPLACE INTO product_cache (asin, marketplace, data, fetch_time)VALUES (?, ?, ?, ?)''', (asin, marketplace, data_str, fetch_time))conn.commit()conn.close()def clean_expired_cache(self, search_ttl=86400, product_ttl=604800):"""清理过期缓存"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 计算过期时间search_expire = (datetime.now() - timedelta(seconds=search_ttl)).strftime('%Y-%m-%d %H:%M:%S')product_expire = (datetime.now() - timedelta(seconds=product_ttl)).strftime('%Y-%m-%d %H:%M:%S')# 清理搜索结果缓存cursor.execute("DELETE FROM search_results WHERE search_time < ?", (search_expire,))deleted_searches = cursor.rowcount# 清理商品缓存cursor.execute("DELETE FROM product_cache WHERE fetch_time < ?", (product_expire,))deleted_products = cursor.rowcountconn.commit()conn.close()print(f"清理过期缓存完成,删除搜索结果 {deleted_searches} 条,商品缓存 {deleted_products} 条")return {"deleted_searches": deleted_searches,"deleted_products": deleted_products}

四、完整使用示例

1. 基础搜索与结果分析示例

python

运行

def basic_search_demo():# 替换为实际的MWS凭证ACCESS_KEY = "your_access_key"SECRET_KEY = "your_secret_key"SELLER_ID = "your_seller_id"# 初始化搜索管理器search_manager = AmazonSearchManager(ACCESS_KEY, SECRET_KEY, SELLER_ID,default_marketplace='US')# 搜索关键词KEYWORD = "wireless bluetooth headphones"try:# 1. 执行搜索print(f"===== 搜索关键词: {KEYWORD} =====")search_results = search_manager.search_products(KEYWORD,max_results=30,  # 获取最多30个结果max_pages=3,     # 最多3页use_cache=True)if not search_results['products']:print("未找到匹配的商品")returnprint(f"找到 {search_results['total_count']} 个商品")# 2. 显示前5个结果print("\n===== 前5个搜索结果 =====")for i, product in enumerate(search_results['products'][:5], 1):print(f"\n{i}. ASIN: {product['asin']}")print(f"   标题: {product['title']}")print(f"   品牌: {product['brand']}")print(f"   价格: {product['price']['amount']} {product['price']['currency']}" if product['price'] else "   价格: 未知")print(f"   销售排名: {product['sales_rank']}" if product['sales_rank'] else "   销售排名: 未知")# 3. 分析搜索结果print("\n===== 搜索结果分析 =====")analysis = search_manager.analyze_search_results(search_results)if analysis:# 价格分析print("\n价格分析:")if analysis['price_analysis']:pa = analysis['price_analysis']print(f"   价格范围: {pa['min']:.2f}-{pa['max']:.2f} USD")print(f"   平均价格: {pa['mean']:.2f} USD")print(f"   中位数价格: {pa['median']:.2f} USD")# 品牌分析print("\n品牌分析:")if analysis['brand_analysis']:ba = analysis['brand_analysis']print(f"   品牌总数: {ba['total_brands']}")print("   顶级品牌:")for brand in ba['top_brands'][:3]:print(f"      {brand['name']}: {brand['count']}个商品 ({brand['percentage']:.1f}%)")# 标题关键词分析print("\n标题关键词分析:")if analysis['title_keywords']:tk = analysis['title_keywords']print("   热门关键词:")for kw in tk['top_keywords'][:5]:print(f"      {kw['name']}: 出现{kw['count']}次")# 4. 生成词云print("\n===== 生成标题关键词词云 =====")wordcloud_path = os.path.join(search_manager.cache_dir, f"{KEYWORD.replace(' ', '_')}_wordcloud.png")search_manager.generate_word_cloud(search_results, wordcloud_path)print(f"词云已保存至: {wordcloud_path}")# 5. 清理过期缓存print("\n===== 清理过期缓存 =====")search_manager.clean_expired_cache()except Exception as e:print(f"操作失败: {str(e)}")if __name__ == "__main__":basic_search_demo()

2. 多关键词对比分析示例

python

运行

def keyword_comparison_demo():# 替换为实际的MWS凭证ACCESS_KEY = "your_access_key"SECRET_KEY = "your_secret_key"SELLER_ID = "your_seller_id"# 初始化搜索管理器search_manager = AmazonSearchManager(ACCESS_KEY, SECRET_KEY, SELLER_ID,default_marketplace='US')# 要比较的关键词列表KEYWORDS = ["wireless headphones","bluetooth headphones","noise cancelling headphones","over ear headphones"]try:# 1. 比较多个关键词print(f"===== 比较关键词: {', '.join(KEYWORDS)} =====")comparison = search_manager.compare_keywords(KEYWORDS,max_results=30)if not comparison['results']:print("未获取到比较数据")return# 2. 显示比较结果摘要print("\n===== 关键词比较摘要 =====")for keyword, data in comparison['results'].items():print(f"\n关键词: {keyword}")print(f"   商品总数: {data['total_products']}")print(f"   平均价格: {data['avg_price']:.2f} USD" if data['avg_price'] else "   平均价格: 未知")print(f"   平均销售排名: {int(data['avg_sales_rank']) if data['avg_sales_rank'] else '未知'}")print("   主要品牌:")for brand in data['top_brands'][:3]:print(f"      {brand['name']}: {brand['percentage']:.1f}%")# 3. 找出竞争度较低的关键词print("\n===== 潜在机会关键词分析 =====")opportunity_keywords = []for keyword, data in comparison['results'].items():# 简单的竞争度评分:商品数量少、平均排名高(数值小)表示竞争度较低if data['total_products'] > 0 and data['avg_sales_rank']:competition_score = (data['total_products'] / 10) + (data['avg_sales_rank'] / 1000)opportunity_keywords.append({'keyword': keyword,'score': competition_score,'total_products': data['total_products'],'avg_rank': data['avg_sales_rank']})# 按竞争度评分排序opportunity_keywords.sort(key=lambda x: x['score'])print("   竞争度从低到高的关键词:")for i, kw in enumerate(opportunity_keywords, 1):print(f"      {i}. {kw['keyword']} (评分: {kw['score']:.2f})")print(f"         商品数: {kw['total_products']}, 平均排名: {int(kw['avg_rank'])}")except Exception as e:print(f"比较分析失败: {str(e)}")if __name__ == "__main__":keyword_comparison_demo()

五、常见问题与优化建议

1. 常见错误及解决方案

错误类型可能原因解决方案
请求被拒签名错误检查签名算法实现,确保参数排序和编码正确
结果为空关键词过于特殊或市场无相关商品尝试更通用的关键词,或切换其他市场
部分结果API 返回结果不完整检查分页实现,确保正确处理 NextPageToken
调用受限超过 QPS 或每日调用限制优化请求频率,实现请求队列和退避策略
响应缓慢网络问题或亚马逊服务器负载高增加超时时间,实现请求重试机制

2. 搜索优化策略

  • 关键词优化:使用长尾关键词提高搜索精准度,减少不相关结果
  • 分页策略:实现智能分页,根据关键词热度动态调整请求页数
  • 缓存分层:热门关键词结果缓存时间短(1-6 小时),长尾关键词缓存时间长(12-24 小时)
  • 批量处理:将多个关键词搜索任务排队,合理分配 API 调用配额
  • 结果去重:通过 ASIN 去重,避免同一商品在不同搜索中重复出现

3. 数据分析高级技巧

  • 市场竞争度评分:结合商品数量、平均排名、价格分布构建综合评分模型
  • 关键词相关性分析:通过共现分析挖掘关键词之间的关联关系
  • 价格区间策略:分析不同价格区间的商品数量和销售表现,找到最优定价区间
  • 品牌集中度分析:计算 CR5/CR10 指标(前 5/10 品牌市场占比)评估市场垄断程度
  • 新兴商品识别:结合上架时间和销售排名变化识别有潜力的新兴商品

通过本文提供的技术方案,开发者可以高效实现亚马逊平台的关键词搜索功能,并对搜索结果进行深度分析。该方案特别针对搜索接口的分页机制、签名认证和结果处理进行了优化,提供了完整的搜索结果缓存、多关键词对比和市场分析功能。在实际应用中,需注意遵守亚马逊 API 的使用规范,合理设置请求频率和缓存策略,以确保系统稳定运行并获取高质量的搜索结果。

http://www.dtcms.com/a/391586.html

相关文章:

  • 博文干货 | Pulsar 平均负载器(AvgShedder)
  • 【硬件】嘉立创专业版layout流程(一)
  • PyQt6之分组框
  • 深度剖析 IM 单聊与群聊架构设计
  • 农业自动化:技术重塑传统农业的新范式
  • Nginx 日志文件在哪?
  • 小程序开发者转多端应用app调整视频播放功能
  • 九、Java-注解
  • Java学习笔记——AI插件、新建模块、算数运算符类型、隐式转换、强制转换、自增自减运算符、赋值运算符、关系运算符、逻辑运算符、三元运算符
  • 【从零开始刷力扣006】leetcode206
  • FreeRTOS——介绍及移植过程
  • Day 07 Physics list-----以B1为例
  • 重读一次IS015765-2,记录对错误和异常处理的方式
  • Edge浏览器CSDN文章编辑时一按shift就乱了(Edge shift键)欧路翻译问题(按Shift翻译鼠标所在段落)
  • SpringIoc 基础练习 验证码
  • 前端项目,CDN预热有什么用?
  • TF卡的存储数据结构—fat32格式
  • led的带宽在模拟太阳光中设备中的影响
  • go资深之路笔记(三) sync.WaitGroup, sync.errgroup和 sync.go-multierror
  • Docker 与数据库环境
  • Node.js 模块系统详解
  • proxy代理应用记录
  • 基于python大数据的汽车数据分析系统设计与实现
  • WebSocket实现原理
  • 从保存到加载Docker镜像文件操作全图解
  • IDEA文件修改后改变文件名和文件夹颜色
  • 【MySQL 】MySQL 入门之旅 · 第十篇:子查询与嵌套查询
  • TM52F1376 SSOP24电子元器件 HITENX海速芯 8位微控制器MCU 芯片 深度解析
  • 基于Matlab图像处理的工件表面缺陷检测系统
  • 业务上云实践MYSQL架构改造