学习强国手机助手

发布于:2024-05-07 ⋅ 阅读:(34) ⋅ 点赞:(0)

前景:

        用手机刷学习强国时要一直盯着手机,总感觉费时费劲,刚好最近学习python写个小工具帮忙自动学习,实现了文章和视频学习,答题类不一定都能正确。上班时电脑连着USB就可以放那,自己可以上班干自己事情。

开发工具:

AirtestIDE:负责获取界面元素,poco远程解析控制手机

pycharm:负责写python逻辑,pyqt5负责界面显示日志

工具界面:

开发条件:

        Android手机,打开USB调试,第一次运行允许安装pocoservice

处理逻辑代码:

from PyQt5.QtCore import QThread, pyqtSignal
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
from airtest.core.api import *
import subprocess
import re


def is_in_array(array, element):
    for item in array:
        if item == element:
            return True
    return False


# 定义一个线程类
class QCtrlThread(QThread):
    # 成员
    read_done_tile = []  # 已读文章或视频
    log_signal = pyqtSignal(str)

    # 初始化
    def __init__(self, parent=None):
        super(QCtrlThread, self).__init__(parent)
        print("QCtrlThread init")

    # run函数是子线程中的操作,线程启动后开始执行
    def run(self):
        self.log_signal.emit("开始学习")
        poco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
        poco.device.wake()
        start_app("cn.xuexi.android")
        sleep(2)
        self.get_score(poco)
        self.log_signal.emit("学习结束。。。")

    def connect_android(self):
        # 执行命令
        print("connect_android")
        cur_path = os.path.abspath('.')
        print(cur_path)
        run_airtest_path = cur_path + "\\airtest"
        cmd = r'"{}" devices'.format(run_airtest_path + '\\core\\android\\static\\adb\\windows\\adb.exe')
        print(cmd)
        if os.path.exists(run_airtest_path):
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
        else:
            p = subprocess.Popen('adb devices', stdout=subprocess.PIPE, shell=True)
        # 获取输出结果
        output, err = p.communicate()
        # 处理输出结果
        output_str = output.decode('utf-8')
        print(output_str)
        device_list = output_str.split("\n")
        device_name = ""
        for i in range(0, len(device_list)):
            device_txt = device_list[i]
            if device_txt.find("List of devices attached") >= 0:
                continue
            print(device_txt)
            if device_txt.find("device") > 0:
                device_split = device_txt.split("	")
                device_name = device_split[0]
                print("cmd result:", device_name)
                break
        if len(device_name) < 8:
            self.log_signal.emit("设备未查找到,请连接设备,确认开发者选项-USB调试已打开")
            return False
        device_url = "Android://127.0.0.1:5037/" + device_name
        self.log_signal.emit("开始连接手机设备名称:" + device_url)
        connect_device(device_url)
        return True

    def operate(self, poco):
        cnt_score = 0  # 得分计数
        list_view = poco("android.widget.FrameLayout").child("androidx.recyclerview.widget.RecyclerView").child(
            "android.widget.FrameLayout")
        print("当前界面展示项:", len(list_view))
        for i in range(0, len(list_view)):
            title_view = list_view[i].offspring(
                "cn.xuexi.android:id/general_card_title_id")
            if title_view.exists() is True:
                title_txt = title_view.get_text()
                self.log_signal.emit("标题:" + title_txt)
                if is_in_array(self.read_done_tile, title_txt) is False:  # 未读
                    title_view.focus([0.5, 0.5]).click()
                    sleep(1)
                    #  开始点击音频|视频新闻
                    self.play(poco)
                    cnt_score += 2
                    self.read_done_tile.append(title_txt)

        list_view.swipe([0, -0.5])  # 滑动
        return cnt_score

    def play(self, poco):
        self.log_signal.emit("开始视频|文章学习")
        #  电视
        if poco(name="cn.xuexi.android:id/EXTRA_INFO_LAYER_VIEW_ID"):
            self.log_signal.emit("开始播放视频")
            if poco(text="继续播放"):
                poco(text="继续播放").focus([0.5, 0.5]).click()
            while 1:
                if poco(text="重新播放"):
                    break
                else:
                    sleep(1)
            #  返回
            poco(name="cn.xuexi.android:id/EXTRA_INFO_LAYER_VIEW_ID").child("android.widget.FrameLayout")[1].child(
                "android.widget.FrameLayout")[2].child("android.widget.ImageView").focus([0.5, 0.5]).click()
        else:
            #  文章
            self.log_signal.emit("开始阅读")
            if poco("android.widget.Button"):
                poco("android.widget.Button").focus([0.5, 0.5]).click()
                # 获取播放时间
                arr_view = poco("android.widget.Button").parent().child("android.view.View")
                #  print(len(arr_view))
                if len(arr_view) > 2:
                    str_time = arr_view[2].get_text()
                    self.log_signal.emit("时长:" + str_time)
                    split_time = str_time.split(":")
                    int_time = int(split_time[0]) * 60 + int(split_time[1])
                    print("时长秒:" + str(int_time))
                    sleep(int_time)
            else:
                # self.log_signal.emit("开始播报")
                # if poco(name="play-0"):
                #     poco(name="play-0").focus([0.5, 0.5]).clicked()
                # else:
                article_body = poco(name="xxqg-article-body")
                for t in range(0, 30):
                    if article_body:
                        article_body.swipe([0, -0.1])
                        sleep(1)
                self.log_signal.emit("阅读完毕")
            #  返回
            if poco(name="cn.xuexi.android:id/TOP_LAYER_VIEW_ID"):
                poco(name="cn.xuexi.android:id/TOP_LAYER_VIEW_ID").child("android.widget.ImageView").focus(
                    [0.5, 0.5]).click()
                sleep(1)

    def answer(self, poco):
        if poco("com.uc.webview.export.WebView").exists() is False:
            return False
        while 1:
            if poco(name="返回"):
                poco(name="返回").click()
                break
            # 查看提示
            tips_content = ""
            if poco(text="查看提示"):
                poco(text="查看提示").focus([0.5, 0.5]).click()
                sleep(1)
                if poco(text="提示"):
                    tips_view = poco(text="提示").parent().parent().child("android.view.View")
                    tips_content = tips_view[1].child("android.view.View").get_text()
                    self.log_signal.emit("提示:\n" + tips_content)
                    tips_head = poco(text="提示").parent().child("android.view.View")
                    tips_head[0].focus([0.5, 0.5]).click()  # 关闭提示
                    sleep(0.5)
            else:
                article_body.swipe([0, -0.5])
                sleep(1)
                continue

            #  self.log_signal.emit("开始答题")
            if poco("android.widget.EditText"):
                self.log_signal.emit("填空题")
                if poco("android.widget.EditText"):
                    new_content = poco("android.widget.EditText").parent().parent().child("android.view.View")
                    edit_views = poco("android.widget.EditText").parent().child("android.view.View")  # 需要填入的项
                    if edit_views.exists() is True:
                        self.log_signal.emit("需要填入字数:", len(edit_views))
                    if tips_content == "请观看视频":  # 随便选一个
                        self.log_signal.emit("请观看视频")
                    else:
                        for i in range(0, len(new_content)):
                            print(new_content[i].get_text())
                        if len(new_content) > 1:  # 最后一段
                            last_part_txt = new_content[len(new_content) - 1].get_text()
                            key_txt = last_part_txt[0:4]
                            index = tips_content.find(key_txt)
                            if index != -1:
                                print("填入:", tips_content[index: -1])
                                print("填入:", tips_content[index: -2])

                else:
                    self.log_signal.emit("解析错误")

            if poco("android.widget.ListView"):
                self.log_signal.emit("选择题")
                items = ["A.", "B.", "C.", "D.", "E."]
                # items_value = []
                is_select = False
                for i in range(0, len(items)):  # 获取选题内容
                    if poco(text=items[i]):
                        item = poco(text=items[i]).parent().child("android.view.View")
                        item_txt = item[1].get_text()
                        self.log_signal.emit(items[i] + ":" + item_txt)
                        if item_txt.find("正确") == 0:
                            self.log_signal.emit("主观判断题,都判正确")
                            poco(text=items[i]).focus([0.5, 0.5]).click()
                            break
                        else:
                            items_value = item[1].get_text()
                            is_sub = items_value in tips_content
                            if is_sub is True:
                                self.log_signal.emit("选中:" + items[i])
                                poco(text=items[i]).focus([0.5, 0.5]).click()
                                is_select = True
                                if poco(text="单选题"):
                                    break
                            else:
                                if tips_content == "请观看视频":  # 随便选一个
                                    poco(text=items[i]).focus([0.5, 0.5]).click()
                                    break
                if poco(text="单选题") and is_select is False:
                    poco(text=items[0]).focus([0.5, 0.5]).click()  # 默认选中第一个
            if poco(text="确定"):
                poco(text="确定").focus([0.5, 0.5]).click()
                sleep(1)
            if poco(text="下一题"):
                poco(text="下一题").focus([0.5, 0.5]).click()
                sleep(1)

    def get_score(self, poco):
        self.log_signal.emit("开始查找得分")

        if poco(name="cn.xuexi.android:id/comm_head_xuexi_score"):
            str_score = poco(name="cn.xuexi.android:id/comm_head_xuexi_score").get_text()
            self.log_signal.emit("得分:" + str_score)
            poco(name="cn.xuexi.android:id/comm_head_xuexi_score").click()
            sleep(2)
        if poco(text="学习积分"):
            list_class = ["我要选读文章", "我要视听学习", "每日答题"]
            for i in range(0, len(list_class)):
                print("开始学习:", list_class[i])
                read_news_label = poco(text=list_class[i])
                if read_news_label:
                    read_news_label.swipe([0, 0.2])  # 滑动一下防止最后一项没显示
                    sleep(1)
                    read_news_views = read_news_label.parent().child("android.widget.TextView")
                    print("read_news_views length:", len(read_news_views))
                    if read_news_label.parent().child(name="去看看"):
                        read_news_label.parent().child(name="去看看").click()  # 进入得分
                        sleep(1)
                        news_score_cnt = 0
                        while 1:
                            try:
                                if list_class[i] == "每日答题":
                                    news_score_cnt += self.answer(poco)
                                    read_news_label.swipe([0, -0.2])  # 滑动一下防止最后一项没显示
                                else:
                                    news_score_cnt += self.operate(poco)
                                if news_score_cnt >= 2:
                                    break
                            except:
                                self.log_signal.emit("解析异常")
                        self.get_score(poco)

        else:
            self.log_signal.emit("界面不对,请回到主页面")

界面代码:

from MainWindow import *
from ctrlThread import QCtrlThread
from PyQt5.QtWidgets import QMainWindow


class XUEXIWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        super(XUEXIWindow, self).__init__(parent)
        self.setupUi(self)

        # 绑定按钮点击事件
        self.btn_start.clicked.connect(self.start)
        self.btn_connenct.clicked.connect(self.connect_device)
        self.thread = QCtrlThread()  # 实例化一个线程

        self.thread.log_signal.connect(self.change_log)

    def start(self):
        print('Start clicked')
        # 启动线程,执行线程类中run函数
        if self.thread.isRunning():
            self.textEdit.append("正在学习中...")
            return
        self.thread.start()

    def connect_device(self):
        print("开始连接设备")
        self.thread.connect_android()

    def change_log(self, msg):
        print("log:", msg)
        self.textEdit.append(str(msg))