13 接口自动化-框架封装之 csv 数据驱动封装和日志
文章目录
- 一、框架封装规则总结
- 二、框架代码简单实现
- 项目框架预览
- csv_util.py - 处理 csv 数据
- logger_util.py - 处理日志
- requests_util.py - 将请求封装在同一个方法中
- yaml_util.py - 处理 yaml 数据
- get_token.yml 文件-采用了csv数据驱动
- get_token.csv 文件
- get_token_error.csv 文件
- create_tag.yml 文件
- update_tag.yml 文件
- get_tag.yml 文件
- upload_file.yml 文件
- test_user.py - user 模块测试用例执行
- test_tag.py - tag 模块测试用例执行
- test_file.py - file 模块测试用例执行
- all.py - 通过 pytest 执行测试用例
- config.yml - 项目配置数据
- conftest.py - 会话之前清除数据
- debug_talk.py - 热加载函数
- pytest.ini - pytest 配置文件
- 三、接口自动化测试项目文件下载
接着 上一篇文章,继续进行框架封装
一、框架封装规则总结
接口自动化测试框架 yaml 用例编写规则:
1、必须有的四个一级关键字:name,base_url,request,validate
2、在 request 一级关键字下必须包括两个二级关键字:method,url
3、传参方式:在 request 一级关键字下,通过二级关键字传参:若是 get 请求,通过 params 关键字传参若是 post 请求:传 json 格式,通过 json 关键字传参传表单格式,通过 data 关键字传参传文件格式,通过 files 关键字传参
4、如果需要接口关联的话,必须使用一级关键字:extract提取值:如:json 提取方式extract:access_token: access_token如:正则表达式提取方式extract:access_token: '"access_token":"(.*?)"'取值:如:access_token: "{{access_token}}"
5、热加载,当 yaml 需要使用动态参数时,那么可以在 debug_talk.py 文件中写方法调用注意:传参时,需要什么类型的数据,需要做强转。 int(min),int(max)如:# 获取随机数的方法def get_random_number(self,start,stop):name = random.randrange(int(start),int(stop))return name
6、断言,以数组的方式编写每个断言如:validate:- equals: {"status_code": 200}- contains: url
7、数据驱动使用 csv 和一级关键字 parameters 实现:如:yaml 写法:parameters:name-appid-secret-grant_type-assert_strcsv 写法:name,appid,secret,grant_type,assert_str获取统一接口鉴权码 access_token,wx2da648349c234bc1,73ee91547d057dec7b37ccd5a39d40ff,client_credential,access_tokenappid 必填检查,,"73ee91547d057dec7b37ccd5a39d40ff",client_credential,errcodesecret 必填检查,wx2da648349c234bc1,,"client_credential",errcode
8、日志监控,异常处理使用 write_log 方法进行日志打印:如:write_log("-------------------接口请求开始--------------------")
二、框架代码简单实现
项目框架预览
csv_util.py - 处理 csv 数据
# -*- coding: utf-8 -*-
import csv
import json
import os
import traceback
import yaml
from common.logger_util import write_log# 获取项目根路径
def get_object_path():return os.getcwd().split('common')[0]# 读取 csv 数据文件
def read_csv_file(csv_path):with open(get_object_path()+csv_path, 'r', encoding='utf-8-sig') as f:csv_data_list = []csv_data = csv.reader(f)for row in csv_data:csv_data_list.append(row)return csv_data_list# 读取 YAML 测试用例文件
def read_testcase_file(yaml_path,csv_path=None):try:with open(get_object_path() + yaml_path, 'r', encoding='utf-8') as f:case_info = yaml.load(f, Loader=yaml.FullLoader)# 如果 csv_path is None ,那么就不会用 csv 数据驱动if csv_path is None:return case_infoelse:case_info_keys = dict(*case_info).keys()if 'parameters' in case_info_keys:new_case_info = analysis_parameters(*case_info,csv_path)return new_case_infoelse:return case_infoexcept Exception as e:write_log("读取用例文件报错,异常信息:%s"%str(traceback.format_exc()))raise# 分析 csv 文件参数
def analysis_parameters(case_info,csv_path):try:key = case_info['parameters']case_info_str = json.dumps(case_info)key_list = str(key).split("-")# 规范 csv 数据的写法length_flag = Truecsv_data_list = read_csv_file(csv_path)first_row_data = csv_data_list[0]for csv_data in csv_data_list:# 每行数据长度与第一行不一致,则停止if len(csv_data) != len(first_row_data):length_flag = Falsebreak# 解析new_case_info = []if length_flag:for x in range(1, len(csv_data_list)): # x 代表行temp_case_info = case_info_strfor y in range(0, len(csv_data_list[x])): # y 代表列if csv_data_list[0][y] in key_list:temp_case_info = temp_case_info.replace("$csv{" + csv_data_list[0][y] + "}",csv_data_list[x][y])new_case_info.append(temp_case_info)return new_case_infoexcept Exception as e:write_log("分析 parameters 参数异常,异常信息:%s"%str(traceback.format_exc()))raise
logger_util.py - 处理日志
import logging
import timefrom common.yaml_util import get_object_path, read_config_yamlclass LoggerUtil:def create_log(self,logger_name='log'):# 创建一个日志对象self.logger = logging.getLogger(logger_name)# 设置全局的日志级别 DEBUG < INFO < WARNING < ERROR < CRITICALself.logger.setLevel(logging.DEBUG)if not self.logger.handlers:# --------文件日志--------# 获得日志文件的名称self.file_log_path = get_object_path() + '/logs/' + read_config_yaml('log', 'log_name') + str(int(time.time()))# 创建文件日志的控制器self.file_handler = logging.FileHandler(self.file_log_path, encoding='utf-8')# 设置文件日志的级别file_log_level = read_config_yaml('log', 'log_level').lower()if file_log_level == 'debug':self.file_handler.setLevel(logging.DEBUG)elif file_log_level == 'INFO':self.file_handler.setLevel(logging.INFO)elif file_log_level == 'WARNING':self.file_handler.setLevel(logging.WARNING)elif file_log_level == 'ERROR':self.file_handler.setLevel(logging.ERROR)elif file_log_level == 'CRITICAL':self.file_handler.setLevel(logging.CRITICAL)# 设置文件日志的格式self.file_handler.setFormatter(logging.Formatter(read_config_yaml('log', 'log_format')))# 将控制器加入到日志对象self.logger.addHandler(self.file_handler)# --------控制台日志----------# 创建文件日志的控制器self.console_handler = logging.StreamHandler()# 设置控制台日志的级别console_log_level = read_config_yaml('log', 'log_level').lower()if console_log_level == 'debug':self.console_handler.setLevel(logging.DEBUG)elif console_log_level == 'INFO':self.console_handler.setLevel(logging.INFO)elif console_log_level == 'WARNING':self.console_handler.setLevel(logging.WARNING)elif console_log_level == 'ERROR':self.console_handler.setLevel(logging.ERROR)elif console_log_level == 'CRITICAL':self.console_handler.setLevel(logging.CRITICAL)# 设置控制台日志的格式self.console_handler.setFormatter(logging.Formatter(read_config_yaml('log', 'log_format')))# 将控制器加入到日志对象self.logger.addHandler(self.console_handler)return self.loggerelse:return self.logger
# 输出正常日志
def write_log(log_message):LoggerUtil().create_log().info(log_message)
# # 输出 error 日志
# def write_error_log(log_message):
# LoggerUtil().create_log().info(log_message)
requests_util.py - 将请求封装在同一个方法中
# -*- coding: utf-8 -*-
import json
import re
import tracebackimport jsonpath
import requests
from more_itertools.more import raise_from common.logger_util import write_log
from common.yaml_util import read_extract_yaml, write_extract_yaml, get_object_path
from debug_talk import DebugTalkclass RequestsUtil:# 创建 session 会话对象session = requests.Session()# 寻找字典中某个 key 的值def find_key(self,key,data):# 处理字典类型(对象)if isinstance(data, dict):if key in data:return data[key]# 递归遍历所有值for value in data.values():result = self.find_key(key,value)if result is not None:return result# 其他类型(如字符串、数字、数组等)直接跳过return None# 规范功能测试 YAML 测试用例文件的写法def analysis_yaml(self, case_info):try:# 1、必须有的四个一级关键字:name,base_url,request,validateif isinstance(case_info, str):case_info = json.loads(case_info)case_info_keys = dict(case_info).keys()if 'name' in case_info_keys and 'base_url' in case_info_keys and 'request' in case_info_keys and 'validate' in case_info_keys:# 2、在 request 一级关键字下必须包括两个二级关键字:method,urlrequest_keys = dict(case_info['request']).keys()name = case_info['name']if 'method' in request_keys and 'url' in request_keys:method = case_info['request']['method']url = case_info['base_url'] + case_info['request']['url']# 应该把method、url、headers、files四个参数从case_info['request']去掉后,剩下的数据传给kwargscase_info['request'].pop('method')case_info['request'].pop('url')# 参数(params、data、json),请求头,文件上传这些都不能做约束headers = Noneif jsonpath.jsonpath(case_info, '$..headers'):headers = case_info['request']['headers']case_info['request'].pop('headers')files = Noneif jsonpath.jsonpath(case_info, '$..files'):files = case_info['request']['files']for key, value in dict(files).items():files[key] = open(get_object_path()+value, "rb")case_info['request'].pop('files')res = self.send_request(name,method, url, headers=headers, files=files, **case_info['request'])# 提取接口关联的变量,既要支持正则表达式也要支持json提取if 'extract' in case_info_keys:for key, value in dict(case_info['extract']).items():if '(.+?)' in value or '(.*?)' in value: # 正则表达式提取re_value = re.search(value, res.text).group(1)if re_value:extract_data = {key: re_value}write_extract_yaml(extract_data)else: # json 提取# 把中间变量写入 extract.yml 文件extract_value = self.find_key(value, res.json())if extract_value:extract_data = {key: extract_value}write_extract_yaml(extract_data)# 断言的封装调用self.validate_result(case_info['validate'], res.json(), res.status_code)return reselse:write_log('在 request 一级关键字下必须包括两个二级关键字:method,url')else:write_log("必须有的四个一级关键字:name,base_url,request,validate")except Exception as e:write_log("分析 yaml 文件异常,异常信息:%s"%str(traceback.format_exc()))raise# 统一替换的方法,data 可以是 url(string),也可以是参数(字典,字典中包含列表),也可以是请求头(字典)# 比如把 access_token: "{{access_token}}" 中的{{access_token}} 替换成 真实的值def replace_value(self,data_value):# 不管是什么类型统一转换成字符串格式if data_value and isinstance(data_value, dict): # 如果 data 不为空且 data 类型是一个字典str_data = json.dumps(data_value)else:str_data = data_value# 替换值for a in range(1, str_data.count('{{')+1):if '{{' in str_data and '}}' in str_data:start_index = str_data.index('{{')end_index = str_data.index('}}')old_value = str_data[start_index:end_index+2]new_value = read_extract_yaml(old_value[2:-2])str_data = str_data.replace(old_value, str(new_value))# 还原数据类型if data_value and isinstance(data_value, dict): # 如果 data 不为空且 data类型是一个字典data_value = json.loads(str_data)else:data_value = str_datareturn data_value# 热加载替换解析# 比如{"name": "hc${get_random_number(10000,99999)}"} 会执行函数 getattr(self,"get_random_number")(10000,99999)def replace_load(self, data_value):# 不管是什么类型统一转换成字符串格式if data_value and isinstance(data_value, dict): # 如果 data 不为空且 data 类型是一个字典str_data = json.dumps(data_value)else:str_data = data_value# 替换 ${函数调用} 为函数调用返回的值for a in range(1, str_data.count('${') + 1):if '${' in str_data and '}' in str_data:start_index = str_data.index('${')end_index = str_data.index('}')old_value = str_data[start_index:end_index + 1]func_name = old_value[2:old_value.index('(')]args_value = old_value[old_value.index('(')+1:old_value.index(')')]# 反射(通过一个函数的字符串直接去调用这个方法)if args_value:new_value = getattr(DebugTalk(), func_name)(*args_value.split(','))else:new_value = getattr(DebugTalk(), func_name)()str_data = str_data.replace(old_value, str(new_value))# 还原数据类型if data_value and isinstance(data_value, dict): # 如果 data 不为空且 data 类型是一个字典data_value = json.loads(str_data)else:data_value = str_datareturn data_value# 统一发送请求的方法:def send_request(self,name, method, url, headers=None, files=None, **kwargs):try:# 处理 method 统一为小写lower_method = str(method).lower()# 处理基础路径# base_url = read_config_yaml("url", "base")# second_url = self.replace_value(read_config_yaml("url", url))# 处理请求头if headers:headers = self.replace_value(headers)# 最核心的地方:请求数据如何去替换,可能是 params、data、jsonfor k, v in kwargs.items():if k in ['params', 'data', 'json']:value = self.replace_value(v)result_value = self.replace_load(value)kwargs[k] = result_value# 发送请求write_log("-------------------接口请求开始--------------------")write_log("接口请求名称:%s"%name)write_log("接口请求方式:%s"%method)write_log("接口请求路径:%s"%url)write_log("接口请求头:%s"%headers)if 'params' in kwargs.keys():write_log("接口请求参数:%s"%kwargs['params'])elif 'json' in kwargs.keys():write_log("接口请求参数:%s"%kwargs['json'])elif 'data' in kwargs.keys():write_log("接口请求参数:%s"%kwargs['data'])write_log("文件上传:%s"%files)res = RequestsUtil.session.request(method=lower_method, url=url, headers=headers, files=files, **kwargs)return resexcept Exception as e:write_log("统一发送请求异常,异常信息:%s"%str(traceback.format_exc()))raise# 断言封装def validate_result(self,expect_result,real_result,status_code):try:write_log("预期结果:" + str(expect_result))write_log("实际结果:" + str(real_result))# 断言的标记,flag = 0 断言成功,flag 不等于 0 断言失败flag = 0# 解析if expect_result and isinstance(expect_result, list):for expect_value in expect_result:for key, value in dict(expect_value).items():if key == "equals":for assert_key, assert_value in dict(value).items():if assert_key == "status_code":if status_code == assert_value:write_log("状态码断言成功")else:flag = flag + 1write_log("状态码断言失败:" + assert_key + "不等于" + assert_value)else:key_list = jsonpath.jsonpath(real_result, "$..%s" % assert_key)if key_list:if assert_value not in key_list:flag = flag + 1write_log("断言失败:" + assert_key + "不等于" + assert_value)else:write_log(assert_key + "-断言成功")else:flag = flag + 1write_log("断言失败:返回的结果中不存在" + assert_key)elif key == "contains":if value not in json.dumps(real_result):flag = flag + 1write_log("断言失败:返回的结果中不包含" + value)else:write_log(value + "-断言成功")else:write_log("框架不支持断言封装")assert flag == 0, "断言失败!"write_log("接口请求成功!!!!")write_log("-------------------接口请求结束--------------------")except Exception as e:write_log("接口请求失败!!!!")write_log("-------------------接口请求结束--------------------")write_log("断言异常,异常信息:%s"%str(traceback.format_exc()))raise
yaml_util.py - 处理 yaml 数据
import os
import yaml# 获取项目根路径
def get_object_path():return os.getcwd().split('common')[0]# 读取 config.yml 文件
def read_config_yaml(first_node,second_node):with open(get_object_path()+'/config.yml', 'r', encoding='utf-8') as f:yaml_config = yaml.load(f, Loader=yaml.FullLoader)return yaml_config[first_node][second_node]# 读取 extract.yml 文件
def read_extract_yaml(first_node):with open(get_object_path()+'/extract.yml', 'r', encoding='utf-8') as f:yaml_config = yaml.load(f, Loader=yaml.FullLoader)return yaml_config[first_node]# 写入 extract.yml 文件
def write_extract_yaml(data):with open(get_object_path()+'/extract.yml', 'a', encoding='utf-8') as f:yaml.dump(data, f,allow_unicode=True)# 清空 extract.yml 文件
def clear_extract_yaml():with open(get_object_path()+'/extract.yml', 'w', encoding='utf-8') as f:f.truncate()# 读取 YAML 测试用例文件
def read_testcase_yaml(yaml_path):with open(get_object_path()+yaml_path, 'r', encoding='utf-8') as f:yaml_value = yaml.load(f, Loader=yaml.FullLoader)return yaml_value
get_token.yml 文件-采用了csv数据驱动
- name: $csv{name}base_url: https://api.weixin.qq.comparameters: name-appid-secret-grant_type-assert_strrequest:url: /cgi-bin/tokenmethod: GETparams:appid: $csv{appid}secret: $csv{secret}grant_type: $csv{grant_type}validate:- equals: {"status_code": 200}- contains: $csv{assert_str}extract:access_token: access_token
# access_token: '"access_token":"(.*?)"'
get_token.csv 文件
name,appid,secret,grant_type,assert_str
获取 access_token,wxcb292044d4fdfd11,69be902b0619de6bf75af4b0b9992645,client_credential,access_token
get_token_error.csv 文件
name,appid,secret,grant_type,assert_str
appid 必填项检查,"",69be902b0619de6bf75af4b0b9992645,client_credential,errcode
secret 必填项检查,wxcb292044d4fdfd11,"",client_credential,errcode
grant_type 必填项检查,wxcb292044d4fdfd11,69be902b0619de6bf75af4b0b9992645,"",errcode
create_tag.yml 文件
- name: Create the user's tagbase_url: https://api.weixin.qq.comrequest:url: /cgi-bin/tags/createmethod: POSTparams:access_token: "{{access_token}}"json: {"tag": {"name": "hc_create${get_random_number(10000,99999)}"}}validate:- equals: {"status_code": 200}- contains: tagextract:tag_id: id
update_tag.yml 文件
- name: Update the user's tagbase_url: https://api.weixin.qq.comrequest:url: /cgi-bin/tags/updatemethod: POSTparams:access_token: "{{access_token}}"json: {"tag": {"id": "{{tag_id}}","name": "hc_update${get_timestamp_str()}"}}validate:- equals: {"status_code": 200}- equals: {"errcode": 0}- equals: {"errmsg": "ok"}
get_tag.yml 文件
- name: Get user's tagsbase_url: https://api.weixin.qq.comrequest:url: /cgi-bin/tags/getmethod: GETparams:access_token: "{{access_token}}"validate:- equals: {"status_code": 200}- contains: tags
upload_file.yml 文件
- name: Upload image filebase_url: https://api.weixin.qq.comrequest:url: /cgi-bin/media/uploadimgmethod: POSTparams:access_token: "{{access_token}}"files:media: "/screenshots/logo.png"validate:- equals: {"status_code": 200}- contains: url
test_user.py - user 模块测试用例执行
# -*- coding: utf-8 -*-
import allure
import pytest
from common.csv_util import read_testcase_file
from common.requests_util import RequestsUtil
from common.yaml_util import read_testcase_yaml@allure.epic("项目名称:微信公众号接口自动化测试")
@allure.feature("模块名称:用户模块")
class TestUser:@allure.story("接口名称:获取用户 token")@allure.severity(allure.severity_level.BLOCKER)@pytest.mark.user@pytest.mark.smoke@pytest.mark.run(order=1)@pytest.mark.parametrize("case_info",read_testcase_file("/data/user/get_token.yml","/data/user/get_token.csv"))def test_get_token(self,case_info):res = RequestsUtil().analysis_yaml(case_info)allure.attach(body=str(res.status_code) + res.text, name="响应数据:",attachment_type=allure.attachment_type.TEXT)@allure.story("接口名称:获取用户 token")@allure.severity(allure.severity_level.BLOCKER)@pytest.mark.user@pytest.mark.parametrize("case_info",read_testcase_file("/data/user/get_token.yml","/data/user/get_token_error.csv"))def test_get_token_error(self,case_info):res = RequestsUtil().analysis_yaml(case_info)allure.attach(body=str(res.status_code) + res.text, name="响应数据:",attachment_type=allure.attachment_type.TEXT)
test_tag.py - tag 模块测试用例执行
# -*- coding: utf-8 -*-
import allure
import pytest
from common.csv_util import read_testcase_file
from common.requests_util import RequestsUtil
from common.yaml_util import read_testcase_yaml@allure.epic("项目名称:微信公众号接口自动化测试")
@allure.feature("模块名称:标签模块")
class TestTag:@allure.story("接口名称:创建用户标签")@allure.severity(allure.severity_level.NORMAL)@pytest.mark.tag@pytest.mark.smoke@pytest.mark.parametrize("case_info", read_testcase_file("/data/tag/create_tag.yml"))def test_create_tag(self, case_info):res = RequestsUtil().analysis_yaml(case_info)allure.attach(body=str(res.status_code) + res.text, name="响应数据:",attachment_type=allure.attachment_type.TEXT)@allure.story("接口名称:更新用户标签")@allure.severity(allure.severity_level.NORMAL)@pytest.mark.tag@pytest.mark.smoke@pytest.mark.parametrize("case_info", read_testcase_yaml("/data/tag/update_tag.yml"))def test_update_tag(self, case_info):res = RequestsUtil().analysis_yaml(case_info)allure.attach(body=str(res.status_code) + res.text, name="响应数据:",attachment_type=allure.attachment_type.TEXT)@allure.story("接口名称:获取用户标签")@allure.severity(allure.severity_level.NORMAL)@pytest.mark.tag@pytest.mark.smoke@pytest.mark.parametrize("case_info", read_testcase_yaml("/data/tag/get_tag.yml"))def test_get_tag(self, case_info):res = RequestsUtil().analysis_yaml(case_info)allure.attach(body=str(res.status_code) + res.text, name="响应数据:",attachment_type=allure.attachment_type.TEXT)
test_file.py - file 模块测试用例执行
# -*- coding: utf-8 -*-
import allure
import pytest
from common.csv_util import read_testcase_file
from common.requests_util import RequestsUtil
from common.yaml_util import read_testcase_yaml@allure.epic("项目名称:微信公众号接口自动化测试")
@allure.feature("模块名称:文件模块")
class TestFile:@allure.story("接口名称:文件上传")@allure.severity(allure.severity_level.NORMAL)@pytest.mark.file@pytest.mark.smoke@pytest.mark.parametrize("case_info", read_testcase_yaml("/data/file/upload_file.yml"))def test_upload_file(self, case_info):res = RequestsUtil().analysis_yaml(case_info)allure.attach(body=str(res.status_code) + res.text, name="响应数据:",attachment_type=allure.attachment_type.TEXT)
all.py - 通过 pytest 执行测试用例
import os
import time
import pytest
if __name__ == "__main__":pytest.main()time.sleep(1)os.system("allure generate report/temps ‐o report/allure-report --clean")
config.yml - 项目配置数据
url:base: https://api.weixin.qq.com
log:log_name: logs_log_level: infolog_format: '[%(asctime)s] %(filename)s->%(funcName)s line:%(lineno)d [%(levelname)s] %(message)s'
conftest.py - 会话之前清除数据
import pytest
from common.yaml_util import clear_extract_yaml@pytest.fixture(scope="session",autouse=True)
def clear_extract():""" 每次会话之前清除 extract.yml 数据 """clear_extract_yaml()
debug_talk.py - 热加载函数
import random
import timeclass DebugTalk:# 获取随机数的方法def get_random_number(self, start, stop):name = random.randrange(int(start), int(stop))return name# 获取时间戳的方法def get_timestamp_str(self):name = time.strftime("_%Y%m%d_%H%M%S")return name
pytest.ini - pytest 配置文件
[pytest]
addopts = -vs -m 'smoke' --alluredir=report/temps --clean-alluredir
testpaths = testcases/
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers = smoke: smoke testcasesuser: user testcasestag: tag testcasesfile: file testcasesdemo: demo testcases
三、接口自动化测试项目文件下载
TestAPI_ALL.zip