Python+requests+ThreadPoolExecutor接口多线程抓取数据
一 功能: 负责抓取商品数据并写入 Excel 文件。
二 逻辑:
- 获取总页数。
- 遍历每一页,获取商品编码和 SKU。
- 使用多线程获取商品详情。
- 将数据追加到 Excel 文件中
三 优点
-
多线程处理
- 使用 ThreadPoolExecutor 提高数据抓取效率,减少等待时间。
-
数据追加
- 使用 openpyxl 手动追加数据,确保数据不断追加而不覆盖已有内容。
-
Base64 编码
- 将商品详情编码为 Base64 字符串,便于存储复杂数据。
-
调试信息
- 在关键步骤打印调试信息,便于排查问题
import os
import base64
import json
import time
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from openpyxl import load_workbook, Workbook
def get_total_pages():
data = {
"name": None,
"sku": None,
"current": 1,
"limit": 100
}
headers = {
"Authorization": "4399aa4123f24e2c90a272f2dfee79e7",
"Content-Type": "application/json"
}
url = 'http://test.api/product_management/list'
response = requests.post(url=url, data=json.dumps(data), headers=headers)
response_json = response.json()
# 假设响应中有一个字段totalPages表示总页数
total_pages = response_json["data"]["total"]
return total_pages
def get_product_codes(page):
data = {
"name": None,
"sku": None,
"current": page,
"limit": 100
}
headers = {
"Authorization": "4399aa4123f24e2c90a272f2dfee79e7",
"Content-Type": "application/json"
}
url = 'http://test.api/product_management/list'
response = requests.post(url=url, data=json.dumps(data), headers=headers)
response_json = response.json()
print("response_json======", response_json)
# 假设响应中有一个字段products包含商品列表,每个商品有一个字段id
products = response_json.get('data', {}).get('list', [])
print("products======", products)
# 提取id和sku,并成对返回
product_id_sku_pairs = [(product.get('id'), product.get('sku')) for product in products]
print("product_id_sku_pairs======", product_id_sku_pairs)
return product_id_sku_pairs
def get_product_details(id):
data = {
"id": id
}
headers = {
"Authorization": "4399aa4123f24e2c90a272f2dfee79e7",
"Content-Type": "application/json"
}
url = 'http://test.api/product_management/detail'
response = requests.post(url=url, data=json.dumps(data), headers=headers)
response_json = response.json().get('data', {}).get('skuList', [])
print("get_product_details======", response_json)
return response_json
def fetch_product_details(product_id, product_sku):
product_detail = get_product_details(product_id)
# 将product_detail转换为JSON字符串
product_detail_str = json.dumps(product_detail, ensure_ascii=False, indent=4)
# 对JSON字符串进行Base64编码
product_detail_base64 = base64.b64encode(product_detail_str.encode('utf-8')).decode('utf-8')
product = {
'code': product_id,
'sku': product_sku,
'product_detail': product_detail_base64
}
return product
def append_to_excel(data_list, output_path):
"""
追加数据到Excel文件
:param data_list: 要追加的数据列表,每个元素是字典格式,例如 [{'code': 'A001', 'sku': 'SKU001', ...}]
:param output_path: Excel文件路径
"""
if not data_list:
return
# 加载或创建工作簿
if os.path.exists(output_path):
book = load_workbook(output_path)
sheet = book["Sheet1"]
write_header = False # 文件已存在,不写表头
else:
book = Workbook()
book.remove(book.active) # 删除默认创建的空表
sheet = book.create_sheet("Sheet1")
write_header = True # 新文件需要写表头
# 写入表头
if write_header:
headers = list(data_list[0].keys())
sheet.append(headers)
# 追加数据
for data in data_list:
row = [data.get(key, "") for key in data_list[0].keys()]
sheet.append(row)
# 保存文件
book.save(output_path)
def Product_Detail():
total_pages = int(get_total_pages())
output_path = 'D:\\product_details_1.xlsx'
with ThreadPoolExecutor(max_workers=10) as executor:
for page in range(1, total_pages + 1):
print(f"正在处理第 {page} 页")
product_id_sku_pairs = get_product_codes(page)
futures = []
for product_id, product_sku in product_id_sku_pairs:
futures.append(executor.submit(fetch_product_details, product_id, product_sku))
all_product_details = []
for future in as_completed(futures):
try:
product = future.result()
print("成功获取数据:", product) # 调试信息
all_product_details.append(product)
except Exception as e:
print(f"请求出错: {e}")
# 调用追加写入方法
append_to_excel(all_product_details, output_path)
print("商品详情已写入 product_details_1.xlsx")
Product_Detail()