python读取excel数据写入mysql

发布于:2024-05-15 ⋅ 阅读:(96) ⋅ 点赞:(0)

概述

业务中有时会需要解析excel中的数据,按照要求处理后,写入到db中;
python处理这个正好简便快捷

demo

没有依赖就 pip install pymysql一下

import pymysql
from pymysql.converters import escape_string
from openpyxl import load_workbook
from Snowflake import Snowflake


def load_excel_data(snowflake):
    # 连接到MySQL数据库
    mydb = pymysql.connect(
        host="xxx.xxx.xxx.xxx",
        port=3306,
        user="xxx",
        passwd="xxx",
        db="xxxx"
    )

    # 打开Excel文件
    wb = load_workbook(filename=r'D:\xx\test.xlsx')
    sheet = wb.active

    # 获取表头
    header = [cell.value for cell in sheet[1]]

    column_header = []
	# 表头转换列名
    for excel_head_name in header:
        if '11' == excel_head_name:
            column_header.append("xx")
        elif '22' == excel_head_name:
            column_header.append("xx")
        elif '33' == excel_head_name:
            column_header.append("xx")
        elif '1122' == excel_head_name:
            column_header.append("xx")


    # 遍历每一行数据,并将其插入到数据库中
    cursor = mydb.cursor()
    count = 0

    defaultUser = "'xxx'"

    for row in sheet.iter_rows(min_row=2, values_only=True):
        cId = snowflake.next_id()

        date = row[0]
        # datetime 转 date
        date = date.date()

        a2 = row[1]
        reason = row[2]
        detail = row[3]
		
		# \'%s\' 将含有特殊内容的字符串整个塞进去
        sql = f"INSERT INTO test_table (id, store_id, num, handler, create_by, update_by, date, a2, reason, detail) VALUES ({cId}, 3, 0, 43, {defaultUser}, {defaultUser}, \'%s\', \'%s\', \'%s\', \'%s\')" % (date, self_escape_string(a2), self_escape_string(reason), self_escape_string(detail))

        print(sql)

        # cursor.execute(sql, row)
        cursor.execute(sql)
        count += 1
        print(f"正在插入{count}条数据")

    # 提交更改并关闭数据库连接
    mydb.commit()
    cursor.close()
    mydb.close()

# 将字符串中的特殊字符转义
# python中没有null只有None
def self_escape_string(data):
    if data is None:
        return ""
    return escape_string(data)



if __name__ == '__main__':
    worker_id = 1
    data_center_id = 1
    snowflake = Snowflake(worker_id, data_center_id)

    load_excel_data(snowflake)

雪花id生成主键

import time
import random


class Snowflake:
    def __init__(self, worker_id, data_center_id):
        ### 机器标识ID
        self.worker_id = worker_id
        ### 数据中心ID
        self.data_center_id = data_center_id
        ### 计数序列号
        self.sequence = 0
        ### 时间戳
        self.last_timestamp = -1

    def next_id(self):
        timestamp = int(time.time() * 1000)
        if timestamp < self.last_timestamp:
            raise Exception(
                "Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp))
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & 4095
            if self.sequence == 0:
                timestamp = self.wait_for_next_millis(self.last_timestamp)
        else:
            self.sequence = 0
        self.last_timestamp = timestamp
        return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence



    def next_id(self):
        timestamp = int(time.time() * 1000)
        if timestamp < self.last_timestamp:
            raise Exception("Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp))
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & 4095
            if self.sequence == 0:
                timestamp = self.wait_for_next_millis(self.last_timestamp)
        else:
            self.sequence = 0
        self.last_timestamp = timestamp
        return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence

    def wait_for_next_millis(self, last_timestamp):
        timestamp = int(time.time() * 1000)
        while timestamp <= last_timestamp:
            timestamp = int(time.time() * 1000)
        return timestamp

网站公告

今日签到

点亮在社区的每一天
去签到