学习如何基于ACP-SDK构建多智能体系统

发布于:2025-09-09 ⋅ 阅读:(19) ⋅ 点赞:(0)

本文围绕IBM 于2025年3月推出的Agent Communication Protocol (ACP)展开,介绍其起源、作用、面临的挑战及解决方案,并阐述了ACP与Google的Agent - to - Agent (A2A)协议合并后的优势。文中还通过具体项目展示了使用ACP - SDK构建的高级智能体系统,包括DeepSearch Agent和BlogPost Generator Agent,详细说明了其架构、功能、搭建步骤、使用示例及代码实现,同时介绍了A2A协议的特点、架构、工作流及相关代码实现。

重要亮点

  • ACP的背景与意义:IBM Research推出ACP为BeeAI平台提供支持,后捐赠给Linux Foundation。它是一种通用标准,能解决不同团队、框架和组织间AI智能体协作问题,将孤立的智能体转变为可互操作的系统,提高互操作性、重用性和可扩展性。例如,在多智能体系统中,不同框架构建的智能体可借助ACP实现有效沟通与协作。
  • ACP解决的挑战:当前AI智能体能力发展迅速,但实际整合存在瓶颈,如框架多样、自定义集成困难、开发量呈指数增长以及跨组织集成复杂等。ACP通过引入通用标准,使智能体可相互发现、理解和协作,克服这些挑战。以一个有众多不同框架构建智能体的组织为例,使用ACP可避免为每个智能体交互编写自定义连接器。
  • ACP与A2A合并:2025年9月1日,ACP与Google的A2A协议合并,形成统一的AI智能体通信标准。这一合并带来诸多好处,如提供单一、整合的协议,降低复杂性,确保更广泛采用;获得IBM、Google和Linux Foundation的联合支持;BeeAI平台可无缝过渡,并提供迁移指南。
  • 基于ACP - SDK的项目展示:通过DeepSearch Agent和BlogPost Generator Agent这两个项目,展示了ACP - SDK的应用。DeepSearch Agent可进行网络抓取、综合研究等,BlogPost Generator Agent能与DeepSearch Agent通信并生成博客文章。文中详细介绍了项目的架构组件、工作流程、通信协议、功能特点、搭建步骤及使用示例。例如,用户输入主题,通过ACP - SDK发送给DeepSearch Agent进行研究,再将研究结果发送给BlogPost Generator Agent生成博客文章。

引言

IBM研究中心于2025年3月推出了智能体通信协议(ACP),为其BeeAI平台提供支持,该平台是一个探索智能体可解释性的开源平台。当月晚些时候,BeeAI项目及其附带的ACP被捐赠给了Linux基金会,这巩固了我们作为一个社区公开推进智能体互操作性的承诺。

在这里插入图片描述
本项目来自https://github.com/piyushagni5/langgraph-ai/tree/main/multi-agent-system

概述

由IBM的BeeAI最初提出的智能体通信协议(ACP)是一项通用标准,它能让不同团队、框架、技术和组织中的人工智能智能体实现无缝协作。它将分散的人工智能领域转变为一个相互连接的协作网络,开启了更高水平的互操作性复用性可扩展性。作为模型上下文协议(MCP)(一种用于工具和数据访问的开放标准)的后续协议,ACP定义了人工智能智能体如何高效地进行操作通信

什么是人工智能智能体?

人工智能智能体是一种自主系统或程序,旨在代表用户或另一个系统执行任务。它通过设计工作流程和利用可用工具来实现这一目标。在多智能体系统中,多个人工智能智能体协作以共同执行任务,从而提高效率和解决问题的能力。

为什么ACP至关重要

作为一种具有治理机制开放式通信标准,ACP支持人工智能智能体跨不同框架和技术栈进行交互。从处理自然语言查询到执行复杂工作流,ACP确保工具、智能体和用户之间的顺畅通信。该协议对于在多智能体系统人机交互中传递信息、做出决策和完成任务至关重要。

什么是ACP?

智能体通信协议(ACP)是一种用于智能体间通信的开放标准。借助这一协议,我们能够将当前各自独立的智能体格局转变为具有互操作性的智能体系统,实现更便捷的集成与协作。

当前挑战

尽管智能体的能力迅速提升,但在现实世界中的集成仍是一个主要瓶颈。如果没有通用的通信协议,各组织会面临一些反复出现的挑战:

  • 框架多样性:组织通常运行数百或数千个智能体,这些智能体是使用不同框架构建的,如LangChaincrewAI、AutoGen或自定义技术栈。
  • 自定义集成:如果没有标准协议,开发人员就必须为每个智能体交互编写自定义连接器。
  • 指数级发展:对于n个智能体,可能需要n(n-1)/2个不同的集成点,这使得大规模智能体生态系统难以维护。
  • 跨组织考量:不同的安全模型、认证系统和数据格式使企业间的整合变得复杂。

我们为什么需要ACP?

随着智能体人工智能</b0的不断发展,在不被单一供应商锁定的情况下,为特定用例利用独立技术的最佳功能的复杂性也在增加。每个框架平台工具包都有其独特优势,但将它们整合到一个连贯的智能体系统中仍然是一项重大挑战。

目前,大多数人工智能智能体系统都是孤立运行的。它们建立在不兼容的框架上,提供自定义的API和端点,并且缺乏通用的通信协议。这导致集成脆弱且不可重复,开发起来既昂贵又耗时。

ACP 通过引入一种通用标准来应对这些挑战,该标准将生态系统从分散的、临时的系统转变为相互连接的智能体网络。借助 ACP,智能体能够相互进行发现理解协作,无论它们的起源或底层技术栈如何。这使开发者能够利用各种智能体的集体智慧,创建出比任何单个系统单独所能实现的更强大、更具可扩展性的工作流。

使用ACP支持的不同智能体架构

ACP支持多种智能体架构,能够实现跨系统的无缝集成与协作。通过标准化通信,ACP使开发者能够设计多智能体系统,整合不同框架、平台和工具包的优势。这种灵活性让组织能够构建定制化解决方案,充分发挥其AI系统的潜力,而不受特定供应商限制的束缚。

如何开始

开始使用ACP非常简单,因为简洁是其核心设计原则。只需极少的工作量,仅用几行代码就能用ACP包装一个智能体。通过Python SDK,你可以通过装饰函数来定义一个符合ACP规范的智能体。

最小实现步骤:

  1. 初始化一个ACP服务器来管理通信。
  2. 使用@server.agent()装饰器来定义智能体的行为。
  3. 利用带有大型语言模型(LLM)后端的LangChain框架。整合用于上下文持久化的内存,以增强智能体的功能。
  4. 在ACP的标准化消息格式与框架的原生格式之间建立映射,以确保响应的结构化。
  5. 启动服务器,使智能体可通过HTTP访问。

这一简化流程使开发人员能够快速将ACP集成到他们的系统中,从而让智能体能够高效地进行沟通与协作。

最新动态:ACP与A2A合并

最近,ACP在Linux基金会下与谷歌的智能体对智能体(A2A)协议合并,这标志着围绕开放标准的行业整合迈出了重要一步。此次合并消除了竞争协议造成的碎片化问题,并为人工智能智能体通信建立了一个统一且得到充分支持的标准。

从2025年9月1日起,ACP团队决定与谷歌的A2A协议团队联手,共同开发一个统一的协议。

合并的主要优势:

  1. 统一标准:开发人员现在拥有一个由主要科技公司支持的单一、统一的协议,这降低了复杂性并确保了更广泛的采用。
  2. 更有力的支持:IBM、谷歌和Linux基金会的共同努力为该标准提供了强大的支持和资源。
  3. 无缝过渡BeeAI平台已从ACP过渡到A2A,并提供了迁移指南,以确保现有实施的连续性。

这种融合加强了人工智能智能体通信的生态系统,使开发者更容易构建具有互操作性、可扩展性和面向未来的系统。

协议生态对比

要了解ACP在更广泛的生态系统中所处的位置,有必要将其与其他新兴标准进行比较。这种比较凸显了ACP(现已与A2A合并)如何补充或区别于其他协议,阐明了其在推动AI智能体通信和互操作性方面的作用。

技术能力对比

架构对比

采用不同协议

带有ACP-SDK的智能体系统

本项目展示了两个使用ACP-SDK构建的高级智能体系统:

1. 深度搜索智能体 — 利用CrewAI + Groq + Firecrawl的研究专家

2. 博客文章生成智能体 — 利用LangGraph与深度搜索(DeepSearch)进行交互的内容创建工具

架构概述

架构组件:

客户端层:

  • ACP客户端(agentic_client.py)——用于编排工作流程

智能体层:

  • 深度搜索智能体(端口8003)——使用CrewAI的研究专家
  • BlogPost生成器(端口8004)——使用LangGraph的内容创建工具

工具层:

  • 深度搜索工具:Firecrawl(网页抓取)+ Groq大语言模型(分析)
  • 博客文章工具:Groq大语言模型(内容生成)+ 文件系统(.md保存)

🔄 工作流程步骤:

  1. 客户端 → 深度搜索:用户主题通过ACP-SDK/REST发送
  2. DeepSearch → 客户端:通过ACP-SDK返回研究数据
  3. 客户端 → 博客文章:通过ACP-SDK/REST发送研究内容
  4. 博客文章 → 客户端:最终博客文章通过ACP-SDK返回

🔧 通信协议:

  • ACP-SDK/REST:客户端与智能体之间(实线箭头)
  • MCP:智能体与其工具之间(虚线箭头)

这与您参考图片中的模式相符,其中:

  • 客户端通过REST/ACP与智能体进行通信
  • 智能体通过ACP相互通信
  • 每个智能体都有自己的一套可通过MCP访问的工具

特点

DeepSearch智能体(CrewAI + Groq + Firecrawl)

  • 技术栈:CrewAI、Groq LLM、Firecrawl /LINKUP 网络爬虫
  • 端口: 8003
  • 功能

– 使用Firecrawl /LINKUP进行网页抓取

– 对任何主题的全面研究

– 数据合成与分析

– 来源引用

– 实时信息收集

博客文章生成智能体(LangGraph + Groq)

  • 技术栈:LangGraph、Groq LLM、ACP-SDK客户端
  • 端口: 8004
  • 功能:

– 通过ACP-SDK与DeepSearch智能体通信

– 多步骤博客生成工作流

– 搜索引擎优化内容创作

– 自动生成Markdown文件

– 格式规范的结构化内容

设置说明

1. 安装依赖项

uv init
#create Virtual Environment
uv myvenv --python 3.13
#install required libraries
uv pip install langchain-groq langgraph crewai crewai-tools acp-sdk asyncio groq
uv pip install firecrawl-py langchain-groq python-dotenv linkup-sdk

2. 环境设置

创建一个包含您的API密钥的.env文件:

# Groq API Key - Get from https://console.groq.com/
 GROQ_API_KEY=your_groq_api_key_here  # Firecrawl API Key - Get from https://firecrawl.dev/
  FIRECRAWL_API_KEY = "fc-f2a0d481accf4247974f82199e8b8f08" LINKUP_API_KEY=your_linkup_api_key_here

3. 启动智能体

为每个智能体打开单独的终端:

终端 1 — 深度搜索智能体:

python deepsearch_server.py
``

![](https://i-blog.csdnimg.cn/img_convert/1ae6b92d79511be5b7dc985a7cccd152.png)

**2号终端——博客文章生成智能体:**

```ruby
python blogpost_server.py

4. 测试系统

3号终端——客户端:

# Test individual agents
 python agentic_client.py deepsearch
 python agentic_client.py blogpost  # Run complete demo
 python agentic_client.py demo  # Default test
 python agentic_client.py

使用示例

直接深度搜索查询

import asyncio
from acp_sdk.client import Clientasync def research_topic():
    async with Client(base_url="http://localhost:8003") as client:
        result = await client.run_sync(
            agent='deepsearch_agent_handler',
            input="Latest AI developments in healthcare"
        )
        print(result.output[0].parts[0].content)asyncio.run(research_topic())

博客生成

import asyncio
from acp_sdk.client import Clientasync def generate_blog():
    async with Client(base_url="http://localhost:8004") as client:
        result = await client.run_sync(
            agent='blogpost_generator_agent',
            input="Future of renewable energy"
        )
        print(result.output[0].parts[0].content)asyncio.run(generate_blog())

文件结构

├── deepsearch_server.py      # DeepSearch agent server
├── blogpost_server.py        # BlogPost generator server
├── agentic_client.py         # Demo client
├── requirements.txt          # Dependencies
├── acp_client.py            # Original client
├── cat_server.py            # Original cat agent
├── dog_server.py            # Original dog agent
└── blog_*.md                # Generated blog posts

工作流详情

博文生成工作流(LangGraph)

1. *研究阶段**:通过ACP-SDK调用DeepSearch智能体*

将普拉班·纳亚克的故事发送到您的收件箱

免费加入Medium,获取这位作者的更新。

2. *标题生成**:使用Groq创建有利于搜索引擎优化的标题*

3. *内容生成**:生成全面的博客文章*

4. *文件保存**:保存为带时间戳的markdown文件*

深度搜索研究流程(CrewAI)

1. *主题分析**:理解研究需求*

2. *网页抓取**:使用Firecrawl收集网页内容*

3. *数据合成**:分析并整合信息*

4. *报告生成**:创建全面的研究报告*

API端点

DeepSearch智能体(端口8003)

*智能体**: deepsearch_agent_handler*

*输入**:研究主题/查询*

*输出**:综合研究报告*

博客文章生成器(端口8004)

*智能体**: blogpost_generator_agent*

*输入**:博客主题*

*输出**:生成的博客文章 + 保存的markdown文件*

扩展系统

添加新智能体

1. 按照该模式创建新的服务器文件

2. 使用唯一的端口号

3. 在ACP-SDK服务器注册

4. 更新客户端代码以进行通信

自定义工具

• 为CrewAI智能体添加工具

• 与LangGraph工作流集成

• 扩展ACP-SDK通信

代码实现

deepsearch_server.py

import os
from collections.abc import AsyncGenerator
from typing import Anyfrom acp_sdk.models import Message, MessagePart
from acp_sdk.server import RunYield, RunYieldResume, Server
from crewai import Agent, LLM, Task, Crew
from crewai_tools import FirecrawlScrapeWebsiteTool
from crewai_tools import LinkupSearchTool
from dotenv import load_dotenv# Load environment variables
load_dotenv()server = Server()# Initialize Groq LLM
groq_llm = LLM(
    model="groq/llama-3.3-70b-versatile",
    api_key=os.getenv("GROQ_API_KEY")
)# Initialize Firecrawl tool
firecrawl_tool = FirecrawlScrapeWebsiteTool(
    api_key=os.getenv("FIRECRAWL_API_KEY")
)
# Initialize the tool with your API key
linkup_tool = LinkupSearchTool(api_key=os.getenv("linkup_api"))# Create DeepSearch agent
deepsearch_agent = Agent(
    role='Deep Research Specialist',
    goal='Conduct comprehensive research on given topics by scraping and analyzing web content',
    backstory="""You are an expert researcher with the ability to find, scrape, and analyze
    web content to provide comprehensive insights on any topic. You use advanced web scraping
    tools to gather information from multiple sources and synthesize it into valuable research.""",
    llm=groq_llm,
    tools=[linkup_tool],
    verbose=True,
    allow_delegation=False
)@server.agent()
async def deepsearch_agent_handler(messages: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:
    # Extract query from messages
    query = " ".join(
        part.content
        for m in messages
        for part in m.parts
    )        # Create research task
    research_task = Task(
        description=f"""
        Research the topic: "{query}"                Your task is to:
        1. Identify relevant websites and sources for this topic
        2. Use the Firecrawl tool to scrape content from these sources
        3. Analyze and synthesize the information
        4. Provide a comprehensive research summary with key insights
        5. Include relevant facts, statistics, and recent developments
        6. Cite your sources                Focus on providing accurate, up-to-date, and comprehensive information.
        """,
        expected_output="""A detailed research report containing:
        - Executive summary of findings
        - Key insights and important facts
        - Recent developments and trends
        - Relevant statistics and data points
        - Source citations
        - Recommendations for further reading""",
        agent=deepsearch_agent
    )        # Create and execute crew
    research_crew = Crew(
        agents=[deepsearch_agent],
        tasks=[research_task],
        verbose=True
    )        try:
        result = research_crew.kickoff()
        response_content = result.raw if hasattr(result, 'raw') else str(result)
    except Exception as e:
        response_content = f"Research failed: {str(e)}. Please check your API keys and try again."        yield Message(parts=[MessagePart(content=response_content)])if __name__ == "__main__":
    server.run(port=8003)

agentic_client.py ==> acp客户端

import asyncio
from acp_sdk.client import Clientasync def sequential_workflow():
    """Main workflow: User -> DeepSearch -> BlogPost Generator (like acp_client.py)"""
    print("🚀 Starting Sequential Agentic Workflow")
    print("="*80)        # Get topic from user
    topic = input("📝 Enter a topic for research and blog generation: ").strip()
    if not topic:
        topic = "The future of sustainable energy technologies"
        print(f"No topic entered. Using default: {topic}")        print(f"\n🔍 Step 1: Researching topic: '{topic}'")
    print("-" * 60)        # Step 1: Call DeepSearch Agent
    async with Client(base_url="http://localhost:8003") as deepsearch_client:
        research_result = await deepsearch_client.run_sync(
            agent='deepsearch_agent_handler',
            input=topic
        )
        research_content = research_result.output[0].parts[0].content
        print("✅ Research completed!")
        print(f"📊 Research length: {len(research_content)} characters")
        print(f"📋 Preview: {research_content[:200]}...")        print(f"\n📝 Step 2: Generating blog post from research data")
    print("-" * 60)        # Step 2: Pass research results to BlogPost Generator
    async with Client(base_url="http://localhost:8004") as blogpost_client:
        blog_result = await blogpost_client.run_sync(
            agent='blogpost_generator_agent',
            input=research_content  # Pass the research content, not the topic
        )
        final_result = blog_result.output[0].parts[0].content
        print("✅ Blog post generated!")
        print(final_result)        print("\n" + "="*80)
    print("🎉 Sequential workflow completed!")async def test_deepsearch_agent():
    """Test the DeepSearch agent independently"""
    print("🔍 Testing DeepSearch Agent...")
    topic = input("Enter a research topic: ").strip() or "Latest trends in artificial intelligence and machine learning in 2024"        async with Client(base_url="http://localhost:8003") as client:
        result = await client.run_sync(
            agent='deepsearch_agent_handler',
            input=topic
        )
        print("DeepSearch Result:")
        print(result.output[0].parts[0].content[:500] + "...")
        print("\n" + "="*80 + "\n")async def test_blogpost_agent():
    """Test the BlogPost Generator agent with mock research data"""
    print("📝 Testing BlogPost Generator Agent...")
    print("Note: This test uses mock research data since BlogPost Generator now expects research content as input")        mock_research = """
Research on: Artificial Intelligence in HealthcareExecutive Summary:
AI in healthcare is revolutionizing patient care through diagnostic imaging, drug discovery, and personalized treatment plans. The market is expected to reach $102 billion by 2025.Key Insights:
- AI diagnostic tools show 94% accuracy in cancer detection
- Machine learning reduces drug discovery time by 30%
- Telemedicine adoption increased 3800% during pandemic
- AI chatbots handle 67% of basic patient inquiriesRecent Developments:
- FDA approved 130+ AI medical devices in 2023
- Google's Med-PaLM achieves expert-level medical reasoning
- IBM Watson for Oncology deployed in 230+ hospitals globallySources: McKinsey Global Institute, Nature Medicine, FDA Medical Device Database
"""        async with Client(base_url="http://localhost:8004") as client:
        result = await client.run_sync(
            agent='blogpost_generator_agent',
            input=mock_research
        )
        print("BlogPost Generator Result:")
        print(result.output[0].parts[0].content)
        print("\n" + "="*80 + "\n")async def demo_workflow():
    """Demonstrate multiple topics using sequential workflow"""
    print("🚀 Starting Multi-Topic Demo")
    print("="*80)        # Ask user for topics
    print("Enter topics for blog generation (press Enter after each, empty line to finish):")
    topics = []
    while True:
        topic = input(f"Topic {len(topics) + 1} (or press Enter to finish): ").strip()
        if not topic:
            break
        topics.append(topic)        if not topics:
        # Default topics if none provided
        topics = [
            "Latest developments in quantum computing",
            "Impact of climate change on global food security",
            "Emerging cybersecurity threats in 2024"
        ]
        print("No topics entered. Using default topics:")        print(f"📋 Topics to process: {len(topics)}")
    for i, topic in enumerate(topics, 1):
        print(f"{i}. {topic}")
    print("\n" + "="*80 + "\n")        for i, topic in enumerate(topics, 1):
        print(f"🔄 Processing Topic {i}/{len(topics)}: {topic}")
        print("-" * 60)                try:
            # Step 1: Research with DeepSearch
            async with Client(base_url="http://localhost:8003") as deepsearch_client:
                research_result = await deepsearch_client.run_sync(
                    agent='deepsearch_agent_handler',
                    input=topic
                )
                research_content = research_result.output[0].parts[0].content
                print(f"✅ Research completed ({len(research_content)} chars)")                        # Step 2: Generate blog with research data
            async with Client(base_url="http://localhost:8004") as blogpost_client:
                blog_result = await blogpost_client.run_sync(
                    agent='blogpost_generator_agent',
                    input=research_content
                )
                print("✅ Blog generated:")
                print(blog_result.output[0].parts[0].content)                        except Exception as e:
            print(f"❌ Error processing topic '{topic}': {str(e)}")                print("\n" + "="*80 + "\n")                # Small delay between requests
        await asyncio.sleep(2)        print("✅ Demo completed!")async def main():
    """Main function to run different demo modes"""
    import sys        if len(sys.argv) > 1:
        mode = sys.argv[1].lower()                if mode == "deepsearch":
            await test_deepsearch_agent()
        elif mode == "blogpost":
            await test_blogpost_agent()
        elif mode == "demo":
            await demo_workflow()
        elif mode == "sequential" or mode == "main":
            await sequential_workflow()
        else:
            print("Usage: python agentic_client.py [sequential|deepsearch|blogpost|demo]")
            print("  sequential - Main workflow: Topic -> DeepSearch -> BlogPost (RECOMMENDED)")
            print("  deepsearch - Test DeepSearch agent only")
            print("  blogpost   - Test BlogPost generator only")
            print("  demo       - Run multiple topics with sequential workflow")
    else:
        # Default: run the main sequential workflow
        print("🧪 Running main sequential workflow...")
        await sequential_workflow()if __name__ == "__main__":
    asyncio.run(main())
(ACP) C:\Users\PLNAYAK\Documents\ACP>python agentic_client.py
🧪 Running main sequential workflow...
🚀 Starting Sequential Agentic Workflow
================================================================================
📝 Enter a topic for research and blog generation: ACP and A2A Merger🔍 Step 1: Researching topic: 'ACP and A2A Merger'
------------------------------------------------------------Research completed!
📊 Research length: 3214 characters
📋 Preview: **Executive Summary**The ACP and A2A merger is a significant event in the business world, with far-reaching implications for the companies involved, their stakeholders, and the industry as a whole. ...📝 Step 2: Generating blog post from research data
------------------------------------------------------------Blog post generated!Blog post successfully generated and saved!**Topic:** **Executive Summary**
**Title:** "ACP & A2A Merger"
**File:** blog_20250907_211334_acp__a2a_merger.md
**Content Length:** 4896 characters**Preview:**
## Introduction
The business world is abuzz with the news of the ACP and A2A merger, a deal that is expected to create a leading player in the industry. This significant event has far-reaching implications for the companies involved, their stakeholders, and the industry as a whole. In this report, w...The complete blog post has been saved as a markdown file.================================================================================
🎉 Sequential workflow completed!

启动Deepsearch服务器:

启动博客文章服务器

调用ACP客户端

博文生成并存储在.md文件中

---
title: ""Agent 2 Agent Protocol""
date: 2025-09-07
topic: "**Executive Summary:**"
generated_by: BlogPost Generator Agent
---# "Agent 2 Agent Protocol"## Introduction to Agent 2 Agent Communication Protocol
The world of artificial intelligence (AI) and multi-agent systems is rapidly evolving, with autonomous software agents playing an increasingly crucial role in various technological fields. At the heart of these systems lies the Agent 2 Agent Communication Protocol, a set of rules and standards that govern how agents interact with each other. In this blog post, we will delve into the world of Agent 2 Agent Communication Protocols, exploring their definition, purpose, types of communication, and recent developments. We will also examine the key insights and findings from research in this area, highlighting the importance of these protocols in the development of autonomous systems.## Definition and Purpose of Agent 2 Agent Communication Protocols
Agent 2 Agent Communication Protocols refer to the standards and methods by which autonomous software agents interact with each other. The primary purpose of these protocols is to facilitate interaction, cooperation, or competition among agents in a multi-agent system. This enables agents to achieve their goals, whether it be solving complex problems, completing tasks, or making decisions.### Types of Communication
Agents can communicate through various methods, including:
* Direct communication: This involves agents communicating directly with each other, often via a shared memory space.
* Indirect communication: This involves agents communicating through the environment, such as by leaving messages or modifying the environment in some way.Some of the key benefits of Agent 2 Agent Communication Protocols include:
* Improved cooperation and coordination among agents
* Enhanced decision-making and problem-solving capabilities
* Increased efficiency and productivity in multi-agent systems## Communication Protocols
Several protocols have been developed for agent communication, including:
1. **Foundation for Intelligent Physical Agents (FIPA) protocol**: This provides a standardized framework for agent communication, enabling agents to interact and cooperate with each other.
2. **Knowledge Query and Manipulation Language (KQML)**: This is a language and protocol for agent communication, allowing agents to query and manipulate knowledge.
3. **Agent Communication Language (ACL)**: This is a language used for agent communication, enabling agents to express and understand each other's messages.These protocols provide a foundation for agent communication, enabling agents to interact and cooperate with each other in a variety of contexts.## Recent Developments and Trends
Recent advancements in AI have led to more sophisticated agent systems, requiring more complex and efficient communication protocols. Some of the key trends and developments in this area include:
* **Artificial Intelligence (AI) Advancements**: Recent advancements in AI have led to more sophisticated agent systems, requiring more complex and efficient communication protocols.
* **Internet of Things (IoT)**: The growth of IoT has increased the need for agent communication protocols that can handle a vast number of devices and varied data types.
* **Blockchain**: The use of blockchain technology is becoming increasingly popular in multi-agent systems, providing a secure and transparent way for agents to communicate and interact.Some of the key statistics and data points in this area include:
* The global AI market is expected to grow significantly, with a substantial portion dedicated to developing more advanced multi-agent systems.
* The number of IoT devices is projected to increase dramatically, further emphasizing the need for efficient agent communication protocols.## Key Insights and Findings
Some of the key insights and findings from research in this area include:
* The importance of standardized communication protocols in enabling agent interaction and cooperation.
* The need for efficient and scalable communication protocols in large-scale multi-agent systems.
* The potential of Agent 2 Agent Communication Protocols in emerging technologies such as IoT, blockchain, and edge computing.## Applications and Use Cases
Agent 2 Agent Communication Protocols have a wide range of applications and use cases, including:
* **Smart Homes and Cities**: Agent communication protocols can be used to enable smart homes and cities, where agents can interact and cooperate to manage energy, traffic, and other resources.
* **Industrial Automation**: Agent communication protocols can be used in industrial automation, where agents can interact and cooperate to manage production, logistics, and supply chain management.
* **Healthcare**: Agent communication protocols can be used in healthcare, where agents can interact and cooperate to manage patient care, medical records, and clinical decision-making.## Recommendations for Further Reading
For a deeper understanding of Agent 2 Agent Communication Protocols, readers are recommended to explore academic papers and books on multi-agent systems, artificial intelligence, and communications protocols. Key areas of focus should include:
* The FIPA protocol and its applications
* Agent coordination mechanisms and their use in multi-agent systems
* The application of agent communication in emerging technologies such as IoT, blockchain, and edge computing## Key Takeaways
* Agent 2 Agent Communication Protocols are a crucial component of multi-agent systems, enabling agents to interact and cooperate with each other.
* Standardized communication protocols, such as FIPA, are essential for enabling agent interaction and cooperation.
* Recent advancements in AI and IoT have increased the need for efficient and scalable communication protocols in multi-agent systems.
* Agent 2 Agent Communication Protocols have a wide range of applications and use cases, including smart homes and cities, industrial automation, and healthcare.## Conclusion
In conclusion, Agent 2 Agent Communication Protocols play a vital role in the development of autonomous systems, enabling agents to interact and cooperate with each other. As the field of AI and multi-agent systems continues to evolve, the importance of these protocols will only continue to grow. By understanding the definition, purpose, and types of communication, as well as recent developments and trends, we can unlock the full potential of Agent 2 Agent Communication Protocols and create more sophisticated and efficient multi-agent systems. Whether you are a researcher, developer, or simply interested in the field of AI, this blog post has provided a comprehensive overview of Agent 2 Agent Communication Protocols, highlighting their importance and potential impact on various technological fields.---
*This blog post was automatically generated based on research conducted by the DeepSearch Agent.*

实现A2A协议实施——高级智能体系统(FastAPI)

尝试使用了谷歌的智能体到智能体(A2A)协议,这是人工智能智能体通信的最新标准,由ACP演变而来并与之融合。

🎯 什么是A2A协议?

智能体到智能体(A2A)协议是谷歌于2025年4月推出的开放标准,由包括Atlassian、PayPal、Salesforce等在内的50多家技术合作伙伴共同开发。A2A使人工智能智能体能够:

• 通过标准化的智能体卡片发现彼此的能力

通过HTTP/JSON-RPC和企业级认证实现安全通信

通过结构化工作流协调复杂任务

使用服务器发送事件(SSE)流式传输实时更新

处理多模态数据,包括文本、图像和结构化数据

架构概述

A2A 功能

1. 智能体卡片(发现)

每个智能体在/.well-known/agent.json路径下发布一个JSON格式的智能体卡片:

{
  "name": "DeepSearch Research Agent",
  "description": "Advanced research agent with web scraping capabilities",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "skills": [
    {
      "id": "web-research",
      "name": "Web Research & Analysis",
      "inputModes": ["text", "data"],
      "outputModes": ["text", "data"]
    }
  ]
}

2. 任务生命周期管理

任务会按定义的状态推进:

  • 已提交 → 处理中 → 已完成
  • 需要输入(用于澄清)
  • 已取消/失败(错误处理)

3. 多模态通信

支持丰富的数据类型:

  • 文本部分:纯文本内容
  • 数据部分:结构化JSON数据
  • 文件部分:带有元数据的二进制文件
async for event in client.send_task_subscribe(task_id, message):
    if event["method"] == "taskStatusUpdate":
        print(f"Status: {event['params']['status']}")

4. 实时流传输

用于实时更新的服务器发送事件(SSE):

启动A2A智能体

终端 1 — 深度搜索智能体:

python deepsearch_server_a2a.py

2号终端——博客文章生成智能体:

python blogpost_server_a2a.py

运行A2A工作流

顺序工作流(推荐):

python agentic_client_a2a.py sequential

A2A工作流示例

顺序工作流

async def sequential_workflow_a2a():
    # Step 1: Discover DeepSearch Agent
    async with A2AClient("http://localhost:8003") as deepsearch_client:
        agent_card = await deepsearch_client.discover_agent()                # Step 2: Send research task
        research_result = await deepsearch_client.send_task(
            task_id="research_123",
            message=Message(role="user", parts=[TextPart(text="AI trends")])
        )                # Step 3: Pass results to BlogPost Generator
        async with A2AClient("http://localhost:8004") as blogpost_client:
            blog_result = await blogpost_client.send_task(
                task_id="blog_456",
                message=Message(role="user", parts=[TextPart(text=research_content)])

流式工作流

async def streaming_workflow():
    async with A2AClient("http://localhost:8003") as client:
        async for event in client.send_task_subscribe(task_id, message):
            if event["method"] == "taskArtifactUpdate":
                artifact = event["params"]["artifact"]
                print(f"Progress: {artifact['description']}")

A2A协议详情

HTTP端点

每个A2A智能体实现:

  • GET /.well-known/agent.json — 智能体卡片发现
  • POST /a2a/tasks/send — 同步任务执行
  • POST /a2a/tasks/sendSubscribe — 流式任务执行(SSE)
  • POST /a2a/tasks/get — 获取任务状态
  • POST /a2a/tasks/cancel — 取消运行中的任务

JSON-RPC 2.0格式

所有请求均使用JSON-RPC 2.0:

{
 "jsonrpc": "2.0",
 "id": "unique-request-id",
 "method": "tasks/send",
 "params": {
 "id": "task-123",
 "message": {
 "role": "user",
 "parts": [{"type": "text", "text": "Research AI trends"}]
 }
 }
 }

错误处理

标准JSON-RPC错误响应:

{
 "jsonrpc": "2.0",
 "id": "request-id",
 "error": {
 "code": -32603,
 "message": "Internal error: API key missing"
 }
 }

代码实现

deepserach_server_a2a.py

#!/usr/bin/env python3
"""
DeepSearch Agent Server - A2A Protocol Implementation
Based on Google's Agent-to-Agent (A2A) Protocol specification
"""import os
import json
import uuid
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from enum import Enum
from dataclasses import dataclass, asdict
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from crewai import Agent, LLM, Task, Crew
from crewai_tools import LinkupSearchTool
import requests
from dotenv import load_dotenv
import uvicorn# Load environment variables
load_dotenv()# A2A Protocol Data Models
class TaskStatus(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input-required"
    COMPLETED = "completed"
    CANCELED = "canceled"
    FAILED = "failed"
    UNKNOWN = "unknown"class MessageRole(str, Enum):
    USER = "user"
    AGENT = "agent"@dataclass
class TextPart:
    type: str = "text"
    text: str = ""
    metadata: Optional[Dict[str, Any]] = None@dataclass
class DataPart:
    type: str = "data"
    data: Dict[str, Any] = None
    metadata: Optional[Dict[str, Any]] = None@dataclass
class Message:
    role: MessageRole
    parts: List[Dict[str, Any]]
    metadata: Optional[Dict[str, Any]] = None@dataclass
class Artifact:
    name: Optional[str] = None
    description: Optional[str] = None
    parts: List[Dict[str, Any]] = None
    index: Optional[int] = None
    append: bool = False
    lastChunk: bool = True
    metadata: Optional[Dict[str, Any]] = None@dataclass
class A2ATask:
    id: str
    status: TaskStatus
    sessionId: Optional[str] = None
    artifacts: List[Artifact] = None
    metadata: Optional[Dict[str, Any]] = None
    created_at: Optional[str] = None
    updated_at: Optional[str] = None        def __post_init__(self):
        if self.artifacts is None:
            self.artifacts = []
        if self.created_at is None:
            self.created_at = datetime.now(timezone.utc).isoformat()
        if self.updated_at is None:
            self.updated_at = self.created_at# A2A Agent Card
AGENT_CARD = {
    "name": "DeepSearch Research Agent",
    "description": "Advanced research agent specializing in web scraping and comprehensive analysis using AI",
    "version": "2.0.0",
    "provider": {
        "name": "ACP Research Labs",
        "organization": "Advanced AI Systems",
        "url": "https://acp-research.example.com"
    },
    "url": "http://localhost:8003",
    "documentationUrl": "https://docs.acp-research.example.com/deepsearch",
    "capabilities": {
        "streaming": True,
        "pushNotifications": True,
        "stateTransitionHistory": True
    },
    "authentication": {
        "schemes": ["none"]  # For demo purposes
    },
    "defaultInputModes": ["text", "data"],
    "defaultOutputModes": ["text", "data"],
    "skills": [
        {
            "id": "web-research",
            "name": "Web Research & Analysis",
            "description": "Comprehensive web research using LINKUP and AI analysis",
            "tags": ["research", "web-scraping", "analysis"],
            "inputModes": ["text", "data"],
            "outputModes": ["text", "data"],
            "examples": [
                {
                    "input": "Research latest developments in quantum computing",
                    "output": "Comprehensive research report with sources and analysis"
                }
            ]
        },
        {
            "id": "data-synthesis",
            "name": "Data Synthesis",
            "description": "Synthesize and analyze information from multiple sources",
            "tags": ["analysis", "synthesis", "insights"],
            "inputModes": ["text", "data"],
            "outputModes": ["text", "data"]
        }
    ]
}# Global task storage (in production, use a proper database)
tasks_storage: Dict[str, A2ATask] = {}# Initialize FastAPI app
app = FastAPI(
    title="DeepSearch Agent - A2A Protocol",
    description="Advanced research agent implementing Google's A2A protocol",
    version="2.0.0"
)# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)# Initialize Groq LLM and LINKUP tool
groq_llm = LLM(
    model="groq/llama-3.3-70b-versatile",
    api_key=os.getenv("GROQ_API_KEY")
)# Initialize LINKUP tool using the proper CrewAI tool
linkup_tool = LinkupSearchTool(api_key=os.getenv("LINKUP_API_KEY"))# Create DeepSearch agent
deepsearch_agent = Agent(
    role='Deep Research Specialist',
    goal='Conduct comprehensive research on given topics using web scraping and AI analysis',
    backstory="""You are an expert researcher with advanced capabilities in web scraping,
    data analysis, and information synthesis. You use cutting-edge tools to gather
    information from multiple sources and provide comprehensive, well-structured research reports.""",
    llm=groq_llm,
    tools=[linkup_tool],
    verbose=True,
    allow_delegation=False
)# A2A Protocol Routes@app.get("/.well-known/agent.json")
async def get_agent_card():
    """A2A Protocol: Agent Card Discovery Endpoint"""
    return JSONResponse(content=AGENT_CARD)@app.post("/a2a/tasks/send")
async def tasks_send(request: Request):
    """A2A Protocol: Send task (synchronous)"""
    try:
        body = await request.json()                # Extract JSON-RPC parameters
        task_id = body.get("params", {}).get("id")
        message = body.get("params", {}).get("message", {})
        session_id = body.get("params", {}).get("sessionId")
        metadata = body.get("params", {}).get("metadata", {})                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                # Create or update task
        if task_id in tasks_storage:
            task = tasks_storage[task_id]
            task.status = TaskStatus.WORKING
            task.updated_at = datetime.now(timezone.utc).isoformat()
        else:
            task = A2ATask(
                id=task_id,
                status=TaskStatus.SUBMITTED,
                sessionId=session_id,
                metadata=metadata
            )
            tasks_storage[task_id] = task                # Process the research task
        result = await process_research_task(task, message)                return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": asdict(result)
        })            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )@app.post("/a2a/tasks/sendSubscribe")
async def tasks_send_subscribe(request: Request):
    """A2A Protocol: Send task with streaming updates (SSE)"""
    try:
        body = await request.json()                task_id = body.get("params", {}).get("id")
        message = body.get("params", {}).get("message", {})
        session_id = body.get("params", {}).get("sessionId")
        metadata = body.get("params", {}).get("metadata", {})                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                # Create task
        task = A2ATask(
            id=task_id,
            status=TaskStatus.SUBMITTED,
            sessionId=session_id,
            metadata=metadata
        )
        tasks_storage[task_id] = task                async def event_generator():
            try:
                # Send initial status
                yield f"data: {json.dumps({'jsonrpc': '2.0', 'method': 'taskStatusUpdate', 'params': {'taskId': task_id, 'status': 'submitted'}})}\n\n"                                # Process task with streaming updates
                async for update in process_research_task_streaming(task, message):
                    yield f"data: {json.dumps(update)}\n\n"                                except Exception as e:
                error_event = {
                    "jsonrpc": "2.0",
                    "method": "error",
                    "params": {
                        "taskId": task_id,
                        "error": {
                            "code": -32603,
                            "message": f"Processing error: {str(e)}"
                        }
                    }
                }
                yield f"data: {json.dumps(error_event)}\n\n"                return StreamingResponse(
            event_generator(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "Access-Control-Allow-Origin": "*",
            }
        )            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )@app.post("/a2a/tasks/get")
async def tasks_get(request: Request):
    """A2A Protocol: Get task status"""
    try:
        body = await request.json()
        task_id = body.get("params", {}).get("id")                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                if task_id not in tasks_storage:
            return JSONResponse(
                status_code=404,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32001,
                        "message": "Task not found"
                    }
                }
            )                task = tasks_storage[task_id]
        return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": asdict(task)
        })            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )@app.post("/a2a/tasks/cancel")
async def tasks_cancel(request: Request):
    """A2A Protocol: Cancel task"""
    try:
        body = await request.json()
        task_id = body.get("params", {}).get("id")                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                if task_id not in tasks_storage:
            return JSONResponse(
                status_code=404,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32001,
                        "message": "Task not found"
                    }
                }
            )                task = tasks_storage[task_id]
        task.status = TaskStatus.CANCELED
        task.updated_at = datetime.now(timezone.utc).isoformat()                return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": asdict(task)
        })            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )# Research Processing Functionsasync def process_research_task(task: A2ATask, message: Dict[str, Any]) -> A2ATask:
    """Process research task synchronously"""
    try:
        task.status = TaskStatus.WORKING
        task.updated_at = datetime.now(timezone.utc).isoformat()                # Extract research topic from message
        topic = extract_topic_from_message(message)                # Create research task for CrewAI
        research_task = Task(
            description=f"""
            Research the topic: "{topic}"                        Your task is to:
            1. Identify relevant websites and sources for this topic
            2. Use the LINKUP tool to search and scrape content from relevant sources
            3. Analyze and synthesize the information
            4. Provide a comprehensive research summary with key insights
            5. Include relevant facts, statistics, and recent developments
            6. Cite your sources                        Focus on providing accurate, up-to-date, and comprehensive information.
            """,
            expected_output="""A detailed research report containing:
            - Executive summary of findings
            - Key insights and important facts
            - Recent developments and trends
            - Relevant statistics and data points
            - Source citations
            - Recommendations for further reading""",
            agent=deepsearch_agent
        )                # Create and execute crew
        research_crew = Crew(
            agents=[deepsearch_agent],
            tasks=[research_task],
            verbose=True
        )                result = research_crew.kickoff()
        research_content = result.raw if hasattr(result, 'raw') else str(result)                # Create artifact with research results
        artifact = Artifact(
            name="research_report",
            description=f"Comprehensive research report on: {topic}",
            parts=[
                asdict(TextPart(text=research_content)),
                asdict(DataPart(data={
                    "topic": topic,
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "agent": "DeepSearch Research Agent",
                    "sources_scraped": True,
                    "analysis_completed": True
                }))
            ]
        )                task.artifacts = [artifact]
        task.status = TaskStatus.COMPLETED
        task.updated_at = datetime.now(timezone.utc).isoformat()                return task            except Exception as e:
        task.status = TaskStatus.FAILED
        task.updated_at = datetime.now(timezone.utc).isoformat()                # Create error artifact
        error_artifact = Artifact(
            name="error_report",
            description="Research task failed",
            parts=[
                asdict(TextPart(text=f"Research failed: {str(e)}. Please check your API keys and try again.")),
                asdict(DataPart(data={
                    "error": str(e),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "status": "failed"
                }))
            ]
        )                task.artifacts = [error_artifact]
        return taskasync def process_research_task_streaming(task: A2ATask, message: Dict[str, Any]):
    """Process research task with streaming updates"""
    try:
        # Update to working status
        task.status = TaskStatus.WORKING
        task.updated_at = datetime.now(timezone.utc).isoformat()                yield {
            "jsonrpc": "2.0",
            "method": "taskStatusUpdate",
            "params": {
                "taskId": task.id,
                "status": "working"
            }
        }                topic = extract_topic_from_message(message)                # Simulate streaming updates
        yield {
            "jsonrpc": "2.0",
            "method": "taskArtifactUpdate",
            "params": {
                "taskId": task.id,
                "artifact": asdict(Artifact(
                    name="progress_update",
                    description="Research progress",
                    parts=[asdict(TextPart(text=f"🔍 Starting research on: {topic}"))]
                ))
            }
        }                await asyncio.sleep(1)                yield {
            "jsonrpc": "2.0",
            "method": "taskArtifactUpdate",
            "params": {
                "taskId": task.id,
                "artifact": asdict(Artifact(
                    name="progress_update",
                    description="Research progress",
                    parts=[asdict(TextPart(text="🌐 Scraping web sources..."))]
                ))
            }
        }                # Process the actual research
        processed_task = await process_research_task(task, message)                # Send final result
        yield {
            "jsonrpc": "2.0",
            "method": "taskStatusUpdate",
            "params": {
                "taskId": task.id,
                "status": processed_task.status.value
            }
        }                if processed_task.artifacts:
            yield {
                "jsonrpc": "2.0",
                "method": "taskArtifactUpdate",
                "params": {
                    "taskId": task.id,
                    "artifact": asdict(processed_task.artifacts[0])
                }
            }                except Exception as e:
        task.status = TaskStatus.FAILED
        yield {
            "jsonrpc": "2.0",
            "method": "taskStatusUpdate",
            "params": {
                "taskId": task.id,
                "status": "failed"
            }
        }                yield {
            "jsonrpc": "2.0",
            "method": "error",
            "params": {
                "taskId": task.id,
                "error": {
                    "code": -32603,
                    "message": f"Research failed: {str(e)}"
                }
            }
        }def extract_topic_from_message(message: Dict[str, Any]) -> str:
    """Extract research topic from A2A message"""
    parts = message.get("parts", [])        for part in parts:
        if part.get("type") == "text":
            return part.get("text", "Unknown topic")
        elif part.get("type") == "data":
            data = part.get("data", {})
            if "topic" in data:
                return data["topic"]
            elif "query" in data:
                return data["query"]        return "Unknown research topic"# Health check endpoint
@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "protocol": "A2A", "version": "2.0.0"}if __name__ == "__main__":
    print("🚀 Starting DeepSearch Agent - A2A Protocol Server")
    print("📋 Agent Card available at: http://localhost:8003/.well-known/agent.json")
    print("🔍 Research capabilities: Web scraping, AI analysis, data synthesis")        uvicorn.run(
        "deepsearch_server_a2a:app",
        host="0.0.0.0",
        port=8003,
        reload=True,
        log_level="info"
    )

blogpost_server_a2a.py

#!/usr/bin/env python3
"""
BlogPost Generator Agent Server - A2A Protocol Implementation
Based on Google's Agent-to-Agent (A2A) Protocol specification
"""import os
import json
import uuid
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from enum import Enum
from dataclasses import dataclass, asdict
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from langchain_groq import ChatGroq
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
from dotenv import load_dotenv
import uvicorn
import httpx
from langchain_mistralai import ChatMistralAI# Load environment variables
load_dotenv()# A2A Protocol Data Models
class TaskStatus(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input-required"
    COMPLETED = "completed"
    CANCELED = "canceled"
    FAILED = "failed"
    UNKNOWN = "unknown"class MessageRole(str, Enum):
    USER = "user"
    AGENT = "agent"@dataclass
class TextPart:
    type: str = "text"
    text: str = ""
    metadata: Optional[Dict[str, Any]] = None@dataclass
class DataPart:
    type: str = "data"
    data: Dict[str, Any] = None
    metadata: Optional[Dict[str, Any]] = None@dataclass
class FilePart:
    type: str = "file"
    file: Dict[str, Any] = None
    metadata: Optional[Dict[str, Any]] = None@dataclass
class Message:
    role: MessageRole
    parts: List[Dict[str, Any]]
    metadata: Optional[Dict[str, Any]] = None@dataclass
class Artifact:
    name: Optional[str] = None
    description: Optional[str] = None
    parts: List[Dict[str, Any]] = None
    index: Optional[int] = None
    append: bool = False
    lastChunk: bool = True
    metadata: Optional[Dict[str, Any]] = None@dataclass
class A2ATask:
    id: str
    status: TaskStatus
    sessionId: Optional[str] = None
    artifacts: List[Artifact] = None
    metadata: Optional[Dict[str, Any]] = None
    created_at: Optional[str] = None
    updated_at: Optional[str] = None        def __post_init__(self):
        if self.artifacts is None:
            self.artifacts = []
        if self.created_at is None:
            self.created_at = datetime.now(timezone.utc).isoformat()
        if self.updated_at is None:
            self.updated_at = self.created_at# A2A Agent Card
AGENT_CARD = {
    "name": "BlogPost Generator Agent",
    "description": "Advanced content creation agent specializing in generating high-quality blog posts from research data",
    "version": "2.0.0",
    "provider": {
        "name": "ACP Content Labs",
        "organization": "Advanced AI Systems",
        "url": "https://acp-content.example.com"
    },
    "url": "http://localhost:8004",
    "documentationUrl": "https://docs.acp-content.example.com/blogpost",
    "capabilities": {
        "streaming": True,
        "pushNotifications": True,
        "stateTransitionHistory": True
    },
    "authentication": {
        "schemes": ["none"]  # For demo purposes
    },
    "defaultInputModes": ["text", "data"],
    "defaultOutputModes": ["text", "data", "file"],
    "skills": [
        {
            "id": "blog-generation",
            "name": "Blog Post Generation",
            "description": "Generate SEO-optimized blog posts from research content using LangGraph workflow",
            "tags": ["content-creation", "blog", "seo", "writing"],
            "inputModes": ["text", "data"],
            "outputModes": ["text", "data", "file"],
            "examples": [
                {
                    "input": "Research data about AI trends",
                    "output": "Complete blog post with title, content, and markdown file"
                }
            ]
        },
        {
            "id": "content-optimization",
            "name": "Content Optimization",
            "description": "Optimize content for SEO and readability",
            "tags": ["seo", "optimization", "readability"],
            "inputModes": ["text", "data"],
            "outputModes": ["text", "data"]
        },
        {
            "id": "markdown-generation",
            "name": "Markdown File Generation",
            "description": "Generate and save blog posts as markdown files",
            "tags": ["markdown", "file-generation", "export"],
            "inputModes": ["text", "data"],
            "outputModes": ["file", "data"]
        }
    ]
}# Global task storage
tasks_storage: Dict[str, A2ATask] = {}# Initialize FastAPI app
app = FastAPI(
    title="BlogPost Generator Agent - A2A Protocol",
    description="Advanced content creation agent implementing Google's A2A protocol",
    version="2.0.0"
)# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)# Initialize Groq LLM
blog_llm = ChatGroq(
    model="llama-3.3-70b-versatile",
    api_key=os.getenv("GROQ_API_KEY"),
    temperature=0.7
)# blog_llm = ChatMistralAI(
#     model="mistral-large-latest",
#     api_key=os.getenv("MISTRAL_API_KEY"),
#     temperature=0.7
# )
# LangGraph State Definition
class BlogState(TypedDict):
    topic: str
    research_content: str
    blog_title: str
    blog_content: str
    filename: str
    task_id: str# LangGraph Workflow Nodes
def generate_title_node(state: BlogState) -> BlogState:
    """Generate an engaging blog title"""
    prompt = f"""
    Based on the following research content about "{state['topic']}",
    create an engaging, SEO-friendly blog post title.        Research content: {state['research_content'][:500]}...        Requirements:
    - Make it catchy and engaging
    - Keep it under 60 characters for SEO
    - Make it informative and clear        Return only the title, nothing else.
    """        response = blog_llm.invoke([{"role": "user", "content": prompt}])
    state["blog_title"] = response.content.strip()
    return statedef generate_blog_content_node(state: BlogState) -> BlogState:
    """Generate the full blog post content"""
    prompt = f"""
    Create a comprehensive, well-structured blog post based on the following research:        Topic: {state['topic']}
    Title: {state['blog_title']}
    Research Content: {state['research_content']}        Requirements:
    - Write in an engaging, professional tone
    - Use markdown formatting
    - Include proper headings (##, ###)
    - Add bullet points and numbered lists where appropriate
    - Include a compelling introduction and conclusion
    - Make it SEO-friendly with natural keyword usage
    - Aim for 800-1500 words
    - Include relevant insights from the research
    - Add a "Key Takeaways" section at the end        Structure:
    1. Introduction
    2. Main content sections with subheadings
    3. Key insights and findings
    4. Key Takeaways
    5. Conclusion        Return the complete blog post in markdown format.
    """        response = blog_llm.invoke([{"role": "user", "content": prompt}])
    state["blog_content"] = response.content.strip()
    return statedef save_blog_node(state: BlogState) -> BlogState:
    """Save the blog post to a markdown file"""
    try:
        # Create filename from title
        safe_title = "".join(c for c in state["blog_title"] if c.isalnum() or c in (' ', '-', '_')).rstrip()
        safe_title = safe_title.replace(' ', '_').lower()
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"blog_{timestamp}_{safe_title[:50]}.md"                # Create the complete blog post with metadata
        blog_post = f"""---
title: "{state['blog_title']}"
date: {datetime.now().strftime("%Y-%m-%d")}
topic: "{state['topic']}"
generated_by: BlogPost Generator Agent A2A
task_id: "{state['task_id']}"
protocol: A2A
---# {state['blog_title']}{state['blog_content']}---
*This blog post was automatically generated using the A2A protocol by the BlogPost Generator Agent based on research data.*
"""                with open(filename, 'w', encoding='utf-8') as f:
            f.write(blog_post)
        state["filename"] = filename            except Exception as e:
        state["filename"] = f"Error saving file: {str(e)}"        return state# Create the blog generation workflow
def create_blog_workflow():
    workflow = StateGraph(BlogState)        # Add nodes
    workflow.add_node("generate_title", generate_title_node)
    workflow.add_node("generate_content", generate_blog_content_node)
    workflow.add_node("save_blog", save_blog_node)        # Add edges
    workflow.set_entry_point("generate_title")
    workflow.add_edge("generate_title", "generate_content")
    workflow.add_edge("generate_content", "save_blog")
    workflow.add_edge("save_blog", END)        return workflow.compile()blog_workflow = create_blog_workflow()# A2A Protocol Routes@app.get("/.well-known/agent.json")
async def get_agent_card():
    """A2A Protocol: Agent Card Discovery Endpoint"""
    return JSONResponse(content=AGENT_CARD)@app.post("/a2a/tasks/send")
async def tasks_send(request: Request):
    """A2A Protocol: Send task (synchronous)"""
    try:
        body = await request.json()                task_id = body.get("params", {}).get("id")
        message = body.get("params", {}).get("message", {})
        session_id = body.get("params", {}).get("sessionId")
        metadata = body.get("params", {}).get("metadata", {})                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                # Create or update task
        if task_id in tasks_storage:
            task = tasks_storage[task_id]
            task.status = TaskStatus.WORKING
            task.updated_at = datetime.now(timezone.utc).isoformat()
        else:
            task = A2ATask(
                id=task_id,
                status=TaskStatus.SUBMITTED,
                sessionId=session_id,
                metadata=metadata
            )
            tasks_storage[task_id] = task                # Process the blog generation task
        result = await process_blog_task(task, message)                return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": asdict(result)
        })            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )@app.post("/a2a/tasks/sendSubscribe")
async def tasks_send_subscribe(request: Request):
    """A2A Protocol: Send task with streaming updates (SSE)"""
    try:
        body = await request.json()                task_id = body.get("params", {}).get("id")
        message = body.get("params", {}).get("message", {})
        session_id = body.get("params", {}).get("sessionId")
        metadata = body.get("params", {}).get("metadata", {})                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                # Create task
        task = A2ATask(
            id=task_id,
            status=TaskStatus.SUBMITTED,
            sessionId=session_id,
            metadata=metadata
        )
        tasks_storage[task_id] = task                async def event_generator():
            try:
                # Send initial status
                yield f"data: {json.dumps({'jsonrpc': '2.0', 'method': 'taskStatusUpdate', 'params': {'taskId': task_id, 'status': 'submitted'}})}\n\n"                                # Process task with streaming updates
                async for update in process_blog_task_streaming(task, message):
                    yield f"data: {json.dumps(update)}\n\n"                                except Exception as e:
                error_event = {
                    "jsonrpc": "2.0",
                    "method": "error",
                    "params": {
                        "taskId": task_id,
                        "error": {
                            "code": -32603,
                            "message": f"Processing error: {str(e)}"
                        }
                    }
                }
                yield f"data: {json.dumps(error_event)}\n\n"                return StreamingResponse(
            event_generator(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "Access-Control-Allow-Origin": "*",
            }
        )            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )@app.post("/a2a/tasks/get")
async def tasks_get(request: Request):
    """A2A Protocol: Get task status"""
    try:
        body = await request.json()
        task_id = body.get("params", {}).get("id")                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                if task_id not in tasks_storage:
            return JSONResponse(
                status_code=404,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32001,
                        "message": "Task not found"
                    }
                }
            )                task = tasks_storage[task_id]
        return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": asdict(task)
        })            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )@app.post("/a2a/tasks/cancel")
async def tasks_cancel(request: Request):
    """A2A Protocol: Cancel task"""
    try:
        body = await request.json()
        task_id = body.get("params", {}).get("id")                if not task_id:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32602,
                        "message": "Invalid params: 'id' is required"
                    }
                }
            )                if task_id not in tasks_storage:
            return JSONResponse(
                status_code=404,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32001,
                        "message": "Task not found"
                    }
                }
            )                task = tasks_storage[task_id]
        task.status = TaskStatus.CANCELED
        task.updated_at = datetime.now(timezone.utc).isoformat()                return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": asdict(task)
        })            except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": body.get("id", None),
                "error": {
                    "code": -32603,
                    "message": f"Internal error: {str(e)}"
                }
            }
        )# Blog Processing Functionsasync def process_blog_task(task: A2ATask, message: Dict[str, Any]) -> A2ATask:
    """Process blog generation task synchronously"""
    try:
        task.status = TaskStatus.WORKING
        task.updated_at = datetime.now(timezone.utc).isoformat()                # Extract research content and topic from message
        research_content, topic = extract_content_from_message(message)                # Generate blog post using LangGraph workflow
        initial_state = BlogState(
            topic=topic,
            research_content=research_content,
            blog_title="",
            blog_content="",
            filename="",
            task_id=task.id
        )                final_state = blog_workflow.invoke(initial_state)                # Create artifacts
        artifacts = []                # Text content artifact
        text_artifact = Artifact(
            name="blog_post_content",
            description=f"Generated blog post: {final_state['blog_title']}",
            parts=[
                asdict(TextPart(text=final_state['blog_content'])),
                asdict(DataPart(data={
                    "title": final_state['blog_title'],
                    "topic": final_state['topic'],
                    "word_count": len(final_state['blog_content'].split()),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "agent": "BlogPost Generator Agent A2A"
                }))
            ]
        )
        artifacts.append(text_artifact)                # File artifact (if saved successfully)
        if not final_state['filename'].startswith("Error"):
            try:
                with open(final_state['filename'], 'r', encoding='utf-8') as f:
                    file_content = f.read()                                file_artifact = Artifact(
                    name="markdown_file",
                    description=f"Blog post markdown file: {final_state['filename']}",
                    parts=[
                        asdict(FilePart(file={
                            "name": final_state['filename'],
                            "mimeType": "text/markdown",
                            "bytes": file_content  # In production, use base64 encoding
                        })),
                        asdict(DataPart(data={
                            "filename": final_state['filename'],
                            "file_size": len(file_content),
                            "saved_at": datetime.now(timezone.utc).isoformat()
                        }))
                    ]
                )
                artifacts.append(file_artifact)
            except Exception as file_error:
                print(f"Warning: Could not read saved file: {file_error}")                task.artifacts = artifacts
        task.status = TaskStatus.COMPLETED
        task.updated_at = datetime.now(timezone.utc).isoformat()                return task            except Exception as e:
        task.status = TaskStatus.FAILED
        task.updated_at = datetime.now(timezone.utc).isoformat()                # Create error artifact
        error_artifact = Artifact(
            name="error_report",
            description="Blog generation task failed",
            parts=[
                asdict(TextPart(text=f"Blog generation failed: {str(e)}. Please check your API keys and input data.")),
                asdict(DataPart(data={
                    "error": str(e),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "status": "failed"
                }))
            ]
        )                task.artifacts = [error_artifact]
        return taskasync def process_blog_task_streaming(task: A2ATask, message: Dict[str, Any]):
    """Process blog generation task with streaming updates"""
    try:
        # Update to working status
        task.status = TaskStatus.WORKING
        task.updated_at = datetime.now(timezone.utc).isoformat()                yield {
            "jsonrpc": "2.0",
            "method": "taskStatusUpdate",
            "params": {
                "taskId": task.id,
                "status": "working"
            }
        }                research_content, topic = extract_content_from_message(message)                # Stream progress updates
        yield {
            "jsonrpc": "2.0",
            "method": "taskArtifactUpdate",
            "params": {
                "taskId": task.id,
                "artifact": asdict(Artifact(
                    name="progress_update",
                    description="Blog generation progress",
                    parts=[asdict(TextPart(text=f"📝 Starting blog generation for: {topic}"))]
                ))
            }
        }                await asyncio.sleep(1)                yield {
            "jsonrpc": "2.0",
            "method": "taskArtifactUpdate",
            "params": {
                "taskId": task.id,
                "artifact": asdict(Artifact(
                    name="progress_update",
                    description="Blog generation progress",
                    parts=[asdict(TextPart(text="🎯 Generating SEO-optimized title..."))]
                ))
            }
        }                await asyncio.sleep(1)                yield {
            "jsonrpc": "2.0",
            "method": "taskArtifactUpdate",
            "params": {
                "taskId": task.id,
                "artifact": asdict(Artifact(
                    name="progress_update",
                    description="Blog generation progress",
                    parts=[asdict(TextPart(text="✍️ Creating comprehensive blog content..."))]
                ))
            }
        }                await asyncio.sleep(1)                yield {
            "jsonrpc": "2.0",
            "method": "taskArtifactUpdate",
            "params": {
                "taskId": task.id,
                "artifact": asdict(Artifact(
                    name="progress_update",
                    description="Blog generation progress",
                    parts=[asdict(TextPart(text="💾 Saving markdown file..."))]
                ))
            }
        }                # Process the actual blog generation
        processed_task = await process_blog_task(task, message)                # Send final result
        yield {
            "jsonrpc": "2.0",
            "method": "taskStatusUpdate",
            "params": {
                "taskId": task.id,
                "status": processed_task.status.value
            }
        }                if processed_task.artifacts:
            for artifact in processed_task.artifacts:
                yield {
                    "jsonrpc": "2.0",
                    "method": "taskArtifactUpdate",
                    "params": {
                        "taskId": task.id,
                        "artifact": asdict(artifact)
                    }
                }
                await asyncio.sleep(0.5)                except Exception as e:
        task.status = TaskStatus.FAILED
        yield {
            "jsonrpc": "2.0",
            "method": "taskStatusUpdate",
            "params": {
                "taskId": task.id,
                "status": "failed"
            }
        }                yield {
            "jsonrpc": "2.0",
            "method": "error",
            "params": {
                "taskId": task.id,
                "error": {
                    "code": -32603,
                    "message": f"Blog generation failed: {str(e)}"
                }
            }
        }def extract_content_from_message(message: Dict[str, Any]) -> tuple[str, str]:
    """Extract research content and topic from A2A message"""
    parts = message.get("parts", [])
    research_content = ""
    topic = "Unknown Topic"        for part in parts:
        if part.get("type") == "text":
            content = part.get("text", "")
            if not research_content:  # First text part is main content
                research_content = content
            # Try to extract topic from content
            if "topic:" in content.lower():
                lines = content.split('\n')
                for line in lines:
                    if "topic:" in line.lower():
                        topic = line.split(':', 1)[-1].strip()
                        break
            elif len(content.strip()) > 10 and len(content.strip()) < 200:
                # If short content, might be the topic itself
                if not research_content or len(research_content) < 100:
                    topic = content.strip()[:100]                            elif part.get("type") == "data":
            data = part.get("data", {})
            if "research_content" in data:
                research_content = data["research_content"]
            if "topic" in data:
                topic = data["topic"]
            elif "query" in data:
                topic = data["query"]        # If we still don't have research content, use the first text part
    if not research_content:
        for part in parts:
            if part.get("type") == "text":
                research_content = part.get("text", "No research content provided")
                break        # Extract topic from research content if not found
    if topic == "Unknown Topic" and research_content:
        lines = research_content.split('\n')
        for line in lines[:5]:  # Check first 5 lines
            line = line.strip()
            if line and len(line) > 10 and len(line) < 150:
                topic = line
                break        return research_content, topic# Health check endpoint
@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "protocol": "A2A", "version": "2.0.0"}if __name__ == "__main__":
    print("🚀 Starting BlogPost Generator Agent - A2A Protocol Server")
    print("📋 Agent Card available at: http://localhost:8004/.well-known/agent.json")
    print("✍️ Content capabilities: Blog generation, SEO optimization, markdown files")        uvicorn.run(
        "blogpost_server_a2a:app",
        host="0.0.0.0",
        port=8004,
        reload=True,
        log_level="info"
    )

agentic_client_a2a.py

#!/usr/bin/env python3
"""
Agentic Client - A2A Protocol Implementation
Based on Google's Agent-to-Agent (A2A) Protocol specification
"""import asyncio
import json
import uuid
import httpx
import sys
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict# A2A Protocol Data Models
@dataclass
class TextPart:
    type: str = "text"
    text: str = ""
    metadata: Optional[Dict[str, Any]] = None@dataclass
class DataPart:
    type: str = "data"
    data: Dict[str, Any] = None
    metadata: Optional[Dict[str, Any]] = None@dataclass
class Message:
    role: str  # "user" or "agent"
    parts: List[Dict[str, Any]]
    metadata: Optional[Dict[str, Any]] = Noneclass A2AClient:
    """A2A Protocol Client for communicating with agents"""        def __init__(self, base_url: str):
        self.base_url = base_url.rstrip('/')
        self.client = httpx.AsyncClient(timeout=300.0)  # 5 minute timeout            async def __aenter__(self):
        return self            async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()        async def discover_agent(self) -> Dict[str, Any]:
        """Discover agent capabilities via Agent Card"""
        try:
            response = await self.client.get(f"{self.base_url}/.well-known/agent.json")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"❌ Failed to discover agent at {self.base_url}: {e}")
            return {}        async def send_task(self, task_id: str, message: Message, session_id: Optional[str] = None) -> Dict[str, Any]:
        """Send task using A2A protocol (synchronous)"""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/send",
            "params": {
                "id": task_id,
                "message": asdict(message),
                "sessionId": session_id,
                "metadata": {
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "client": "A2A Agentic Client"
                }
            }
        }                try:
            response = await self.client.post(f"{self.base_url}/a2a/tasks/send", json=payload)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            return {
                "jsonrpc": "2.0",
                "id": payload["id"],
                "error": {
                    "code": -32603,
                    "message": f"Request failed: {str(e)}"
                }
            }        async def send_task_subscribe(self, task_id: str, message: Message, session_id: Optional[str] = None):
        """Send task with streaming updates using A2A protocol (SSE)"""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/sendSubscribe",
            "params": {
                "id": task_id,
                "message": asdict(message),
                "sessionId": session_id,
                "metadata": {
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "client": "A2A Agentic Client"
                }
            }
        }                try:
            async with self.client.stream(
                "POST",
                f"{self.base_url}/a2a/tasks/sendSubscribe",
                json=payload,
                headers={"Accept": "text/event-stream"}
            ) as response:
                response.raise_for_status()                                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        try:
                            event_data = json.loads(line[6:])  # Remove "data: " prefix
                            yield event_data
                        except json.JSONDecodeError:
                            continue                                    except Exception as e:
            yield {
                "jsonrpc": "2.0",
                "method": "error",
                "params": {
                    "taskId": task_id,
                    "error": {
                        "code": -32603,
                        "message": f"Streaming failed: {str(e)}"
                    }
                }
            }        async def get_task(self, task_id: str) -> Dict[str, Any]:
        """Get task status using A2A protocol"""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/get",
            "params": {
                "id": task_id
            }
        }                try:
            response = await self.client.post(f"{self.base_url}/a2a/tasks/get", json=payload)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            return {
                "jsonrpc": "2.0",
                "id": payload["id"],
                "error": {
                    "code": -32603,
                    "message": f"Request failed: {str(e)}"
                }
            }        async def cancel_task(self, task_id: str) -> Dict[str, Any]:
        """Cancel task using A2A protocol"""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/cancel",
            "params": {
                "id": task_id
            }
        }                try:
            response = await self.client.post(f"{self.base_url}/a2a/tasks/cancel", json=payload)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            return {
                "jsonrpc": "2.0",
                "id": payload["id"],
                "error": {
                    "code": -32603,
                    "message": f"Request failed: {str(e)}"
                }
            }# Workflow Functionsasync def sequential_workflow_a2a():
    """Main A2A workflow: User -> DeepSearch -> BlogPost Generator"""
    print("🚀 Starting A2A Sequential Workflow")
    print("=" * 80)        # Get topic from user
    topic = input("📝 Enter a topic for research and blog generation: ").strip()
    if not topic:
        topic = "The future of sustainable energy technologies"
        print(f"No topic entered. Using default: {topic}")        session_id = str(uuid.uuid4())        print(f"\n🔍 Step 1: Discovering and researching topic: '{topic}'")
    print("-" * 60)        # Step 1: Communicate with DeepSearch Agent
    async with A2AClient("http://localhost:8003") as deepsearch_client:
        # Discover agent capabilities
        agent_card = await deepsearch_client.discover_agent()
        if agent_card:
            print(f"✅ Discovered DeepSearch Agent: {agent_card.get('name', 'Unknown')}")
            print(f"📋 Skills: {[skill['name'] for skill in agent_card.get('skills', [])]}")                # Create research message
        research_message = Message(
            role="user",
            parts=[
                asdict(TextPart(text=topic)),
                asdict(DataPart(data={
                    "task_type": "research",
                    "topic": topic,
                    "requirements": [
                        "comprehensive_analysis",
                        "web_scraping",
                        "source_citation",
                        "data_synthesis"
                    ]
                }))
            ]
        )                # Send research task
        research_task_id = f"research_{uuid.uuid4()}"
        research_result = await deepsearch_client.send_task(
            task_id=research_task_id,
            message=research_message,
            session_id=session_id
        )                if "error" in research_result:
            print(f"❌ Research failed: {research_result['error']['message']}")
            return                # Extract research content
        task_data = research_result.get("result", {})
        artifacts = task_data.get("artifacts", [])                if not artifacts:
            print("❌ No research results received")
            return                research_content = ""
        for artifact in artifacts:
            for part in artifact.get("parts", []):
                if part.get("type") == "text":
                    research_content += part.get("text", "")                print("✅ Research completed!")
        print(f"📊 Research length: {len(research_content)} characters")
        print(f"📋 Preview: {research_content[:200]}...")        print(f"\n📝 Step 2: Generating blog post from research data")
    print("-" * 60)        # Step 2: Communicate with BlogPost Generator Agent
    async with A2AClient("http://localhost:8004") as blogpost_client:
        # Discover agent capabilities
        agent_card = await blogpost_client.discover_agent()
        if agent_card:
            print(f"✅ Discovered BlogPost Agent: {agent_card.get('name', 'Unknown')}")
            print(f"📋 Skills: {[skill['name'] for skill in agent_card.get('skills', [])]}")                # Create blog generation message
        blog_message = Message(
            role="user",
            parts=[
                asdict(TextPart(text=research_content)),
                asdict(DataPart(data={
                    "task_type": "blog_generation",
                    "topic": topic,
                    "research_content": research_content,
                    "requirements": [
                        "seo_optimization",
                        "markdown_format",
                        "professional_tone",
                        "comprehensive_structure"
                    ]
                }))
            ]
        )                # Send blog generation task
        blog_task_id = f"blog_{uuid.uuid4()}"
        blog_result = await blogpost_client.send_task(
            task_id=blog_task_id,
            message=blog_message,
            session_id=session_id
        )                if "error" in blog_result:
            print(f"❌ Blog generation failed: {blog_result['error']['message']}")
            return                # Process blog results
        task_data = blog_result.get("result", {})
        artifacts = task_data.get("artifacts", [])                print("✅ Blog post generated!")                # Display results
        for artifact in artifacts:
            artifact_name = artifact.get("name", "Unknown")
            artifact_desc = artifact.get("description", "")                        print(f"\n📄 Artifact: {artifact_name}")
            print(f"📝 Description: {artifact_desc}")                        for part in artifact.get("parts", []):
                if part.get("type") == "text":
                    content = part.get("text", "")
                    print(f"📋 Preview: {content[:300]}...")
                elif part.get("type") == "data":
                    data = part.get("data", {})
                    if "title" in data:
                        print(f"🎯 Title: {data['title']}")
                    if "word_count" in data:
                        print(f"📊 Word Count: {data['word_count']}")
                    if "filename" in data:
                        print(f"💾 File: {data['filename']}")
                elif part.get("type") == "file":
                    file_info = part.get("file", {})
                    print(f"📁 File: {file_info.get('name', 'Unknown')}")
                    print(f"📄 Type: {file_info.get('mimeType', 'Unknown')}")        print("\n" + "=" * 80)
    print("🎉 A2A Sequential workflow completed successfully!")async def streaming_workflow_a2a():
    """Streaming A2A workflow with real-time updates"""
    print("🚀 Starting A2A Streaming Workflow")
    print("=" * 80)        topic = input("📝 Enter a topic for research and blog generation: ").strip()
    if not topic:
        topic = "Latest developments in quantum computing"
        print(f"No topic entered. Using default: {topic}")        session_id = str(uuid.uuid4())        print(f"\n🔍 Step 1: Streaming research for: '{topic}'")
    print("-" * 60)        # Step 1: Stream research from DeepSearch Agent
    research_content = ""
    async with A2AClient("http://localhost:8003") as deepsearch_client:
        research_message = Message(
            role="user",
            parts=[asdict(TextPart(text=topic))]
        )                research_task_id = f"research_stream_{uuid.uuid4()}"                async for event in deepsearch_client.send_task_subscribe(
            task_id=research_task_id,
            message=research_message,
            session_id=session_id
        ):
            method = event.get("method")
            params = event.get("params", {})                        if method == "taskStatusUpdate":
                status = params.get("status")
                print(f"🔄 Research Status: {status}")
            elif method == "taskArtifactUpdate":
                artifact = params.get("artifact", {})
                artifact_name = artifact.get("name", "")                                if artifact_name == "progress_update":
                    for part in artifact.get("parts", []):
                        if part.get("type") == "text":
                            print(f"📋 {part.get('text', '')}")
                elif artifact_name == "research_report":
                    for part in artifact.get("parts", []):
                        if part.get("type") == "text":
                            research_content = part.get("text", "")
                            print(f"✅ Research completed! ({len(research_content)} chars)")
            elif method == "error":
                error = params.get("error", {})
                print(f"❌ Research Error: {error.get('message', 'Unknown error')}")
                return        if not research_content:
        print("❌ No research content received")
        return        print(f"\n📝 Step 2: Streaming blog generation")
    print("-" * 60)        # Step 2: Stream blog generation from BlogPost Agent
    async with A2AClient("http://localhost:8004") as blogpost_client:
        blog_message = Message(
            role="user",
            parts=[
                asdict(TextPart(text=research_content)),
                asdict(DataPart(data={"topic": topic}))
            ]
        )                blog_task_id = f"blog_stream_{uuid.uuid4()}"                async for event in blogpost_client.send_task_subscribe(
            task_id=blog_task_id,
            message=blog_message,
            session_id=session_id
        ):
            method = event.get("method")
            params = event.get("params", {})                        if method == "taskStatusUpdate":
                status = params.get("status")
                print(f"🔄 Blog Status: {status}")
            elif method == "taskArtifactUpdate":
                artifact = params.get("artifact", {})
                artifact_name = artifact.get("name", "")                                if artifact_name == "progress_update":
                    for part in artifact.get("parts", []):
                        if part.get("type") == "text":
                            print(f"📋 {part.get('text', '')}")
                elif artifact_name in ["blog_post_content", "markdown_file"]:
                    print(f"✅ {artifact.get('description', 'Content generated')}")
                    for part in artifact.get("parts", []):
                        if part.get("type") == "data":
                            data = part.get("data", {})
                            if "title" in data:
                                print(f"🎯 Title: {data['title']}")
                            if "filename" in data:
                                print(f"💾 File: {data['filename']}")
            elif method == "error":
                error = params.get("error", {})
                print(f"❌ Blog Error: {error.get('message', 'Unknown error')}")
                return        print("\n" + "=" * 80)
    print("🎉 A2A Streaming workflow completed!")async def test_deepsearch_a2a():
    """Test DeepSearch agent independently using A2A protocol"""
    print("🔍 Testing DeepSearch Agent (A2A Protocol)")
    print("-" * 60)        topic = input("Enter a research topic: ").strip() or "Latest trends in artificial intelligence"        async with A2AClient("http://localhost:8003") as client:
        # Discover agent
        agent_card = await client.discover_agent()
        if agent_card:
            print(f"✅ Agent: {agent_card.get('name')}")
            print(f"📝 Description: {agent_card.get('description')}")
            print(f"🔧 Capabilities: {list(agent_card.get('capabilities', {}).keys())}")                # Send research task
        message = Message(
            role="user",
            parts=[asdict(TextPart(text=topic))]
        )                task_id = f"test_research_{uuid.uuid4()}"
        result = await client.send_task(task_id, message)                if "error" in result:
            print(f"❌ Error: {result['error']['message']}")
        else:
            task_data = result.get("result", {})
            print(f"✅ Status: {task_data.get('status')}")                        artifacts = task_data.get("artifacts", [])
            for artifact in artifacts:
                print(f"📄 {artifact.get('name', 'Result')}: {artifact.get('description', '')}")async def test_blogpost_a2a():
    """Test BlogPost agent independently using A2A protocol"""
    print("📝 Testing BlogPost Generator Agent (A2A Protocol)")
    print("-" * 60)        # Mock research data
    mock_research = """
    Research Topic: Artificial Intelligence in Healthcare    Executive Summary:
    AI in healthcare is revolutionizing patient care through diagnostic imaging,
    drug discovery, and personalized treatment plans. The market is expected to
    reach $102 billion by 2025.    Key Insights:
    - AI diagnostic tools show 94% accuracy in cancer detection
    - Machine learning reduces drug discovery time by 30%
    - Telemedicine adoption increased 3800% during pandemic    Recent Developments:
    - FDA approved 130+ AI medical devices in 2023
    - Google's Med-PaLM achieves expert-level medical reasoning    Sources: McKinsey Global Institute, Nature Medicine, FDA Medical Device Database
    """        async with A2AClient("http://localhost:8004") as client:
        # Discover agent
        agent_card = await client.discover_agent()
        if agent_card:
            print(f"✅ Agent: {agent_card.get('name')}")
            print(f"📝 Description: {agent_card.get('description')}")
            print(f"🔧 Capabilities: {list(agent_card.get('capabilities', {}).keys())}")                # Send blog generation task
        message = Message(
            role="user",
            parts=[
                asdict(TextPart(text=mock_research)),
                asdict(DataPart(data={"topic": "AI in Healthcare"}))
            ]
        )                task_id = f"test_blog_{uuid.uuid4()}"
        result = await client.send_task(task_id, message)                if "error" in result:
            print(f"❌ Error: {result['error']['message']}")
        else:
            task_data = result.get("result", {})
            print(f"✅ Status: {task_data.get('status')}")                        artifacts = task_data.get("artifacts", [])
            for artifact in artifacts:
                print(f"📄 {artifact.get('name', 'Result')}: {artifact.get('description', '')}")async def main():
    """Main function with different A2A demo modes"""
    print("🤖 A2A Protocol Agentic Client")
    print("=" * 50)        if len(sys.argv) > 1:
        mode = sys.argv[1].lower()                if mode == "sequential":
            await sequential_workflow_a2a()
        elif mode == "streaming":
            await streaming_workflow_a2a()
        elif mode == "deepsearch":
            await test_deepsearch_a2a()
        elif mode == "blogpost":
            await test_blogpost_a2a()
        else:
            print("Usage: python agentic_client_a2a.py [sequential|streaming|deepsearch|blogpost]")
            print("  sequential - Main workflow: Topic -> DeepSearch -> BlogPost (RECOMMENDED)")
            print("  streaming  - Same workflow with real-time streaming updates")
            print("  deepsearch - Test DeepSearch agent only")
            print("  blogpost   - Test BlogPost generator only")
    else:
        # Default: run the main sequential workflow
        print("🧪 Running A2A sequential workflow...")
        await sequential_workflow_a2a()if __name__ == "__main__":
    asyncio.run(main())

响应日志

(ACP) C:\Users\PLNAYAK\Documents\ACP>python agentic_client_a2a.py
🤖 A2A Protocol Agentic Client
==================================================
🧪 Running A2A sequential workflow...
🚀 Starting A2A Sequential Workflow
================================================================================
📝 Enter a topic for research and blog generation: Model Context Protocol by Anthropic🔍 Step 1: Discovering and researching topic: 'Model Context Protocol by Anthropic'
------------------------------------------------------------
✅ Discovered DeepSearch Agent: DeepSearch Research Agent
📋 Skills: ['Web Research & Analysis', 'Data Synthesis']
✅ Research completed!
📊 Research length: 2900 characters
📋 Preview: The Model Context Protocol (MCP) is an open-standard protocol developed by Anthropic that enables seamless integration between Large Language Models (LLMs) and external data sources. The protocol prov...📝 Step 2: Generating blog post from research data
------------------------------------------------------------
✅ Discovered BlogPost Agent: BlogPost Generator Agent
📋 Skills: ['Blog Post Generation', 'Content Optimization', 'Markdown File Generation']
✅ Blog post generated!📄 Artifact: blog_post_content
📝 Description: Generated blog post: "Meet MCP: Unlocking LLMs"
📋 Preview: ## Meet MCP: Unlocking LLMs
The world of Artificial Intelligence (AI) is rapidly evolving, with Large Language Models (LLMs) playing a crucial role in shaping the future of human-computer interaction. However, the integration of LLMs with external data sources has been a significant challenge, limit...
🎯 Title: "Meet MCP: Unlocking LLMs"
📊 Word Count: 708📄 Artifact: markdown_file
📝 Description: Blog post markdown file: blog_20250907_232553_meet_mcp_unlocking_llms.md
📁 File: blog_20250907_232553_meet_mcp_unlocking_llms.md
📄 Type: text/markdown
💾 File: blog_20250907_232553_meet_mcp_unlocking_llms.md================================================================================
🎉 A2A Sequential workflow completed successfully!

网站公告

今日签到

点亮在社区的每一天
去签到