由浅入深使用LangGraph创建一个Agent工作流

发布于:2025-08-04 ⋅ 阅读:(11) ⋅ 点赞:(0)

创建一个简单的工作流:Start ——> 节点1(固定输入输出) ——> End

from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Annotated

history_messages = [
    SystemMessage(content="你是一个聊天助理"),
    HumanMessage(content="你好,你能介绍下你自己吗?"),
    AIMessage(content="我是一个小模型聊天助理"),
]
messages = history_messages + [HumanMessage(content="今天天气怎么样?")]
class MyState(TypedDict):
    messages:Annotated[list,""]

def node1(state:MyState):
    return {"messages": AIMessage(content="今天天气很好")}

graph_builder = StateGraph(MyState)
graph_builder.add_node("node1", node1)
graph_builder.add_edge(START, "node1")
graph_builder.add_edge("node1", END)
graph = graph_builder.compile()
response = graph.stream({"messages": messages})
for event in response:
    print(event)

创建一个简单的工作流:Start ——> 节点1(固定输入输出) ——> 节点2(固定输入输出) ——> End

同时数据存到Mastate的数组里传递给下一个节点

from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph.message import add_messages

history_messages = [
    SystemMessage(content="你是一个聊天助理"),
    HumanMessage(content="你好,你能介绍下你自己吗?"),
    AIMessage(content="我是一个小模型聊天助理"),
]
messages = history_messages + [HumanMessage(content="今天天气怎么样?")]
class MyState(TypedDict):
    messages:Annotated[list,add_messages]

def node1(state:MyState):
    return {"messages": AIMessage(content="今天天气很好")}

def node2(state:MyState):
    return {"messages": AIMessage(content="明天天气也很好")}

graph_builder = StateGraph(MyState)
graph_builder.add_node("node1", node1)
graph_builder.add_node("node2", node2)
graph_builder.add_edge("node1", "node2")
graph_builder.add_edge(START, "node1")
graph_builder.add_edge("node2", END)
graph = graph_builder.compile()
response = graph.stream({"messages": messages})
for event in response:
    print(event)

创建一个简单的会话工作流:Start ——> 会话节点1 ——> End

from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph.message import add_messages


class MyState(TypedDict):
    messages: Annotated[list, add_messages]


llm = init_chat_model(
    "deepseek-r1:7b",
    model_provider="ollama")


def chatbot(state: MyState):
    return {"messages": llm.invoke(state["messages"])}


graph_builder = StateGraph(MyState)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()
response = graph.stream({"messages": "你好"})
for event in response:
    if "chatbot" in event:
        value = event["chatbot"]
        if "messages" in value and isinstance(value["messages"], AIMessage):
            print(event["chatbot"]["messages"].content)

用langChain创建一个简单的Rag :

需要再安装两个包:

pip install pypdf

pip install dashscope

from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 加载文档
loader = PyPDFLoader("./data/你的PDF文档.pdf")
pages = loader.load_and_split()

# 文档切片
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=200,
    length_function=len,
    add_start_index=True,
)

texts = text_splitter.create_documents(
    [page.page_content for page in pages[:2]],
)

# 灌库
embeddings = DashScopeEmbeddings(model="text-embedding-v1",dashscope_api_key="你的阿里云百炼平台APIkey")
db = FAISS.from_documents(texts, embeddings)

# 检索 top-5 结果
retriever = db.as_retriever(search_kwargs={"k": 5})

docs=retriever.invoke("文档相关的问题")
print(docs)

 作为一个条件节点放到会话工作流会话节点前:Start——> 条件Rag节点 ——> 会话节点1 ——> End

 

from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

llm = init_chat_model(
    "deepseek-r1:7b",
    model_provider="ollama")


def chatbot(state: MessagesState):
    return {"messages": llm.invoke(state["messages"])}


def retriever(state: MessagesState):
    question = state["messages"][-1].content
    messages = "请参照一下上下:"
    # 加载文档
    loader = PyPDFLoader("./data/你的PDF文档.pdf")
    pages = loader.load_and_split()

    # 文档切片
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=200,
        length_function=len,
        add_start_index=True,
    )

    texts = text_splitter.create_documents(
        [page.page_content for page in pages[:2]],
    )

    # 灌库
    embeddings = DashScopeEmbeddings(model="text-embedding-v1", dashscope_api_key="你的apikey")
    db = FAISS.from_documents(texts, embeddings)

    # 检索 top-5 结果
    dbRetriever = db.as_retriever(search_kwargs={"k": 5})
    docs = dbRetriever.invoke(question)
    for doc in docs:
        messages = messages + doc.page_content
    messages = messages + "回答问题:" + question
    return {"messages": HumanMessage(content=messages)}


graph_builder = StateGraph(MessagesState)
graph_builder.add_node("retriever", retriever)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "retriever")
graph_builder.add_edge("retriever", "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()
response = graph.stream({"messages": "你的问题?"})
for event in response:
    if "chatbot" in event:
        value = event["chatbot"]
        if "messages" in value and isinstance(value["messages"], AIMessage):
            print(event["chatbot"]["messages"].content)