【爬虫】爬取股票历史K线数据写入数据库(三)

发布于:2024-05-10 ⋅ 阅读:(18) ⋅ 点赞:(0)

在这里插入图片描述
前几天有写过两篇:
【爬虫】爬取A股数据写入数据库(二)
【爬虫】爬取A股数据写入数据库(一)

现在继续完善,分析及爬取股票的历史K线数据通过ORM形式批量写入数据库。

2024/05,本文主要内容如下:

  1. 对东方财富官网进行分析,并作数据爬取,使用python,使用pip install requests 模拟http数据请求,获取数据。
  2. 将爬取的数据写入通过 sqlalchemy ORM 写入 sqlite数据库。
  3. 记录爬取股票的基本信息,如果库中已存在某个股票代码,则进行更新。
  4. 后续计划:会不断完善,最终目标是做出一个简单的股票查看客户端。
  5. 本系列所有源码均无偿分享,仅作交流无其他,供大家参考。
    python依赖环境如下:
pip install requests==2.31.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pandas==2.2.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install jsonpath==0.8.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install sqlalchemy==2.0.30 -i https://pypi.tuna.tsinghua.edu.cn/simple

1. 对东方财富官网历史K线数据分析

网页地址:https://quote.eastmoney.com/sz002224.html?jump_to_web=true#fullScreenChart
通过分析网页,发现https://push2his.eastmoney.com/api/qt/stock/kline/get?请求后面带着一些参数即可以获取到相应数据,我们不断调试,模拟这类请求即可。分析过程如下图所示,F12调出调试框,不断尝试:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2. 爬取数据代码逻辑

如下即爬取数据的可运行代码,复制后直接能跑:

import pandas as pd
from typing import List
import requests
from jsonpath import jsonpath

class CustomedSession(requests.Session):
    def request(self, *args, **kwargs):
        kwargs.setdefault('timeout', 60)
        return super(CustomedSession, self).request(*args, **kwargs)
session = CustomedSession()
adapter = requests.adapters.HTTPAdapter(pool_connections = 50, pool_maxsize = 50, max_retries = 5)
session.mount('http://', adapter)
session.mount('https://', adapter)

# 请求地址
QEURY_URL = 'http://push2his.eastmoney.com/api/qt/stock/kline/get'
# HTTP 请求头
EASTMONEY_REQUEST_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; Touch; rv:11.0) like Gecko',
    'Accept': '*/*',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
}

"""
获取单只股票的历史K线数据
"""
def get_k_history_data(
    stock_codes: str, # 股票代码
    beg: str = '19000101', # 开始日期,19000101,表示 1900年1月1日
    end: str = '20500101', # 结束日期
    klt: int = 101,  # 行情之间的时间间隔 1、5、15、30、60分钟; 101:日; 102:周; 103:月
    fqt: int = 1, # 复权方式,0 不复权 1 前复权 2 后复权
):
    try:
        # 生成东方财富专用的secid
        if stock_codes[:3] == '000':   # 沪市指数
            secid = f'1.{stock_codes}'
        elif stock_codes[:3] == '399': # 深证指数
            secid = f'0.{stock_codes}'

        if stock_codes[0] != '6':  # 沪市股票
            secid = f'0.{stock_codes}'
        else:
            secid = f'1.{stock_codes}' # 深市股票
            
        EASTMONEY_KLINE_FIELDS = {'f51': '日期', 'f52': '开盘', 'f53': '收盘', 'f54': '最高', 'f55': '最低',
                                'f56': '成交量', 'f57': '成交额', 'f58': '振幅', 'f59': '涨跌幅', 'f60': '涨跌额', 'f61': '换手率',}
        fields = list(EASTMONEY_KLINE_FIELDS.keys())
        # columns = list(EASTMONEY_KLINE_FIELDS.values())
        fields2 = ",".join(fields)
        params = (
            ('fields1', 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13'),
            ('fields2', fields2),
            ('beg', beg),
            ('end', end),
            ('rtntype', '6'),
            ('secid', secid),
            ('klt', f'{klt}'),
            ('fqt', f'{fqt}'),
        )
        code = secid.split('.')[-1]
        json_response = session.get(QEURY_URL, headers=EASTMONEY_REQUEST_HEADERS, params=params, verify=False).json()
        data_list = []
        klines: List[str] = jsonpath(json_response, '$..klines[:]')
        if not klines:
            return data_list
        
        name = json_response['data']['name']
        rows = [kline.split(',') for kline in klines]
        # 0           1      2     3      4      5        6           7        8        9       10
        # 日期,       开盘,   收盘, 最高,  最低,   成交量,  成交额,      振幅,    涨跌幅,   涨跌额, 换手率
        # 2024-05-08, 4.89,  4.82, 4.91,  4.80,  61811,  29955564.00,  2.25,  -1.23,    -0.06,  0.98
        # data_list = [{'code': '002224', 'name': '三力士', 'time': '2024-05-08', 'info': '0,1,2,3,4,5,6,7,8,9,10'}]
        for row in rows:
            time, open, close, high, low, vol, quota, mm, change, range, tun = row
            line_str = f'{open},{close},{high},{low},{vol},{quota},{mm},{change},{range},{tun}'
            data_list.append({'id': None,'code': code, 'name': name, 'time': time, 'info': line_str})
        
        return data_list
    except Exception as e:
        print('get_k_history_data error-----------------------', str(e))
        return data_list

if __name__ == "__main__":
    data = get_k_history_data(stock_codes='002224', beg='20240507', end='20500101')
    print('----', data)

3. 将爬取的数据通过ORM形式写入数据库

数据库表设计:

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Index, Table
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemy.schema import UniqueConstraint
from datetime import datetime

# 声明一个基类,所有的ORM类都将继承自这个基类
DBBase = declarative_base()

# 创建引擎
engine = create_engine('sqlite:///a.db',  echo=False)
# 绑定引擎
Session = sessionmaker(bind=engine)
# 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象conn
db_session = scoped_session(Session)

'''
股票K线信息表
0           1      2     3      4      5        6           7        8        9       10
日期,       开盘,   收盘, 最高,  最低,   成交量,  成交额,      振幅,    涨跌幅,   涨跌额, 换手率
2024-05-08, 4.89,  4.82, 4.91,  4.80,  61811,  29955564.00,  2.25,  -1.23,    -0.06,  0.98
data_list = [{'code': '002224', 'name': '三力士', 'time': '2024-05-08', 'info': '1,2,3,4,5,6,7,8,9,10'}]
'''
class tb_k(DBBase):
    __tablename__ = 'tb_k'
    id = Column(Integer, primary_key=True, autoincrement=True)
    code = Column(String, nullable=False, comment="股票代码")
    name = Column(String, comment="股票名称")
    time = Column(String, comment="时间")
    info = Column(String, comment="开盘,收盘,最高,最低,成交量,成交额,振幅,涨跌幅,涨跌额,换手率")
    __table_args__ = (
        Index('unique_index', 'code', 'time', unique=True),
    )
# 创建表, 创建所有class xx(DBBase)
DBBase.metadata.create_all(engine)

写入数据库的逻辑:

# 查询某个股票最近更新K线的日期
def query_latast_K_data(code):
    result = db_session.query(tb_k).filter(tb_k.code==code).order_by(desc(tb_k.time)).first()
    if result is None:
        return '19000101'
    return str(result.time).replace('-','')

# 批量插入或更新某只股票的历史K线数据
def insert_or_update_stock_k(data_list):
    if len(data_list) <= 0:
        return
    try:
        db_session.bulk_insert_mappings(tb_k, data_list)
        db_session.commit()
    except Exception as e:
        print('insert_or_update_stock_k error=', str(e))

4. 整体逻辑流程

步骤:

  1. 输入某个股票代码爬取该股票的历史K线数据
  2. 将返回结果组成数组,批量写入数据库
  3. 每次写入前,会根据该股票代码,查询最新的同步日期,从该日期开始进行追加同步
# 更新某个股票的最新日K线数据到数据库
def update_k_info_db(code='002224'):
    # 根据 code 查询库中已存在的某个股票日K线数据的最近日期,作为开始日期,向后获取
    beg_time = db_orm.query_latast_K_data(code)
    data_list = stock.get_k_history_data(stock_codes=code, beg=beg_time, end='20500101')
    
    if len(data_list) > 0:
        db_orm.insert_or_update_stock_k(data_list)
        
        
if __name__ == "__main__":
    update_base_info_db()

最终结果保存在 a.db中,例如:
在这里插入图片描述
更多内容可关注我,后续源码包均在上面回复下载:
【爬虫】爬取A股数据系列工具