【爬虫】爬取A股数据写入数据库(二)

发布于:2024-05-09 ⋅ 阅读:(21) ⋅ 点赞:(0)

在这里插入图片描述
前几天有写过一篇 【爬虫】爬取A股数据写入数据库(一),现在继续完善下,将已有数据通过ORM形式批量写入数据库。
2024/05,本文主要内容如下:

  1. 对东方财富官网进行分析,并作数据爬取,使用python,使用pip install requests 模拟http数据请求,获取数据。
  2. 将爬取的数据写入通过 sqlalchemy ORM 写入 sqlite数据库。
  3. 记录爬取股票的基本信息,如果库中已存在某个股票代码,则进行更新。
  4. 后续计划:会不断完善,最终目标是做出一个简单的股票查看客户端。
  5. 本系列所有源码均无偿分享,仅作交流无其他,供大家参考。
    python依赖环境如下:
conda create --name xuan_gu python=3.9 --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda remove --name xuan_gu --all
conda activate xuan_gu

#pip install PyQt5==5.15.10 -i https://pypi.tuna.tsinghua.edu.cn/simple
#pip install pyqtgraph==0.13.6 -i https://pypi.tuna.tsinghua.edu.cn/simple
#python -m pyqtgraph.examples 查看图形化的pyqtgraph示例

pip install requests==2.31.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pandas==2.2.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install jsonpath==0.8.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install sqlalchemy==2.0.30 -i https://pypi.tuna.tsinghua.edu.cn/simple

1. 对东方财富官网的分析

东方财富网页地址:https://data.eastmoney.com/gdhs/
通过分析网页,发现https://datacenter-web.eastmoney.com/api/data/v1/get?请求后面带着一些参数即可以获取到相应数据,我们不断调试,模拟这类请求即可。分析过程如下图所示,F12调出调试框,不断尝试:
| ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/dcd5a9558b7a49e29f834d9fa0cebad4.png
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2. 爬取数据代码逻辑

如下即爬取数据的可运行代码,复制后直接能跑:

import pandas as pd
from typing import List
import requests

class CustomedSession(requests.Session):
    def request(self, *args, **kwargs):
        kwargs.setdefault('timeout', 60)
        return super(CustomedSession, self).request(*args, **kwargs)
session = CustomedSession()
adapter = requests.adapters.HTTPAdapter(pool_connections = 50, pool_maxsize = 50, max_retries = 5)
session.mount('http://', adapter)
session.mount('https://', adapter)

# 请求地址
QEURY_URL = 'http://datacenter-web.eastmoney.com/api/data/v1/get'
# HTTP 请求头
EASTMONEY_REQUEST_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; Touch; rv:11.0) like Gecko',
    'Accept': '*/*',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
    # 'Referer': 'http://quote.eastmoney.com/center/gridlist.html',
}

# 请求返回值过滤
RESULT_FIELDS = {
    'SECURITY_CODE': '股票代码',
    'SECURITY_NAME_ABBR': '股票名称',
    'END_DATE': '本次股东户数统计截止日',
    'PRE_END_DATE': '上次股东户数统计截止日',
    'INTERVAL_CHRATE': '区间涨跌幅',
    'AVG_MARKET_CAP': '户均持股市值',
    'AVG_HOLD_NUM': '户均持股数量',
    'TOTAL_MARKET_CAP': '总市值',
    'TOTAL_A_SHARES': '总股本',
    'HOLD_NOTICE_DATE': '公告日期',
    'HOLDER_NUM': '本次股东户数',
    'PRE_HOLDER_NUM': '上次股东户数',
    'HOLDER_NUM_CHANGE': '股东户数增减',
    'HOLDER_NUM_RATIO': '股东户数较上期变化百分比',  
    'f2': '最新价',
    'f3': '涨跌幅百分比',
}

"""
获取沪深A股市场最新公开的股东数目变化情况: 当作获取所有股票
"""
def get_latest_holder_number() -> pd.DataFrame:
    # 请求页码
    QEURY_PAGE = 1
    PAGE_COUNT = 100
    dfs: List[pd.DataFrame] = []
    while 1:
        if QEURY_PAGE > PAGE_COUNT:
            break
        # 请求参数
        QUERY_PARAM = [
            ('sortColumns', 'HOLD_NOTICE_DATE,SECURITY_CODE'),
            ('sortTypes', '-1,-1'),
            ('pageSize', 500),
            ('pageNumber', QEURY_PAGE),
            ('columns', 'SECURITY_CODE,SECURITY_NAME_ABBR,END_DATE,INTERVAL_CHRATE,AVG_MARKET_CAP,AVG_HOLD_NUM,TOTAL_MARKET_CAP,TOTAL_A_SHARES,HOLD_NOTICE_DATE,HOLDER_NUM,PRE_HOLDER_NUM,HOLDER_NUM_CHANGE,HOLDER_NUM_RATIO,END_DATE,PRE_END_DATE',),
            ('quoteColumns', 'f2,f3'),
            ('source', 'WEB'),
            ('client', 'WEB'),
            ('reportName', 'RPT_HOLDERNUMLATEST'),
        ]
        params = tuple(QUERY_PARAM)
        response = session.get(QEURY_URL, headers=EASTMONEY_REQUEST_HEADERS, params=params)
        resultJson = response.json()
        PAGE_COUNT = resultJson.get('result').get('pages')
        print('json len=', len(str(resultJson)), 'page count=', PAGE_COUNT, 'page number=', QEURY_PAGE)
        if PAGE_COUNT is None:
            break
        data = resultJson.get('result').get('data')
        if data is None:
            break
        df = pd.DataFrame(data)
        df = df.rename(columns=RESULT_FIELDS)[RESULT_FIELDS.values()]
        dfs.append(df)
        QEURY_PAGE += 1
    if len(dfs) == 0:
        df = pd.DataFrame(columns=RESULT_FIELDS.values())
        return df
    df = pd.concat(dfs, ignore_index=True)
    return df

if __name__ == "__main__":
    data = get_latest_holder_number()
    print(data)

调用如上函数即可:
在这里插入图片描述

3. 将爬取的数据通过ORM形式写入数据库

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Index
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.orm import scoped_session
from datetime import datetime

# 声明一个基类,所有的ORM类都将继承自这个基类
Base = declarative_base()

# 创建引擎
engine = create_engine('sqlite:///a.db',  echo=False)
# 绑定引擎
Session = sessionmaker(bind=engine)
# 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象conn
session = scoped_session(Session)

# 股票基础信息表
class stock_base_info(Base):
    __tablename__ = 'stock_base_info'
    SECURITY_CODE = Column(String, primary_key=True, index=True, nullable=False, comment="股票代码")
    SECURITY_NAME_ABBR = Column(String, nullable=False, comment="股票名称")
    TOTAL_MARKET_CAP = Column(Float, comment="总市值")
    TOTAL_A_SHARES = Column(Float, comment="总股本")
    HOLD_NOTICE_DATE = Column(String, comment="公告日期")
    HOLDER_NUM = Column(Integer, comment="本次股东户数")
    HOLDER_NUM_RATIO = Column(String, comment="股东户数较上期变化百分比")
    PRE_HOLDER_NUM = Column(Integer, comment="上次股东户数")
    f2 = Column(String, comment="股票价格")
    last_updated = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment="最后更新时间")
    
    __table_args__ = (
        Index('idx_SECURITY_CODE_index', SECURITY_CODE, unique=True),
    )
    
# 创建表, 创建所有class xx(Base)
Base.metadata.create_all(engine)  


def insert_or_update_stock_info(net_list):
    all_instances = session.query(stock_base_info).all()
    db_list = []
    for v in all_instances:
        db_list.append({
            'SECURITY_CODE': v.SECURITY_CODE,
            'SECURITY_NAME_ABBR': v.SECURITY_NAME_ABBR,
            'TOTAL_MARKET_CAP': v.TOTAL_MARKET_CAP,
            'TOTAL_A_SHARES': v.TOTAL_A_SHARES,
            'HOLD_NOTICE_DATE': v.HOLD_NOTICE_DATE,
            'HOLDER_NUM': v.HOLDER_NUM,
            'HOLDER_NUM_RATIO': v.HOLDER_NUM_RATIO,
            'PRE_HOLDER_NUM': v.PRE_HOLDER_NUM,
            'f2': v.f2,
        })

    # 查询出库中所有的数据 db_list; 从爬取的数据 net_list 中找到库中已有的数据进行更新 形成 db_map 并批量更新
    # 将 net_list 中不在 db_list 中的数据,形成 net_map 并批量插入
    db_map, not_exist_map = {}, {}
    for v in db_list:
        db_map[v['SECURITY_CODE']] = v
        
    for item in net_list:
        code = item['SECURITY_CODE']
        if code in db_map:
            db_map[code].update(item)
        else:     
            not_exist_map[code] = item
    update_result = list(db_map.values())
    insert_result = list(not_exist_map.values())
    if len(update_result) > 0:
        session.bulk_update_mappings(stock_base_info, update_result)
    if len(insert_result) > 0:
        session.bulk_insert_mappings(stock_base_info, insert_result)
    
    session.commit()
    
if __name__ == "__main__":
    pass

4. 整体逻辑流程

步骤:

  1. 爬取数据得到返回结果
  2. 将返回结果组成数组,并写入数据库
  3. 对于库中已存在的信息根据 股票代码 进行批量更新,对于不存在的进行批量插入
import stock
import db_orm

def update_base_info_db():
    data_df = stock.get_latest_holder_number()
    print('获取的股票数量=', data_df.shape)
    net_list = []
    for index, row in data_df.iterrows():
        code = row['股票代码']
        name = row['股票名称']
        cap = row['总市值']
        shares = row['总股本']
        data = row['公告日期']
        num = row['本次股东户数']
        pre_num = row['上次股东户数']
        ratio = row['股东户数较上期变化百分比']
        f2 = row['最新价'] # float类型
        net_list.append({
            'SECURITY_CODE': code,
            'SECURITY_NAME_ABBR': name,
            'TOTAL_MARKET_CAP': cap,
            'TOTAL_A_SHARES': shares,
            'HOLD_NOTICE_DATE': data,
            'HOLDER_NUM': num,
            'HOLDER_NUM_RATIO': ratio,
            'PRE_HOLDER_NUM': pre_num,
            'f2': str(f2),
        })
    if len(net_list) > 0:
        db_orm.insert_or_update_stock_info(net_list=net_list)

if __name__ == "__main__":
    update_base_info_db()

最终结果保存在 a.db中,例如:
在这里插入图片描述
更多内容可关注我,后续源码包均在上面回复下载:
【爬虫】爬取A股数据系列工具