250810-OpenWebUI集成Dify应用

发布于:2025-08-11 ⋅ 阅读:(11) ⋅ 点赞:(0)

A. 最终效果

在这里插入图片描述

B. 环境配置

配置并启动Open-WebUI

  • 随后浏览器访问:http://localhost:8080
pip install open-webui
open-webui serve

配置并启动Pipelines

  • Pipelines默认占用80端口
  • 相比于Docker的启动方式,可以在相同的命令行中,查看pipelines 的日志
git clone https://github.com/open-webui/pipelines.git
cd pipelines
pip install -r requirements.txt
sh ./start.sh

配置并启动Dify

  • dify/docker/docker-compose.yaml文件会包含很多image资源,默认只启动其中的几个
cd dify
cd docker
cp .env.example .env
docker compose up -d
  • 3min配置一个ChatFlow应用
  • 点击LLM大模型模块自定义System中的提示词
  • 点击预览测试对话
  • 首次对外使用要点击发布,再次应用更新要点击发布

在这里插入图片描述

  • API调用的测试代码:
import requests
import json

API_KEY = "app-w1pVOdGHpJ81OmqsZ2YIXyT8"  # 你的真实 API Key

url = "http://localhost/v1/chat-messages"
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

payload = {
    "inputs": {},
    "query": "你的名字是什么?",
    "response_mode": "streaming",
    "conversation_id": "",
    "user": "abc-123",
    "files": [
        {
            "type": "image",
            "transfer_method": "remote_url",
            "url": "https://cloud.dify.ai/logo/logo-site.png"
        }
    ]
}

with requests.post(url, headers=headers, data=json.dumps(payload), stream=True) as r:
    for raw in r.iter_lines():
        if not raw:
            continue
        if not raw.startswith(b"data:"):
            continue

        data_str = raw[len(b"data:"):].strip().decode("utf-8", errors="ignore")
        if data_str in ("[DONE]", ""):
            continue

        try:
            event = json.loads(data_str)
        except json.JSONDecodeError:
            continue

        etype = event.get("event")

        if etype == "message":
            chunk = event.get("answer", "")
            if chunk:
                print(chunk, end="", flush=True)

        if etype in ("message_end", "workflow_finished"):
            print()
            break

C. 测试案例

  • 案例1: pipelines官方代码
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
import subprocess


class Pipeline:
    def __init__(self):
        # Optionally, you can set the id and name of the pipeline.
        # Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
        # The identifier must be unique across all pipelines.
        # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
        # self.id = "python_code_pipeline"
        self.name = "Python Code Pipeline"
        pass

    async def on_startup(self):
        # This function is called when the server is started.
        print(">>>" * 80)
        print(f"on_startup:{__name__}")
        pass

    async def on_shutdown(self):
        # This function is called when the server is stopped.
        print("<<<" * 80)
        print(f"on_shutdown:{__name__}")
        pass

    def execute_python_code(self, code):
        try:
            result = subprocess.run(
                ["python", "-c", code], capture_output=True, text=True, check=True
            )
            stdout = result.stdout.strip()
            return stdout, result.returncode
        except subprocess.CalledProcessError as e:
            return e.output.strip(), e.returncode

    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
        # This is where you can add your custom pipelines like RAG.
        print(f"pipe:{__name__}")

        print(messages)
        print(user_message)

        if body.get("title", False):
            print("Title Generation")
            return "Python Code Pipeline"
        else:
            # stdout, return_code = self.execute_python_code(user_message)
            stdout = "This is a test"
            return stdout

  • 案例2: dify自定义chatflow
# pipelines/python_code_pipeline/__init__.py
from typing import List, Union, Generator, Iterator
import os
import json
import requests
from requests.exceptions import RequestException, Timeout

# 可选:Open-WebUI 的消息类型(不是必须)
# from schemas import OpenAIChatMessage

class Pipeline:
    def __init__(self):
        self.name = "Dify_ChatFlow"

        # 优先走环境变量,便于 Docker 与本地双环境切换
        # 本机直接跑 ./start.sh 时,Dify 通常可用 http://localhost
        # 若 pipelines 用 Docker 跑,而 Dify 也在 Docker,则一般用 http://host.docker.internal
        self.DIFY_BASE_URL = "http://localhost"
        self.DIFY_API_KEY  = "app-w1pVOdGHpJ81OmqsZ2YIXyT8"   # !!! 改成你的 app key
        # Dify “对话机器人”的接口路径(保持默认即可)
        self.DIFY_CHAT_MESSAGES_PATH = os.getenv("DIFY_CHAT_MESSAGES_PATH", "/v1/chat-messages")

        # 超时(秒)
        self.CONNECT_TIMEOUT = float(os.getenv("DIFY_CONNECT_TIMEOUT", "10"))
        self.READ_TIMEOUT    = float(os.getenv("DIFY_READ_TIMEOUT", "120"))

    async def on_startup(self):
        print(">>> Dify bridge ready:", self.DIFY_BASE_URL)

    async def on_shutdown(self):
        print("Dify bridge shutdown")

    # ------ 内部:把请求打到 Dify 并以生成器流式返回 ------
    def _dify_stream(self, query: str, conversation_id: str | None, user_id: str = "openwebui-user") -> Iterator[str]:
        url = f"{self.DIFY_BASE_URL.rstrip('/')}{self.DIFY_CHAT_MESSAGES_PATH}"
        headers = {
            "Authorization": f"Bearer {self.DIFY_API_KEY}",
            "Content-Type": "application/json"
        }
        payload = {
            "inputs": {},
            "query": query,
            "response_mode": "streaming",
            "conversation_id": conversation_id or "",
            "user": user_id,
            # 如需带图:按需传 files
            # "files": [{"type": "image","transfer_method": "remote_url","url": "https://cloud.dify.ai/logo/logo-site.png"}]
        }

        try:
            with requests.post(
                url,
                headers=headers,
                data=json.dumps(payload),
                stream=True,
                timeout=(self.CONNECT_TIMEOUT, self.READ_TIMEOUT),
            ) as r:
                r.raise_for_status()
                for raw in r.iter_lines():
                    if not raw:
                        continue
                    if not raw.startswith(b"data:"):
                        continue
                    data_str = raw[len(b"data:"):].strip().decode("utf-8", errors="ignore")
                    if data_str in ("[DONE]", ""):
                        continue

                    try:
                        event = json.loads(data_str)
                    except json.JSONDecodeError:
                        continue

                    etype = event.get("event")
                    if etype == "message":
                        chunk = event.get("answer", "")
                        if chunk:
                            yield chunk
                    elif etype in ("message_end", "workflow_finished"):
                        break
        except Timeout:
            yield "\n[Pipeline] Dify request timed out."
        except RequestException as e:
            yield f"\n[Pipeline] Dify request error: {e}"

    # ------ Open-WebUI 调用的主入口 ------
    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
        print("pipe:python_code_pipeline")
        # 忽略 Open-WebUI 的标题/标签探测任务(它们会带一大段“### Task:”提示词)
        if body.get("title", False) or user_message.strip().startswith("### Task:"):
            # 返回一个简单标题或空串即可,避免把系统提示词发给 Dify
            return "" if not body.get("title") else "Python Code Pipeline"

        # 你也可以从 body 中取会话 id(如果前端传了)
        conversation_id = body.get("conversation_id") or ""

        # 把用户问题转发到 Dify,并以流式返回
        return self._dify_stream(query=user_message, conversation_id=conversation_id)

D. 界面设置

如下操作即可

在这里插入图片描述

E. 在主界面中使用

  • 看到左边的对话记录,就知道我测试多少次,不太懂,不太熟,但是坑又太多
  • 点赞、留言、转发:谢谢

在这里插入图片描述

References


网站公告

今日签到

点亮在社区的每一天
去签到