利用 ECB 加密 json并压测接口,输出测试报告
利用 ECB 加密 json并压测接口,输出测试报告
- 需求:
- 代码:
- 运行结果:
需求:
- 调用 /api/GDPostFeiYouTemu/NotMailTemuOrderPush
- 生成一个 随机数 赋值给orderCode和referenceNumber(且需要唯一)
- ECB 加密 Json 参数
- 可配置的 持续调用时间和调用次数
- 生成测试报告,并保存在文件夹中
代码:
import json
import random
import string
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from base64 import b64encode
import requests
from concurrent.futures import ThreadPoolExecutor
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('request_log.log'),
logging.StreamHandler()
]
)
# 加密函数(保持不变)
def encrypt_data(data, key):
key = key.encode('utf-8')[:16] # AES-128
cipher = AES.new(key, AES.MODE_ECB)
ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))
return b64encode(ct_bytes).decode('utf-8')
# 生成唯一随机字符串(添加时间戳增强唯一性)
def generate_unique_random_string(length=10):
timestamp = datetime.now().strftime("%f") # 微秒级时间戳
chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))
return f"{timestamp}_{chars}"
# 构造请求数据(优化唯一性)
def prepare_request_data():
base_data = {
"head": {
"method": "batch_create_order",
"version": "1.0.0",
"requestTime": "2022-07-20 10:06:31"
},
"uaccount": "xxxxx",
"body": [{
"mailCode": "TEST201072000MAILCODE",
"orderCode": "TE_" + generate_unique_random_string(8),
"length": 20.3,
"width": 18.1,
"height": 6.3,
"weight": 0.53,
"receiverCompName": "AMZ-tt",
"receiverName": "Kaila Gilam 123456",
"receiverProv": "Atlantico",
"receiverCity": "Barranquilla",
"receiverArea": "Fayetteville",
"receiverHouseNo": "15",
"receiverAddress": "138 Lowery Dr 138 Lowery Dr",
"receiverAddress2": "apto 2B",
"receiverMobile": "",
"receiverTel": "3013357358",
"receiverEmail": "yh267lps4r1207d@us.shipping.temuemail.com",
"receiverPostCode": "110121",
"senderCompName": "CAN109",
"senderName": "CAN101",
"senderEngProv": "guangdong",
"senderEngCity": "guangzhou",
"senderAddress": "baiyun helong",
"senderPostCode": "80000",
"senderTel": "123456456",
"senderMobile": "147258369",
"countryCode": "CO",
"countryName": "哥伦比亚",
"productCode": "COL-X",
"tariffType": "ddp",
"vatNumber": "5236523",
"taxNo": "3923982135",
"remark": "",
"referenceNumber": "BG-" + generate_unique_random_string(8),
"declValue": "50",
"declCurrency": "RMB",
"cardType": "2",
"cardNo": "48452153625425361252",
"inner": [{
"gName": "加厚压缩袋",
"gEngName": "compression bag",
"innerContext": "",
"innerQty": 5,
"innerWeight": 0.22,
"innerPrice": 3.5,
"currencyCode": "USD",
"madePlace": "CN",
"hscode": "3923290000",
"model": "无",
"gBrand": "无",
"originCountry": "",
"remark": "",
"sku": "100010",
"material": "塑料",
"purpose": "物品",
"taxNo": "3923",
"salesAddress": "https://suijimimashengcheng.bmcx.com/",
"distribution": "compression",
"electricityFlag": "",
"unitCode": "11",
"secUnitCode": "",
"innerDeclValue": "25",
"innerDeclCurrency": "RMB"
}
]
}
]
}
return base_data
# 发送请求(添加重试机制和日志)
def send_request(url, key):
try:
data = prepare_request_data()
encrypted = encrypt_data(json.dumps(data), key)
start_time = time.time()
response = requests.post(url, json={"data": encrypted}, timeout=10)
duration = time.time() - start_time
log_msg = f"Status: {response.status_code} | Duration: {duration:.2f}s | Order: {data['body'][0]['orderCode']}"
if response.status_code != 200:
logging.error(log_msg)
return (False, response.status_code)
else:
logging.info(log_msg)
return (True, response.status_code)
except Exception as e:
logging.error(f"Request failed: {str(e)}")
return (False, str(e))
# 压力测试主函数
def stress_test():
# 配置参数
config = {
"url": "http://txxxxxx.xxxx.com/api/GDPostFeiYouTemu/NotPush",
"key": "bJtR4ZSNK4p",
"threads_per_second": 1,
"duration_seconds": 1
}
success_count = 0
failure_count = 0
total_requests = config["threads_per_second"] * config["duration_seconds"]
logging.info(f"Starting stress test: {total_requests} total requests")
with ThreadPoolExecutor(max_workers=50) as executor: # 稍微多些工作线程防止堆积
futures = []
start_time = time.time()
# 创建每分钟任务
for _ in range(config["duration_seconds"]):
# 提交每秒的任务批
for _ in range(config["threads_per_second"]):
future = executor.submit(
send_request,
config["url"],
config["key"]
)
futures.append(future)
# 精确控制时间间隔
time.sleep(1 - (time.time() - start_time) % 1)
# 统计结果
for future in futures:
result = future.result()
if result[0]:
success_count += 1
else:
failure_count += 1
# 生成测试报告
end_time = time.time()
test_duration = end_time - start_time
report = f"""
======== Stress Test Report ========
Total Requests: {total_requests}
Successful: {success_count}
Failed: {failure_count}
Success Rate: {(success_count / total_requests) * 100:.2f}%
Test Duration: {test_duration:.2f} seconds
Avg Throughput: {total_requests / test_duration:.2f} req/s
====================================
"""
logging.info(report)
if __name__ == "__main__":
stress_test()
运行结果:
D:\PYTHON-学习\邮政接口压力测试脚本\pythonProject1\.venv\Scripts\python.exe D:\PYTHON-学习\邮政接口压力测试脚本\pythonProject1\优化非邮下单接口.py
2025-03-11 16:27:35,452 - INFO - Starting stress test: 1 total requests
2025-03-11 16:27:37,474 - INFO - Status: 200 | Duration: 2.02s | Order: TE_452975_72S1AYET
2025-03-11 16:27:37,475 - INFO -
======== Stress Test Report ========
Total Requests: 1
Successful: 1
Failed: 0
Success Rate: 100.00%
Test Duration: 2.02 seconds
Avg Throughput: 0.49 req/s
====================================
进程已结束,退出代码为 0
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.dtcms.com/a/63134.html
如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!