【人工智能-agent】--Dify中MCP工具存数据到MySQL

发布于:2025-05-13 ⋅ 阅读:(9) ⋅ 点赞:(0)

本文记录的工作如下:

  1. 自定义MCP工具,爬取我的钢铁网数据
  2. 爬取的数据插值处理
  3. 自定义MCP工具,把爬取到的数据(str)存入本地excel表格中
  4. 自定义MCP工具,把爬取到的数据(str)存入本地MySQL数据库中
  5. 搭建MCP-server
  6. 使用Dify调用MCP工具,实现自动化爬取和存入

目录

1.工具--爬取数据

2.工具--保存到excel

3.工具--保存到MySQL中

4.搭建MCP-server

5.Dify调用MCP工具


1.工具--爬取数据

async def fetch_website(
    data_type: str,
    Start_Time: str,
    End_Time: str,
)-> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.edge.service import Service
    from selenium.webdriver.edge.options import Options
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.action_chains import ActionChains
    # 设置Edge浏览器选项
    # try:


    # except Exception as e:
    #     print("Edge浏览器启动失败,请检查驱动是否正确安装!")
    #     print(e)
    all_data_dist = {
        "综合": "GANGCAIZONGHE",
        "长材": "CHANGCAI",
        "扁平": "BIANPING",
        "一次材": "YICICAI",
        "华东": "HUADONG",
        "华南": "HUANAN",
        "华北": "HUABEI",
        "中南": "ZHONGNAN",
        "东北": "DONGBEI",
        "西南": "XINAN",
        "西北": "XIBEI",
        "螺纹": "LUOWEN",
        "线材": "XIANCAI",
        "型材": "XINCAI",
        "中厚": "ZHONGHOU",
        "锅炉容器板": "GUOLURONGQIBAN",
        "造船板": "ZAOCHUANBAN",
        "热卷": "REJUAN",
        "窄带": "ZAIDAI",
        "冷板": "LENGBAN",
        "镀锌板卷": "DUXIN",
        "无缝管": "WUFENGGUAN",
        "聊城无缝钢管": "WUFENG_LIAOCHENG",
        "焊管": "HANGUAN",
        "盘扣式钢管脚手架": "PKSJSJ"
    }

    print("开始爬取数据")
    print("普钢的所有数据类型:" + str(all_data_dist.keys()))

    # Start_Time = input("请输入开始日期(格式:2024-01-01):") or "2024-04-01"
    Start_Time_year = Start_Time.split("-")[0]
    Start_Time_month = Start_Time.split("-")[1]
    Start_Time_day = Start_Time.split("-")[2]
    # day格式转换02需要去掉前导0
    Start_Time_day = str(int(Start_Time_day))
    print(f"开始日期:{Start_Time_year}-{Start_Time_month}-{Start_Time_day}")

    # End_Time = input("请输入结束日期(格式:2025-04-01):") or "2025-04-01"
    End_Time_year = End_Time.split("-")[0]
    End_Time_month = End_Time.split("-")[1]
    End_Time_day = End_Time.split("-")[2]
    # day格式转换02需要去掉前导0
    End_Time_day = str(int(End_Time_day))
    print(f"结束日期:{End_Time_year}-{End_Time_month}-{End_Time_day}")
    # data_type = input(
    #     "请输入需要爬取的数据类型:普钢['综合', '长材', '扁平', '一次材', '华东', '华南', '华北', '中南', '东北', '西南', '西北', '螺纹', '线材', '型材', '中厚', '锅炉容器板', '造船板', '热卷', '窄带', '冷板', '镀锌板卷', '无缝管', '聊城无缝钢管', '焊管', '盘扣式钢管脚手架']")
    data_type1 = all_data_dist[data_type]

    # 等待页面加载3s
    # time.sleep(3)
    # # element = driver.find_element(By.CLASS_NAME, "mRightBox")
    # # 等待页面加载3s-
    # time.sleep(3)
    edge_options = Options()
    edge_options.add_argument("--headless")  # 无头模式,不显示浏览器窗口
    edge_options.add_argument("--disable-gpu")
    edge_options.add_argument("--window-size=1920,1080")
    edge_service = Service('D:\桌面文件\edgedriver_win64\msedgedriver.exe')  # 替换为你的Edge驱动路径
    driver = webdriver.Edge(service=edge_service, options=edge_options)
    print("开始打开浏览器")
    url = "https://index.mysteel.com/xpic/detail.html?tabName=pugang"
    driver.get(url)
    time.sleep(3)
    driver.find_element(By.CSS_SELECTOR, "img.addBtn[src*='icon.png']").click()  # 点击展开
    print("点击展开")
    time.sleep(3)

    # print(element.text)

    try:
        # 点击类型
        key1 = driver.find_element(By.ID, data_type1)
        key1.click()
        # 等待页面加载3s
        time.sleep(1)
        # //*[@id="searchTimeLiDiv"]/ul/li[1]/a按日查询
        key2 = driver.find_element(By.XPATH, '//*[@id="searchTimeLiDiv"]/ul/li[1]/a')
        key2.click()
        time.sleep(1)

        # 起始日期//*[@id="startDay"]
        start_date = driver.find_element(By.XPATH, '//*[@id="startDay"]')
        # start_date.clear()
        start_date.click()
        time.sleep(1)

        driver.maximize_window()
        # # 解析年月日
        # target_date = "2021-09-01"
        # year, month, day = target_date.split('-')

        # # 等待日历面板加载
        # WebDriverWait(driver, 10).until(
        #     EC.presence_of_element_located((By.CSS_SELECTOR,  ".daterangepicker.dropdown-menu"))
        # )
        from selenium.webdriver.common.by import By
        from selenium.webdriver.support.ui import Select
        # 选择年份(如果页面有年份下拉框)

        year_dropdown = driver.find_element(By.XPATH, "/html/body/div[3]/div[2]/div/table/thead/tr[1]/th[2]/select[2]")
        year_dropdown.click()
        time.sleep(1)
        print("选择年份下拉框")

        # 选择年份<option value="1975">1975</option>
        # year_dropdown.find_element(By.XPATH,  f"//option[@value='{year}']").click()
        # print(f"选择年份:{year}")
        select = Select(year_dropdown)
        select.select_by_visible_text(Start_Time_year)  # 根据文本选择
        print(f"选择年份:{Start_Time_year}")

        # 输出
        # 选择月份  /html/body/div[3]/div[2]/div/table/thead/tr[1]/th[2]/select[1]
        month_dropdown = driver.find_element(By.XPATH, "/html/body/div[3]/div[2]/div/table/thead/tr[1]/th[2]/select[1]")
        month_dropdown.click()
        time.sleep(1)
        print("选择月份下拉框")

        select1 = Select(month_dropdown)
        select1.select_by_visible_text(Start_Time_month)  # 根据文本选择
        print(f"选择月份:{Start_Time_month}")

        # 选择日期
        date_cell = driver.find_element(
            By.XPATH, f"//td[contains(@class, 'available') and text()='{Start_Time_day}']"
        )
        date_cell.click()
        time.sleep(1)
        print(f"选择日期:{Start_Time_day}")

    except Exception as e:
        print(f"执行出错: {str(e)}")
        driver.save_screenshot('error.png')
    try:
        # //*[@id="searchTimeLiDiv"]/ul/li[1]/a按日查询
        key2 = driver.find_element(By.XPATH, '//*[@id="searchTimeLiDiv"]/ul/li[1]/a')
        key2.click()
        time.sleep(1)
        # 终止日期//*[@id="endDay"]
        end_date = driver.find_element(By.XPATH, '//*[@id="endDay"]')
        # start_date.clear()
        end_date.click()
        time.sleep(1)

        driver.maximize_window()
        # 解析年月日
        target_date = "2021-09-01"
        year, month, day = target_date.split('-')

        # # 等待日历面板加载
        # WebDriverWait(driver, 10).until(
        #     EC.presence_of_element_located((By.CSS_SELECTOR,  ".daterangepicker.dropdown-menu"))
        # )
        from selenium.webdriver.common.by import By
        from selenium.webdriver.support.ui import Select
        # 选择年份(如果页面有年份下拉框)
        year_end_dropdown = driver.find_element(By.XPATH,
                                                "/html/body/div[4]/div[2]/div/table/thead/tr[1]/th[2]/select[2]")
        year_end_dropdown.click()
        print("选择年份下拉框")

        # 选择年份<option value="1975">1975</option>
        # year_dropdown.find_element(By.XPATH,  f"//option[@value='{year}']").click()
        # print(f"选择年份:{year}")
        select_year_end = Select(year_end_dropdown)
        select_year_end.select_by_visible_text(End_Time_year)  # 根据文本选择
        print(f"选择年份:{End_Time_year}")

        # 输出
        # 选择月份  /html/body/div[4]/div[2]/div/table/thead/tr[1]/th[2]/select[1]
        month_end_dropdown = driver.find_element(By.XPATH,
                                                 "/html/body/div[4]/div[2]/div/table/thead/tr[1]/th[2]/select[1]")
        month_end_dropdown.click()
        print("选择月份下拉框")

        select_month_end = Select(month_end_dropdown)
        select1_month_end = select_month_end.select_by_visible_text(End_Time_month)  # 根据文本选择
        print(f"选择月份:{End_Time_month}")

        # /html/body/div[4]/div[2]
        # 找到右侧日历
        left_calendar = driver.find_element(
            By.XPATH,
            '/html/body/div[4]/div[2]'
        )
        # 选择日期
        # left_calendar = driver.find_element(By.CLASS_SELECTOR, "calendar single left")
        # driver.find_element(By.XPATH, "//td[contains(@class, 'available') and text()='2']")
        date_end_cell = left_calendar.find_element(
            By.XPATH,
            f'.//td[text()={End_Time_day}]'
        )
        date_end_cell.click()
        driver.save_screenshot('C:\pythonProject\python爬虫\我的钢铁网\end_date.png')
        print(f"选择日期:{End_Time_day}")
    except Exception as e:
        print(f"执行出错: {str(e)}")
        driver.save_screenshot('error.png')
    # 点击搜索按钮
    search_btn = driver.find_element(By.XPATH, '//*[@id="dome1"]/table/tbody/tr/td[5]/img')
    search_btn.click()
    # driver.save_screenshot('C:\pythonProject\python爬虫\我的钢铁网\搜索之后.png')
    element = driver.find_element(By.CLASS_NAME, "mRightBox")
    # 等待页面加载3s-
    time.sleep(3)
    #保存截图
    driver.save_screenshot('C:\pythonProject\项目五(钢铁价格预测)\我的钢铁网\搜索之后.png')
    print(element.text)
    # data_str = element.text
    # driver.quit()
    return [types.TextContent(type="text", text=element.text)]

    # return data_str

2.工具--保存到excel

async def Save_To_Excel(data_str:str):
    # 把爬取的text数据保存到Excel文档中
    import pandas as pd
    from openpyxl import load_workbook
    import os
    import re
    # 示例数据准备(替换为你的实际数据)

    # 按行拆分数据并转换为列表
    lines = [line.split() for line in data_str.strip().split('\n')]
    # 工作簿名称,去掉空格
    sheet_name = str(lines[0]).strip()
    sheet_name =  sheet_name
    sheet_name = re.sub(r'[^\u4e00-\u9fa5]', '', sheet_name)

    header = lines[1]  # 假设第一行是表头
    rows = lines[2:]  # 数据行
    df = pd.DataFrame(rows, columns=header)

    '''
    数据插值,然后存入表格
    '''
    print(df.columns.tolist())  # 查看所有列名
    df['时间'] = pd.to_datetime(df['时间'], format='%Y/%m/%d')  # 确保日期为datetime类型

    # 2. 创建完整日期范围(从数据最早日期到最晚日期)
    date_range = pd.date_range(
        start=df['时间'].min(),
        end=df['时间'].max(),
        freq='D'
    )
    # 3. 重新索引并保留原始数据
    df = df.set_index('时间').reindex(date_range).rename_axis('时间').reset_index()

    # 4. 定义需要插值的数值列(根据实际Excel列名调整)
    numeric_cols = [
        '本日', '昨日', '日环比', '上周', '周环比',
        '上月度', '与上月比', '去年同期', '与去年比'
    ]

    # 5. 线性插值填充(取前后值的平均值)
    for col in numeric_cols:
        if col in df.columns:
            if col in ['日环比', '周环比', '与上月比', '与去年比']:
                df[col] = df[col].str.rstrip('%').astype(float) / 100
                # 2. 线性插值
                df[col] = df[col].interpolate(method='linear')
                # 3. 还原为百分比字符串
                df[col] = (df[col] * 100).round(2).astype(str) + '%'
            else:
                # 2. 线性插值,保留两位小数
                # df[col] = df[col].astype(float).interpolate(method='linear', limit_direction='both', limit=2)
                df[col] = pd.to_numeric(df[col], errors='coerce')  # 非数值转为NaN
                df[col] = df[col].interpolate(method='linear').round(2)
    # 6. 格式化日期为YYYY/MM/DD
    df['时间'] = df['时间'].dt.strftime('%Y/%m/%d')

    from openpyxl.styles import Alignment
    # 目标文件路径(注意Windows路径要用双反斜杠或原始字符串)
    file_path = r"C:\pythonProject\项目五(钢铁价格预测)\我的钢铁网\gyp4.xlsx"

    # 核心逻辑:追加或新建
    if os.path.exists(file_path):
        # 追加模式(加载现有工作簿)
        with pd.ExcelWriter(file_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
            book = load_workbook(file_path)
            if sheet_name in book.sheetnames:
                # # 读取现有数据并合并
                # existing_df = pd.read_excel(file_path,  sheet_name=sheet_name)
                # combined_df = pd.concat([existing_df,  new_df], ignore_index=True)
                combined_df = df

                # 移除旧表(为了覆盖写入)
                book.remove(book[sheet_name])
            else:
                combined_df = df

            # 写入合并后的数据
            combined_df.to_excel(writer, index=False, sheet_name=sheet_name)
            # 调整列宽
            worksheet = writer.sheets[sheet_name]
            for col in worksheet.columns:
                max_length = max(len(str(cell.value)) for cell in col)
                worksheet.column_dimensions[col[0].column_letter].width = max_length + 5
                for cell in col:
                    cell.alignment = Alignment(horizontal='left')  # 左对
    else:
        # 新建模式
        with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
            df.to_excel(writer, index=False, sheet_name=sheet_name)

            # 调整列宽
            worksheet = writer.sheets[sheet_name]
            for col in worksheet.columns:
                max_length = max(len(str(cell.value)) for cell in col)
                worksheet.column_dimensions[col[0].column_letter].width = max_length + 5
                for cell in col:
                    cell.alignment = Alignment(horizontal='left')  # 左对

    print(f"数据已保存到:{file_path}")

    return [types.TextContent(type="text", text=f"数据已保存到:{file_path}")]

3.工具--保存到MySQL中

async  def Save_To_MySQL(data_str:str):
    # 把爬取的text数据保存到MySQL数据库中
    import pandas as pd
    from openpyxl import load_workbook
    import os
    import re
    # 示例数据准备(替换为你的实际数据)
    Small_type1 = '综合'
    # 按行拆分数据并转换为列表
    lines = [line.split() for line in data_str.strip().split('\n')]
    # 工作簿名称,去掉空格
    sheet_name = str(lines[0]).strip()
    # sheet_name = Large_type1 + '-' + Small_type1
    # sheet_name = re.sub(r'[^\u4e00-\u9fa5]',  '', sheet_name)
    header = lines[1]  # 假设第一行是表头
    rows = lines[2:]  # 数据行
    df = pd.DataFrame(rows, columns=header)

    '''
    数据插值,然后存入表格
    '''
    print(df.columns.tolist())  # 查看所有列名
    df['时间'] = pd.to_datetime(df['时间'], format='%Y/%m/%d')  # 确保日期为datetime类型

    # 2. 创建完整日期范围(从数据最早日期到最晚日期)
    date_range = pd.date_range(
        start=df['时间'].min(),
        end=df['时间'].max(),
        freq='D'
    )
    # 3. 重新索引并保留原始数据
    df = df.set_index('时间').reindex(date_range).rename_axis('时间').reset_index()

    # 4. 定义需要插值的数值列(根据实际Excel列名调整)
    numeric_cols = [
        '本日', '昨日', '日环比', '上周', '周环比',
        '上月度', '与上月比', '去年同期', '与去年比'
    ]

    # 5. 线性插值填充(取前后值的平均值)
    for col in numeric_cols:
        if col in df.columns:
            if col in ['日环比', '周环比', '与上月比', '与去年比']:
                df[col] = df[col].str.rstrip('%').astype(float) / 100
                # 2. 线性插值
                df[col] = df[col].interpolate(method='linear')
                # 3. 还原为百分比字符串
                df[col] = (df[col] * 100).round(2).astype(str) + '%'
            else:
                # 2. 线性插值,保留两位小数
                # df[col] = df[col].astype(float).interpolate(method='linear', limit_direction='both', limit=2)
                df[col] = pd.to_numeric(df[col], errors='coerce')  # 非数值转为NaN
                df[col] = df[col].interpolate(method='linear').round(2)
    # 6. 格式化日期为YYYY/MM/DD
    df['时间'] = df['时间'].dt.strftime('%Y/%m/%d')
    print("插值成功")
    df.insert(0, '类型', sheet_name)
    print("第一列插入类型成功")

    import pandas as pd
    from sqlalchemy import create_engine, text, Date
    from sqlalchemy.exc import SQLAlchemyError

    # MySQL连接配置(替换为你的实际配置)
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'root111111',
        'database': 'gyp_test',
        'port': 3306
    }

    try:
        # 创建SQLAlchemy引擎
        engine = create_engine(
            f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4"
        )

        # 检查表是否存在,不存在则创建
        with engine.connect() as conn:
            table_exists = conn.execute(text(
                f"SHOW TABLES LIKE '原材料指数表'"
            )).fetchone()

            if not table_exists:
                create_table_sql = """
                CREATE TABLE `原材料指数表` (
                    `id` INT AUTO_INCREMENT PRIMARY KEY,
                    `类型` VARCHAR(50) NOT NULL,
                    `时间` DATE NOT NULL,
                    `本日` DECIMAL(10,2),
                    `昨日` DECIMAL(10,2),
                    `日环比` VARCHAR(20),
                    `上周` DECIMAL(10,2),
                    `周环比` VARCHAR(20),
                    `上月度` DECIMAL(10,2),
                    `与上月比` VARCHAR(20),
                    `去年同期` DECIMAL(10,2),
                    `与去年比` VARCHAR(20),
                    INDEX (`时间`),
                    INDEX (`类型`)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
                conn.execute(text(create_table_sql))
                print("表'原材料指数表'创建成功")

        # 将DataFrame写入数据库
        df.to_sql(
            name='原材料指数表',
            con=engine,
            if_exists='append',  # 追加模式
            index=False,
            dtype={
                '时间': Date  # 确保日期格式正确
            }
        )
        print("数据插入成功")
        return [types.TextContent(type="text", text="数据插入MySQL数据库成功")]

    except SQLAlchemyError as e:
        print(f"数据库操作出错: {e}")
    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        engine.dispose()

4.搭建MCP-server

from math import pi
@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse"]),
    default="stdio",
    help="Transport type",
)
def main(port: int, transport: str) -> int:
    app = Server("mcp-website-fetcher")

    @app.call_tool()
    async def call_tool(
        name: str, arguments: dict
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        # if name != "fetch":
        #     raise ValueError(f"Unknown tool: {name}")
        # if "url" not in arguments:
        #     raise ValueError("Missing required argument 'url'")
        if name=="Web_crawling":
            return await fetch_website(arguments["data_type"], arguments["Start_Time"], arguments["End_Time"])
        elif name=="add_gyp":
            A = arguments["A"]
            B = arguments["B"]
            return [types.TextContent(type="text", text=str(A+B+pi))]
        elif name=="save_to_excel":
            return await Save_To_Excel(arguments["data_str"])
        elif name=="save_to_mysql":
            return await Save_To_MySQL(arguments["data_str"])

    #普钢:类型为综合,2025.1.1-2025.5.6的数据
    @app.list_tools()
    async def list_tools() -> list[types.Tool]:
        return [
            types.Tool(
                name="Web_crawling",
                description="抓取网页数据,需要输入抓取钢材的类型,开始时间,结束时间三个参数",
                inputSchema={
                    "type": "object",
                    "required": ["data_type", "Start_Time", "End_Time"],
                    "properties":
                        {
                            "data_type": {"type": "string", "description":
                                "普钢['综合', '长材', '扁平', '一次材', '螺纹', '线材', '型材', '中厚', '锅炉容器板', '造船板', '热卷', '窄带', '冷板', '镀锌板卷', '无缝管', '聊城无缝钢管', '焊管', '盘扣式钢管脚手架'],"
                                "特钢['综合', '特钢'],"
                                "铁矿石['综合','进口矿','国产矿'],"
                                "焦炭['综合']"
                            },
                            "Start_Time": {"type": "string", "format": "date", "description": "开始时间(YYYY-MM-DD)"},
                            "End_Time": {"type": "string", "format": "date", "description": "结束时间(YYYY-MM-DD)"},
                    },
                },
            ),
            types.Tool(
                name="add_gyp",
                description="这是一个计算器工具",
                inputSchema={
                    "type": "object",
                    "required": ["A", "B"],
                    "properties":
                        {
                            "A": {"type": "number", "description": "第一个数"},
                            "B": {"type": "number", "description": "第二个数"},
                        },
                },
            ),
            types.Tool(
                name="save_to_excel",
                description="保存爬取的数据到本地Excel文档当中",
                inputSchema={
                    "type": "object",
                    "required": ["data_str"],
                    "properties":
                        {
                            "data_str": {"type": "string", "description": "爬取的数据"},
                        },
                },
            ),
            types.Tool(
                name="save_to_mysql",
                description="保存爬取的数据到MySQL数据库中",
                inputSchema={
                    "type": "object",
                    "required": ["data_str"],
                    "properties":
                        {
                            "data_str": {"type": "string", "description": "爬取的数据"},
                        },
                },
            )
        ]
    # @app.list_resources()
    # async def list_resources() -> list[types.Resource]:
    #     return [
    #         types.Resource(
    #             uri=FileUrl(r"D:\桌面文件\test-知识库.xlsx"),
    #             name='student_grade',
    #             description="这是一个成绩单",
    #             mimeType="text/plain",
    #         )
    #     ]
    # @app.read_resource()
    # async def read_resource(name: str,uri:FileUrl) -> str | bytes:
    #     if name=="student_grade":
    #         with open(uri.path, "rb") as f:
    #             return f.read()


    if transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import Starlette
        from starlette.responses import Response
        from starlette.routing import Mount, Route

        sse = SseServerTransport("/messages/")

        async def handle_sse(request):
            async with sse.connect_sse(
                request.scope, request.receive, request._send
            ) as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )
            return Response()

        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse, methods=["GET"]),
                Mount("/messages/", app=sse.handle_post_message),
            ],
        )

        import uvicorn

        uvicorn.run(starlette_app, host="0.0.0.0", port=port)
    else:
        from mcp.server.stdio import stdio_server

        async def arun():
            async with stdio_server() as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )

        anyio.run(arun)

    return 0

运行:uv run mcp_simple_tool --transport sse --port 8000

 

5.Dify调用MCP工具

你是一个智能助手,可根据用户输入的指令,进行推理并调用工具,完成任务后返回给用户结果。其中
server_name1为地图和天气服务,其中server_name2为搜索服务,server 3是·爬虫、保存数据服务。

注意:1.必须使用mcp_sse_list_tools工具列出可以调用的工具。
           2.必须使用mcp_sse_call_tool调用合适的工具完成用户的需求。

 1.查询:普钢:类型为综合,2025.1.1-2025.5.6的数据

2.保存到本机数据库中,MySQL

完整代码:

import anyio
import click
import httpx
import mcp.types as types
from mcp.server.lowlevel import Server
import pandas as pd
import time
from pydantic import FileUrl

#启动服务端:
# muv run mcp_simple_tool --transport sse --port 8000

# async def fetch_website(
#     url: str,
# ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
#     headers = {
#         "User-Agent": "MCP Test Server (github.com/modelcontextprotocol/python-sdk)"
#     }
#     async with httpx.AsyncClient(follow_redirects=True, headers=headers) as client:
#         response = await client.get(url)
#         response.raise_for_status()
#         return [types.TextContent(type="text", text=response.text)]


async def fetch_website(
    data_type: str,
    Start_Time: str,
    End_Time: str,
)-> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.edge.service import Service
    from selenium.webdriver.edge.options import Options
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.action_chains import ActionChains
    # 设置Edge浏览器选项
    # try:


    # except Exception as e:
    #     print("Edge浏览器启动失败,请检查驱动是否正确安装!")
    #     print(e)
    all_data_dist = {
        "综合": "GANGCAIZONGHE",
        "长材": "CHANGCAI",
        "扁平": "BIANPING",
        "一次材": "YICICAI",
        "华东": "HUADONG",
        "华南": "HUANAN",
        "华北": "HUABEI",
        "中南": "ZHONGNAN",
        "东北": "DONGBEI",
        "西南": "XINAN",
        "西北": "XIBEI",
        "螺纹": "LUOWEN",
        "线材": "XIANCAI",
        "型材": "XINCAI",
        "中厚": "ZHONGHOU",
        "锅炉容器板": "GUOLURONGQIBAN",
        "造船板": "ZAOCHUANBAN",
        "热卷": "REJUAN",
        "窄带": "ZAIDAI",
        "冷板": "LENGBAN",
        "镀锌板卷": "DUXIN",
        "无缝管": "WUFENGGUAN",
        "聊城无缝钢管": "WUFENG_LIAOCHENG",
        "焊管": "HANGUAN",
        "盘扣式钢管脚手架": "PKSJSJ"
    }

    print("开始爬取数据")
    print("普钢的所有数据类型:" + str(all_data_dist.keys()))

    # Start_Time = input("请输入开始日期(格式:2024-01-01):") or "2024-04-01"
    Start_Time_year = Start_Time.split("-")[0]
    Start_Time_month = Start_Time.split("-")[1]
    Start_Time_day = Start_Time.split("-")[2]
    # day格式转换02需要去掉前导0
    Start_Time_day = str(int(Start_Time_day))
    print(f"开始日期:{Start_Time_year}-{Start_Time_month}-{Start_Time_day}")

    # End_Time = input("请输入结束日期(格式:2025-04-01):") or "2025-04-01"
    End_Time_year = End_Time.split("-")[0]
    End_Time_month = End_Time.split("-")[1]
    End_Time_day = End_Time.split("-")[2]
    # day格式转换02需要去掉前导0
    End_Time_day = str(int(End_Time_day))
    print(f"结束日期:{End_Time_year}-{End_Time_month}-{End_Time_day}")
    # data_type = input(
    #     "请输入需要爬取的数据类型:普钢['综合', '长材', '扁平', '一次材', '华东', '华南', '华北', '中南', '东北', '西南', '西北', '螺纹', '线材', '型材', '中厚', '锅炉容器板', '造船板', '热卷', '窄带', '冷板', '镀锌板卷', '无缝管', '聊城无缝钢管', '焊管', '盘扣式钢管脚手架']")
    data_type1 = all_data_dist[data_type]

    # 等待页面加载3s
    # time.sleep(3)
    # # element = driver.find_element(By.CLASS_NAME, "mRightBox")
    # # 等待页面加载3s-
    # time.sleep(3)
    edge_options = Options()
    edge_options.add_argument("--headless")  # 无头模式,不显示浏览器窗口
    edge_options.add_argument("--disable-gpu")
    edge_options.add_argument("--window-size=1920,1080")
    edge_service = Service('D:\桌面文件\edgedriver_win64\msedgedriver.exe')  # 替换为你的Edge驱动路径
    driver = webdriver.Edge(service=edge_service, options=edge_options)
    print("开始打开浏览器")
    url = "https://index.mysteel.com/xpic/detail.html?tabName=pugang"
    driver.get(url)
    time.sleep(3)
    driver.find_element(By.CSS_SELECTOR, "img.addBtn[src*='icon.png']").click()  # 点击展开
    print("点击展开")
    time.sleep(3)

    # print(element.text)

    try:
        # 点击类型
        key1 = driver.find_element(By.ID, data_type1)
        key1.click()
        # 等待页面加载3s
        time.sleep(1)
        # //*[@id="searchTimeLiDiv"]/ul/li[1]/a按日查询
        key2 = driver.find_element(By.XPATH, '//*[@id="searchTimeLiDiv"]/ul/li[1]/a')
        key2.click()
        time.sleep(1)

        # 起始日期//*[@id="startDay"]
        start_date = driver.find_element(By.XPATH, '//*[@id="startDay"]')
        # start_date.clear()
        start_date.click()
        time.sleep(1)

        driver.maximize_window()
        # # 解析年月日
        # target_date = "2021-09-01"
        # year, month, day = target_date.split('-')

        # # 等待日历面板加载
        # WebDriverWait(driver, 10).until(
        #     EC.presence_of_element_located((By.CSS_SELECTOR,  ".daterangepicker.dropdown-menu"))
        # )
        from selenium.webdriver.common.by import By
        from selenium.webdriver.support.ui import Select
        # 选择年份(如果页面有年份下拉框)

        year_dropdown = driver.find_element(By.XPATH, "/html/body/div[3]/div[2]/div/table/thead/tr[1]/th[2]/select[2]")
        year_dropdown.click()
        time.sleep(1)
        print("选择年份下拉框")

        # 选择年份<option value="1975">1975</option>
        # year_dropdown.find_element(By.XPATH,  f"//option[@value='{year}']").click()
        # print(f"选择年份:{year}")
        select = Select(year_dropdown)
        select.select_by_visible_text(Start_Time_year)  # 根据文本选择
        print(f"选择年份:{Start_Time_year}")

        # 输出
        # 选择月份  /html/body/div[3]/div[2]/div/table/thead/tr[1]/th[2]/select[1]
        month_dropdown = driver.find_element(By.XPATH, "/html/body/div[3]/div[2]/div/table/thead/tr[1]/th[2]/select[1]")
        month_dropdown.click()
        time.sleep(1)
        print("选择月份下拉框")

        select1 = Select(month_dropdown)
        select1.select_by_visible_text(Start_Time_month)  # 根据文本选择
        print(f"选择月份:{Start_Time_month}")

        # 选择日期
        date_cell = driver.find_element(
            By.XPATH, f"//td[contains(@class, 'available') and text()='{Start_Time_day}']"
        )
        date_cell.click()
        time.sleep(1)
        print(f"选择日期:{Start_Time_day}")

    except Exception as e:
        print(f"执行出错: {str(e)}")
        driver.save_screenshot('error.png')
    try:
        # //*[@id="searchTimeLiDiv"]/ul/li[1]/a按日查询
        key2 = driver.find_element(By.XPATH, '//*[@id="searchTimeLiDiv"]/ul/li[1]/a')
        key2.click()
        time.sleep(1)
        # 终止日期//*[@id="endDay"]
        end_date = driver.find_element(By.XPATH, '//*[@id="endDay"]')
        # start_date.clear()
        end_date.click()
        time.sleep(1)

        driver.maximize_window()
        # 解析年月日
        target_date = "2021-09-01"
        year, month, day = target_date.split('-')

        # # 等待日历面板加载
        # WebDriverWait(driver, 10).until(
        #     EC.presence_of_element_located((By.CSS_SELECTOR,  ".daterangepicker.dropdown-menu"))
        # )
        from selenium.webdriver.common.by import By
        from selenium.webdriver.support.ui import Select
        # 选择年份(如果页面有年份下拉框)
        year_end_dropdown = driver.find_element(By.XPATH,
                                                "/html/body/div[4]/div[2]/div/table/thead/tr[1]/th[2]/select[2]")
        year_end_dropdown.click()
        print("选择年份下拉框")

        # 选择年份<option value="1975">1975</option>
        # year_dropdown.find_element(By.XPATH,  f"//option[@value='{year}']").click()
        # print(f"选择年份:{year}")
        select_year_end = Select(year_end_dropdown)
        select_year_end.select_by_visible_text(End_Time_year)  # 根据文本选择
        print(f"选择年份:{End_Time_year}")

        # 输出
        # 选择月份  /html/body/div[4]/div[2]/div/table/thead/tr[1]/th[2]/select[1]
        month_end_dropdown = driver.find_element(By.XPATH,
                                                 "/html/body/div[4]/div[2]/div/table/thead/tr[1]/th[2]/select[1]")
        month_end_dropdown.click()
        print("选择月份下拉框")

        select_month_end = Select(month_end_dropdown)
        select1_month_end = select_month_end.select_by_visible_text(End_Time_month)  # 根据文本选择
        print(f"选择月份:{End_Time_month}")

        # /html/body/div[4]/div[2]
        # 找到右侧日历
        left_calendar = driver.find_element(
            By.XPATH,
            '/html/body/div[4]/div[2]'
        )
        # 选择日期
        # left_calendar = driver.find_element(By.CLASS_SELECTOR, "calendar single left")
        # driver.find_element(By.XPATH, "//td[contains(@class, 'available') and text()='2']")
        date_end_cell = left_calendar.find_element(
            By.XPATH,
            f'.//td[text()={End_Time_day}]'
        )
        date_end_cell.click()
        driver.save_screenshot('C:\pythonProject\python爬虫\我的钢铁网\end_date.png')
        print(f"选择日期:{End_Time_day}")
    except Exception as e:
        print(f"执行出错: {str(e)}")
        driver.save_screenshot('error.png')
    # 点击搜索按钮
    search_btn = driver.find_element(By.XPATH, '//*[@id="dome1"]/table/tbody/tr/td[5]/img')
    search_btn.click()
    # driver.save_screenshot('C:\pythonProject\python爬虫\我的钢铁网\搜索之后.png')
    element = driver.find_element(By.CLASS_NAME, "mRightBox")
    # 等待页面加载3s-
    time.sleep(3)
    #保存截图
    driver.save_screenshot('C:\pythonProject\项目五(钢铁价格预测)\我的钢铁网\搜索之后.png')
    print(element.text)
    # data_str = element.text
    # driver.quit()
    return [types.TextContent(type="text", text=element.text)]

    # return data_str


async def Save_To_Excel(data_str:str):
    # 把爬取的text数据保存到Excel文档中
    import pandas as pd
    from openpyxl import load_workbook
    import os
    import re
    # 示例数据准备(替换为你的实际数据)

    # 按行拆分数据并转换为列表
    lines = [line.split() for line in data_str.strip().split('\n')]
    # 工作簿名称,去掉空格
    sheet_name = str(lines[0]).strip()
    sheet_name =  sheet_name
    sheet_name = re.sub(r'[^\u4e00-\u9fa5]', '', sheet_name)

    header = lines[1]  # 假设第一行是表头
    rows = lines[2:]  # 数据行
    df = pd.DataFrame(rows, columns=header)

    '''
    数据插值,然后存入表格
    '''
    print(df.columns.tolist())  # 查看所有列名
    df['时间'] = pd.to_datetime(df['时间'], format='%Y/%m/%d')  # 确保日期为datetime类型

    # 2. 创建完整日期范围(从数据最早日期到最晚日期)
    date_range = pd.date_range(
        start=df['时间'].min(),
        end=df['时间'].max(),
        freq='D'
    )
    # 3. 重新索引并保留原始数据
    df = df.set_index('时间').reindex(date_range).rename_axis('时间').reset_index()

    # 4. 定义需要插值的数值列(根据实际Excel列名调整)
    numeric_cols = [
        '本日', '昨日', '日环比', '上周', '周环比',
        '上月度', '与上月比', '去年同期', '与去年比'
    ]

    # 5. 线性插值填充(取前后值的平均值)
    for col in numeric_cols:
        if col in df.columns:
            if col in ['日环比', '周环比', '与上月比', '与去年比']:
                df[col] = df[col].str.rstrip('%').astype(float) / 100
                # 2. 线性插值
                df[col] = df[col].interpolate(method='linear')
                # 3. 还原为百分比字符串
                df[col] = (df[col] * 100).round(2).astype(str) + '%'
            else:
                # 2. 线性插值,保留两位小数
                # df[col] = df[col].astype(float).interpolate(method='linear', limit_direction='both', limit=2)
                df[col] = pd.to_numeric(df[col], errors='coerce')  # 非数值转为NaN
                df[col] = df[col].interpolate(method='linear').round(2)
    # 6. 格式化日期为YYYY/MM/DD
    df['时间'] = df['时间'].dt.strftime('%Y/%m/%d')

    from openpyxl.styles import Alignment
    # 目标文件路径(注意Windows路径要用双反斜杠或原始字符串)
    file_path = r"C:\pythonProject\项目五(钢铁价格预测)\我的钢铁网\gyp4.xlsx"

    # 核心逻辑:追加或新建
    if os.path.exists(file_path):
        # 追加模式(加载现有工作簿)
        with pd.ExcelWriter(file_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
            book = load_workbook(file_path)
            if sheet_name in book.sheetnames:
                # # 读取现有数据并合并
                # existing_df = pd.read_excel(file_path,  sheet_name=sheet_name)
                # combined_df = pd.concat([existing_df,  new_df], ignore_index=True)
                combined_df = df

                # 移除旧表(为了覆盖写入)
                book.remove(book[sheet_name])
            else:
                combined_df = df

            # 写入合并后的数据
            combined_df.to_excel(writer, index=False, sheet_name=sheet_name)
            # 调整列宽
            worksheet = writer.sheets[sheet_name]
            for col in worksheet.columns:
                max_length = max(len(str(cell.value)) for cell in col)
                worksheet.column_dimensions[col[0].column_letter].width = max_length + 5
                for cell in col:
                    cell.alignment = Alignment(horizontal='left')  # 左对
    else:
        # 新建模式
        with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
            df.to_excel(writer, index=False, sheet_name=sheet_name)

            # 调整列宽
            worksheet = writer.sheets[sheet_name]
            for col in worksheet.columns:
                max_length = max(len(str(cell.value)) for cell in col)
                worksheet.column_dimensions[col[0].column_letter].width = max_length + 5
                for cell in col:
                    cell.alignment = Alignment(horizontal='left')  # 左对

    print(f"数据已保存到:{file_path}")

    return [types.TextContent(type="text", text=f"数据已保存到:{file_path}")]

async  def Save_To_MySQL(data_str:str):
    # 把爬取的text数据保存到MySQL数据库中
    import pandas as pd
    from openpyxl import load_workbook
    import os
    import re
    # 示例数据准备(替换为你的实际数据)
    Small_type1 = '综合'
    # 按行拆分数据并转换为列表
    lines = [line.split() for line in data_str.strip().split('\n')]
    # 工作簿名称,去掉空格
    sheet_name = str(lines[0]).strip()
    # sheet_name = Large_type1 + '-' + Small_type1
    # sheet_name = re.sub(r'[^\u4e00-\u9fa5]',  '', sheet_name)
    header = lines[1]  # 假设第一行是表头
    rows = lines[2:]  # 数据行
    df = pd.DataFrame(rows, columns=header)

    '''
    数据插值,然后存入表格
    '''
    print(df.columns.tolist())  # 查看所有列名
    df['时间'] = pd.to_datetime(df['时间'], format='%Y/%m/%d')  # 确保日期为datetime类型

    # 2. 创建完整日期范围(从数据最早日期到最晚日期)
    date_range = pd.date_range(
        start=df['时间'].min(),
        end=df['时间'].max(),
        freq='D'
    )
    # 3. 重新索引并保留原始数据
    df = df.set_index('时间').reindex(date_range).rename_axis('时间').reset_index()

    # 4. 定义需要插值的数值列(根据实际Excel列名调整)
    numeric_cols = [
        '本日', '昨日', '日环比', '上周', '周环比',
        '上月度', '与上月比', '去年同期', '与去年比'
    ]

    # 5. 线性插值填充(取前后值的平均值)
    for col in numeric_cols:
        if col in df.columns:
            if col in ['日环比', '周环比', '与上月比', '与去年比']:
                df[col] = df[col].str.rstrip('%').astype(float) / 100
                # 2. 线性插值
                df[col] = df[col].interpolate(method='linear')
                # 3. 还原为百分比字符串
                df[col] = (df[col] * 100).round(2).astype(str) + '%'
            else:
                # 2. 线性插值,保留两位小数
                # df[col] = df[col].astype(float).interpolate(method='linear', limit_direction='both', limit=2)
                df[col] = pd.to_numeric(df[col], errors='coerce')  # 非数值转为NaN
                df[col] = df[col].interpolate(method='linear').round(2)
    # 6. 格式化日期为YYYY/MM/DD
    df['时间'] = df['时间'].dt.strftime('%Y/%m/%d')
    print("插值成功")
    df.insert(0, '类型', sheet_name)
    print("第一列插入类型成功")

    import pandas as pd
    from sqlalchemy import create_engine, text, Date
    from sqlalchemy.exc import SQLAlchemyError

    # MySQL连接配置(替换为你的实际配置)
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'root111111',
        'database': 'gyp_test',
        'port': 3306
    }

    try:
        # 创建SQLAlchemy引擎
        engine = create_engine(
            f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4"
        )

        # 检查表是否存在,不存在则创建
        with engine.connect() as conn:
            table_exists = conn.execute(text(
                f"SHOW TABLES LIKE '原材料指数表'"
            )).fetchone()

            if not table_exists:
                create_table_sql = """
                CREATE TABLE `原材料指数表` (
                    `id` INT AUTO_INCREMENT PRIMARY KEY,
                    `类型` VARCHAR(50) NOT NULL,
                    `时间` DATE NOT NULL,
                    `本日` DECIMAL(10,2),
                    `昨日` DECIMAL(10,2),
                    `日环比` VARCHAR(20),
                    `上周` DECIMAL(10,2),
                    `周环比` VARCHAR(20),
                    `上月度` DECIMAL(10,2),
                    `与上月比` VARCHAR(20),
                    `去年同期` DECIMAL(10,2),
                    `与去年比` VARCHAR(20),
                    INDEX (`时间`),
                    INDEX (`类型`)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
                conn.execute(text(create_table_sql))
                print("表'原材料指数表'创建成功")

        # 将DataFrame写入数据库
        df.to_sql(
            name='原材料指数表',
            con=engine,
            if_exists='append',  # 追加模式
            index=False,
            dtype={
                '时间': Date  # 确保日期格式正确
            }
        )
        print("数据插入成功")
        return [types.TextContent(type="text", text="数据插入MySQL数据库成功")]

    except SQLAlchemyError as e:
        print(f"数据库操作出错: {e}")
    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        engine.dispose()



from math import pi
@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse"]),
    default="stdio",
    help="Transport type",
)
def main(port: int, transport: str) -> int:
    app = Server("mcp-website-fetcher")

    @app.call_tool()
    async def call_tool(
        name: str, arguments: dict
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        # if name != "fetch":
        #     raise ValueError(f"Unknown tool: {name}")
        # if "url" not in arguments:
        #     raise ValueError("Missing required argument 'url'")
        if name=="Web_crawling":
            return await fetch_website(arguments["data_type"], arguments["Start_Time"], arguments["End_Time"])
        elif name=="add_gyp":
            A = arguments["A"]
            B = arguments["B"]
            return [types.TextContent(type="text", text=str(A+B+pi))]
        elif name=="save_to_excel":
            return await Save_To_Excel(arguments["data_str"])
        elif name=="save_to_mysql":
            return await Save_To_MySQL(arguments["data_str"])

    #普钢:类型为综合,2025.1.1-2025.5.6的数据
    @app.list_tools()
    async def list_tools() -> list[types.Tool]:
        return [
            types.Tool(
                name="Web_crawling",
                description="抓取网页数据,需要输入抓取钢材的类型,开始时间,结束时间三个参数",
                inputSchema={
                    "type": "object",
                    "required": ["data_type", "Start_Time", "End_Time"],
                    "properties":
                        {
                            "data_type": {"type": "string", "description":
                                "普钢['综合', '长材', '扁平', '一次材', '螺纹', '线材', '型材', '中厚', '锅炉容器板', '造船板', '热卷', '窄带', '冷板', '镀锌板卷', '无缝管', '聊城无缝钢管', '焊管', '盘扣式钢管脚手架'],"
                                "特钢['综合', '特钢'],"
                                "铁矿石['综合','进口矿','国产矿'],"
                                "焦炭['综合']"
                            },
                            "Start_Time": {"type": "string", "format": "date", "description": "开始时间(YYYY-MM-DD)"},
                            "End_Time": {"type": "string", "format": "date", "description": "结束时间(YYYY-MM-DD)"},
                    },
                },
            ),
            types.Tool(
                name="add_gyp",
                description="这是一个计算器工具",
                inputSchema={
                    "type": "object",
                    "required": ["A", "B"],
                    "properties":
                        {
                            "A": {"type": "number", "description": "第一个数"},
                            "B": {"type": "number", "description": "第二个数"},
                        },
                },
            ),
            types.Tool(
                name="save_to_excel",
                description="保存爬取的数据到本地Excel文档当中",
                inputSchema={
                    "type": "object",
                    "required": ["data_str"],
                    "properties":
                        {
                            "data_str": {"type": "string", "description": "爬取的数据"},
                        },
                },
            ),
            types.Tool(
                name="save_to_mysql",
                description="保存爬取的数据到MySQL数据库中",
                inputSchema={
                    "type": "object",
                    "required": ["data_str"],
                    "properties":
                        {
                            "data_str": {"type": "string", "description": "爬取的数据"},
                        },
                },
            )
        ]
    # @app.list_resources()
    # async def list_resources() -> list[types.Resource]:
    #     return [
    #         types.Resource(
    #             uri=FileUrl(r"D:\桌面文件\test-知识库.xlsx"),
    #             name='student_grade',
    #             description="这是一个成绩单",
    #             mimeType="text/plain",
    #         )
    #     ]
    # @app.read_resource()
    # async def read_resource(name: str,uri:FileUrl) -> str | bytes:
    #     if name=="student_grade":
    #         with open(uri.path, "rb") as f:
    #             return f.read()


    if transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import Starlette
        from starlette.responses import Response
        from starlette.routing import Mount, Route

        sse = SseServerTransport("/messages/")

        async def handle_sse(request):
            async with sse.connect_sse(
                request.scope, request.receive, request._send
            ) as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )
            return Response()

        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse, methods=["GET"]),
                Mount("/messages/", app=sse.handle_post_message),
            ],
        )

        import uvicorn

        uvicorn.run(starlette_app, host="0.0.0.0", port=port)
    else:
        from mcp.server.stdio import stdio_server

        async def arun():
            async with stdio_server() as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )

        anyio.run(arun)

    return 0


网站公告

今日签到

点亮在社区的每一天
去签到