该段代码主要实现从数据库和 Excel 文件中读取数据,并对两者进行字段匹配,最终找出 Excel 中未匹配到的数据库记录。功能如下:
- [sqlSelect()]:连接 MySQL 数据库并查询比价单及其商品信息。
- [BiJiaDaoChu()]:调用外部 API 导出 Excel 文件(注释中未被调用)。
- [read_excel_to_dict()]:将 Excel 文件读取为字典列表。
- [normalize_value()]:统一不同格式的值(如时间、空值、数字转字符串),便于后续比较。
- [match_list_to_list()]:根据字段映射匹配两个字典列表中的条目,允许时间差 2 秒。
- [find_unmatched_in_list_b()]:找出在 Excel 中没有匹配到的数据库记录。
最终输出:打印出数据库中在 Excel 中未找到匹配项的数据。
import json
import pandas as pd
import pymysql
import requests
import datetime
import numpy as np
from pymysql import Timestamp
def sqlSelect():
global result_sku
db = pymysql.Connect(
host='IP',
port=3306,
user='user',
password='password',
db='test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor
)
cur = db.cursor()
sql = 'select * from aaa a join bbb b on a.id=b.sheet_id where a.status in(1,2)'
cur.execute(sql)
result_sku = cur.fetchall()
cur.close()
db.close()
# print(result_sku)
return result_sku
def BiJiaDaoChu():
data = {
"askSheetCode": None
}
headers = {
"Authorization": "440d9854d7434d1f998081abc6785fab",
"Content-Type": "application/json"
}
url = 'http:test/export'
response = requests.post(url=url, data=json.dumps(data), headers=headers)
# 判断响应类型
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
try:
print(response.json()) # 尝试解析 JSON
except json.JSONDecodeError:
print("无法解析 JSON 响应")
elif 'application/octet-stream' in content_type or 'application/vnd.ms-excel' in content_type:
with open("../data/exported_data_bijia_test.xls", "wb") as f:
f.write(response.content) # 保存 Excel 文件
print("文件已保存为 exported_data_bijia_test.xls")
else:
print("未知响应类型:", content_type)
print(response.text)
def read_excel_to_dict(file_path):
"""
读取 Excel 文件并将数据以字典的形式返回。
:param file_path: Excel 文件的路径
:return: 包含数据的字典列表
"""
try:
# 读取 Excel 文件
df = pd.read_excel(file_path)
# 将 DataFrame 转换为字典列表
data = df.to_dict(orient='records')
# print("Excel 数据读取成功", data)
return data
except Exception as e:
print(f"读取 Excel 文件时出错: {e}")
return []
def normalize_value(value):
# 处理空值
if value is None or (isinstance(value, float) and np.isnan(value)) or value == '':
return None
# 统一时间格式为 datetime.datetime
if isinstance(value, pd.Timestamp):
return value.to_pydatetime()
elif isinstance(value, datetime.datetime):
return value
elif isinstance(value, datetime.date):
return datetime.datetime.combine(value, datetime.time())
# 统一数字类型为字符串
if isinstance(value, (int, float)):
return str(int(value)) if isinstance(value, float) else str(value)
return value
def match_list_to_list(list_a, list_b, field_mapping):
"""
比较两个字典列表,返回匹配成功的对。
增加字段级调试打印 + 时间字段允许最多相差 2 秒。
"""
matched_pairs = []
for a_item in list_a:
for b_item in list_b:
matched = True
for key_a, key_b in field_mapping.items():
val_a = normalize_value(a_item.get(key_a))
val_b = normalize_value(b_item.get(key_b))
# 如果都是时间类型,允许最多差 2 秒
if isinstance(val_a, datetime.datetime) and isinstance(val_b, datetime.datetime):
diff_seconds = abs((val_a - val_b).total_seconds())
if diff_seconds <= 2:
continue # 允许匹配成功
elif val_a != val_b:
#print(f"[字段不匹配] {key_a}({val_a!r}) vs {key_b}({val_b!r})")
matched = False
break
if matched:
matched_pairs.append((a_item, b_item))
#print("[匹配成功] 找到一对匹配项")
break # 可选:找到第一个就停止
return matched_pairs
def find_unmatched_in_list_b(matched_pairs, list_a, list_b):
"""
找出 list_b 中未匹配到的项。
:param matched_pairs: 已匹配成功的 (a_item, b_item) 列表
:param list_a: Excel 数据列表
:param list_b: DB 数据列表
:return: 未匹配的 list_b 数据
"""
# 提取所有已匹配的 b_item
matched_b_items = [b_item for a_item, b_item in matched_pairs]
# 找出未匹配的
unmatched_b_items = [item for item in list_b if item not in matched_b_items]
return unmatched_b_items
file_path = "../data/exported_data_bijia_test.xls"
excel_data = read_excel_to_dict(file_path)
list_a = excel_data
list_b = sqlSelect()
mapping = {'需求比价单号': 'sheet_sn', '业务分类': 'business_class', '报价开始时间': 'quotation_start_time',
'报价截止时间': 'quotation_end_time', '采购单位': 'purchase_unit', '联系人': 'lixiren', '联系方式': 'mobile',
'商品名称': 'goods_name', '品牌': 'pinpai', '计量单位': 'unit', '采购数量': 'number', '规格描述': 'specifications',
'技术标准': 'standard_code', '备注': 'remark', '其他信息': 'other_info'}
matches = match_list_to_list(list_a, list_b, mapping)
# print("匹配结果:=========", matches)
# for a, b in matches:
# print("匹配成功:")
# print(" list_a 项:", a)
# print(" list_b 项:", b)
# 查找未匹配的 list_b 数据
unmatched_b = find_unmatched_in_list_b(matches, list_a, list_b)
# 打印出来
print("=== list_b 中未在 list_a 匹配到的数据 ===")
for item in unmatched_b:
print(item)