前景:
用手机刷学习强国时要一直盯着手机,总感觉费时费劲,刚好最近学习python写个小工具帮忙自动学习,实现了文章和视频学习,答题类不一定都能正确。上班时电脑连着USB就可以放那,自己可以上班干自己事情。
开发工具:
AirtestIDE:负责获取界面元素,poco远程解析控制手机
pycharm:负责写python逻辑,pyqt5负责界面显示日志
工具界面:
开发条件:
Android手机,打开USB调试,第一次运行允许安装pocoservice
处理逻辑代码:
from PyQt5.QtCore import QThread, pyqtSignal
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
from airtest.core.api import *
import subprocess
import re
def is_in_array(array, element):
for item in array:
if item == element:
return True
return False
# 定义一个线程类
class QCtrlThread(QThread):
# 成员
read_done_tile = [] # 已读文章或视频
log_signal = pyqtSignal(str)
# 初始化
def __init__(self, parent=None):
super(QCtrlThread, self).__init__(parent)
print("QCtrlThread init")
# run函数是子线程中的操作,线程启动后开始执行
def run(self):
self.log_signal.emit("开始学习")
poco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False)
poco.device.wake()
start_app("cn.xuexi.android")
sleep(2)
self.get_score(poco)
self.log_signal.emit("学习结束。。。")
def connect_android(self):
# 执行命令
print("connect_android")
cur_path = os.path.abspath('.')
print(cur_path)
run_airtest_path = cur_path + "\\airtest"
cmd = r'"{}" devices'.format(run_airtest_path + '\\core\\android\\static\\adb\\windows\\adb.exe')
print(cmd)
if os.path.exists(run_airtest_path):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
else:
p = subprocess.Popen('adb devices', stdout=subprocess.PIPE, shell=True)
# 获取输出结果
output, err = p.communicate()
# 处理输出结果
output_str = output.decode('utf-8')
print(output_str)
device_list = output_str.split("\n")
device_name = ""
for i in range(0, len(device_list)):
device_txt = device_list[i]
if device_txt.find("List of devices attached") >= 0:
continue
print(device_txt)
if device_txt.find("device") > 0:
device_split = device_txt.split(" ")
device_name = device_split[0]
print("cmd result:", device_name)
break
if len(device_name) < 8:
self.log_signal.emit("设备未查找到,请连接设备,确认开发者选项-USB调试已打开")
return False
device_url = "Android://127.0.0.1:5037/" + device_name
self.log_signal.emit("开始连接手机设备名称:" + device_url)
connect_device(device_url)
return True
def operate(self, poco):
cnt_score = 0 # 得分计数
list_view = poco("android.widget.FrameLayout").child("androidx.recyclerview.widget.RecyclerView").child(
"android.widget.FrameLayout")
print("当前界面展示项:", len(list_view))
for i in range(0, len(list_view)):
title_view = list_view[i].offspring(
"cn.xuexi.android:id/general_card_title_id")
if title_view.exists() is True:
title_txt = title_view.get_text()
self.log_signal.emit("标题:" + title_txt)
if is_in_array(self.read_done_tile, title_txt) is False: # 未读
title_view.focus([0.5, 0.5]).click()
sleep(1)
# 开始点击音频|视频新闻
self.play(poco)
cnt_score += 2
self.read_done_tile.append(title_txt)
list_view.swipe([0, -0.5]) # 滑动
return cnt_score
def play(self, poco):
self.log_signal.emit("开始视频|文章学习")
# 电视
if poco(name="cn.xuexi.android:id/EXTRA_INFO_LAYER_VIEW_ID"):
self.log_signal.emit("开始播放视频")
if poco(text="继续播放"):
poco(text="继续播放").focus([0.5, 0.5]).click()
while 1:
if poco(text="重新播放"):
break
else:
sleep(1)
# 返回
poco(name="cn.xuexi.android:id/EXTRA_INFO_LAYER_VIEW_ID").child("android.widget.FrameLayout")[1].child(
"android.widget.FrameLayout")[2].child("android.widget.ImageView").focus([0.5, 0.5]).click()
else:
# 文章
self.log_signal.emit("开始阅读")
if poco("android.widget.Button"):
poco("android.widget.Button").focus([0.5, 0.5]).click()
# 获取播放时间
arr_view = poco("android.widget.Button").parent().child("android.view.View")
# print(len(arr_view))
if len(arr_view) > 2:
str_time = arr_view[2].get_text()
self.log_signal.emit("时长:" + str_time)
split_time = str_time.split(":")
int_time = int(split_time[0]) * 60 + int(split_time[1])
print("时长秒:" + str(int_time))
sleep(int_time)
else:
# self.log_signal.emit("开始播报")
# if poco(name="play-0"):
# poco(name="play-0").focus([0.5, 0.5]).clicked()
# else:
article_body = poco(name="xxqg-article-body")
for t in range(0, 30):
if article_body:
article_body.swipe([0, -0.1])
sleep(1)
self.log_signal.emit("阅读完毕")
# 返回
if poco(name="cn.xuexi.android:id/TOP_LAYER_VIEW_ID"):
poco(name="cn.xuexi.android:id/TOP_LAYER_VIEW_ID").child("android.widget.ImageView").focus(
[0.5, 0.5]).click()
sleep(1)
def answer(self, poco):
if poco("com.uc.webview.export.WebView").exists() is False:
return False
while 1:
if poco(name="返回"):
poco(name="返回").click()
break
# 查看提示
tips_content = ""
if poco(text="查看提示"):
poco(text="查看提示").focus([0.5, 0.5]).click()
sleep(1)
if poco(text="提示"):
tips_view = poco(text="提示").parent().parent().child("android.view.View")
tips_content = tips_view[1].child("android.view.View").get_text()
self.log_signal.emit("提示:\n" + tips_content)
tips_head = poco(text="提示").parent().child("android.view.View")
tips_head[0].focus([0.5, 0.5]).click() # 关闭提示
sleep(0.5)
else:
article_body.swipe([0, -0.5])
sleep(1)
continue
# self.log_signal.emit("开始答题")
if poco("android.widget.EditText"):
self.log_signal.emit("填空题")
if poco("android.widget.EditText"):
new_content = poco("android.widget.EditText").parent().parent().child("android.view.View")
edit_views = poco("android.widget.EditText").parent().child("android.view.View") # 需要填入的项
if edit_views.exists() is True:
self.log_signal.emit("需要填入字数:", len(edit_views))
if tips_content == "请观看视频": # 随便选一个
self.log_signal.emit("请观看视频")
else:
for i in range(0, len(new_content)):
print(new_content[i].get_text())
if len(new_content) > 1: # 最后一段
last_part_txt = new_content[len(new_content) - 1].get_text()
key_txt = last_part_txt[0:4]
index = tips_content.find(key_txt)
if index != -1:
print("填入:", tips_content[index: -1])
print("填入:", tips_content[index: -2])
else:
self.log_signal.emit("解析错误")
if poco("android.widget.ListView"):
self.log_signal.emit("选择题")
items = ["A.", "B.", "C.", "D.", "E."]
# items_value = []
is_select = False
for i in range(0, len(items)): # 获取选题内容
if poco(text=items[i]):
item = poco(text=items[i]).parent().child("android.view.View")
item_txt = item[1].get_text()
self.log_signal.emit(items[i] + ":" + item_txt)
if item_txt.find("正确") == 0:
self.log_signal.emit("主观判断题,都判正确")
poco(text=items[i]).focus([0.5, 0.5]).click()
break
else:
items_value = item[1].get_text()
is_sub = items_value in tips_content
if is_sub is True:
self.log_signal.emit("选中:" + items[i])
poco(text=items[i]).focus([0.5, 0.5]).click()
is_select = True
if poco(text="单选题"):
break
else:
if tips_content == "请观看视频": # 随便选一个
poco(text=items[i]).focus([0.5, 0.5]).click()
break
if poco(text="单选题") and is_select is False:
poco(text=items[0]).focus([0.5, 0.5]).click() # 默认选中第一个
if poco(text="确定"):
poco(text="确定").focus([0.5, 0.5]).click()
sleep(1)
if poco(text="下一题"):
poco(text="下一题").focus([0.5, 0.5]).click()
sleep(1)
def get_score(self, poco):
self.log_signal.emit("开始查找得分")
if poco(name="cn.xuexi.android:id/comm_head_xuexi_score"):
str_score = poco(name="cn.xuexi.android:id/comm_head_xuexi_score").get_text()
self.log_signal.emit("得分:" + str_score)
poco(name="cn.xuexi.android:id/comm_head_xuexi_score").click()
sleep(2)
if poco(text="学习积分"):
list_class = ["我要选读文章", "我要视听学习", "每日答题"]
for i in range(0, len(list_class)):
print("开始学习:", list_class[i])
read_news_label = poco(text=list_class[i])
if read_news_label:
read_news_label.swipe([0, 0.2]) # 滑动一下防止最后一项没显示
sleep(1)
read_news_views = read_news_label.parent().child("android.widget.TextView")
print("read_news_views length:", len(read_news_views))
if read_news_label.parent().child(name="去看看"):
read_news_label.parent().child(name="去看看").click() # 进入得分
sleep(1)
news_score_cnt = 0
while 1:
try:
if list_class[i] == "每日答题":
news_score_cnt += self.answer(poco)
read_news_label.swipe([0, -0.2]) # 滑动一下防止最后一项没显示
else:
news_score_cnt += self.operate(poco)
if news_score_cnt >= 2:
break
except:
self.log_signal.emit("解析异常")
self.get_score(poco)
else:
self.log_signal.emit("界面不对,请回到主页面")
界面代码:
from MainWindow import *
from ctrlThread import QCtrlThread
from PyQt5.QtWidgets import QMainWindow
class XUEXIWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
super(XUEXIWindow, self).__init__(parent)
self.setupUi(self)
# 绑定按钮点击事件
self.btn_start.clicked.connect(self.start)
self.btn_connenct.clicked.connect(self.connect_device)
self.thread = QCtrlThread() # 实例化一个线程
self.thread.log_signal.connect(self.change_log)
def start(self):
print('Start clicked')
# 启动线程,执行线程类中run函数
if self.thread.isRunning():
self.textEdit.append("正在学习中...")
return
self.thread.start()
def connect_device(self):
print("开始连接设备")
self.thread.connect_android()
def change_log(self, msg):
print("log:", msg)
self.textEdit.append(str(msg))