8. 测试用例执行
预期效果如下:
用例执行逻辑如下:
- 前端提交用例 id 列表到后台,后台获取每一条用例的信息;
- 后台获取域名信息、用例 id 列表;
- 对用例的请求数据进行变量的参数化、函数化等预处理操作;
- 根据先后顺序进行接口请求,并对响应数据进行断言;
- 根据用例中的提取变量表达式,从断言成功的响应数据中提取关联变量值用于后续用例使用。
8.1 修改测试用例页模板文件:前端提交用例信息
templates/test_case.html:
1 {% extends 'base.html' %} 2 {% load static %} 3 {% block title %}测试用例{% endblock %} 4 5 {% block content %} 6 <script type="text/javascript"> 7 //页面加载的时候,所有的复选框都是未选中的状态 8 function checkOrCancelAll() { 9 var all_check = document.getElementById("all_check"); //1.获取all的元素对象 10 var all_check = all_check.checked; //2.获取选中状态 11 //3.若checked=true,将所有的复选框选中;checked=false,将所有的复选框取消 12 var allCheck = document.getElementsByName("test_cases_list"); 13 //4.循环遍历取出每一个复选框中的元素 14 if (all_check)//全选 15 { 16 for (var i = 0; i < allCheck.length; i++) { 17 //设置复选框的选中状态 18 allCheck[i].checked = true; 19 } 20 } else//取消全选 21 { 22 for (var i = 0; i < allCheck.length; i++) { 23 allCheck[i].checked = false; 24 } 25 } 26 } 27 28 function ischecked() { 29 //3.若checked=true,将所有的复选框选中,checked=false,将所有的复选框取消 30 var allCheck = document.getElementsByName("test_cases_list"); 31 for (var i = 0; i < allCheck.length; i++) { 32 if (allCheck[i].checked == true) { 33 alert("所需执行的测试用例提交成功!"); 34 return true 35 } 36 } 37 alert("请选择要执行的测试用例!") 38 return false 39 } 40 41 </script> 42 43 <form action="" method="POST"> 44 {% csrf_token %} 45 <input style="margin-left: 5px;" type="submit" value='执行测试用例' onclick="return ischecked()"/> 46 <span style="margin-left: 5px;">运行环境:</span> 47 <select name="env"> 48 <option selected value="dev">dev</option> 49 <option value="prod">prod</option> 50 </select> 51 <div class="table-responsive"> 52 <table class="table table-striped"> 53 <thead> 54 <tr> 55 <th><input type="checkbox" id="all_check" onclick="checkOrCancelAll();"/>全选</th> 56 <th>用例名称</th> 57 <th>所属项目</th> 58 <th>所属模块</th> 59 <th>接口地址</th> 60 <th>请求方式</th> 61 <th>请求数据</th> 62 <th>断言key</th> 63 <th>提取变量表达式</th> 64 </tr> 65 </thead> 66 <tbody> 67 68 {% for test_case in test_cases %} 69 <tr> 70 <td><input type="checkbox" value="{{ test_case.id }}" name="test_cases_list"> {{ test_case.id }}</td> 71 <td><a href="{% url 'test_case_detail' test_case.id%}">{{ test_case.case_name }}</a></td> 72 <td>{{ test_case.belong_project.name }}</td> 73 <td>{{ test_case.belong_module.name }}</td> 74 <td>{{ test_case.uri }}</td> 75 <td>{{ test_case.request_method }}</td> 76 <td>{{ test_case.request_data }}</td> 77 <td>{{ test_case.assert_key }}</td> 78 <td>{{ test_case.extract_var }}</td> 79 </tr> 80 {% endfor %} 81 </tbody> 82 </table> 83 84 </div> 85 </form> 86 {# 实现分页标签的代码 #} 87 {# 这里使用 bootstrap 渲染页面 #} 88 <div id="pages" class="text-center"> 89 <nav> 90 <ul class="pagination"> 91 <li class="step-links"> 92 {% if test_cases.has_previous %} 93 <a class='active' href="?page={{ test_cases.previous_page_number }}">上一页</a> 94 {% endif %} 95 96 <span class="current"> 97 第 {{ test_cases.number }} 页 / 共 {{ test_cases.paginator.num_pages }} 页</span> 98 99 {% if test_cases.has_next %} 100 <a class='active' href="?page={{ test_cases.next_page_number }}">下一页</a> 101 {% endif %} 102 </li> 103 </ul> 104 </nav> 105 </div> 106 {% endblock %}
8.2 定义接口地址模型类
models.py:
1 from django.db import models 2 from smart_selects.db_fields import GroupedForeignKey # pip install django-smart-selects:后台级联选择 3 from django.contrib.auth.models import User 4 5 6 class Project(models.Model): 7 id = models.AutoField(primary_key=True) 8 name = models.CharField('项目名称', max_length=50, unique=True, null=False) 9 proj_owner = models.CharField('项目负责人', max_length=20, null=False) 10 test_owner = models.CharField('测试负责人', max_length=20, null=False) 11 dev_owner = models.CharField('开发负责人', max_length=20, null=False) 12 desc = models.CharField('项目描述', max_length=100, null=True) 13 create_time = models.DateTimeField('项目创建时间', auto_now_add=True) 14 update_time = models.DateTimeField('项目更新时间', auto_now=True, null=True) 15 16 def __str__(self): 17 return self.name 18 19 class Meta: 20 verbose_name = '项目信息表' 21 verbose_name_plural = '项目信息表' 22 23 24 class Module(models.Model): 25 id = models.AutoField(primary_key=True) 26 name = models.CharField('模块名称', max_length=50, null=False) 27 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE) 28 test_owner = models.CharField('测试负责人', max_length=50, null=False) 29 desc = models.CharField('简要描述', max_length=100, null=True) 30 create_time = models.DateTimeField('创建时间', auto_now_add=True) 31 update_time = models.DateTimeField('更新时间', auto_now=True, null=True) 32 33 def __str__(self): 34 return self.name 35 36 class Meta: 37 verbose_name = '模块信息表' 38 verbose_name_plural = '模块信息表' 39 40 41 class TestCase(models.Model): 42 id = models.AutoField(primary_key=True) 43 case_name = models.CharField('用例名称', max_length=50, null=False) # 如 register 44 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE, verbose_name='所属项目') 45 belong_module = GroupedForeignKey(Module, "belong_project", on_delete=models.CASCADE, verbose_name='所属模块') 46 request_data = models.CharField('请求数据', max_length=1024, null=False, default='') 47 uri = models.CharField('接口地址', max_length=1024, null=False, default='') 48 assert_key = models.CharField('断言内容', max_length=1024, null=True) 49 maintainer = models.CharField('编写人员', max_length=1024, null=False, default='') 50 extract_var = models.CharField('提取变量表达式', max_length=1024, null=True) # 示例:userid||userid": (\d+) 51 request_method = models.CharField('请求方式', max_length=1024, null=True) 52 status = models.IntegerField(null=True, help_text="0:表示有效,1:表示无效,用于软删除") 53 created_time = models.DateTimeField('创建时间', auto_now_add=True) 54 updated_time = models.DateTimeField('更新时间', auto_now=True, null=True) 55 user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='责任人', null=True) 56 57 def __str__(self): 58 return self.case_name 59 60 class Meta: 61 verbose_name = '测试用例表' 62 verbose_name_plural = '测试用例表' 63 64 65 class CaseSuite(models.Model): 66 id = models.AutoField(primary_key=True) 67 suite_desc = models.CharField('用例集合描述', max_length=100, blank=True, null=True) 68 if_execute = models.IntegerField(verbose_name='是否执行', null=False, default=0, help_text='0:执行;1:不执行') 69 test_case_model = models.CharField('测试执行模式', max_length=100, blank=True, null=True, help_text='data/keyword') 70 creator = models.CharField(max_length=50, blank=True, null=True) 71 create_time = models.DateTimeField('创建时间', auto_now=True) # 创建时间-自动获取当前时间 72 73 class Meta: 74 verbose_name = "用例集合表" 75 verbose_name_plural = '用例集合表' 76 77 78 class SuiteCase(models.Model): 79 id = models.AutoField(primary_key=True) 80 case_suite = models.ForeignKey(CaseSuite, on_delete=models.CASCADE, verbose_name='用例集合') 81 test_case = models.ForeignKey(TestCase, on_delete=models.CASCADE, verbose_name='测试用例') 82 status = models.IntegerField(verbose_name='是否有效', null=False, default=1, help_text='0:有效,1:无效') 83 create_time = models.DateTimeField('创建时间', auto_now=True) # 创建时间-自动获取当前时间 84 85 86 class InterfaceServer(models.Model): 87 id = models.AutoField(primary_key=True) 88 env = models.CharField('环境', max_length=50, null=False, default='') 89 ip = models.CharField('ip', max_length=50, null=False, default='') 90 port = models.CharField('端口', max_length=100, null=False, default='') 91 remark = models.CharField('备注', max_length=100, null=True) 92 create_time = models.DateTimeField('创建时间', auto_now_add=True) 93 update_time = models.DateTimeField('更新时间', auto_now=True, null=True) 94 95 def __str__(self): 96 return self.env 97 98 class Meta: 99 verbose_name = '接口地址配置表' 100 verbose_name_plural = '接口地址配置表'
执行数据迁移:
python manage.py makemigrations python manage.py migrate
admin.py:
1 from django.contrib import admin 2 from .import models 3 4 5 class ProjectAdmin(admin.ModelAdmin): 6 list_display = ("id", "name", "proj_owner", "test_owner", "dev_owner", "desc", "create_time", "update_time") 7 8 admin.site.register(models.Project, ProjectAdmin) 9 10 11 class ModuleAdmin(admin.ModelAdmin): 12 list_display = ("id", "name", "belong_project", "test_owner", "desc", "create_time", "update_time") 13 14 admin.site.register(models.Module, ModuleAdmin) 15 16 17 class TestCaseAdmin(admin.ModelAdmin): 18 list_display = ( 19 "id", "case_name", "belong_project", "belong_module", "request_data", "uri", "assert_key", "maintainer", 20 "extract_var", "request_method", "status", "created_time", "updated_time", "user") 21 22 admin.site.register(models.TestCase, TestCaseAdmin) 23 24 25 class CaseSuiteAdmin(admin.ModelAdmin): 26 list_display = ("id", "suite_desc", "creator", "create_time") 27 28 admin.site.register(models.CaseSuite, CaseSuiteAdmin) 29 30 31 class InterfaceServerAdmin(admin.ModelAdmin): 32 list_display = ("id", "env", "ip", "port", "remark", "create_time") 33 34 admin.site.register(models.InterfaceServer, InterfaceServerAdmin)
登录 admin 系统,添加地址配置数据:
8.3 修改测试用例视图函数,后台执行用例
1)Redis 持久化递增唯一数
在本工程中,我们使用 Redis 来维护一个每次调用函数来就会递增的数值,供注册接口的注册用户名拼接使用,避免注册接口请求数据重复使用问题。
1.1)Redis 持久化配置
修改 redis.windows.conf:
appendonly yes # 每次更新操作后进行日志记录 appendfsync everysec # 每秒同步一次(默认值)
1.2)启动 Redis 服务端:
redis-server.exe redis.windows.conf
2)请求/响应数据处理
在应用目录下新建 utils 包,用于封装接口请求的相关函数。
data_process.py
该模块实现了对接口请求的所需工具函数,如获取递增唯一数(供注册用户名使用)、md5 加密(用于登录密码加密)、请求数据预处理、响应数据断言等功能。
- get_unique_num_value():用于获取每次递增的唯一数
- 该函数的目标是解决注册用户名重复的问题。
- 虽然可以在赋值注册用户名变量时,采用前缀字符串拼接随机数的方式,但是用随机数的方式仍然是有可能出现用户名重复的情况。因此,可以在单独的一个文件中维护一个数字,每次请求注册接口之前,先读取该文件中的数字,拼接用户名前缀字符串。读取完之后,再把这个数字进行加一的操作并保存,即每读取一次这个数字之后,就做一次修改,进而保证每次拼接的用户名都是唯一的,避免出现因为用户名重复导致用例执行失败的情况。
- data_preprocess():对请求数据进行预处理:参数化及函数化。
- data_postprocess():将响应数据需要关联的参数保存进全局变量,供后续接口使用。
- assert_result():对响应数据进行关键字断言。
1 import re 2 import hashlib 3 import os 4 import json 5 import traceback 6 import redis 7 from InterfaceAutoTest.settings import redis_port 8 9 10 # 连接redis 11 pool = redis.ConnectionPool(host='localhost', port=redis_port, decode_responses=True) 12 redis_obj = redis.Redis(connection_pool=pool) 13 14 15 # 初始化框架工程中的全局变量,存储在测试数据中的唯一值数据 16 # 框架工程中若要使用字典中的任意一个变量,则每次使用后,均需要将字典中的value值进行加1操作。 17 def get_unique_number_value(unique_number): 18 data = None 19 try: 20 redis_value = redis_obj.get(unique_number) # {"unique_number": 666} 21 if redis_value: 22 data = redis_value 23 print("全局唯一数当前生成的值是:%s" % data) 24 # 把redis中key为unique_number的值进行加一操作,以便下提取时保持唯一 25 redis_obj.set(unique_number, int(redis_value) + 1) 26 else: 27 data = 1000 # 初始化递增数值 28 redis_obj.set(unique_number, data) 29 except Exception as e: 30 print("获取全局唯一数变量值失败,请求的全局唯一数变量是%s,异常原因如下:%s" % (unique_number, traceback.format_exc())) 31 data = None 32 finally: 33 return data 34 35 36 def md5(s): 37 m5 = hashlib.md5() 38 m5.update(s.encode("utf-8")) 39 md5_value = m5.hexdigest() 40 return md5_value 41 42 43 # 请求数据预处理:参数化、函数化 44 # 将请求数据中包含的${变量名}的字符串部分,替换为唯一数或者全局变量字典中对应的全局变量 45 def data_preprocess(global_key, requestData): 46 try: 47 # 匹配注册用户名参数,即"${unique_num...}"的格式,并取出本次请求的随机数供后续接口的用户名参数使用 48 if re.search(r"\$\{unique_num\d+\}", requestData): 49 var_name = re.search(r"\$\{(unique_num\d+)\}", requestData).group(1) # 获取用户名参数 50 print("用户名变量:%s" % var_name) 51 var_value = get_unique_number_value(var_name) 52 print("用户名变量值: %s" % var_value) 53 requestData = re.sub(r"\$\{unique_num\d+\}", str(var_value), requestData) 54 var_name = var_name.split("_")[1] 55 print("关联的用户名变量: %s" % var_name) 56 # "xxxkey" : "{'var_name': var_value}" 57 global_var = json.loads(os.environ[global_key]) 58 global_var[var_name] = var_value 59 os.environ[global_key] = json.dumps(global_var) 60 print("用户名唯一数参数化后的全局变量【os.environ[global_key]】: {}".format(os.environ[global_key])) 61 # 函数化,如密码加密"${md5(...)}"的格式 62 if re.search(r"\$\{\w+\(.+\)\}", requestData): 63 var_pass = re.search(r"\$\{(\w+\(.+\))\}", requestData).group(1) # 获取密码参数 64 print("需要函数化的变量: %s" % var_pass) 65 print("函数化后的结果: %s" % eval(var_pass)) 66 requestData = re.sub(r"\$\{\w+\(.+\)\}", eval(var_pass), requestData) # 将requestBody里面的参数内容通过eval修改为实际变量值 67 print("函数化后的请求数据: %s" % requestData) # requestBody是拿到的请求时发送的数据 68 # 其余变量参数化 69 if re.search(r"\$\{(\w+)\}", requestData): 70 print("需要参数化的变量: %s" % (re.findall(r"\$\{(\w+)\}", requestData))) 71 for var_name in re.findall(r"\$\{(\w+)\}", requestData): 72 requestData = re.sub(r"\$\{%s\}" % var_name, str(json.loads(os.environ[global_key])[var_name]), requestData) 73 print("变量参数化后的最终请求数据: %s" % requestData) 74 print("数据参数后的最终全局变量【os.environ[global_key]】: {}".format(os.environ[global_key])) 75 return 0, requestData, "" 76 except Exception as e: 77 print("请求数据预处理发生异常,error:{}".format(traceback.format_exc())) 78 return 1, {}, traceback.format_exc() 79 80 81 # 响应数据提取关联参数 82 def data_postprocess(global_key, response_data, extract_var): 83 print("需提取的关联变量:%s" % extract_var) 84 var_name = extract_var.split("||")[0] 85 print("关联变量名:%s" % var_name) 86 regx_exp = extract_var.split("||")[1] 87 print("关联变量正则:%s" % regx_exp) 88 if re.search(regx_exp, response_data): 89 global_vars = json.loads(os.environ[global_key]) 90 print("关联前的全局变量:{}".format(global_vars)) 91 global_vars[var_name] = re.search(regx_exp, response_data).group(1) 92 os.environ[global_key] = json.dumps(global_vars) 93 print("关联前的全局变量:{}".format(os.environ[global_key])) 94 return 95 96 97 # 响应数据 断言处理 98 def assert_result(response_obj, key_word): 99 try: 100 # 多个断言关键字 101 if '&&' in key_word: 102 key_word_list = key_word.split('&&') 103 print("断言关键字列表:%s" % key_word_list) 104 # 断言结果标识符 105 flag = True 106 exception_info = '' 107 # 遍历分隔出来的断言关键词列表 108 for key_word in key_word_list: 109 # 如果断言词非空,则进行断言 110 if key_word: 111 # 没查到断言词则认为是断言失败 112 if not (key_word in json.dumps(response_obj.json(), ensure_ascii=False)): 113 print("断言关键字【{}】匹配失败".format(key_word)) 114 flag = False # 只要有一个断言词匹配失败,则整个接口断言失败 115 exception_info = "keyword: {} not matched from response, assert failed".format(key_word) 116 else: 117 print("断言关键字【{}】匹配成功".format(key_word)) 118 if flag: 119 print("接口断言成功!") 120 else: 121 print("接口断言失败!") 122 return flag, exception_info 123 # 单个断言关键字 124 else: 125 if key_word in json.dumps(response_obj.json(), ensure_ascii=False): 126 print("接口断言【{}】匹配成功!".format(key_word)) 127 return True, '' 128 else: 129 print("接口断言【{}】匹配失败!".format(key_word)) 130 return False, '' 131 except Exception as e: 132 return False, traceback.format_exc() 133 134 135 # 测试代码 136 if __name__ == "__main__": 137 print(get_unique_number_value("unique_num1"))
request_process.py
该模块实现了对接口请求的封装。
1 import requests 2 import json 3 # from Util.Log import logger 4 5 6 # 此函数封装了get请求、post和put请求的方法 7 def request_process(url, request_method, request_content): 8 print("-------- 开始调用接口 --------") 9 if request_method == "get": 10 try: 11 if isinstance(request_content, dict): 12 print("接口地址:%s" % url) 13 print("请求数据:%s" % request_content) 14 r = requests.get(url, params=json.dumps(request_content)) 15 else: 16 r = requests.get(url+str(request_content)) 17 print("接口地址:%s" % r.url) 18 print("请求数据:%s" % request_content) 19 20 except Exception as e: 21 print("get方法请求发生异常:请求的url是%s, 请求的内容是%s\n发生的异常信息如下:%s" % (url, request_content, e)) 22 r = None 23 return r 24 elif request_method == "post": 25 try: 26 if isinstance(request_content, dict): 27 print("接口地址:%s" % url) 28 print("请求数据:%s" % json.dumps(request_content)) 29 r = requests.post(url, data=json.dumps(request_content)) 30 else: 31 raise ValueError 32 except ValueError as e: 33 print("post方法请求发生异常:请求的url是%s, 请求的内容是%s\n发生的异常信息如下:%s" % (url, request_content, "请求参数不是字典类型")) 34 r = None 35 except Exception as e: 36 print("post方法请求发生异常:请求的url是%s, 请求的内容是%s\n发生的异常信息如下:%s" % (url, request_content, e)) 37 r = None 38 return r 39 elif request_method == "put": 40 try: 41 if isinstance(request_content, dict): 42 print("接口地址:%s" % url) 43 print("请求数据:%s" % json.dumps(request_content)) 44 r = requests.put(url, data=json.dumps(request_content)) 45 else: 46 raise ValueError 47 except ValueError as e: 48 print("put方法请求发生异常:请求的url是%s, 请求的内容是%s\n发生的异常信息如下:%s" % (url, request_content, "请求参数不是字典类型")) 49 r = None 50 except Exception as e: 51 print("put方法请求发生异常:请求的url是%s, 请求的内容是%s\n发生的异常信息如下:%s" % (url, request_content, e)) 52 r = None 53 return r
3)封装接口用例执行方法
在应用目录下新建 task.py:
1 import time 2 import os 3 import traceback 4 import json 5 from . import models 6 from .utils.data_process import data_preprocess, assert_result, data_postprocess 7 from .utils.request_process import request_process 8 9 10 def case_task(test_case_id_list, server_address): 11 global_key = 'case'+ str(int(time.time() * 100000)) 12 os.environ[global_key] = '{}' 13 print() 14 print("全局变量标识符【global_key】: {}".format(global_key)) 15 print("全局变量内容【os.environ[global_key]】: {}".format(os.environ[global_key])) 16 for test_case_id in test_case_id_list: 17 print() 18 test_case = models.TestCase.objects.filter(id=int(test_case_id))[0] 19 print("######### 开始执行用例【{}】 #########".format(test_case)) 20 execute_start_time = time.time() # 记录时间戳,便于计算总耗时(毫秒) 21 request_data = test_case.request_data 22 extract_var = test_case.extract_var 23 assert_key = test_case.assert_key 24 interface_name = test_case.uri 25 belong_project = test_case.belong_project 26 belong_module = test_case.belong_module 27 maintainer = test_case.maintainer 28 request_method = test_case.request_method 29 print("初始请求数据: {}".format(request_data)) 30 print("关联参数: {}".format(extract_var)) 31 print("断言关键字: {}".format(assert_key)) 32 print("接口名称: {}".format(interface_name)) 33 print("所属项目: {}".format(belong_project)) 34 print("所属模块: {}".format(belong_module)) 35 print("用例维护人: {}".format(maintainer)) 36 print("请求方法: {}".format(request_method)) 37 url = "{}{}".format(server_address, interface_name) 38 print("接口地址: {}".format(url)) 39 code, request_data, error_msg = data_preprocess(global_key, str(request_data)) 40 try: 41 res_data = request_process(url, request_method, json.loads(request_data)) 42 print("响应数据: {}".format(json.dumps(res_data.json(), ensure_ascii=False))) # ensure_ascii:兼容中文 43 result_flag, exception_info = assert_result(res_data, assert_key) 44 if result_flag: 45 print("用例【%s】执行成功!" % test_case) 46 if extract_var.strip() != "None": 47 data_postprocess(global_key, json.dumps(res_data.json(), ensure_ascii=False), extract_var) 48 else: 49 print("用例【%s】执行失败!" % test_case) 50 except Exception as e: 51 print("接口请求异常,error: {}".format(traceback.format_exc()))
4)修改测试用例视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer 8 from .task import case_task 9 10 11 # 封装分页处理 12 def get_paginator(request, data): 13 paginator = Paginator(data, 10) # 默认每页展示10条数据 14 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 15 page = request.GET.get('page') 16 try: 17 paginator_pages = paginator.page(page) 18 except PageNotAnInteger: 19 # 如果请求的页数不是整数, 返回第一页。 20 paginator_pages = paginator.page(1) 21 except InvalidPage: 22 # 如果请求的页数不存在, 重定向页面 23 return HttpResponse('找不到页面的内容') 24 return paginator_pages 25 26 27 # 项目菜单项 28 @login_required 29 def project(request): 30 print("request.user.is_authenticated: ", request.user.is_authenticated) 31 projects = Project.objects.filter().order_by('-id') 32 print("projects:", projects) 33 return render(request, 'project.html', {'projects': get_paginator(request, projects)}) 34 35 36 # 模块菜单项 37 @login_required 38 def module(request): 39 if request.method == "GET": # 请求get时候,id倒序查询所有的模块数据 40 modules = Module.objects.filter().order_by('-id') 41 return render(request, 'module.html', {'modules': get_paginator(request, modules)}) 42 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 43 proj_name = request.POST['proj_name'] 44 projects = Project.objects.filter(name__contains=proj_name.strip()) 45 projs = [proj.id for proj in projects] 46 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 47 return render(request, 'module.html', {'modules': get_paginator(request, modules), 'proj_name': proj_name}) 48 49 50 # 获取测试用例执行的接口地址 51 def get_server_address(env): 52 if env: # 环境处理 53 env_data = InterfaceServer.objects.filter(env=env[0]) 54 print("env_data: {}".format(env_data)) 55 if env_data: 56 ip = env_data[0].ip 57 port = env_data[0].port 58 print("ip: {}, port: {}".format(ip, port)) 59 server_address = "http://{}:{}".format(ip, port) 60 print("server_address: {}".format(server_address)) 61 return server_address 62 else: 63 return "" 64 else: 65 return "" 66 67 68 # 测试用例菜单项 69 @login_required 70 def test_case(request): 71 print("request.session['is_login']: {}".format(request.session['is_login'])) 72 test_cases = "" 73 if request.method == "GET": 74 test_cases = TestCase.objects.filter().order_by('id') 75 print("testcases: {}".format(test_cases)) 76 elif request.method == "POST": 77 print("request.POST: {}".format(request.POST)) 78 test_case_id_list = request.POST.getlist('test_cases_list') 79 env = request.POST.getlist('env') 80 print("env: {}".format(env)) 81 server_address = get_server_address(env) 82 if not server_address: 83 return HttpResponse("提交的运行环境为空,请选择环境后再提交!") 84 if test_case_id_list: 85 test_case_id_list.sort() 86 print("test_case_id_list: {}".format(test_case_id_list)) 87 print("获取到用例,开始用例执行") 88 case_task(test_case_id_list, server_address) 89 else: 90 print("运行测试用例失败") 91 return HttpResponse("提交的运行测试用例为空,请选择用例后在提交!") 92 test_cases = TestCase.objects.filter().order_by('id') 93 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 94 95 96 # 用例详情页 97 @login_required 98 def test_case_detail(request, test_case_id): 99 test_case_id = int(test_case_id) 100 test_case = TestCase.objects.get(id=test_case_id) 101 print("test_case: {}".format(test_case)) 102 print("test_case.id: {}".format(test_case.id)) 103 print("test_case.belong_project: {}".format(test_case.belong_project)) 104 105 return render(request, 'test_case_detail.html', {'test_case': test_case}) 106 107 108 # 模块页展示测试用例 109 @login_required 110 def module_test_cases(request, module_id): 111 module = "" 112 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现 113 module = Module.objects.get(id=int(module_id)) 114 test_cases = TestCase.objects.filter(belong_module=module).order_by('-id') 115 print("test_case in module_test_cases: {}".format(test_cases)) 116 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 117 118 119 # 用例集合菜单项 120 @login_required 121 def case_suite(request): 122 case_suites = CaseSuite.objects.filter() 123 return render(request, 'case_suite.html', {'case_suites': get_paginator(request, case_suites)}) 124 125 126 # 用例集合-添加测试用例页 127 @login_required 128 def add_case_in_suite(request, suite_id): 129 # 查询指定的用例集合 130 case_suite = CaseSuite.objects.get(id=suite_id) 131 # 根据id号查询所有的用例 132 test_cases = TestCase.objects.filter().order_by('id') 133 if request.method == "GET": 134 print("test cases:", test_cases) 135 elif request.method == "POST": 136 test_cases_list = request.POST.getlist('testcases_list') 137 # 如果页面勾选了用例 138 if test_cases_list: 139 print("勾选用例id:", test_cases_list) 140 # 根据页面勾选的用例与查询出的所有用例一一比较 141 for test_case in test_cases_list: 142 test_case = TestCase.objects.get(id=int(test_case)) 143 # 匹配成功则添加用例 144 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case) 145 # 未勾选用例 146 else: 147 print("添加测试用例失败") 148 return HttpResponse("添加的测试用例为空,请选择用例后再添加!") 149 return render(request, 'add_case_in_suite.html', 150 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 151 152 153 # 用例集合页-查看/删除用例 154 @login_required 155 def show_and_delete_case_in_suite(request, suite_id): 156 case_suite = CaseSuite.objects.get(id=suite_id) 157 test_cases = SuiteCase.objects.filter(case_suite=case_suite) 158 if request.method == "POST": 159 test_cases_list = request.POST.getlist('test_cases_list') 160 if test_cases_list: 161 print("勾选用例:", test_cases_list) 162 for test_case in test_cases_list: 163 test_case = TestCase.objects.get(id=int(test_case)) 164 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete() 165 else: 166 print("测试用例删除失败") 167 return HttpResponse("所选测试用例为空,请选择用例后再进行删除!") 168 case_suite = CaseSuite.objects.get(id=suite_id) 169 return render(request, 'show_and_delete_case_in_suite.html', 170 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 171 172 173 # 默认页的视图函数 174 @login_required 175 def index(request): 176 return render(request, 'index.html') 177 178 179 # 登录页的视图函数 180 def login(request): 181 print("request.session.items(): {}".format(request.session.items())) # 打印session信息 182 if request.session.get('is_login', None): 183 return redirect('/') 184 # 如果是表单提交行为,则进行登录校验 185 if request.method == "POST": 186 login_form = UserForm(request.POST) 187 message = "请检查填写的内容!" 188 if login_form.is_valid(): 189 username = login_form.cleaned_data['username'] 190 password = login_form.cleaned_data['password'] 191 try: 192 # 使用django提供的身份验证功能 193 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象 194 if user is not None: 195 print("用户【%s】登录成功" % username) 196 auth.login(request, user) 197 request.session['is_login'] = True 198 # 登录成功,跳转主页 199 return redirect('/') 200 else: 201 message = "用户名不存在或者密码不正确!" 202 except: 203 traceback.print_exc() 204 message = "登录程序出现异常" 205 # 用户名或密码为空,返回登录页和错误提示信息 206 else: 207 return render(request, 'login.html', locals()) 208 # 不是表单提交,代表只是访问登录页 209 else: 210 login_form = UserForm() 211 return render(request, 'login.html', locals()) 212 213 214 # 注册页的视图函数 215 def register(request): 216 return render(request, 'register.html') 217 218 219 # 登出的视图函数:重定向至login视图函数 220 @login_required 221 def logout(request): 222 auth.logout(request) 223 request.session.flush() 224 return redirect("/login/")
9. 用例执行结果展示
9.1 定义模型类
1)models.py 中增加 TestCaseExecuteResult 模型类,用于记录用例执行结果。
1 from django.db import models 2 from smart_selects.db_fields import GroupedForeignKey # pip install django-smart-selects:后台级联选择 3 from django.contrib.auth.models import User 4 5 6 class Project(models.Model): 7 id = models.AutoField(primary_key=True) 8 name = models.CharField('项目名称', max_length=50, unique=True, null=False) 9 proj_owner = models.CharField('项目负责人', max_length=20, null=False) 10 test_owner = models.CharField('测试负责人', max_length=20, null=False) 11 dev_owner = models.CharField('开发负责人', max_length=20, null=False) 12 desc = models.CharField('项目描述', max_length=100, null=True) 13 create_time = models.DateTimeField('项目创建时间', auto_now_add=True) 14 update_time = models.DateTimeField('项目更新时间', auto_now=True, null=True) 15 16 def __str__(self): 17 return self.name 18 19 class Meta: 20 verbose_name = '项目信息表' 21 verbose_name_plural = '项目信息表' 22 23 24 class Module(models.Model): 25 id = models.AutoField(primary_key=True) 26 name = models.CharField('模块名称', max_length=50, null=False) 27 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE) 28 test_owner = models.CharField('测试负责人', max_length=50, null=False) 29 desc = models.CharField('简要描述', max_length=100, null=True) 30 create_time = models.DateTimeField('创建时间', auto_now_add=True) 31 update_time = models.DateTimeField('更新时间', auto_now=True, null=True) 32 33 def __str__(self): 34 return self.name 35 36 class Meta: 37 verbose_name = '模块信息表' 38 verbose_name_plural = '模块信息表' 39 40 41 class TestCase(models.Model): 42 id = models.AutoField(primary_key=True) 43 case_name = models.CharField('用例名称', max_length=50, null=False) # 如 register 44 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE, verbose_name='所属项目') 45 belong_module = GroupedForeignKey(Module, "belong_project", on_delete=models.CASCADE, verbose_name='所属模块') 46 request_data = models.CharField('请求数据', max_length=1024, null=False, default='') 47 uri = models.CharField('接口地址', max_length=1024, null=False, default='') 48 assert_key = models.CharField('断言内容', max_length=1024, null=True) 49 maintainer = models.CharField('编写人员', max_length=1024, null=False, default='') 50 extract_var = models.CharField('提取变量表达式', max_length=1024, null=True) # 示例:userid||userid": (\d+) 51 request_method = models.CharField('请求方式', max_length=1024, null=True) 52 status = models.IntegerField(null=True, help_text="0:表示有效,1:表示无效,用于软删除") 53 created_time = models.DateTimeField('创建时间', auto_now_add=True) 54 updated_time = models.DateTimeField('更新时间', auto_now=True, null=True) 55 user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='责任人', null=True) 56 57 def __str__(self): 58 return self.case_name 59 60 class Meta: 61 verbose_name = '测试用例表' 62 verbose_name_plural = '测试用例表' 63 64 65 class CaseSuite(models.Model): 66 id = models.AutoField(primary_key=True) 67 suite_desc = models.CharField('用例集合描述', max_length=100, blank=True, null=True) 68 if_execute = models.IntegerField(verbose_name='是否执行', null=False, default=0, help_text='0:执行;1:不执行') 69 test_case_model = models.CharField('测试执行模式', max_length=100, blank=True, null=True, help_text='data/keyword') 70 creator = models.CharField(max_length=50, blank=True, null=True) 71 create_time = models.DateTimeField('创建时间', auto_now=True) # 创建时间-自动获取当前时间 72 73 class Meta: 74 verbose_name = "用例集合表" 75 verbose_name_plural = '用例集合表' 76 77 78 class SuiteCase(models.Model): 79 id = models.AutoField(primary_key=True) 80 case_suite = models.ForeignKey(CaseSuite, on_delete=models.CASCADE, verbose_name='用例集合') 81 test_case = models.ForeignKey(TestCase, on_delete=models.CASCADE, verbose_name='测试用例') 82 status = models.IntegerField(verbose_name='是否有效', null=False, default=1, help_text='0:有效,1:无效') 83 create_time = models.DateTimeField('创建时间', auto_now=True) # 创建时间-自动获取当前时间 84 85 86 class InterfaceServer(models.Model): 87 id = models.AutoField(primary_key=True) 88 env = models.CharField('环境', max_length=50, null=False, default='') 89 ip = models.CharField('ip', max_length=50, null=False, default='') 90 port = models.CharField('端口', max_length=100, null=False, default='') 91 remark = models.CharField('备注', max_length=100, null=True) 92 create_time = models.DateTimeField('创建时间', auto_now_add=True) 93 update_time = models.DateTimeField('更新时间', auto_now=True, null=True) 94 95 def __str__(self): 96 return self.env 97 98 class Meta: 99 verbose_name = '接口地址配置表' 100 verbose_name_plural = '接口地址配置表' 101 102 103 class TestCaseExecuteResult(models.Model): 104 id = models.AutoField(primary_key=True) 105 belong_test_case = GroupedForeignKey(TestCase, "belong_test_case", on_delete=models.CASCADE, verbose_name='所属用例') 106 status = models.IntegerField(null=True, help_text="0:表示未执行,1:表示已执行") 107 exception_info = models.CharField(max_length=2048, blank=True, null=True) 108 request_data = models.CharField('请求体', max_length=1024, null=True) # {"code": "00", "userid": 22889} 109 response_data = models.CharField('响应字符串', max_length=1024, null=True) # {"code": "00", "userid": 22889} 110 execute_result = models.CharField('执行结果', max_length=1024, null=True) # 成功/失败 111 extract_var = models.CharField('关联参数', max_length=1024, null=True) # 响应成功后提取变量 112 last_time_response_data = models.CharField('上一次响应字符串', max_length=1024, null=True) # {"code": "00", "userid": 22889} 113 execute_total_time = models.CharField('执行耗时', max_length=1024, null=True) 114 execute_start_time = models.CharField('执行开始时间', max_length=300, blank=True, null=True) 115 execute_end_time = models.CharField('执行结束时间', max_length=300, blank=True, null=True) 116 created_time = models.DateTimeField('创建时间', auto_now_add=True) 117 updated_time = models.DateTimeField('更新时间', auto_now=True, null=True) 118 119 def __str__(self): 120 return str(self.id) 121 122 class Meta: 123 verbose_name = '用例执行结果记录表' 124 verbose_name_plural = '用例执行结果记录表'
2)数据迁移
python manage.py makemigrations python manage.py migrate
9.2 修改用例执行封装函数,增加执行结果记录
修改应用目录下 task.py:
1 import time 2 import os 3 import traceback 4 import json 5 from . import models 6 from .utils.data_process import data_preprocess, assert_result, data_postprocess 7 from .utils.request_process import request_process 8 9 10 def case_task(test_case_id_list, server_address): 11 global_key = 'case'+ str(int(time.time() * 100000)) 12 os.environ[global_key] = '{}' 13 print() 14 print("全局变量标识符【global_key】: {}".format(global_key)) 15 print("全局变量内容【os.environ[global_key]】: {}".format(os.environ[global_key])) 16 for test_case_id in test_case_id_list: 17 18 test_case = models.TestCase.objects.filter(id=int(test_case_id))[0] 19 last_execute_record_data = models.TestCaseExecuteResult.objects.filter( 20 belong_test_case_id=test_case_id).order_by('-id') 21 if last_execute_record_data: 22 last_time_execute_response_data = last_execute_record_data[0].response_data 23 else: 24 last_time_execute_response_data = '' 25 print("上一次响应结果: {}".format(last_execute_record_data)) 26 print("上一次响应时间: {}".format(last_time_execute_response_data)) 27 execute_record = models.TestCaseExecuteResult.objects.create(belong_test_case=test_case) 28 execute_record.last_time_response_data = last_time_execute_response_data 29 # 获取当前用例上一次执行结果 30 execute_record.save() 31 32 test_case = models.TestCase.objects.filter(id=int(test_case_id))[0] 33 print("\n######### 开始执行用例【{}】 #########".format(test_case)) 34 execute_start_time = time.time() # 记录时间戳,便于计算总耗时(毫秒) 35 execute_record.execute_start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(execute_start_time)) 36 37 request_data = test_case.request_data 38 extract_var = test_case.extract_var 39 assert_key = test_case.assert_key 40 interface_name = test_case.uri 41 belong_project = test_case.belong_project 42 belong_module = test_case.belong_module 43 maintainer = test_case.maintainer 44 request_method = test_case.request_method 45 print("初始请求数据: {}".format(request_data)) 46 print("关联参数: {}".format(extract_var)) 47 print("断言关键字: {}".format(assert_key)) 48 print("接口名称: {}".format(interface_name)) 49 print("所属项目: {}".format(belong_project)) 50 print("所属模块: {}".format(belong_module)) 51 print("用例维护人: {}".format(maintainer)) 52 print("请求方法: {}".format(request_method)) 53 url = "{}{}".format(server_address, interface_name) 54 print("接口地址: {}".format(url)) 55 code, request_data, error_msg = data_preprocess(global_key, str(request_data)) 56 # 请求数据预处理异常,结束用例执行 57 if code != 0: 58 print("数据处理异常,error: {}".format(error_msg)) 59 execute_record.execute_result = "失败" 60 execute_record.status = 1 61 execute_record.exception_info = error_msg 62 execute_end_time = time.time() 63 execute_record.execute_end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(execute_end_time)) 64 execute_record.execute_total_time = int(execute_end_time - execute_start_time) * 1000 65 execute_record.save() 66 return 67 # 记录请求预处理结果 68 else: 69 execute_record.request_data = request_data 70 # 调用接口 71 try: 72 res_data = request_process(url, request_method, json.loads(request_data)) 73 print("响应数据: {}".format(json.dumps(res_data.json(), ensure_ascii=False))) # ensure_ascii:兼容中文 74 result_flag, exception_info = assert_result(res_data, assert_key) 75 # 结果记录保存 76 if result_flag: 77 print("用例【%s】执行成功!" % test_case) 78 execute_record.execute_result = "成功" 79 if extract_var.strip() != "None": 80 var_value = data_postprocess(global_key, json.dumps(res_data.json(), ensure_ascii=False), extract_var) 81 execute_record.extract_var = var_value 82 else: 83 print("用例【%s】执行失败!" % test_case) 84 execute_record.execute_result = "失败" 85 execute_record.exception_info = exception_info 86 execute_record.response_data = json.dumps(res_data.json(), ensure_ascii=False) 87 execute_record.status = 1 88 execute_end_time = time.time() 89 execute_record.execute_end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(execute_end_time)) 90 print("执行结果结束时间: {}".format(execute_record.execute_end_time)) 91 execute_record.execute_total_time = int((execute_end_time - execute_start_time) * 1000) 92 print("用例执行耗时: {}".format(execute_record.execute_total_time)) 93 execute_record.save() 94 except Exception as e: 95 print("接口请求异常,error: {}".format(traceback.format_exc())) 96 execute_record.execute_result = "失败" 97 execute_record.exception_info = traceback.format_exc() 98 execute_record.status = 1 99 execute_end_time = time.time() 100 execute_record.execute_end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(execute_end_time)) 101 print("执行结果结束时间: {}".format(execute_record.execute_end_time)) 102 execute_record.execute_total_time = int(execute_end_time - execute_start_time) * 1000 103 print("用例执行耗时: {} 毫秒".format(execute_record.execute_total_time)) 104 execute_record.save()
前端执行测试用例,查看用例执行结果表数据:
9.3 定义路由
在前面已经获取到用例结果数据并保存,下面处理一下用例结果展示。
from django.urls import path, re_path from . import views urlpatterns = [ path('', views.index), path('login/', views.login), path('logout/', views.logout), path('project/', views.project, name='project'), path('module/', views.module, name='module'), path('test_case/', views.test_case, name="test_case"), re_path('test_case_detail/(?P<test_case_id>[0-9]+)', views.test_case_detail, name="test_case_detail"), re_path('module_test_cases/(?P<module_id>[0-9]+)/$', views.module_test_cases, name="module_test_cases"), path('case_suite/', views.case_suite, name="case_suite"), re_path('add_case_in_suite/(?P<suite_id>[0-9]+)', views.add_case_in_suite, name="add_case_in_suite"), re_path('show_and_delete_case_in_suite/(?P<suite_id>[0-9]+)', views.show_and_delete_case_in_suite, name="show_and_delete_case_in_suite"), path('test_case_execute_record/', views.test_case_execute_record, name="test_case_execute_record"), ]
9.4 定义视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer, TestCaseExecuteResult 8 from .task import case_task 9 10 11 # 封装分页处理 12 def get_paginator(request, data): 13 paginator = Paginator(data, 10) # 默认每页展示10条数据 14 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 15 page = request.GET.get('page') 16 try: 17 paginator_pages = paginator.page(page) 18 except PageNotAnInteger: 19 # 如果请求的页数不是整数, 返回第一页。 20 paginator_pages = paginator.page(1) 21 except InvalidPage: 22 # 如果请求的页数不存在, 重定向页面 23 return HttpResponse('找不到页面的内容') 24 return paginator_pages 25 26 27 # 项目菜单项 28 @login_required 29 def project(request): 30 print("request.user.is_authenticated: ", request.user.is_authenticated) 31 projects = Project.objects.filter().order_by('-id') 32 print("projects:", projects) 33 return render(request, 'project.html', {'projects': get_paginator(request, projects)}) 34 35 36 # 模块菜单项 37 @login_required 38 def module(request): 39 if request.method == "GET": # 请求get时候,id倒序查询所有的模块数据 40 modules = Module.objects.filter().order_by('-id') 41 return render(request, 'module.html', {'modules': get_paginator(request, modules)}) 42 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 43 proj_name = request.POST['proj_name'] 44 projects = Project.objects.filter(name__contains=proj_name.strip()) 45 projs = [proj.id for proj in projects] 46 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 47 return render(request, 'module.html', {'modules': get_paginator(request, modules), 'proj_name': proj_name}) 48 49 50 # 获取测试用例执行的接口地址 51 def get_server_address(env): 52 if env: # 环境处理 53 env_data = InterfaceServer.objects.filter(env=env[0]) 54 print("env_data: {}".format(env_data)) 55 if env_data: 56 ip = env_data[0].ip 57 port = env_data[0].port 58 print("ip: {}, port: {}".format(ip, port)) 59 server_address = "http://{}:{}".format(ip, port) 60 print("server_address: {}".format(server_address)) 61 return server_address 62 else: 63 return "" 64 else: 65 return "" 66 67 68 # 测试用例菜单项 69 @login_required 70 def test_case(request): 71 print("request.session['is_login']: {}".format(request.session['is_login'])) 72 test_cases = "" 73 if request.method == "GET": 74 test_cases = TestCase.objects.filter().order_by('id') 75 print("testcases: {}".format(test_cases)) 76 elif request.method == "POST": 77 print("request.POST: {}".format(request.POST)) 78 test_case_id_list = request.POST.getlist('test_cases_list') 79 env = request.POST.getlist('env') 80 print("env: {}".format(env)) 81 server_address = get_server_address(env) 82 if not server_address: 83 return HttpResponse("提交的运行环境为空,请选择环境后再提交!") 84 if test_case_id_list: 85 test_case_id_list.sort() 86 print("test_case_id_list: {}".format(test_case_id_list)) 87 print("获取到用例,开始用例执行") 88 case_task(test_case_id_list, server_address) 89 else: 90 print("运行测试用例失败") 91 return HttpResponse("提交的运行测试用例为空,请选择用例后在提交!") 92 test_cases = TestCase.objects.filter().order_by('id') 93 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 94 95 96 # 用例详情页 97 @login_required 98 def test_case_detail(request, test_case_id): 99 test_case_id = int(test_case_id) 100 test_case = TestCase.objects.get(id=test_case_id) 101 print("test_case: {}".format(test_case)) 102 print("test_case.id: {}".format(test_case.id)) 103 print("test_case.belong_project: {}".format(test_case.belong_project)) 104 105 return render(request, 'test_case_detail.html', {'test_case': test_case}) 106 107 108 # 模块页展示测试用例 109 @login_required 110 def module_test_cases(request, module_id): 111 module = "" 112 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现 113 module = Module.objects.get(id=int(module_id)) 114 test_cases = TestCase.objects.filter(belong_module=module).order_by('-id') 115 print("test_case in module_test_cases: {}".format(test_cases)) 116 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 117 118 119 # 用例集合菜单项 120 @login_required 121 def case_suite(request): 122 case_suites = CaseSuite.objects.filter() 123 return render(request, 'case_suite.html', {'case_suites': get_paginator(request, case_suites)}) 124 125 126 # 用例集合-添加测试用例页 127 @login_required 128 def add_case_in_suite(request, suite_id): 129 # 查询指定的用例集合 130 case_suite = CaseSuite.objects.get(id=suite_id) 131 # 根据id号查询所有的用例 132 test_cases = TestCase.objects.filter().order_by('id') 133 if request.method == "GET": 134 print("test cases:", test_cases) 135 elif request.method == "POST": 136 test_cases_list = request.POST.getlist('testcases_list') 137 # 如果页面勾选了用例 138 if test_cases_list: 139 print("勾选用例id:", test_cases_list) 140 # 根据页面勾选的用例与查询出的所有用例一一比较 141 for test_case in test_cases_list: 142 test_case = TestCase.objects.get(id=int(test_case)) 143 # 匹配成功则添加用例 144 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case) 145 # 未勾选用例 146 else: 147 print("添加测试用例失败") 148 return HttpResponse("添加的测试用例为空,请选择用例后再添加!") 149 return render(request, 'add_case_in_suite.html', 150 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 151 152 153 # 用例集合页-查看/删除用例 154 @login_required 155 def show_and_delete_case_in_suite(request, suite_id): 156 case_suite = CaseSuite.objects.get(id=suite_id) 157 test_cases = SuiteCase.objects.filter(case_suite=case_suite) 158 if request.method == "POST": 159 test_cases_list = request.POST.getlist('test_cases_list') 160 if test_cases_list: 161 print("勾选用例:", test_cases_list) 162 for test_case in test_cases_list: 163 test_case = TestCase.objects.get(id=int(test_case)) 164 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete() 165 else: 166 print("测试用例删除失败") 167 return HttpResponse("所选测试用例为空,请选择用例后再进行删除!") 168 case_suite = CaseSuite.objects.get(id=suite_id) 169 return render(request, 'show_and_delete_case_in_suite.html', 170 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 171 172 173 @login_required 174 def test_case_execute_record(request): 175 test_case_execute_records = TestCaseExecuteResult.objects.filter().order_by('-id') 176 return render(request, 'test_case_execute_records.html', {'test_case_execute_records': get_paginator(request, test_case_execute_records)}) 177 178 179 # 默认页的视图函数 180 @login_required 181 def index(request): 182 return render(request, 'index.html') 183 184 185 # 登录页的视图函数 186 def login(request): 187 print("request.session.items(): {}".format(request.session.items())) # 打印session信息 188 if request.session.get('is_login', None): 189 return redirect('/') 190 # 如果是表单提交行为,则进行登录校验 191 if request.method == "POST": 192 login_form = UserForm(request.POST) 193 message = "请检查填写的内容!" 194 if login_form.is_valid(): 195 username = login_form.cleaned_data['username'] 196 password = login_form.cleaned_data['password'] 197 try: 198 # 使用django提供的身份验证功能 199 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象 200 if user is not None: 201 print("用户【%s】登录成功" % username) 202 auth.login(request, user) 203 request.session['is_login'] = True 204 # 登录成功,跳转主页 205 return redirect('/') 206 else: 207 message = "用户名不存在或者密码不正确!" 208 except: 209 traceback.print_exc() 210 message = "登录程序出现异常" 211 # 用户名或密码为空,返回登录页和错误提示信息 212 else: 213 return render(request, 'login.html', locals()) 214 # 不是表单提交,代表只是访问登录页 215 else: 216 login_form = UserForm() 217 return render(request, 'login.html', locals()) 218 219 220 # 注册页的视图函数 221 def register(request): 222 return render(request, 'register.html') 223 224 225 # 登出的视图函数:重定向至login视图函数 226 @login_required 227 def logout(request): 228 auth.logout(request) 229 request.session.flush() 230 return redirect("/login/")
9.5 定义模板
1)新增”测试执行记录“模板文件:templates/test_case_execute_records.html
1 {% extends 'base.html' %} 2 {% load static %} 3 {% block title %}用例执行记录{% endblock %} 4 {% block content %} 5 6 <div class="table-responsive"> 7 <table class="table table-striped"> 8 <thead> 9 <tr> 10 <th width="4%">id</th> 11 <th width="4%">名称</th> 12 <th width="20%">请求数据</th> 13 <th width="20%">执行返回结果</th> 14 <th width="5%">操作</th> 15 <th>断言内容</th> 16 <th width="5%">执行结果</th> 17 <th width="5%">异常信息</th> 18 <th width="10%">请求后提取变量</th> 19 <th width="8%">开始时间</th> 20 <th width="8%">执行耗时(ms)</th> 21 </tr> 22 </thead> 23 <tbody> 24 25 {% for testrecord in test_case_execute_records %} 26 <tr> 27 <td>{{ testrecord.id }}</td> 28 <td><a href="{% url 'test_case_detail' testrecord.belong_test_case.id%}" target="_blank">{{ testrecord.belong_test_case.case_name }}</a></td> 29 <td>{{ testrecord.request_data }}</td> 30 <td>{{ testrecord.response_data }}</td> 31 <td><a href="" target="_blank">对比差异</a></td> 32 <td>{{ testrecord.belong_test_case.assert_key }}</td> 33 <td>{{ testrecord.execute_result|default_if_none:"" }}</td> 34 {% if testrecord.exception_info %} 35 <td><a href="" target="_blank">显示异常信息</a></td> 36 {% else %} 37 <td>无</td> 38 {% endif %} 39 40 <td>{{ testrecord.extract_var }}</td> 41 <td>{{ testrecord.execute_start_time }}</td> 42 <td>{{ testrecord.execute_total_time }}</td> 43 </tr> 44 {% endfor %} 45 46 </tbody> 47 </table> 48 49 {# 实现分页标签的代码 #} 50 {# 这里使用 bootstrap 渲染页面 #} 51 <div id="pages" class="text-center"> 52 <nav> 53 <ul class="pagination"> 54 <li class="step-links"> 55 {% if test_case_execute_records.has_previous %} 56 <a class='active' href="?page={{ test_case_execute_records.previous_page_number }}">上一页</a> 57 {% endif %} 58 59 <span class="current"> 60 第 {{ test_case_execute_records.number }} 页 / 共 {{ test_case_execute_records.paginator.num_pages }} 页</span> 61 62 {% if test_case_execute_records.has_next %} 63 <a class='active' href="?page={{ test_case_execute_records.next_page_number }}">下一页</a> 64 {% endif %} 65 </li> 66 </ul> 67 </nav> 68 </div> 69 </div> 70 {% endblock %}
2)修改 base.html:新增“用例执行结果”菜单项
1 <!DOCTYPE html> 2 <html lang="zh-CN"> 3 {% load static %} 4 <head> 5 <meta charset="utf-8"> 6 <meta http-equiv="X-UA-Compatible" content="IE=edge"> 7 <meta name="viewport" content="width=device-width, initial-scale=1"> 8 <!-- 上述3个meta标签*必须*放在最前面,任何其他内容都*必须*跟随其后! --> 9 <title>{% block title %}base{% endblock %}</title> 10 11 <!-- Bootstrap --> 12 <link href="{% static 'bootstrap/css/bootstrap.min.css' %}" rel="stylesheet"> 13 14 15 <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> 16 <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> 17 <!--[if lt IE 9]> 18 <script src="https://cdn.bootcss.com/html5shiv/3.7.3/html5shiv.min.js"></script> 19 <script src="https://cdn.bootcss.com/respond.js/1.4.2/respond.min.js"></script> 20 <![endif]--> 21 {% block css %}{% endblock %} 22 </head> 23 <body> 24 <nav class="navbar navbar-default"> 25 <div class="container-fluid"> 26 <!-- Brand and toggle get grouped for better mobile display --> 27 <div class="navbar-header"> 28 <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#my-nav" 29 aria-expanded="false"> 30 <span class="sr-only">切换导航条</span> 31 <span class="icon-bar"></span> 32 <span class="icon-bar"></span> 33 <span class="icon-bar"></span> 34 </button> 35 <a class="navbar-brand" href="/">自动化测试平台</a> 36 </div> 37 38 <div class="collapse navbar-collapse" id="my-nav"> 39 <ul class="nav navbar-nav"> 40 <li class="active"><a href="/project/">项目</a></li> 41 <li class="active"><a href="/module/">模块</a></li> 42 <li class="active"><a href="/test_case/">测试用例</a></li> 43 <li class="active"><a href="/case_suite/">用例集合</a></li> 44 <li class="active"><a href="/test_case_execute_record/">用例执行结果</a></li> 45 </ul> 46 <ul class="nav navbar-nav navbar-right"> 47 {% if request.user.is_authenticated %} 48 <li><a href="#">当前在线:{{ request.user.username }}</a></li> 49 <li><a href="/logout">登出</a></li> 50 {% else %} 51 <li><a href="/login">登录</a></li> 52 53 {% endif %} 54 </ul> 55 </div><!-- /.navbar-collapse --> 56 </div><!-- /.container-fluid --> 57 </nav> 58 59 {% block content %}{% endblock %} 60 61 62 <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> 63 <script src="{% static 'js/jquery-3.4.1.js' %}"></script> 64 <!-- Include all compiled plugins (below), or include individual files as needed --> 65 <script src="{% static 'bootstrap/js/bootstrap.min.js' %}"></script> 66 </body> 67 </html>
页面效果如下:
9.6 结果对比差异
在用例执行结果页面,可以看到在“操作”列,有“对比差异”链接,该功能用于对比当前用例上一次的执行结果与当前的执行结果,便于查看结果的差异。
由于在前面用例执行时,已经在结果记录环节获取到当前用例上一次的结果并记录到当前用例记录数据中,下面来处理一下这个页面的展示。
1) 定义路由
from django.urls import path, re_path from . import views urlpatterns = [ path('', views.index), path('login/', views.login), path('logout/', views.logout), path('project/', views.project, name='project'), path('module/', views.module, name='module'), path('test_case/', views.test_case, name="test_case"), re_path('test_case_detail/(?P<test_case_id>[0-9]+)', views.test_case_detail, name="test_case_detail"), re_path('module_test_cases/(?P<module_id>[0-9]+)/$', views.module_test_cases, name="module_test_cases"), path('case_suite/', views.case_suite, name="case_suite"), re_path('add_case_in_suite/(?P<suite_id>[0-9]+)', views.add_case_in_suite, name="add_case_in_suite"), re_path('show_and_delete_case_in_suite/(?P<suite_id>[0-9]+)', views.show_and_delete_case_in_suite, name="show_and_delete_case_in_suite"), path('test_case_execute_record/', views.test_case_execute_record, name="test_case_execute_record"), re_path('case_result_diff/(?P<test_record_id>[0-9]+)', views.case_result_diff, name="case_result_diff"), ]
2)定义视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 import json 8 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer, TestCaseExecuteResult 9 from .task import case_task 10 11 12 # 封装分页处理 13 def get_paginator(request, data): 14 paginator = Paginator(data, 10) # 默认每页展示10条数据 15 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 16 page = request.GET.get('page') 17 try: 18 paginator_pages = paginator.page(page) 19 except PageNotAnInteger: 20 # 如果请求的页数不是整数, 返回第一页。 21 paginator_pages = paginator.page(1) 22 except InvalidPage: 23 # 如果请求的页数不存在, 重定向页面 24 return HttpResponse('找不到页面的内容') 25 return paginator_pages 26 27 28 # 项目菜单项 29 @login_required 30 def project(request): 31 print("request.user.is_authenticated: ", request.user.is_authenticated) 32 projects = Project.objects.filter().order_by('-id') 33 print("projects:", projects) 34 return render(request, 'project.html', {'projects': get_paginator(request, projects)}) 35 36 37 # 模块菜单项 38 @login_required 39 def module(request): 40 if request.method == "GET": # 请求get时候,id倒序查询所有的模块数据 41 modules = Module.objects.filter().order_by('-id') 42 return render(request, 'module.html', {'modules': get_paginator(request, modules)}) 43 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 44 proj_name = request.POST['proj_name'] 45 projects = Project.objects.filter(name__contains=proj_name.strip()) 46 projs = [proj.id for proj in projects] 47 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 48 return render(request, 'module.html', {'modules': get_paginator(request, modules), 'proj_name': proj_name}) 49 50 51 # 获取测试用例执行的接口地址 52 def get_server_address(env): 53 if env: # 环境处理 54 env_data = InterfaceServer.objects.filter(env=env[0]) 55 print("env_data: {}".format(env_data)) 56 if env_data: 57 ip = env_data[0].ip 58 port = env_data[0].port 59 print("ip: {}, port: {}".format(ip, port)) 60 server_address = "http://{}:{}".format(ip, port) 61 print("server_address: {}".format(server_address)) 62 return server_address 63 else: 64 return "" 65 else: 66 return "" 67 68 69 # 测试用例菜单项 70 @login_required 71 def test_case(request): 72 print("request.session['is_login']: {}".format(request.session['is_login'])) 73 test_cases = "" 74 if request.method == "GET": 75 test_cases = TestCase.objects.filter().order_by('id') 76 print("testcases: {}".format(test_cases)) 77 elif request.method == "POST": 78 print("request.POST: {}".format(request.POST)) 79 test_case_id_list = request.POST.getlist('test_cases_list') 80 env = request.POST.getlist('env') 81 print("env: {}".format(env)) 82 server_address = get_server_address(env) 83 if not server_address: 84 return HttpResponse("提交的运行环境为空,请选择环境后再提交!") 85 if test_case_id_list: 86 test_case_id_list.sort() 87 print("test_case_id_list: {}".format(test_case_id_list)) 88 print("获取到用例,开始用例执行") 89 case_task(test_case_id_list, server_address) 90 else: 91 print("运行测试用例失败") 92 return HttpResponse("提交的运行测试用例为空,请选择用例后在提交!") 93 test_cases = TestCase.objects.filter().order_by('id') 94 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 95 96 97 # 用例详情页 98 @login_required 99 def test_case_detail(request, test_case_id): 100 test_case_id = int(test_case_id) 101 test_case = TestCase.objects.get(id=test_case_id) 102 print("test_case: {}".format(test_case)) 103 print("test_case.id: {}".format(test_case.id)) 104 print("test_case.belong_project: {}".format(test_case.belong_project)) 105 106 return render(request, 'test_case_detail.html', {'test_case': test_case}) 107 108 109 # 模块页展示测试用例 110 @login_required 111 def module_test_cases(request, module_id): 112 module = "" 113 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现 114 module = Module.objects.get(id=int(module_id)) 115 test_cases = TestCase.objects.filter(belong_module=module).order_by('-id') 116 print("test_case in module_test_cases: {}".format(test_cases)) 117 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 118 119 120 # 用例集合菜单项 121 @login_required 122 def case_suite(request): 123 case_suites = CaseSuite.objects.filter() 124 return render(request, 'case_suite.html', {'case_suites': get_paginator(request, case_suites)}) 125 126 127 # 用例集合-添加测试用例页 128 @login_required 129 def add_case_in_suite(request, suite_id): 130 # 查询指定的用例集合 131 case_suite = CaseSuite.objects.get(id=suite_id) 132 # 根据id号查询所有的用例 133 test_cases = TestCase.objects.filter().order_by('id') 134 if request.method == "GET": 135 print("test cases:", test_cases) 136 elif request.method == "POST": 137 test_cases_list = request.POST.getlist('testcases_list') 138 # 如果页面勾选了用例 139 if test_cases_list: 140 print("勾选用例id:", test_cases_list) 141 # 根据页面勾选的用例与查询出的所有用例一一比较 142 for test_case in test_cases_list: 143 test_case = TestCase.objects.get(id=int(test_case)) 144 # 匹配成功则添加用例 145 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case) 146 # 未勾选用例 147 else: 148 print("添加测试用例失败") 149 return HttpResponse("添加的测试用例为空,请选择用例后再添加!") 150 return render(request, 'add_case_in_suite.html', 151 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 152 153 154 # 用例集合页-查看/删除用例 155 @login_required 156 def show_and_delete_case_in_suite(request, suite_id): 157 case_suite = CaseSuite.objects.get(id=suite_id) 158 test_cases = SuiteCase.objects.filter(case_suite=case_suite) 159 if request.method == "POST": 160 test_cases_list = request.POST.getlist('test_cases_list') 161 if test_cases_list: 162 print("勾选用例:", test_cases_list) 163 for test_case in test_cases_list: 164 test_case = TestCase.objects.get(id=int(test_case)) 165 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete() 166 else: 167 print("测试用例删除失败") 168 return HttpResponse("所选测试用例为空,请选择用例后再进行删除!") 169 case_suite = CaseSuite.objects.get(id=suite_id) 170 return render(request, 'show_and_delete_case_in_suite.html', 171 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 172 173 174 # 用例执行结果菜单项 175 @login_required 176 def test_case_execute_record(request): 177 test_case_execute_records = TestCaseExecuteResult.objects.filter().order_by('-id') 178 return render(request, 'test_case_execute_records.html', {'test_case_execute_records': get_paginator(request, test_case_execute_records)}) 179 180 181 # 用例执行结果-对比差异 182 @login_required 183 def diffCaseResponse(request, test_record_id): 184 test_record_data = TestCaseExecuteResult.objects.get(id=test_record_id) 185 print("用例执行结果记录: {}".format(test_record_data)) 186 present_response = test_record_data.response_data 187 if present_response: 188 present_response = json.dumps(json.loads(present_response), sort_keys=True, indent=4, 189 ensure_ascii=False) # 中文字符不转ascii编码 190 print("当前响应结果: {}".format(present_response)) 191 last_time_execute_response = test_record_data.last_time_response_data 192 if last_time_execute_response: 193 last_time_execute_response = json.dumps(json.loads(last_time_execute_response), sort_keys=True, indent=4, 194 ensure_ascii=False) 195 print("上一次响应结果: {}".format(last_time_execute_response)) 196 return render(request, 'case_result_diff.html', locals()) 197 198 199 # 默认页的视图函数 200 @login_required 201 def index(request): 202 return render(request, 'index.html') 203 204 205 # 登录页的视图函数 206 def login(request): 207 print("request.session.items(): {}".format(request.session.items())) # 打印session信息 208 if request.session.get('is_login', None): 209 return redirect('/') 210 # 如果是表单提交行为,则进行登录校验 211 if request.method == "POST": 212 login_form = UserForm(request.POST) 213 message = "请检查填写的内容!" 214 if login_form.is_valid(): 215 username = login_form.cleaned_data['username'] 216 password = login_form.cleaned_data['password'] 217 try: 218 # 使用django提供的身份验证功能 219 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象 220 if user is not None: 221 print("用户【%s】登录成功" % username) 222 auth.login(request, user) 223 request.session['is_login'] = True 224 # 登录成功,跳转主页 225 return redirect('/') 226 else: 227 message = "用户名不存在或者密码不正确!" 228 except: 229 traceback.print_exc() 230 message = "登录程序出现异常" 231 # 用户名或密码为空,返回登录页和错误提示信息 232 else: 233 return render(request, 'login.html', locals()) 234 # 不是表单提交,代表只是访问登录页 235 else: 236 login_form = UserForm() 237 return render(request, 'login.html', locals()) 238 239 240 # 注册页的视图函数 241 def register(request): 242 return render(request, 'register.html') 243 244 245 # 登出的视图函数:重定向至login视图函数 246 @login_required 247 def logout(request): 248 auth.logout(request) 249 request.session.flush() 250 return redirect("/login/")
3)定义模板
新增 case_result_diff.html:
1 {% extends 'base.html' %} 2 {% load static %} 3 {% block title %}结果对比差异{% endblock %} 4 5 {% block content %} 6 <table class="table table-striped"> 7 <thead> 8 <tr> 9 <th width="50%">上次执行结果</th> 10 <th width="50%">本次执行结果</th> 11 </tr> 12 </thead> 13 <tbody> 14 <tr> 15 <td> 16 <div> 17 <pre style="height: 400px;">{{ last_time_execute_response | safe }}</pre> 18 </div> 19 </td> 20 <td> 21 <div><pre style="height: 400px;">{{ present_response | safe }}</pre></div> 22 </td> 23 </tr> 24 </tbody> 25 </table> 26 27 {% endblock %}
修改 test_case_execute_records.html:增加“对比差异”链接
{% extends 'base.html' %} {% load static %} {% block title %}用例执行记录{% endblock %} {% block content %} <div class="table-responsive"> <table class="table table-striped"> <thead> <tr> <th width="4%">id</th> <th width="4%">名称</th> <th width="20%">请求数据</th> <th width="20%">执行返回结果</th> <th width="5%">操作</th> <th>断言内容</th> <th width="5%">执行结果</th> <th width="5%">异常信息</th> <th width="10%">请求后提取变量</th> <th width="8%">开始时间</th> <th width="8%">执行耗时(ms)</th> </tr> </thead> <tbody> {% for testrecord in test_case_execute_records %} <tr> <td>{{ testrecord.id }}</td> <td><a href="{% url 'test_case_detail' testrecord.belong_test_case.id%}" target="_blank">{{ testrecord.belong_test_case.case_name }}</a></td> <td>{{ testrecord.request_data }}</td> <td>{{ testrecord.response_data }}</td> <td><a href="{% url 'case_result_diff' testrecord.id %}" target="_blank">对比差异</a></td> <td>{{ testrecord.belong_test_case.assert_key }}</td> <td>{{ testrecord.execute_result|default_if_none:"" }}</td> {% if testrecord.exception_info %} <td><a href="" target="_blank">显示异常信息</a></td> {% else %} <td>无</td> {% endif %} <td>{{ testrecord.extract_var }}</td> <td>{{ testrecord.execute_start_time }}</td> <td>{{ testrecord.execute_total_time }}</td> </tr> {% endfor %} </tbody> </table> {# 实现分页标签的代码 #} {# 这里使用 bootstrap 渲染页面 #} <div id="pages" class="text-center"> <nav> <ul class="pagination"> <li class="step-links"> {% if test_case_execute_records.has_previous %} <a class='active' href="?page={{ test_case_execute_records.previous_page_number }}">上一页</a> {% endif %} <span class="current"> 第 {{ test_case_execute_records.number }} 页 / 共 {{ test_case_execute_records.paginator.num_pages }} 页</span> {% if test_case_execute_records.has_next %} <a class='active' href="?page={{ test_case_execute_records.next_page_number }}">下一页</a> {% endif %} </li> </ul> </nav> </div> </div> {% endblock %}
9.7 异常信息展示
1)定义路由
from django.urls import path, re_path from . import views urlpatterns = [ path('', views.index), path('login/', views.login), path('logout/', views.logout), path('project/', views.project, name='project'), path('module/', views.module, name='module'), path('test_case/', views.test_case, name="test_case"), re_path('test_case_detail/(?P<test_case_id>[0-9]+)', views.test_case_detail, name="test_case_detail"), re_path('module_test_cases/(?P<module_id>[0-9]+)/$', views.module_test_cases, name="module_test_cases"), path('case_suite/', views.case_suite, name="case_suite"), re_path('add_case_in_suite/(?P<suite_id>[0-9]+)', views.add_case_in_suite, name="add_case_in_suite"), re_path('show_and_delete_case_in_suite/(?P<suite_id>[0-9]+)', views.show_and_delete_case_in_suite, name="show_and_delete_case_in_suite"), path('test_case_execute_record/', views.test_case_execute_record, name="test_case_execute_record"), re_path('case_result_diff/(?P<test_record_id>[0-9]+)', views.case_result_diff, name="case_result_diff"), re_path('show_exception/(?P<execute_id>[0-9]+)$', views.show_exception, name="show_exception"), ]
2)定义视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 import json 8 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer, TestCaseExecuteResult 9 from .task import case_task 10 11 12 # 封装分页处理 13 def get_paginator(request, data): 14 paginator = Paginator(data, 10) # 默认每页展示10条数据 15 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 16 page = request.GET.get('page') 17 try: 18 paginator_pages = paginator.page(page) 19 except PageNotAnInteger: 20 # 如果请求的页数不是整数, 返回第一页。 21 paginator_pages = paginator.page(1) 22 except InvalidPage: 23 # 如果请求的页数不存在, 重定向页面 24 return HttpResponse('找不到页面的内容') 25 return paginator_pages 26 27 28 # 项目菜单项 29 @login_required 30 def project(request): 31 print("request.user.is_authenticated: ", request.user.is_authenticated) 32 projects = Project.objects.filter().order_by('-id') 33 print("projects:", projects) 34 return render(request, 'project.html', {'projects': get_paginator(request, projects)}) 35 36 37 # 模块菜单项 38 @login_required 39 def module(request): 40 if request.method == "GET": # 请求get时候,id倒序查询所有的模块数据 41 modules = Module.objects.filter().order_by('-id') 42 return render(request, 'module.html', {'modules': get_paginator(request, modules)}) 43 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 44 proj_name = request.POST['proj_name'] 45 projects = Project.objects.filter(name__contains=proj_name.strip()) 46 projs = [proj.id for proj in projects] 47 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 48 return render(request, 'module.html', {'modules': get_paginator(request, modules), 'proj_name': proj_name}) 49 50 51 # 获取测试用例执行的接口地址 52 def get_server_address(env): 53 if env: # 环境处理 54 env_data = InterfaceServer.objects.filter(env=env[0]) 55 print("env_data: {}".format(env_data)) 56 if env_data: 57 ip = env_data[0].ip 58 port = env_data[0].port 59 print("ip: {}, port: {}".format(ip, port)) 60 server_address = "http://{}:{}".format(ip, port) 61 print("server_address: {}".format(server_address)) 62 return server_address 63 else: 64 return "" 65 else: 66 return "" 67 68 69 # 测试用例菜单项 70 @login_required 71 def test_case(request): 72 print("request.session['is_login']: {}".format(request.session['is_login'])) 73 test_cases = "" 74 if request.method == "GET": 75 test_cases = TestCase.objects.filter().order_by('id') 76 print("testcases: {}".format(test_cases)) 77 elif request.method == "POST": 78 print("request.POST: {}".format(request.POST)) 79 test_case_id_list = request.POST.getlist('test_cases_list') 80 env = request.POST.getlist('env') 81 print("env: {}".format(env)) 82 server_address = get_server_address(env) 83 if not server_address: 84 return HttpResponse("提交的运行环境为空,请选择环境后再提交!") 85 if test_case_id_list: 86 test_case_id_list.sort() 87 print("test_case_id_list: {}".format(test_case_id_list)) 88 print("获取到用例,开始用例执行") 89 case_task(test_case_id_list, server_address) 90 else: 91 print("运行测试用例失败") 92 return HttpResponse("提交的运行测试用例为空,请选择用例后在提交!") 93 test_cases = TestCase.objects.filter().order_by('id') 94 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 95 96 97 # 用例详情页 98 @login_required 99 def test_case_detail(request, test_case_id): 100 test_case_id = int(test_case_id) 101 test_case = TestCase.objects.get(id=test_case_id) 102 print("test_case: {}".format(test_case)) 103 print("test_case.id: {}".format(test_case.id)) 104 print("test_case.belong_project: {}".format(test_case.belong_project)) 105 106 return render(request, 'test_case_detail.html', {'test_case': test_case}) 107 108 109 # 模块页展示测试用例 110 @login_required 111 def module_test_cases(request, module_id): 112 module = "" 113 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现 114 module = Module.objects.get(id=int(module_id)) 115 test_cases = TestCase.objects.filter(belong_module=module).order_by('-id') 116 print("test_case in module_test_cases: {}".format(test_cases)) 117 return render(request, 'test_case.html', {'test_cases': get_paginator(request, test_cases)}) 118 119 120 # 用例集合菜单项 121 @login_required 122 def case_suite(request): 123 case_suites = CaseSuite.objects.filter() 124 return render(request, 'case_suite.html', {'case_suites': get_paginator(request, case_suites)}) 125 126 127 # 用例集合-添加测试用例页 128 @login_required 129 def add_case_in_suite(request, suite_id): 130 # 查询指定的用例集合 131 case_suite = CaseSuite.objects.get(id=suite_id) 132 # 根据id号查询所有的用例 133 test_cases = TestCase.objects.filter().order_by('id') 134 if request.method == "GET": 135 print("test cases:", test_cases) 136 elif request.method == "POST": 137 test_cases_list = request.POST.getlist('testcases_list') 138 # 如果页面勾选了用例 139 if test_cases_list: 140 print("勾选用例id:", test_cases_list) 141 # 根据页面勾选的用例与查询出的所有用例一一比较 142 for test_case in test_cases_list: 143 test_case = TestCase.objects.get(id=int(test_case)) 144 # 匹配成功则添加用例 145 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case) 146 # 未勾选用例 147 else: 148 print("添加测试用例失败") 149 return HttpResponse("添加的测试用例为空,请选择用例后再添加!") 150 return render(request, 'add_case_in_suite.html', 151 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 152 153 154 # 用例集合页-查看/删除用例 155 @login_required 156 def show_and_delete_case_in_suite(request, suite_id): 157 case_suite = CaseSuite.objects.get(id=suite_id) 158 test_cases = SuiteCase.objects.filter(case_suite=case_suite) 159 if request.method == "POST": 160 test_cases_list = request.POST.getlist('test_cases_list') 161 if test_cases_list: 162 print("勾选用例:", test_cases_list) 163 for test_case in test_cases_list: 164 test_case = TestCase.objects.get(id=int(test_case)) 165 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete() 166 else: 167 print("测试用例删除失败") 168 return HttpResponse("所选测试用例为空,请选择用例后再进行删除!") 169 case_suite = CaseSuite.objects.get(id=suite_id) 170 return render(request, 'show_and_delete_case_in_suite.html', 171 {'test_cases': get_paginator(request, test_cases), 'case_suite': case_suite}) 172 173 174 # 用例执行结果菜单项 175 @login_required 176 def test_case_execute_record(request): 177 test_case_execute_records = TestCaseExecuteResult.objects.filter().order_by('-id') 178 return render(request, 'test_case_execute_records.html', {'test_case_execute_records': get_paginator(request, test_case_execute_records)}) 179 180 181 # 用例执行结果-对比差异 182 @login_required 183 def case_result_diff(request, test_record_id): 184 test_record_data = TestCaseExecuteResult.objects.get(id=test_record_id) 185 print("用例执行结果记录: {}".format(test_record_data)) 186 present_response = test_record_data.response_data 187 if present_response: 188 present_response = json.dumps(json.loads(present_response), sort_keys=True, indent=4, 189 ensure_ascii=False) # 中文字符不转ascii编码 190 print("当前响应结果: {}".format(present_response)) 191 last_time_execute_response = test_record_data.last_time_response_data 192 if last_time_execute_response: 193 last_time_execute_response = json.dumps(json.loads(last_time_execute_response), sort_keys=True, indent=4, 194 ensure_ascii=False) 195 print("上一次响应结果: {}".format(last_time_execute_response)) 196 return render(request, 'case_result_diff.html', locals()) 197 198 199 # 用例执行结果-异常信息展示 200 @login_required 201 def show_exception(request, execute_id): 202 test_record = TestCaseExecuteResult.objects.get(id=execute_id) 203 return render(request, 'show_exception.html', {'exception_info': test_record.exception_info}) 204 205 206 # 默认页的视图函数 207 @login_required 208 def index(request): 209 return render(request, 'index.html') 210 211 212 # 登录页的视图函数 213 def login(request): 214 print("request.session.items(): {}".format(request.session.items())) # 打印session信息 215 if request.session.get('is_login', None): 216 return redirect('/') 217 # 如果是表单提交行为,则进行登录校验 218 if request.method == "POST": 219 login_form = UserForm(request.POST) 220 message = "请检查填写的内容!" 221 if login_form.is_valid(): 222 username = login_form.cleaned_data['username'] 223 password = login_form.cleaned_data['password'] 224 try: 225 # 使用django提供的身份验证功能 226 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象 227 if user is not None: 228 print("用户【%s】登录成功" % username) 229 auth.login(request, user) 230 request.session['is_login'] = True 231 # 登录成功,跳转主页 232 return redirect('/') 233 else: 234 message = "用户名不存在或者密码不正确!" 235 except: 236 traceback.print_exc() 237 message = "登录程序出现异常" 238 # 用户名或密码为空,返回登录页和错误提示信息 239 else: 240 return render(request, 'login.html', locals()) 241 # 不是表单提交,代表只是访问登录页 242 else: 243 login_form = UserForm() 244 return render(request, 'login.html', locals()) 245 246 247 # 注册页的视图函数 248 def register(request): 249 return render(request, 'register.html') 250 251 252 # 登出的视图函数:重定向至login视图函数 253 @login_required 254 def logout(request): 255 auth.logout(request) 256 request.session.flush() 257 return redirect("/login/")
3)定义模板
新增异常信息展示模板:show_exception.html
1 {% extends 'base.html' %} 2 {% load static %} 3 {% block title %}异常信息{% endblock %} 4 {% block content %} 5 6 <p style="margin-left: 10px;">异常信息如下:</p> 7 <p style="margin-left: 10px; width: 90%">{{ exception_info|default_if_none:"" }}</p> 8 9 {% endblock %}
修改用例执行记录模板 test_case_execute_records.html:增加异常信息展示链接
1 {% extends 'base.html' %} 2 {% load static %} 3 {% block title %}用例执行记录{% endblock %} 4 {% block content %} 5 6 <div class="table-responsive"> 7 <table class="table table-striped"> 8 <thead> 9 <tr> 10 <th width="4%">id</th> 11 <th width="4%">名称</th> 12 <th width="20%">请求数据</th> 13 <th width="20%">执行返回结果</th> 14 <th width="5%">操作</th> 15 <th>断言内容</th> 16 <th width="5%">执行结果</th> 17 <th width="5%">异常信息</th> 18 <th width="10%">请求后提取变量</th> 19 <th width="8%">开始时间</th> 20 <th width="8%">执行耗时(ms)</th> 21 </tr> 22 </thead> 23 <tbody> 24 25 {% for testrecord in test_case_execute_records %} 26 <tr> 27 <td>{{ testrecord.id }}</td> 28 <td><a href="{% url 'test_case_detail' testrecord.belong_test_case.id%}" target="_blank">{{ testrecord.belong_test_case.case_name }}</a></td> 29 <td>{{ testrecord.request_data }}</td> 30 <td>{{ testrecord.response_data }}</td> 31 <td><a href="{% url 'case_result_diff' testrecord.id %}" target="_blank">对比差异</a></td> 32 <td>{{ testrecord.belong_test_case.assert_key }}</td> 33 34 {% ifequal testrecord.execute_result '成功' %} 35 <td bgcolor='green'>{{ testrecord.execute_result}}</td> 36 {% else %} 37 <td bgcolor='red'>{{ testrecord.execute_result}}</td> 38 {% endifequal %} 39 40 {% if testrecord.exception_info %} 41 <td><a href="{% url 'show_exception' testrecord.id %}" target="_blank">显示异常信息</a></td> 42 {% else %} 43 <td>无</td> 44 {% endif %} 45 46 <td>{{ testrecord.extract_var }}</td> 47 <td>{{ testrecord.execute_start_time }}</td> 48 <td>{{ testrecord.execute_total_time }}</td> 49 </tr> 50 {% endfor %} 51 52 </tbody> 53 </table> 54 55 {# 实现分页标签的代码 #} 56 {# 这里使用 bootstrap 渲染页面 #} 57 <div id="pages" class="text-center"> 58 <nav> 59 <ul class="pagination"> 60 <li class="step-links"> 61 {% if test_case_execute_records.has_previous %} 62 <a class='active' href="?page={{ test_case_execute_records.previous_page_number }}">上一页</a> 63 {% endif %} 64 65 <span class="current"> 66 第 {{ test_case_execute_records.number }} 页 / 共 {{ test_case_execute_records.paginator.num_pages }} 页</span> 67 68 {% if test_case_execute_records.has_next %} 69 <a class='active' href="?page={{ test_case_execute_records.next_page_number }}">下一页</a> 70 {% endif %} 71 </li> 72 </ul> 73 </nav> 74 </div> 75 </div> 76 {% endblock %}