【爬虫】03 - 爬虫的基本数据存储

发布于:2025-07-19 ⋅ 阅读:(11) ⋅ 点赞:(0)

爬虫03 - 爬虫的数据存储

一:CSV数据存储

1:基本介绍

csv是一种轻量级、跨平台的文件格式,广泛用于数据交换、日志记录及中小规模数据存储

  • 无需依赖‌:直接通过Python标准库csv模块操作。
  • ‌人类可读‌:文本格式可直接用Excel或文本编辑器查看。
  • ‌高效灵活‌:适合快速导出、导入表格型数据。

csv格式如下:

  • 每行表示一条记录,字段以特定分隔符(默认为逗号)分隔。
  • 支持文本引用(如双引号)包裹含特殊字符的字段。
  • 首行可定义列名(表头)。
适用场景 说明
数据导出、备份 从数据库或API批量导出结构化数据
数据分析预处理 配合Pandas进行统计与可视化
跨系统数据交换 兼容Excel/R/MATLAB等工具

2:基本使用

python内置CSV模块,无需额外安装

基本读写csv实例

import csv

headers = ["id", "name", "email"]
data = [
    [1, "张三", "zhangsan@example.com"],
    [2, "李四", "lisi@test.org"],
    [3, "王五", "wangwu@demo.net"]
]

# 用open函数,第一个参数是文件名,第二个参数是模式(r -> read, w -> wirte),第三个参数是编码方式encoding -> utf-8
with open("output.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(headers)  # 写入表头
    writer.writerows(data)    # 批量写入数据
import csv

with open ("output.csv", "r", newline="", encoding="utf-8") as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

字典方式读写实例

# 写入字典数据
with open("dict_output.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["id", "name", "email"])
    writer.writeheader()
    writer.writerow({"id": 1, "name": "张三", "email": "zhangsan@example.com"})

# 读取为字典数据
with open("dict_output.csv", "r", newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row)
        print(row["id"], row["name"], row["email"])

自定义分割符和引号规则

# 使用分号分隔,双引号包裹所有字段
with open("custom.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f, delimiter=";", quoting=csv.QUOTE_ALL)
    writer.writerow(["id", "name"])
    writer.writerow([1, "张三"])

3:高级使用

含特殊字符的字段‌

字段包含逗号、换行符等破坏CSV结构的字符 -> 使用引号包裹字段,并配置csv.QUOTE_MINIMALcsv.QUOTE_NONNUMERIC

data = [
    [4, "Alice, Smith", "alice@example.com"],
    [5, "Bob\nJohnson", "bob@test.org"]
]

with open("special_chars.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
    writer.writerows(data)

嵌套数据

import json

data = [
    {"id": 1, "info": '{"age": 30, "city": "北京"}'},
    {"id": 2, "info": '{"age": 25, "city": "上海"}'}
]

# 写入嵌套JSON
with open("nested_data.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["id", "info"])
    writer.writeheader()
    writer.writerows(data)

# 读取并解析JSON
with open("nested_data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        info = json.loads(row["info"])
        print(f"ID: {row['id']}, 城市: {info['city']}")

复杂情况的处理

大文件 -> 逐行读取 & 分批读取

复杂数据处理 -> pandas

4:使用示例

# 读取百度图书,并将结果放入到csv文件中,方便后面使用pandas进行数据分析
import csv
import requests
from bs4 import BeautifulSoup

url = "https://book.douban.com/top250"
headers = {"User-Agent": "Mozilla/5.0"}

response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")

books = []
for item in soup.select("tr.item"):
    title = item.select_one(".pl2 a")["title"]
    score = item.select_one(".rating_nums").text
    books.append({"title": title, "score": score})

# 写入CSV
with open("douban_books.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["title", "score"])
    writer.writeheader()
    writer.writerows(books)

print("数据已存储至 douban_books.csv")

二:JSON数据存储

使用场景 说明
配置文件存储 程序参数、路径配置等(如config.json)
API数据交互 前后端通过JSON格式传递请求与响应
结构化日志记录 记录带元数据的操作日志,便于后续分析

1:基础json读写

dump -> 写入到json, load -> 从json读取

import json

data = {
    "project": "数据分析平台",
    "version": 2.1,
    "authors": ["张三", "李四"],
    "tags": {"python": 5, "database": 3}
}

with open("data.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=2)  # 禁用ASCII转义,缩进2空格


with open("data.json", "r", encoding="utf-8") as f:
    loaded_data = json.load(f)
print(loaded_data["tags"]["python"])  # 输出:5

2:字符串和对象的转换

dumps -> json转成字符串,loads -> 字符串转成json

data_str = json.dumps(data, ensure_ascii=False)
print(type(data_str))  # <class 'str'>

json_str = '{"name": "王五", "age": 30}'
obj = json.loads(json_str)
print(obj["age"])  # 输出:30

3:日期和时间的特殊处理

JSON默认不支持Python的datetime对象,需自定义转换逻辑

from datetime import datetime

def datetime_encoder(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()  # 转为ISO格式字符串
    raise TypeError("类型无法序列化")

data = {"event": "发布会", "time": datetime.now()}

# 序列化时指定自定义编码函数
json_str = json.dumps(data, default=datetime_encoder, ensure_ascii=False)
print(json_str)  # {"event": "发布会", "time": "2024-07-20T15:30:45.123456"}

4:自定义序列化

class User:
    def __init__(self, name, level):
        self.name = name
        self.level = level

user = User("赵六", 3)

# 方法1:手动转换字典
user_dict = {"name": user.name, "level": user.level}
json.dumps(user_dict)

# 方法2:使用__dict__(需类属性均为可序列化类型)
json.dumps(user.__dict__)

5:高级使用

跳过特定的字段

def filter_encoder(obj):
    if "password" in obj:
        del obj["password"]
    return obj

user_data = {"name": "张三", "password": "123456"}
json.dumps(user_data, default=filter_encoder)  # {"name": "张三"}

取消缩进和空格(紧凑输出)

json.dumps(data, separators=(",", ":"))  # 输出最简格式

ujson代替(C实现的JSON超高速库,API完全兼容)

import ujson as json  # 替换标准库
json.dumps(data)      # 速度提升3-10倍

大文件的读取

# 逐行读取JSON数组文件(每行为独立JSON对象)
with open("large_data.json", "r", encoding="utf-8") as f:
    for line in f:
        item = json.loads(line)
        process(item)

三:数据库数据存储

1:基础部分学习

各种数据库的数据存储请看这里

2:补充说明

可以使用dbutils进行mysql的连接池管理

import pymysql
from dbutils.pooled_db import PooledDB

pool = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=2, # 链接池中最多闲置的链接,0和None不限制
    maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享
    blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待
    ping=0, # ping MySQL服务端,检查是否服务可用
    host='127.0.0.1', # 数据库服务器的IP地址
    port=3306, # 数据库服务端口
    user='root', # 数据库用户名
    password='314159', # 数据库密码
    database='test', # 数据库名称
    charset='utf8' # 数据库编码
)


# 创建游标对象
conn = pool.connection()
cursor = conn.cursor()

# 有了游标对象就能操作了,方式都是 -> cursor.???(sql, args)
sql = "select * from user where id = %s"
cursor.execute(sql, [1]) # 执行SQL语句 -> select * from user where id = 1
# 获取结果
print(cursor.fetchone())

mongodb的管道操作

from datetime import datetime

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

# 建立连接(默认连接池大小100)
client = MongoClient(
    host="localhost",
    port=27017,
)

try:
    # 心跳检测
    client.admin.command('ping')
    print("Successfully connected to MongoDB!")
except ConnectionFailure:
    print("Server not available")

# 选择数据库与集合(自动懒创建)
db = client["ecommerce"]
products_col = db["products"]

# 插入文档(自动生成_id)
product_data = {
    "name": "Wireless Mouse",
    "price": 49.99,
    "tags": ["electronics", "computer"],
    "stock": {"warehouse_A": 100, "warehouse_B": 50},
    "last_modified": datetime.now()
}
insert_result = products_col.insert_one(product_data)
print(f"Inserted ID: {insert_result.inserted_id}")

# 查询文档(支持嵌套查询)
query = {"price": {"$lt": 60}, "tags": "electronics"} # 查询价格小于60 并且 tags是electronics 的文档
projection = {"name": 1, "price": 1}  # 类似SQL SELECT, 只返回name和price字段
cursor = products_col.find(query, projection).limit(5)
for doc in cursor:
    print(doc)

# 更新文档(原子操作)
update_filter = {"name": "Wireless Mouse"}
update_data = {"$inc": {"stock.warehouse_A": -10}, "$set": {"last_modified": datetime.now()}}
update_result = products_col.update_one(update_filter, update_data)
print(f"Modified count: {update_result.modified_count}")

# 删除文档
delete_result = products_col.delete_many({"price": {"$gt": 200}})
print(f"Deleted count: {delete_result.deleted_count}")

# 管道操作
# 统计各仓库库存总量
pipeline = [
    {"$unwind": "$stock"},  # 阶段一:展开嵌套文档
    {"$group": { # 阶段二:分组聚合
        # 分组字段(分段的字段是stock.warehouse)记为_id
        # 聚合字段为 sum(对每一个分组的stock.quantity进行sum)
        "_id": "$stock.warehouse",
        "total_stock": {"$sum": "$stock.quantity"}
    }},
    # 阶段三:排序, 根据分组条件降序排序
    {"$sort": {"total_stock": -1}}
]
results = products_col.aggregate(pipeline)
for res in results:
    print(f"Warehouse {res['_id']}: {res['total_stock']} units")

mongodb大数据的处理

from pymongo import MongoClient
from faker import Faker
import time

client = MongoClient('mongodb://localhost:27017/')
db = client['bigdata']
collection = db['user_profiles']

fake = Faker()
batch_size = 5000  # 分批次插入减少内存压力

def generate_batch(batch_size):
    return [{
        "name": fake.name(),
        "email": fake.email(),
        "last_login": fake.date_time_this_year()
    } for _ in range(batch_size)]

start_time = time.time()
for _ in range(200):  # 总数据量100万
    batch_data = generate_batch(batch_size)
    collection.insert_many(batch_data, ordered=False)  # 无序插入提升速度
    print(f"已插入 {(i+1)*batch_size} 条数据")

print(f"总耗时: {time.time()-start_time:.2f}秒") 


# 分析电商订单数据(含嵌套结构)
pipeline = [
    {"$unwind": "$items"},  # 展开订单中的商品数组
    {"$match": {"status": "completed"}},  # 筛选已完成订单
    {"$group": {
        "_id": "$items.category",
        "total_sales": {"$sum": "$items.price"},
        "avg_quantity": {"$avg": "$items.quantity"},
        "top_product": {"$max": "$items.name"}
    }},
    {"$sort": {"total_sales": -1}},
    {"$limit": 10}
]

orders_col = db["orders"]
results = orders_col.aggregate(pipeline)

for res in results:
    print(f"品类 {res['_id']}: 销售额{res['total_sales']}元")

性能优化的关键措施 -> 添加索引 & bluk操作

# 创建索引(提升查询速度)
products_col.create_index([("name", pymongo.ASCENDING)], unique=True)
products_col.create_index([("price", pymongo.ASCENDING), ("tags", pymongo.ASCENDING)])

# 批量写入提升吞吐量
bulk_ops = [
    pymongo.InsertOne({"name": "Keyboard", "price": 89.99}),
    pymongo.UpdateOne({"name": "Mouse"}, {"$set": {"price": 59.99}}),
    pymongo.DeleteOne({"name": "Earphones"})
]
results = products_col.bulk_write(bulk_ops)

高可用架构配置

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

# MongoDB 副本集连接字符串
# replicaSet=rs0 是副本集的名称
uri = "mongodb://192.127.1.1:27017,192.127.1.2:27017,192.127.1.3:27017/mydb?replicaSet=rs0"

# 创建 MongoClient 实例
client = MongoClient(uri)

# 测试连接
try:
    # 通过执行一个简单的命令来验证连接是否成功
    client.admin.command('ping')
    print("成功连接到 MongoDB 副本集!")
except ConnectionFailure as e:
    print(f"无法连接到 MongoDB 副本集: {e}")

网站公告

今日签到

点亮在社区的每一天
去签到