案例一:搜索【美妆】获取每一个卡片的信
代码如下:
# 导入时间等待库 import time # 导入ChromiumPage from DrissionPage import ChromiumPage # 数据写入到excel文件 from DataRecorder import Recorder # 写入到excel表格中 recorder = Recorder("./data.xlsx") recorder.set.show_msg(False) # 不显示日志信息--会显示很多无用的日志,所以这里屏蔽掉无用的日志打印 def handler(page, keyword): # 访问小红书的搜索【美妆】后的界面链接 # https://www.xiaohongshu.com/search_result?keyword=美妆&source=web_explore_feed page.get(f"https://www.xiaohongshu.com/search_result?keyword={keyword}&source=web_explore_feed&type=51") time.sleep(5) # 为了防止网络的影响加载慢,可以等待5秒,让数据加载一下 # 7--【循环进行向下滑动滚轮,不断加载跟多内容】 for i in range(1, 3): # 滚轮向下滑动3次 滚动的操作会做3次 # 为了捕获失败,防止程序报错,加上try...except try: # 通过class="note-item" 定位界面上的每一个卡片数据 # cards = page.eles('@class=note-item') # 该行代码等价于下一行,都是class定位 cards = page.eles('.note-item') # 3--【(1) 启动监听机制 监听卡片详情接口 对指定的数据包进行监听】 page.listen.start("/sns/web/v1/feed") # 遍历的每一个卡片 for card in cards: # print(card) # 通过局部定位定位大图,根据标签名定位img标签,并进行点击操作 card.ele('@tag()=img').click(by_js=True) # by_js=True 参数可以不写,进行默认 # 4--【(2) 等待卡片详情接口数据返回 点击显示的数据是动态请求的】 res = page.listen.wait(count=1, timeout=1, fit_count=True) # (3) 获取数据 data = res.response.body print("data:::", data) time.sleep(2) # 数据提取 nickname = find_first_key_value(data, "nickname") title = find_first_key_value(data, "title") desc = find_first_key_value(data, "desc") comment_count = find_first_key_value(data, "comment_count") liked_count = find_first_key_value(data, "liked_count") # 基于recorder将采集的数据写入excel,recorder需要是字典格式 # 把每个字典作为是一行数据,一行一行写入,所以这里需要组装成字典格式 map = { "博主昵称": nickname, "标题": title, "详情": desc, "评论数": comment_count, "点赞数": liked_count, } recorder.add_data(map) recorder.record() # 5--【关闭卡片并等待 弹窗关闭】 close_btn = page.ele('@class=close close-mask-dark') close_btn.click() time.sleep(2) except Exception as e: print("错误失败了--error::::", e) finally: # 7.1--【滚动滚轮】 try抛不抛异常,这个finally都会在try执行完一次以后被执行一次 page.scroll.up(100) # 点击滑块 time.sleep(1) page.scroll.to_bottom() time.sleep(1) # 6--【数据解析函数】 def find_first_key_value(data, target_key): # (1) 处理数据为字典的递归遍历 if isinstance(data, dict): # 判断是否是字典类型 # 便利字典的key和value for key, val in data.items(): # 如果key等于传递进来的target_key,就把对应的value返回出去 if key == target_key: return val # 递归遍历子元素 ret = find_first_key_value(val, target_key) if ret is not None: return ret # (2) 处理数据为列表的递归遍历 if isinstance(data, list): for item in data: # 遍历列表获取列表中的字典形式的列表元素 # 根据target_key提取对应的value值 ret = find_first_key_value(item, target_key) if ret is not None: return ret return None def main(): with open("关键词.txt", mode="r", encoding="utf-8") as f: # 1--【readlines 逐行进行读取txt文件中的搜索关键词,并以列表的形式进行返回】 keyword_list = f.readlines() # 创建浏览器驱动对象 page = ChromiumPage() # 访问小红书首页 page.get("https://www.xiaohongshu.com/explore") # 2--【小红书的登录很难,如果用逆向的话比较麻烦,所以这里我们进行手动登录】 input("等待登录") for keyword in keyword_list: # 把显示的页面page和关键词keyword传递过去 handler(page, keyword) # 程序的入口 main() """ 对代码中【】括起来的注释进行分析和解释 1、在编写代码前需要创建一个【关键词.txt】文件,里面存放将要搜索的关键词,例如:美妆、生活 2、因为小红书的登录如果使用逆向操作比较麻烦,所以我们可以使用ChromiumPage控制浏览器,进行手动扫码登录 3、因为卡片点击后,每次放大显示的数据都是通过接口直接请求的数据,所以我们需要监听,每次请求后接口返回的数据 因为如果通过逆向发送请求,并且获取接口的返回数据比较麻烦(接口中的参数和返回的数据都进行了加密处理还需要破解) 所以我们可以通过page提供的监听机制,可以动态的获取被监听的接口的请求和返回的数据 3.1、监听不能放到for循环里面,不然每点击一次卡片数据都会开启一次监听,所以需要放到外面,开启一次监听即可 4、获取监听到指定请求响应回来的数据内容 count=1, timeout=1, fit_count=True 参数可写可不写 5、点击放到后,需要先关闭放大后的弹窗,才能继续进行下一个点击,不然就只能获取到一个卡片数据 6、数据解析方法有案例 7、因为每一页只能显示几条数据,所以我们需要滑动滑块加载更多 因为当try...except加载完以后,不管会不会抛异常,都会执行finally中的代码,所以我们把 点击滑块,滑动滑块的动作放到这里 问题1:不能滚轮滚到底在进行加载数据吗? 不能,因为每次滚轮滚到底,加载完以后,滚轮自动就会回到中间 问题2:滚轮可以一下滚到底,在一个一个点吗? 因为滚轮滚到底以后,会在动态在加载数据,滚轮还会回到中间,在往下滚,在回到中间,类似一页一页的 7.1、100就是点击的时候上下晃动100个像素,不要这个也行直接滑倒底部 写50 100 200都可以 为了模拟人的行为 up是向上100个像素再往下滑倒底部,更真实的模拟人的行为 """
准备文件1:创建一个txt文件,用于存放需要搜索的字段,比如需要获取"美妆"和"生活"的数据,就写"美妆"和"生活"
分析一:当代码打印【等待登录】的时候就需要我们手动扫描登录,减少了逆向破解的繁琐操作
分析二:
分析三:
1、通过class="note-item" 定位界面上的每一个卡片数据,
2、通过局部定位定位大图,根据标签名定位img标签,并进行点击操作
分析四:
分析五:
for i in range(1, 3): try: print("开始执行-----try") except Exception as e: print("开始执行-----try", e) finally: print("滚轮滑动了一次") """ 开始执行-----try 滚轮滑动了一次 开始执行-----try 滚轮滑动了一次 """
分析六:
dic = { 'a': { 'b': 'i am b', 'c': 'i am c', 'd1': { 'd1': 'i am d1' } }, 'ab': { 'abb': 'i am abb' }, 'cc': [ {'c1': 'i am c1', 'c2': 'i am c2'} ] } # 数据解析函数 def find_first_key_value(data, target_key): # (1) 处理数据为字典的递归遍历 if isinstance(data, dict): # 判断是否是字典类型 # 便利字典的key和value for key, val in data.items(): # 如果key等于传递进来的target_key,就把对应的value返回出去 if key == target_key: return val # 递归遍历子元素 ret = find_first_key_value(val, target_key) if ret is not None: return ret # (2) 处理数据为列表的递归遍历 if isinstance(data, list): for item in data: # 遍历列表获取列表中的字典形式的列表元素 # 根据target_key提取对应的value值 ret = find_first_key_value(item, target_key) if ret is not None: return ret return None # 把json类型的数据转递作为第一个参数传递进去,把需要获取数据的key作为第二个参数, # 就可返回得到key对应的value ret_a = find_first_key_value(dic, "a") print(ret_a) # {'b': 'i am b', 'c': 'i am c', 'd1': {'d1': 'i am d1'}} ret_d1 = find_first_key_value(dic, "d1") print(ret_d1) # {'d1': 'i am d1'} ret_abb = find_first_key_value(dic, "abb") print(ret_abb) # i am abb ret_c = find_first_key_value(dic, "c") print(ret_c) # i am c ret_c2 = find_first_key_value(dic, "c2") print(ret_c2) # i am c2
分析七:爬取的结果如下
案例二:爬取某东某个商品的【全部评价】数据信息
import json # 导入时间等待库 import time # 导入ChromiumPage from DrissionPage import ChromiumPage # 数据写入到excel文件 from DataRecorder import Recorder recorder = Recorder("JD.xlsx") recorder.set.show_msg(False) def find_key_val(data, target_key, max_count=1): results = [] # (1) json字符串反序列化 先判断传递来的是字符串吗?是的话就反序列化转化为对象 if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError: return results def _search(data): # 最大数量限制 if len(results) == max_count: return # 处理数据为字典的递归遍历 if isinstance(data, dict): for key, val in data.items(): if key == target_key: results.append(val) if len(results) == max_count: return # 递归遍历子元素 _search(val) # (2) 处理数据为列表的递归遍历 if isinstance(data, list): for item in data: ret = _search(item) if ret is not None: return ret return None _search(data) return results def handler(page): try: # 监听某个固定的接口 page.listen.start("client.action") # 定位到”全部评价“这个按钮,并点击 page.ele('@text()=全部评价').click() # 因为在滚动滚轮的时候被监听的接口会不断的更新返回的数据, # 所以我们要写死循环不断的接听返回的数据,不然监听的数据只会返回一条评价 while 1: # 获取返回的数据 循环一次,获取一次评价数据 res = page.listen.wait() data = res.response.body print("data:::", data) # 解析数据 commentInfoList = find_key_val(data, "commentInfo", 11) for commentInfo in commentInfoList: map = { "用户名": commentInfo.get("userNickName"), "评论时间": commentInfo.get("commentDate"), "评论内容": commentInfo.get("commentData"), "评分": commentInfo.get("commentScore"), } recorder.add_data(map) recorder.record() # 滚动滚轮并等待 page.ele('@class=_rateListContainer_1ygkr_45').scroll.to_bottom() time.sleep(3) # 防止被封号 except Exception as e: print("错误失败了---error:::", e) def main(): # 创建浏览器驱动对象 page = ChromiumPage() # 访问需要爬取的商品的链接 page.get("https://item.jd.com/100146260385.html") input("等待登录") handler(page) # 程序的入口 main() """ 在Python中,字符串序列化是将数据结构(如字典、列表等)转换为字符串格式的过程, 而反序列化则是将字符串重新转换回原始数据结构。以下是主要实现方式: json模块(最常用): json.dumps()实现序列化,可将Python对象转为JSON格式字符串 json.loads()实现反序列化,将JSON字符串转回Python对象 处理中文时建议添加ensure_ascii=False参数避免Unicode编码 import json data = {"name": "张三", "age": 25} # 序列化 json_str = json.dumps(data, ensure_ascii=False) # 反序列化 restored_data = json.loads(json_str) """
分析一:如果给出的返回数据是字符串格式的,就需要先进行反序列化操作
import json dic = { 'a': { 'b': 'i am b', 'c': 'i am c', 'd1': { 'd1': 'i am d1' } }, 'ab': { 'abb': 'i am abb' }, 'data': [ {'commentInfo': {'name': 'i am name_1', 'userImgURL': 'i am userImgURL_1', 'iconType': 'i am iconType_1'}}, {'commentInfo': {'name': 'i am name_2', 'userImgURL': 'i am userImgURL_2', 'iconType': 'i am iconType_2'}}, {'commentInfo': {'name': 'i am name_3', 'userImgURL': 'i am userImgURL_3', 'iconType': 'i am iconType_3'}} ] } json_str = json.dumps(dic, ensure_ascii=False, indent=4) # print(type(json_str)) # print("json_str:", json_str) def find_key_val(data, target_key, max_count=1): results = [] # (1) json字符串反序列化 先判断传递来的是字符串吗?是的话就反序列化转化为对象 if isinstance(data, str): try: data = json.loads(data) print("进行了反序列化") except json.JSONDecodeError: return results def _search(data): # 最大数量限制 if len(results) == max_count: return # 处理数据为字典的递归遍历 if isinstance(data, dict): for key, val in data.items(): if key == target_key: results.append(val) if len(results) == max_count: return # 递归遍历子元素 _search(val) # (2) 处理数据为列表的递归遍历 if isinstance(data, list): for item in data: ret = _search(item) if ret is not None: return ret return None _search(data) return results ret_c2 = find_key_val(json_str, "commentInfo") print(ret_c2) """ 进行了反序列化 [{'name': 'i am name_1', 'userImgURL': 'i am userImgURL_1', 'iconType': 'i am iconType_1'}] """
分析二:定位到”全部评价“这个按钮