python自动化测试框架,封装方法方式

发布于:2025-07-25 ⋅ 阅读:(21) ⋅ 点赞:(0)

年薪30W+必备!接口自动化测试框架封装实战(附源码)|Pytest+Allure+Jenkins

第一种:静态方法封装,接口调用入参定义一个(默认json),直接执行接口请求

接口封装代码如下:

class OrderTransactionService:

    @staticmethod
    def getComboProductList(body):
        url = http_host + '/service?serialize=7'
        headers = {'Content-Type': 'application/json'}
        request_list = []
        request_list.append('com.ymm.insurance.request.ProductListRequestDirect')
        request_list.append(body)
        parm_json = CommonUtils.be_post_json(methodName='getComboProductList',
                                             url='http://service.ymm.com/insure-service-apply/orderTransactionService_1.0.0',
                                             request_list=request_list)
        response = httpUtil.Post(url, headers, parm_json)
        logger.info("=======getComboProductList的POST方式的入参是=====\n" + str(request_list))
        logger.info("=======getComboProductList的POST方式的返回值是=====\n" + response)
        response = json.loads(response)
        return response

测试用例方法调用如下:

# encoding:utf-8
import pytest
from settings import env
from base.insure.insure_service import OrderTransactionService
from settings import mysql
orderTransactionService = OrderTransactionService()
database = mysql.Database("fis")

@pytest.mark.doubleRead
class Test_getComboProductList:
    # 从数据库随机取一条数据
    json_dev =  {
        "cargoId": 10171239743346,
        "driverYmmUid": 965006065498560609,
        "driverHcbUid": 900345103,
        "shipperYmmUid": 965006065498913346,
        "shipperHcbUid": 1100297317,
        "appType": 1,
        "source": 1,
        "cargoSource": 1,
        "client": 1,
        "version": "10990000",
        "pluginVersion": "10.99.1.1",
        "payOrderPluginVersion": "10.99.1.284",
        "securityTran": 7,
        "startCode": 310101,
        "endCode": 270213,
        "firstCategoryCode": 10,
        "firstCategoryName": "食品饮料",
        "secondCategoryCode": 91,
        "secondCategoryName": "酱油"
    }
    json_qa = {
    "cargoId": 123659599419839,
    "driverYmmUid": 967933837394139781,
    "driverHcbUid": 215996505,
    "shipperYmmUid": 967933837002049839,
    "shipperHcbUid": 215801452,
    "appType": 1,
    "source": 1,
    "cargoSource": 1,
    "client": 1,
    "version": "8600700",
    "pluginVersion": "7.25.1.1",
    "payOrderPluginVersion": "8.58.141.380",
    "securityTran": 8,
    "startCode": 330382,
    "endCode": 320114,
    "firstCategoryCode": 3,
    "firstCategoryName": "服饰 纺织 皮革",
    "secondCategoryCode": 47,
    "secondCategoryName": "服装",
    "userFlag": None,
    "platFormScene": None
  }
    if env.env == 'qa':
        json =json_qa
    else:
        json =json_dev
    def test_01(self):
        result = orderTransactionService.getComboProductList(self.json)

第二种:不是静态方法,通过不定长字典参数(**kwargs),封装接口

class freightCompensationClaimService:
    def autoReport(self, **kwargs):
        url = http_host + '/service?serialize=7'
        headers = {'Content-Type': 'application/json'}
        request_list = []
        request_list.append('com.ymm.insurance.dto.claim.crm.request.AutoReportClaimRequestDto')
        request_list.append(kwargs)
        parm_json = CommonUtils.be_post_json(methodName='autoReport',
                                             url='http://service.ymm.com/insure-service-apply/freightCompensationClaimService_1.0.0',
                                             request_list=request_list)
        response = httpUtil.Post(url, headers, parm_json)
        logger.info("=======autoReport的POST方式的入参是=====\n" + str(kwargs))
        print("=======autoReport的POST方式的返回值是=====\n" + response)
        response_json = json.loads(response)
        return response_json

测试用例方法调用如下:

from base.insure.insure_service import freightCompensationClaimService
from settings.mysql import Database

freightCompensationClaimService=freightCompensationClaimService()
dataBase = Database("fis")
dataBase_calim = Database("fis_claim")
poliNo = 0

# 太平洋运费损失险理赔
class Test_autoReport:
    def setup_class(self):
        self.poliNo = 'AGUZGDS59724EGGF73E4'
        self.claimAmount = 6000.01
        self.conferenceFreightRate = 8000
        self.receivedFreightRate = 1000
        self.truckLength = '6.0'
        self.driverLicenses =  ["https://dev-image56-conf-oss.ymm56.com/ymmfile/insure-service/1e15310d-f7b7-444d-a5ef-a11708bbe48a?Expires=1654166938&OSSAccessKeyId=LTAIq0WRi8jPwg5y&Signature=7PV0SGlFsqSuMvTPHQlYt2jw3eU%3D"]
        self.drivingLicenses = ["https://dev-image56-conf-oss.ymm56.com/ymmfile/insure-service/1e15310d-f7b7-444d-a5ef-a11708bbe48a?Expires=1654166938&OSSAccessKeyId=LTAIq0WRi8jPwg5y&Signature=7PV0SGlFsqSuMvTPHQlYt2jw3eU%3D"]
        self.invoices = ["https://dev-image56-conf-oss.ymm56.com/ymmfile/insure-service/1e15310d-f7b7-444d-a5ef-a11708bbe48a?Expires=1654166938&OSSAccessKeyId=LTAIq0WRi8jPwg5y&Signature=7PV0SGlFsqSuMvTPHQlYt2jw3eU%3D"]



    def test_01(self):
        response = freightCompensationClaimService.autoReport(policyNo= self.poliNo,claimAmount=self.claimAmount,conferenceFreightRate=self.conferenceFreightRate ,receivedFreightRate=self.receivedFreightRate ,truckLength=self.truckLength ,
            driverLicenses=self.driverLicenses,drivingLicenses=self.drivingLicenses,invoices=self.invoices )
        print(response["response"])
        assert response["response"]["success"] == True
        assert response["response"]["data"] != None

 第三种:通过构建对象类,封装测试方法(推荐使用,可以解决重复登录,请求头重复等问题)

接口调用方法封装

class TradeDriverCheckOutApp:
    '''
    司机端-司机抢单完成之后,指派司机等操作接口封装
    所有接口请求头中的client-info必须跟authorization保持一致,和login中的保持一致
    '''
    # 定义类的构造方法
    def __init__(self, tel_num):
        self.tel_num = tel_num
        self.headers = {
            "client-info": "36b7a347643fe4bdad1692fc5bd13da0/driver/8.84.0.0/android/com.xiwei.logistics",
            "rn-info": "global-engine:7.58.0.299/fta-driver-youpin:8.61.0.401/fta-forum:7.34.4.12/enterprisecargodetail:7.53.001.15/fta-im:7.47.0.52/fta-user:7.51.0.32/projectcargodetail:7.53.001.220/carsticker:8.47.230.22/common:6.5.1.3/fta-cargomatch:7.57.1.154/cargodetail:7.62.75.786/cargomatch:7.53.1.38/nav:7.51.1.7/fta-price-checking:8.19.1.10/fta-board-equipment:8.57.280.58/sd-driver-fta:8.53.0.63/entrust:8.53.001.32/payorder:8.58.141.380/ymmverify:7.58.0.146/sd-trade-fta:1.7.250.321/sdcargo-driver:7.55.0.285/driver_cargo_trade:7.25.1.1/rn-region-cargodetail-driver:8.62.101.273/orders:7.59.1.168/user:7.51.0.14/transaction:7.55.10.14",
            "plugin-info": "ymm.maptencent:1.80.1/nav:5.84.0/order:1.82.0/wal:3.61.10/shortdistance:1.82.1/ymm.di:8.74.1/im:7.82.0/shortdistance.im:1.39.0/financeshield:4.24.7/ymm.verify:7.84.0/ymm.cargopublish:7.80.0/ymm.framework.dynamic:1.5.7/ymm.security:8.80.1/cargo:8.84.0/"
        }
        self.login_return = self.login()

    def login(self):
        env_url = f'https://{env}.ymm56.com/ymm-userCenter-app/account/login'
        print("env_url", env_url)
        headers = self.headers
        user = {
            "cmToken": "",
            "code": "1234"
        }
        user["telephone"] = self.tel_num
        login_res = requests.post(url=env_url, json=user, headers=headers)
        login_res_json = json.loads(login_res.content)
        # return_auth = login_res_json["info"]["profileInfo"]["basicAuthrization"]
        return login_res_json

    def assign_team_member(self, req_json):
        '''
        车队长指派司机,司机场景险批单
        '''
        url = 'https://{0}.ymm56.com//trade-checkout-app/platformCarrierTeam/assignTeamMember'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("出参:{0}".format(req_resp_json))
        return req_resp_json

    def payPage(self, cargoID):
        '''
        司机抢单-支付定金页
        '''
        req_json = {"depositAmount": "0", "cargoId": cargoID, "freightAmount": "0", "pageSource": "0",
                    "extraParams": None, "walletVersion": "3.61.10",
                    "verticalParam": {"todayCargoRetentionCount": 0, "todayRetentionCount": 0,
                                      "cargoDetailVersion": "d8d9d065beae4c1a80368e8be77f3f1a"}, "tags": [],
                    "transParams": None}
        url = 'https://{0}.ymm56.com/ld-order-deal-app/order/payPage'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("/ld-order-deal-app/order/payPage入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("/ld-order-deal-app/order/payPage出参:{0}".format(req_resp_json))
        return req_resp_json

    def payPagehalfpagecombo(self, cargoID):
        '''
        司机抢单-支付定金页-半弹窗-getComboProductList
        '''
        req_json = {
                "depositAmount": "",
                "cargoId": "10189392400523",
                "freightAmount": "",
                "pageSource": "0",
                "extraParams": None,
                "walletVersion": "3.58.21",
                "verticalParam": {
                    "driverLatitude": 31.209773,
                    "driverLongitude": 121.408535,
                    "cargoDetailVersion": "cd0a3da8bfa1416a9bdbfcd76b53c798"
                },
                "tags": [],
                "transParams": None
            }
        req_json["cargoId"] = cargoID
        url = 'https://{0}.ymm56.com/ld-order-deal-app/order/half-pay-page'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("/ld-order-deal-app/order/payPage入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("/ld-order-deal-app/order/payPage出参:{0}".format(req_resp_json))
        return req_resp_json

    def payPagehalfpageinfolistforpop(self, cargoID):
        '''
        司机抢单-支付定金页-半弹窗-getProductInfoListForPop
        '''
        req_json = {
                "depositAmount": "",
                "cargoId": "10189393153503",
                "freightAmount": "",
                "pageSource": "0",
                "extraParams": None,
                "walletVersion": "3.58.21",
                "verticalParam": {
                    "driverLatitude": 31.209773,
                    "driverLongitude": 121.408535,
                    "cargoDetailVersion": "b8164eed54b84a8c902185620ccb4722"
                },
                "tags": [],
                "transParams": None
            }
        req_json["cargoId"] = cargoID
        url = 'https://{0}.ymm56.com/ld-order-deal-app/order/half-pay-page'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("/ld-order-deal-app/order/payPage入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("/ld-order-deal-app/order/payPage出参:{0}".format(req_resp_json))
        return req_resp_json

    #insure 是否需要购买司机端保险,不需要购买司机端保险,则insure=None
    def order_grab(self, dd,insure=True):
        '''
        司机抢单-发起支付,如果遇见系统繁忙,请求次数过多,可以换新的货源试试
        '''
        if insure == True:
            req_json = json_utils.JsonUtils("grapgly.json", 'cargo_publish').data
        else:
            req_json = json_utils.JsonUtils("grab.json", 'cargo_publish').data
        req_json["deposit"] = dd["deposit"]
        req_json["contractDTO"]["totalFreight"] = dd["totalFreight"]
        req_json["contractDTO"]["netFreight"] = dd["netFreight"]
        req_json["moreCostList"][0]["moreCostAmount"] = dd["moreCostAmount"]
        req_json["insuranceItems"] = dd["insuranceItems"]
        req_json["cargoId"] = dd["cargoId"]
        req_json["transmissionParams"] = dd["transmissionParams"]
        url = 'https://{0}.ymm56.com/ld-order-deal-app/order/grab'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("===/ld-order-deal-app/order/grab入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("/ld-order-deal-app/order/grab出参:{0}".format(req_resp_json))
        return req_resp_json

    # insure 是否需要购买司机端保险,不需要购买司机端保险,则insure=None
    def order_grab_half_infolistforpop(self, dd, insure=True):
        '''
        司机抢单-支付定金页-半弹窗-组合险1.0和2.0新接口,-发起支付,需要读取新的入参grab_half_infolistforpop.json
        '''
        if insure == True:
            req_json = json_utils.JsonUtils("grab_half_infolistforpop.json", 'cargo_publish').data
        else:
            req_json = json_utils.JsonUtils("grab_half_infolistforpop.json", 'cargo_publish').data
        req_json["deposit"] = dd["deposit"]
        req_json["contractDTO"]["totalFreight"] = dd["totalFreight"]
        req_json["contractDTO"]["netFreight"] = dd["netFreight"]
        req_json["moreCostList"][0]["moreCostAmount"] = dd["moreCostAmount"]
        req_json["insuranceItems"] = dd["insuranceItems"]
        req_json["cargoId"] = dd["cargoId"]
        req_json["transmissionParams"] = dd["transmissionParams"]
        url = 'https://{0}.ymm56.com/ld-order-deal-app/order/grab'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("===/ld-order-deal-app/order/grab入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("/ld-order-deal-app/order/grab出参:{0}".format(req_resp_json))
        return req_resp_json

    def insure_app_trial(self, req_json):
        '''
        司机端,频道险-自主购买,算价接口
        '''
        url = 'https://{0}.ymm56.com//insure-app/insurance/common/trial'.format(env)
        headers = self.headers
        headers["authorization"] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        print("入参:{0}".format(req_json))
        req_resp = requests.post(url=url, json=req_json, headers=headers)
        req_resp_json = json.loads(req_resp.content)
        print("出参:{0}".format(req_resp_json))
        return req_resp_json

    # 测试银行支付
    def payOrderWithTestBank(self, mobile, client_name, bizNo, source):
        # 登陆接口
        auth = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        request_url = "http://pay.qa-bj.56qq.com/mwallet/login/loginByThirdParty.htm?" if env == 'qa' else "https://pay.dev-ag.56qq.com/mwallet/login/loginByThirdParty.htm"
        headers = {}
        headers['Content-Type'] = "application/x-www-form-urlencoded"
        headers['Authorization'] = auth
        headers['x-auth-type'] = 'basic'
        headers['x-cipher-type'] = 'YMM'
        headers['encryptresponse'] = '2'
        headers['x-ag-wallet-v3-de'] = 'true'
        headers['walletversion'] = '3.2.1'
        t = int(round(time.time() * 1000))
        st_sid = self.getStSid(mobile, client_name)
        st = st_sid['st']
        sid = st_sid['sid']
        token = getAuth.getWalletToken(env, auth, client_name)
        encodeToken = urllib.parse.quote(str(token))
        body = f"st={st}&sid={sid}&third_party_token={encodeToken}&_t_={t}"
        response = httpUtil.Post(request_url, headers, body)
        responseJson = json.loads(response)
        print("钱包登录接口loginByThirdParty返回结果:", responseJson)
        hostUid = responseJson['data']['hostUid']
        NewToken = responseJson['data']['token']
        businessType = 100000
        # 进入收银台
        request_applyPay_url = "https://pay.qa-bj.56qq.com/mwallet/my/applyPayV2.htm?" if env == 'qa' else "https://pay.dev-ag.56qq.com/mwallet/my/applyPayV2.htm?"
        body_applyPay = f"uid={hostUid}&third_party_token={encodeToken}&outerTradeNo={bizNo}&source={source}&cashierType=interiorH5&token={NewToken}&_sid_={sid}&st={st}&_vn_=8.27.0.0&_t_={t}&_pn_=com.xiwei.logistics&dfp=6c48ed91-4e84-4b50-a4b1-dd0d24f48a43&sid={sid}&_m_=PCT-AL10&_lat_=31.22012644089214&_no_=CHINA_MOBILE&_lng_=121.3134673016471&_ov_=10&_ch_=ymm&_dfp_=6c48ed91-4e84-4b50-a4b1-dd0d24f48a43&_nw_=Wi-Fi&_vc_=8270000"
        responseApplyPay = httpUtil.Post(request_applyPay_url, headers, body_applyPay)
        responseApplyPayJson = json.loads(responseApplyPay)
        print("applyPayV2接口返回", responseApplyPayJson)
        fields = responseApplyPayJson['data']['submitInfo']['fields']
        fields_encode = quote_plus(json.dumps(fields))
        # 支付接口
        request_payV2_url = "https://pay.qa-bj.56qq.com/mwallet/my/payV2.htm" if env == 'qa' else "https://pay.dev-ag.56qq.com/mwallet/my/payV2.htm"
        body_payV2 = f"uid={hostUid}&third_party_token={encodeToken}&payWay=%7B%22payWayCode%22%3A%22TESTBANK%22%2C%22properties%22%3A%7B%7D%7D&submitComponents=%7B%22submitInfo%22%3A%7B%22submit%22%3Atrue%2C%22baitiaoGuide%22%3Anull%2C%22frontMode%22%3Afalse%2C%22selectedCouponId%22%3Anull%2C%22selectedCoupon%22%3Anull%2C%22campaignInfo%22%3A%22null%22%2C%22tag%22%3A%22submitInfo%22%2C%22id%22%3A%221%22%2C%22type%22%3A%22biz%22%2C%22fields%22%3A{fields_encode}%2C%22businessType%22%3A%22{businessType}%22%2C%22balanceDeduct%22%3Anull%7D%7D&cashierType=interiorH5&token={NewToken}&_sid_={sid}&st={st}&_vn_=8.27.0.0&_t_={t}&_pn_=com.xiwei.logistics&dfp=6c48ed91-4e84-4b50-a4b1-dd0d24f48a43&sid={sid}&_m_=PCT-AL10&_lat_=31.22025732677263&_no_=CHINA_MOBILE&_lng_=121.31334732464585&_ov_=10&_ch_=ymm&_dfp_=6c48ed91-4e84-4b50-a4b1-dd0d24f48a43&_nw_=Wi-Fi&_vc_=8270000"
        print("========body_payV2入参========")
        print(body_payV2.replace("'", '"'))
        # body_payV2 = f"uid={hostUid}&third_party_token={encodeToken}&payWay=%7B%22payWayCode%22%3A%22TESTBANK%22%2C%22properties%22%3A%7B%7D%7D&submitComponents=%7B%22submitInfo%22%3A%7B%22submit%22%3Atrue%2C%22baitiaoGuide%22%3Anull%2C%22frontMode%22%3Afalse%2C%22selectedCouponId%22%3Anull%2C%22selectedCoupon%22%3Anull%2C%22campaignInfo%22%3A%22null%22%2C%22tag%22%3A%22submitInfo%22%2C%22id%22%3A%221%22%2C%22type%22%3A%22biz%22%2C%22fields%22%3A%7B%22st%22%3A%22{st}%22%2C%22bizNo%22%3A%22{bizNo}%22%2C%22amount%22%3A%22{amount}%22%2C%22productId%22%3Anull%2C%22mainPartnerId%22%3Anull%2C%22source%22%3A%22{source}%22%2C%22combinationTradeCouponList%22%3Anull%2C%22sid%22%3A%22{sid}%22%2C%22tradeCouponList%22%3Anull%2C%22tradeVoucherNos%22%3A%22{tradeVoucherNos}%22%2C%22paymentScene%22%3Anull%2C%22campaignInfo%22%3Anull%2C%22balanceDeduct%22%3Anull%7D%2C%22businessType%22%3A%22{businessType}%22%2C%22balanceDeduct%22%3Anull%7D%7D&cashierType=interiorH5&token={NewToken}&_sid_={sid}&st={st}&_vn_=8.27.0.0&_t_={t}&_pn_=com.xiwei.logistics&dfp=6c48ed91-4e84-4b50-a4b1-dd0d24f48a43&sid={sid}&_m_=PCT-AL10&_lat_=31.22025732677263&_no_=CHINA_MOBILE&_lng_=121.31334732464585&_ov_=10&_ch_=ymm&_dfp_=6c48ed91-4e84-4b50-a4b1-dd0d24f48a43&_nw_=Wi-Fi&_vc_=8270000"
        responseV2 = httpUtil.Post(request_payV2_url, headers, body_payV2)
        responseV2Json = json.loads(responseV2)
        print("payV2入参body_payV2:", body_payV2)
        print("payV2返回结果:", responseV2Json)
        return responseV2Json

    def getStSid(self, mobile, client_name):
        if env == 'dev':
            env_url_c = requestData.dev
        else:
            env_url_c = requestData.qa
        request_url = env_url_c + requestData.partnerToken_request
        headers = {}
        headers['Content-Type'] = "application/json"
        headers["client-info"] = readconfig.get_clientinfo(client_name)
        headers['Authorization'] = self.login_return["info"]["profileInfo"]["basicAuthrization"]
        PostJson = {"needWalletToken": False}
        response = httpUtil.Post(request_url, headers, PostJson)
        responseToJson = json.loads(response)
        result = json.loads(responseToJson["token"])
        st_sid = {"sid": result["id"], "st": result["token"]}
        return st_sid

 业务逻辑串联,调用封装的接口方法

# -*- coding: utf-8 -*-
"""
@Time :2023/3/29 19:05
@Auth :guoliuyang
@File :create_driver_policy.py
"""
from settings import env
from testcase.common.tools.resend_cargo import cargoPublish
from base.trade_checkout_app import trade_checkout_app
from utils import getAuth
import time
import json
from base import cargo


'''
司机场景险购买-场景险保单
司机抢单,支付定金页,司机场景险,购买保险
'''

class CreateDriverPolicy():

    def create_driver_policy_new(self, mobileDriver, cargoID):
        try:
            trade_new = trade_checkout_app.TradeDriverCheckOutApp(mobileDriver)
            # 通过paypage获取险种和对应的险种价格
            respone_paypage = trade_new.payPage(cargoID)
            # json_return = json.dumps(respone_paypage["data"]["body"]) #把字典数据转换为json
            for i in respone_paypage["data"]["body"]["items"]:
                if i["type"] == 'MBCostBizComponent':
                    deposit = i["props"]["deposit"]["amount"]
                    print("定金:{0}".format(i["props"]["deposit"]["amount"]))
                    totalFreight = i["props"]["freight"]["amount"]
                    print(
                        "总运费:{0}".format(i["props"]["freight"]["amount"]))
                    netFreight = totalFreight  # 净得运费,由于货源都是退还定金的,所以目前跟总运费一致,如果不退还定金则需要减去定金
                    print("净得运费:{0}".format(netFreight))
                    moreCostAmount = i["props"]["moreCost"][0]["totalFee"]
                    print("技术服务费:{0}".format(moreCostAmount))


            for i in respone_paypage["data"]["body"]["items"]:
                if i["type"] == "MBInsuranceBizComponent":
                    insuranceList = i["props"]["insuranceList"]

            insuranceItems = []
            for i in insuranceList:
                a = {
                    "itemId": None,
                    "amount": 0,
                    "code": 0,
                    "continuousCheck": False,
                    "childList": []
                }
                # 17-人保超级放空险、20-太保超级防空险、16-百万三者责任险、13-太平洋运输保、3-平安运输保
                # 14-运费损失补偿险、18-平安驾乘意外险、22太保押车险----17,11,16,13,3,14,18,22
                if i["type"] in (17,20,22,11,16,13,3,14,18,22):
                    a["itemId"] = i["itemId"]
                    a["amount"] = i["premium"]
                    a["code"] = i["type"]
                    insuranceItems.append(a)
            print("保险数据:{0}".format(insuranceItems))
            cargoId = respone_paypage["data"]["props"]["cargoId"]
            print("cargoId:{0}".format(cargoId))
            transmissionParams = respone_paypage["data"]["props"]["transmissionParams"]
            print("transmissionParams:{0}".format(transmissionParams))

            # 可以通过payPage获取,deposit、totalFreight、netFreight、insuranceItems、versionId(会影响算价,给下面的接口用)
            dd = {
                "deposit":deposit,
                "totalFreight":totalFreight,
                "netFreight":netFreight,
                "moreCostAmount":moreCostAmount,
                "insuranceItems":insuranceItems,
                "cargoId":cargoId,
                "transmissionParams":transmissionParams
            }
            # 通过/ld-order-deal-app/order/grab,进入支付页面,调起支付
            respone_order_grab = trade_new.order_grab(dd)
            outOrderNo = respone_order_grab["data"]["info"]["outOrderNo"]
            print("支付入参outOrderNo:{0}".format(outOrderNo))
            tradeAmount = respone_order_grab["data"]["info"]["tradeAmount"]
            print("支付入参tradeAmount:{0}".format(tradeAmount))
            source = respone_order_grab["data"]["info"]["source"]
            print("支付入参source:{0}".format(source))

            # 测试银行支付
            getAuth.payWithTestBank(mobileDriver, "ymm_and_driver_clientlist", outOrderNo, tradeAmount,source)
            return respone_order_grab["data"]["info"]["orderId"]

        except Exception as e:
            print("出现异常!")
        finally:
            print("司机抢单结束")


if __name__ == '__main__':
    # 组合险会提示抢单异常(新方法,自己写的),务必使用新货源
    # 货源重新发货有改动,用resend_cargo.py这个文件可以实现再来一单,如果遇见发货失败,只看cargoPublish方法就行,一般都是入参问题
    # cargo = cargoPublish(mobile='15617681741')
    # cargoId = cargo.cargoPublish()
    # print(cargoId)
    # time.sleep(10)
    # pa = CreateDriverPolicy()
    # pa.create_driver_policy_new('15617681788', cargoId)


    # 仅限发佛山-广州的玻璃货物,可以走运输保易碎品
    # cargoResponse = cargo.prepare_and_complet(mobile=15617681743, client_name='ymm_and_shipper_clientlist',start_city='佛山市', end_city='广州市', searchWord='玻璃')
    # cargo_id = cargoResponse['cargo_id']
    # print(cargo_id)


    # 根据货源id直接抢单,务必使用新货源
    pa = CreateDriverPolicy()
    pa.create_driver_policy_new('15617681748', 10189811109884)

 准备测试数据,编写场景测试用例,通过jenkins定时执行并生成测试报告

from utils import httpUtil, getAuth, json_utils
from settings.env import env
from data.enum.Product import Product
from base.insure import create_insure_utils
import pytest
from testcase.common.tools.create_driver_policy import CreateDriverPolicy
from settings import env
from testcase.common.tools.resend_cargo import cargoPublish
from base.trade_checkout_app import trade_checkout_app
from utils import getAuth
import time
import json
from base import cargo

class Test_create_driver_policy:
    # 自动化测试脚步,货主和司机账户
    shipper = '15617681741'
    driver = '15617681788'
    bodyJson = json_utils.JsonUtils("resend_cargo.json", 'cargo_publish').data

    def test_all_policy(self):
        '''
        司机场景险购买 - 场景险保单
        司机抢单,支付定金页,司机场景险,购买保险
        货主:15617681741,司机:15617681788
        '''
        # 第一步:发布货源
        cargo = cargoPublish(self.shipper,self.bodyJson)
        cargoId = cargo.cargoPublish()
        print(cargoId)
        # 第二步:司机抢单,自动化账户:15617681788
        time.sleep(10)
        pa = CreateDriverPolicy()
        order = pa.create_driver_policy_new(self.driver, cargoId)
        print(order)
        time.sleep(10)

年薪30W+必备!接口自动化测试框架封装实战(附源码)|Pytest+Allure+Jenkins

 


网站公告

今日签到

点亮在社区的每一天
去签到