Building an AI Agent Server with AG-UI and Microsoft Agent Framework

In this post, I want to talk about the Python backend I built for an AG-UI demo project. It is part of a larger project that also includes a frontend that uses CopilotKit:

This post discusses the Python AG-UI server that is built with Microsoft Agent Framework.

All code is on GitHub: https://github.com/gbaeke/agui. Most of the code for this demo was written with GitHub Copilot with the help of Microsoft Docs MCP and Context7. 🤷

What is AG-UI?

Before we dive into the code, let’s talk about AG-UI. AG-UI is a standardized protocol for building AI agent interfaces. Think of it as a common language that lets your frontend talk to any backend agent that supports it, no matter what technology you use.

The protocol gives you some nice features out of the box:

  • Remote Agent Hosting: deploy your agents as web services (e.g. FastAPI)
  • Real-time Streaming: stream responses using Server-Sent Events (SSE)
  • Standardized Communication: consistent message format for reliable interactions (e.g. tool started, tool arguments, tool end, …)
  • Thread Management: keep conversation context across multiple requests

Why does this matter? Well, without a standard like AG-UI, every frontend needs custom code to talk to different backends. With AG-UI, you build your frontend once and it works with any AG-UI compatible backend. The same goes for backends – build it once and any AG-UI client can use it.

Under the hood, AG-UI uses simple HTTP POST requests for sending messages and Server-Sent Events (SSE) for streaming responses back. It’s not complicated, but it’s standardized. And that’s the point.

AG-UI has many more features than the ones discussed in this post. Check https://docs.ag-ui.com/introduction for the full picture.

Microsoft Agent Framework

Now, you could implement AG-UI from scratch but that’s a lot of work. This is where Microsoft Agent Framework comes in. It’s a Python (and C#) framework that makes building AI agents really easy.

The framework handles the heavy lifting when it comes to agent building:

  • Managing chat with LLMs like Azure OpenAI
  • Function calling (tools)
  • Streaming responses
  • Multi-turn conversations
  • And a lot more

The key concept is the ChatAgent. You give it:

  1. chat client (like Azure OpenAI)
  2. Instructions (the system prompt)
  3. Tools (functions the agent can call)

And you’re done. The agent knows how to talk to the LLM, when to call tools, and how to stream responses back.

What’s nice about Agent Framework is that it integrates with AG-UI out of the box, similar to other frameworks like LangGraph, Google ADK and others. You write your agent code and expose it via AG-UI with basically one line of code. The framework translates everything automatically – your agent’s responses become AG-UI events, tool calls get streamed correctly, etc…

The integration with Microsoft Agent Framework was announced on the blog of CopilotKit, the team behind AG-UI. The blog included the diagram below to illustrate the capabilities:

From https://www.copilotkit.ai/blog/microsoft-agent-framework-is-now-ag-ui-compatible

The Code

Let’s look at how this actually works in practice. The code is pretty simple. Most of the code is Microsoft Agent Framework code. AG-UI gets exposed with one line of code.

The Server (server.py)

The main server file is really short:

import uvicorn
from api import app
from config import SERVER_HOST, SERVER_PORT

def main():
    print(f"🚀 Starting AG-UI server at http://{SERVER_HOST}:{SERVER_PORT}")
    uvicorn.run(app, host=SERVER_HOST, port=SERVER_PORT)

if __name__ == "__main__":
    main()

That’s it. We run a FastAPI server on port 8888. The interesting part is in api/app.py:

from fastapi import FastAPI
from agent_framework.ag_ui.fastapi import add_agent_framework_fastapi_endpoint
from agents.main_agent import agent

app = FastAPI(title="AG-UI Demo Server")

# This single line exposes your agent via AG-UI protocol
add_agent_framework_fastapi_endpoint(app, agent, "/")

See that add_agent_framework_fastapi_endpoint() call? That’s all you need. This function from Agent Framework takes your agent and exposes it as an AG-UI endpoint. It handles HTTP requests, SSE streaming, protocol translation – everything.

You just pass in your FastAPI app, your agent, and the route path. Done.

The Main Agent (agents/main_agent.py)

Here’s where we define the actual agent with standard Microsoft Agent Framework abstractions:

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import DefaultAzureCredential
from config import AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME
from tools import get_weather, get_current_time, calculate, bedtime_story_tool

# Create Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(
    credential=DefaultAzureCredential(),
    endpoint=AZURE_OPENAI_ENDPOINT,
    deployment_name=AZURE_OPENAI_DEPLOYMENT_NAME,
)

# Create the AI agent with tools
agent = ChatAgent(
    name="AGUIAssistant",
    instructions="You are a helpful assistant with access to tools...",
    chat_client=chat_client,
    tools=[get_weather, get_current_time, calculate, bedtime_story_tool],
)

This is the heart of the backend. We create a ChatAgent with:

  1. A name: “AGUIAssistant”
  2. Instructions: the system prompt that tells the agent how to behave
  3. A chat clientAzureOpenAIChatClient that handles communication with Azure OpenAI
  4. Tools: a list of functions the agent can call

The code implements a few toy tools and a sub-agent to illustrate how AG-UI handels tool calls. The tools are discussed below:

The Tools (tools/)

In Agent Framework, tools can be Python functions with a decorator:

from agent_framework import ai_function
import httpx
import json

@ai_function(description="Get the current weather for a location")
def get_weather(location: str) -> str:
    """Get real weather information for a location using Open-Meteo API."""
    # Step 1: Geocode the location
    geocode_url = "https://geocoding-api.open-meteo.com/v1/search"
    # ... make HTTP request ...
    
    # Step 2: Get weather data
    weather_url = "https://api.open-meteo.com/v1/forecast"
    # ... make HTTP request ...
    
    # Return JSON string
    return json.dumps({
        "location": resolved_name,
        "temperature": current["temperature_2m"],
        "condition": condition,
        # ...
    })

The @ai_function decorator tells Agent Framework “this is a tool the LLM can use”. The framework automatically:

  • Generates a schema from the function signature
  • Makes it available to the LLM
  • Handles calling the function when needed
  • Passes the result back to the LLM

You just write normal Python code. The function takes typed parameters (location: str) and returns a string. Agent Framework does the rest.

The weather tool calls the Open-Meteo API to get real weather data. In an AG-UI compatible client, you can intercept the tool result and visualize it any way you want before the LLM generates an answer from the tool result:

React client with CopilotKit

Above, when the user asks for weather information, AG-UI events inform the client that a tool call has started and ended. It also streams the tool result back to the client which uses a custom component to render the information. This happens before the chat client generates the answer based on the tool result.

The Subagent (tools/storyteller.py)

This is where it gets interesting. In Agent Framework, a ChatAgent can become a tool with .as_tool():

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

# Create a specialized agent for bedtime stories
bedtime_story_agent = ChatAgent(
    name="BedTimeStoryTeller",
    description="A creative storyteller that writes engaging bedtime stories",
    instructions="""You are a gentle and creative bedtime story teller.
When given a topic, create a short, soothing bedtime story for children.
Your stories should be 3-5 paragraphs long, calming, and end peacefully.""",
    chat_client=chat_client,
)

# Convert the agent to a tool
bedtime_story_tool = bedtime_story_agent.as_tool(
    name="tell_bedtime_story",
    description="Generate a calming bedtime story based on a theme",
    arg_name="theme",
    arg_description="The theme for the story (e.g., 'a brave rabbit')",
)

This creates a subagent – another ChatAgent with different instructions. When the main agent needs to tell a bedtime story, it calls tell_bedtime_story which delegates to the subagent.

Why is this useful? Because you can give each agent specialized instructions. The main agent handles general questions and decides which tool to use. The storyteller agent focuses only on creating good stories. Clean separation of concerns.

The subagent has its own chat client and can have its own tools too if you want. It’s a full agent, just exposed as a tool.

And because it is a tool, you can render it with the standard AG-UI tool events:

Testing with a client

In src/backend there is a Python client client_raw.py. When you run that client against the server and invoke a tool, you will see something like below:

AG-UI client in Python

This client simply uses httpx to talk the AG-UI server and inspects and renders the AG-UI events as they come in.

Why This Works

Let me tell you what I like about this setup:

Separation of concerns: The frontend doesn’t know about Python, Azure OpenAI, or any backend details. It just speaks AG-UI. You could swap the backend for a C# implementation or something else entirely – the frontend wouldn’t care. Besides of course the handling of specific tool calls.

Standard protocol: Because we use AG-UI, any AG-UI client can talk to this backend. We use CopilotKit in the frontend but you could use anything that speaks AG-UI. Take the Python client as an example.

Framework handles complexity: Streaming, tool calls, conversation history, protocol translation – Agent Framework does all of this. You just write business logic.

Easy to extend: Want a new tool? Write a function with @ai_function. Want a specialized agent? Create a ChatAgent and call .as_tool(). That’s it.

The AG-UI documentation explains that the protocol supports 7 different features including human-in-the-loop, generative UI, and shared state. Our simple backend gets all of these capabilities because Agent Framework implements the protocol.

Note that there are many more capabilities. Check the AG-UI interactive Dojo to find out: https://dojo.ag-ui.com/microsoft-agent-framework-python

Wrap Up

This is a simple but powerful pattern for building AI agent backends. You write minimal code and get a lot of functionality. AG-UI gives you a standard way to expose your agent, and Microsoft Agent Framework handles the implementation details.

If you want to try this yourself, the code is in the repo. You’ll need an Azure OpenAI deployment and follow the OAuth setup. After that, just run the code as instructed in the repo README!

The beauty is in the simplicity. Sometimes the best code is the code you don’t have to write.

Creating presentations with Microsoft Agent Framework and GAMMA API

Introduction

Imagine automating the creation of professional presentations through a coordinated series of intelligent agents. In this blog post, we’ll explore how to build a workflow that combines research, outline generation, content creation, and design, all orchestrated through the Microsoft Agent Framework.

We’ll build a practical example that generates presentations using:

  • Azure OpenAI Responses API for agent creation
  • Microsoft Agent Framework Workflows for orchestration
  • Tavily Search API for concurrent web research
  • GAMMA API for stunning presentation generation (In October 2025, this API was in beta)

By the end, you’ll understand how to leverage these tools to create end-to-end workflows that tackle complex, multi-step tasks.


What is Microsoft Agent Framework?

The Microsoft Agent Framework is a new, modern framework for building intelligent automation systems powered by AI agents. It enables you to create agents and workflows that blend AI reasoning with structured business processes.

Agents vs. Workflows: Understanding the Difference

Before diving into workflows, it’s important to understand how agents and workflows complement each other:

  • AI Agents are LLM-powered systems that make dynamic decisions based on context. They have access to tools and can reason through problems, but the steps they take are determined by the model at runtime.
Agent (from Microsoft Learn)
  • Workflows, on the other hand, are predefined sequences of operations with explicit control flow. They define exactly how data flows from one step to the next. Workflows can contain agents as components, orchestrating them alongside other executors and services.
Workflow (from Microsoft Learn)

Think of it this way:

  • An agent is like an expert consultant who decides how to solve a problem
  • workflow is like a project manager who coordinates multiple specialists in a structured process

The framework provides the tools to do both and to combine them in interesting ways.


Creating an Agent with Azure OpenAI Responses API

Let’s start by understanding how to create individual agents before we orchestrate them into workflows.

The Azure OpenAI Responses API is Azure’s version of the OpenAI Responses API . It’s the successor to the chat completion APIs and supports structured outputs, tool calling, stateful interactions and hosted tools.

Note: Azure OpenAI Responses API does not support the native web search tool. We will use Tavily to support web searches.

Here’s how to create a simple agent in Agent Framework, based on the Responses API:

import asyncio
import os
from dotenv import load_dotenv
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

# Load environment variables from .env
load_dotenv()

async def main():
    # Create a client
    client = AzureOpenAIResponsesClient(
        endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o")
    )
    
    # Create an agent
    agent = client.create_agent(
        name="OutlineAgent",
        instructions="Based on the user's request, you create an outline for a presentation.",
    )
    
    # Run the agent
    result = await agent.run("Create an outline for a presentation about Python")
    print(result.text)

if __name__ == "__main__":
    asyncio.run(main())

Note: this is just one type of agent you can create with Agent Framework. Some of the other supported types are chat completion agents and Azure AI Foundry Agents. See Microsoft Learn for more information. We could have used these agents as well.

Agents with Function Tools

Agents become truly useful when equipped with function tools—custom Python functions that agents can call to accomplish tasks. In Agent Framework, you can use the optional @ai_function decorator to provide a meaningful description to the LLM using the function. An alternative is to use a docstring. The framework will automatically convert the Python functions to tools the LLM understands.

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function(
    name="search_web",
    description="Search the web for information"
)
async def search_web(
    queries: Annotated[list[str], Field(description="List of search queries")]
) -> str:
    # Implementation here
    pass

Once defined, you pass these tools to your agent:

agent = client.create_agent(
    name="ResearchAgent",
    instructions="You research and create content for slides.",
    tools=[search_web]  # Register the tool
)

Now the agent can autonomously decide to call search_web when it needs information! In the agent instructions, you can tell the agent how and when to use the tool if needed.

In the example discussed further in this post, both the outline agent and research agent use the search function to build the slide content.

Structured Output with Pydantic

For predictable, structured responses, the framework supports structured output with Pydantic models:

from pydantic import BaseModel, Field
from typing import List, Annotated

class OutlineResponse(BaseModel):
    title: str
    number_of_slides: Annotated[int, Field(default=5)]
    slide_titles: List[str]
    audience: Annotated[str, Field(default="general")]

agent = client.create_agent(
    name="OutlineAgent",
    instructions="Create a presentation outline.",
    response_format=OutlineResponse  # Enforce this structure
)

The agent’s response will automatically be validated and parsed as an OutlineResponse object: type-safe and predictable. This feature is supported by the underlying Responses API with its support for structured outputs.

We use structured outputs extensively in the example code discussed below.


Understanding Workflows: Orchestrating Agents at Scale

Now that we understand agents, let’s see how to compose them into workflows.

A workflow is built from four key components:

  1. Executors – The individual processing nodes (agents, custom logic, integrations)
  2. Edges – Connections defining data flow between executors
  3. Workflows – The orchestrator that manages execution and routing
  4. Events – Real-time observability into execution

Agent Framework can visualize workflows in different ways. One option is to use the DevUI:

DevUI with a sequential workflow ready to run

To run the workflow, simply click Configure & Run and provide your input. The steps in the workflow will light up when they are doing work.

For more information about Dev UI see the Agent Framework repository on GitHub.

Building a Workflow with WorkflowBuilder

After defining the executors, you use WorkflowBuilder to build a workflow:

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder()
    .set_start_executor(outline_agent)          # First step
    .add_edge(outline_agent, research_agent)    # Data flows here
    .add_edge(research_agent, gamma_executor)   # Final step
    .build()
)

# serve workflow with Dev UI
serve(entities=[workflow], port=8093, auto_open=True)

This creates a linear, sequential flow where:

  1. Input → outline_agent produces an outline
  2. Outline → research_agent researches and creates slide content
  3. Slide content → gamma_executor generates the presentation

Note that the first two steps are simply Azure Responses API agents (AgentExecutors). The third step is a custom executor inheriting from the Executor class.

The workflow is served with Dev UI, which results in a browser opening the Dev UI interface as shown in the screenshot above.

Input and Output Between Nodes

Understanding data flow in the workflow is crucial. Each executor (agents or custom) receives input and must send output to the next executor. Below is a custom executor created from a Python function. You can also create executors from a Python class:

@executor(id="some executor id")
async def handle_input(self, message: str, ctx: WorkflowContext) -> None:
    # Process the input
    result = do_some_work(message)
    
    # Send output to next executor(s)
    ctx.send_message(result)

For agents specifically, they receive messages and produce AgentExecutorResponse objects automatically. You do not have to write code that sends the output of an agent to the next node.

Custom Executors for Complex Logic

Sometimes you need logic beyond agent reasoning. Custom executors handle this. Below is a class-based executor versus a fuction-based executor above:

from agent_framework import Executor, handler, WorkflowContext

class GammaAPIExecutor(Executor):
    def __init__(self, id: str = "gamma_api"):
        super().__init__(id=id)

    @handler
    async def call_gamma_api(
        self, response: AgentExecutorResponse, ctx: WorkflowContext
    ) -> None:
        # Extract data from agent response
        slide_content = response.agent_run_response.text
        
        # Call external API
        api_response = requests.post(
            "https://api.gamma.app/generations",
            json=payload
        )
        
        # Yield final output
        await ctx.yield_output({"status": "success", "pdf_url": pdf_url})

Custom executors are perfect for:

  • Calling external APIs
  • Data transformation and validation
  • Conditional routing logic
  • Long-running operations like polling

We will use a custom executor to call the GAMMA API to create a presentation based on the output of the previous agent nodes.

Using Sequential Orchestration

The simplest types of workflow process tasks one after another, each building on the previous result. This is ideal for pipelines where each step depends on prior outputs.

Input → Agent A → Agent B → Agent C → Output

Use sequential when:

  • Steps have dependencies
  • Later steps need data from earlier ones
  • You want controlled, predictable execution flow

Other types of workflow orchestration are discussed on Microsoft Learn. We will use a sequential workflow in the example below.


The Presentation Workflow in Action

Now let’s see how all these concepts come together in our actual implementation.

Architecture Overview

Our workflow follows this flow:

Step 1: Outline Agent

This is an agent with a web search tool. The agent generates a search query relevant to the user input and decides on a presentation title and a list of slide titles:

outline_agent = AzureOpenAIResponsesClient(
    endpoint=endpoint,
    api_key=api_key,
    deployment_name=deployment_name
).create_agent(
    name="OutlineAgent",
    instructions="""
        Based on the user's request, you create an outline for a presentation.
        Before you generate the outline, use the 'search_web' tool to research 
        the topic thoroughly with ONE query.
        Base the outline on the research findings.
    """,
    tools=[search_web],
    response_format=OutlineResponse,
    middleware=[logging_function_middleware]
)

  • Input: User’s presentation request sent to the agent as a user message

Processing:

  • Calls search_web to research the topic
  • Generates a structured outline with title and slide titles

OutputOutlineResponse object with titlenumber_of_slidesslide_titles, and audience

If the user specifies the number of slides and audience, this will be reflected in the reponses.

Note: this agent uses middleware to log the use of the search_web tool.

Step 2: Research Agent

The research agent takes the presentation title and slides from the previous step and does research on the web for each slide:

research_agent = AzureOpenAIResponsesClient(
    endpoint=endpoint,
    api_key=api_key,
    deployment_name=deployment_name
).create_agent(
    name="ResearchAgent",
    instructions="""
        You research and create content for slides.
        Generate one web search query for each slide title.
        Keep in mind the target audience when generating queries.
        Next, use the 'search_web' tool ONCE passing in the queries all at once.

        Use the result from the queries to generate content for each slide.
        Content for slides should be limited to 100 words max with three main bullet points.
    """,
    tools=[search_web],
    response_format=ResearchResponse,
    middleware=[logging_function_middleware]
)

Input: The OutlineResponse from the outline agent is automatically sent as a user message. This message will contain the JSON output from the previous step.

Processing:

  • Generates one search queries per slide
  • Calls search_web with all queries concurrently (crucial for performance!)
  • Creates 100-word slide content with bullet points. It’s best to keep slide content concise for best results.

OutputResearchResponse object with slide content for each slide

Why Concurrency Matters: The Search Tool

To speed up research, the search_web function uses AsyncTavilyClient for concurrent searching:

from tavily import AsyncTavilyClient

tavily_client = AsyncTavilyClient(TAVILY_API_KEY)

@ai_function(
    name="search_web",
    description="Search the web for multiple topics using Tavily"
)
async def search_web(
    queries: Annotated[list[str], Field(description="List of search queries")]
) -> str:
    logger.info(f"SEARCH - Performing web searches for {len(queries)} queries")
    
    async def _search_single_query(query: str) -> dict:
        try:
            logger.info(f'SEARCH - "{query}" - Searching')
            # AsyncTavilyClient.search() is awaitable
            response = await tavily_client.search(
                query=query,
                search_depth="advanced",
                max_results=5,
            )
            return {"query": query, "results": response.get("results", [])}
        except Exception as e:
            logger.error(f'SEARCH - "{query}" - {str(e)}')
            return {"query": query, "results": [], "error": str(e)}
    
    # Run all searches concurrently!
    tasks = [_search_single_query(query) for query in queries]
    search_results = await asyncio.gather(*tasks)
    
    # Aggregate and return
    aggregated = {r["query"]: r["results"] for r in search_results}
    return json.dumps({"results": aggregated})

Key insight: By using AsyncTavilyClient instead of the synchronous TavilyClient, we enable concurrent execution. All queries run in parallel, dramatically reducing total execution time.

To learn more about Tavily, check their website. They have a generous free tier which I almost exhausted creating this example! 😊

Step 3: Custom Gamma Executor

Step three does not need an agent. It uses the output of the agents to call the GAMMA API:

class GammaAPIExecutor(Executor):
    @handler
    async def call_gamma_api(
        self, response: AgentExecutorResponse, ctx: WorkflowContext
    ) -> None:
        # Extract slide content
        slide_content = response.agent_run_response.text
        response_json = json.loads(slide_content)
        
        logger.info(f'GAMMA - "{title}" - Slides: {number_of_slides}')
        
        # Call Gamma API
        api_response = requests.post(
            f"{GAMMA_API_BASE_URL}/generations",
            headers=headers,
            json=payload,
            timeout=30,
        )
        
        generation_id = data.get("id") or data.get("generationId")
        logger.info(f'GAMMA - POST /generations {api_response.status_code}')
        
        # Poll for completion
        completed_data = await self._poll_for_completion(generation_id)
        
        # Download PDF
        pdf_path = self._download_pdf(completed_data.get("exportUrl"))
        
        logger.info(f'GAMMA - Presentation completed')
        await ctx.yield_output(f"PDF saved to: {pdf_path}")

Input: The ResearchResponse from the research agent (slide content)

Processing:

  • Calls GAMMA API to generate presentation
  • Polls for completion (generation is asynchronous)
  • Downloads resulting PDF
  • The presentation is also available in Gamma for easy editing:
Presentation available in Gamma after creation with the workflow

Output: Success message with PDF path and presentation in Gamma.


Running the example

The workflow code is on GitHub in the gamma folder: https://github.com/gbaeke/maf/tree/main/gamma

In that folder, check https://github.com/gbaeke/maf/blob/main/gamma/SETUP_GUIDE.md for setup instructions.

Durable Execution for AI workflows

If you follow me on LinkedIn, you know I have talked about agentic workflows versus agents. Although everybody talks about agents, workflows are often better suited to many tasks. A workflow is more deterministic, is easier to reason about and to troubleshoot. Anthropic also talked about this a while ago in Building Effective Agents.

In fact, I often see people create tool-calling agents (e.g., in Copilot Studio) with instructions to call the tools in a specific order. For example: check if a mail is a complaint and if so, draft an e-mail response. Although this simple task will probably work with a non-deterministic agent, a workflow is better suited.

The question then becomes, how do you write the workflow? Simple workflows can easily be coded yourself. Instead of writing code, you might use a visual builder like Power Automate, Agent Flows in Copilot Studio, Logic Apps, n8n, make and many others.

But what if you need to write robust workflows that are more complex, need to scale, are long-running and need to pick up where they left off in case of failure? In that case, a durable execution engine might be a good fit. There are many such solutions on the market: restate, Temporal and many others, including several options in Azure.

Before we dive into how we can write such a workflow in Azure, let’s look at how one vendor, restate, defines durable execution:

Durable Execution is the practice of making code execution persistent, so that services recover automatically from crashes and restore the results of already completed operations and code blocks without re-executing them.
(from https://restate.dev/what-is-durable-execution/)

Ok, enough talk. Let’s see who we can write a workflow that uses durable execution. The example we use is kept straightforward to not get lost in the details:

  • I have a list of articles
  • Each article needs to be summarized. The summaries will be used in a later step to create a newsletter. We will use gpt-4.1-mini to create the summaries in parallel.
  • When the summaries are ready, we create a newsletter in HTML.
  • When the newsletter is ready, we will e-mail it to subscribers. There’s only one subscriber here, me! 🤷‍♂️

All code is here: https://github.com/gbaeke/dts_ai

Azure Durable Task Scheduler

There are several options for durable execution in Azure. One option is to use the newer Durable Task Scheduler in combination with the Durable Task SDKs. Another option is Durable Functions. These functions can also use the Durable Task Scheduler as the back-end.

The Durable Task Scheduler is the ❤️ heart of the solution as it keeps track of the different tasks in an orchestration (or workflow). On failure, it retains state, marking completed tasks and queuing pending ones to complete the orchestration. Note that the scheduler does not execute the tasks. That’s the job of a worker. Workers connect to the scheduler in order to complete the orchestration. The code in your worker does the actual processing like making LLM calls or talking to other systems.

The code you write (e.g., for the worker) uses the Durable Task SDK in your language of choice. I will use Python in this example.

For local development, you can use the Durable Task Scheduler emulator by running the following Docker container:

docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest

Your code will connect to the emulator on port 8080. Port 8082 presents an administrative UI to check and interact with orchestrations:

A view on a completed orchestration with summaries in parallel and newletter creation at the end

Later, you can deploy the Durable Task Scheduler in Azure and use that instead of the local emulator.

Let’s build locally

As discussed about, we will create a workflow that takes articles as input, generates summaries, creates a newsletter and sends it via e-mail. With three articles, the following would happen:

Processing three articles to create and email a newsletter

The final newsletter looks something like this:

HTML newsletter generated from one or more articles

Don’t worry, I won’t send actual AI slop newsletters to you! 😊

To get started, we need to start the Durable Task Scheduler locally using this command (requires Docker):

docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest

Next, we need a worker. The worker defines one or more activities in addition to one or more orchestrations that use these activities. Our worker has three activities:

  1. process_article: takes in an article and returns a summary generated by gpt-4.1-mini; no need for a fancy agent framework, we simply use the OpenAI SDK
  2. aggregate_results: takes in a list of summaries and generates the HTML newsletter using gpt-4.1-mini.
  3. send_email: e-mails the newsletter to myself with Resend

These activities are just Python functions. The process_article function is shown below:

def process_article(ctx, text: str) -> str:
    """
    Activity function summarizes the given text.    
    """
    logger.info(f"Summarizing text: {text}")
    wrapper = AgentWrapper()
    summary = wrapper.summarize(text)
    return summary

Easy does it I guess!

Next, the worker defines an orchestration function. The orchestration takes in the list of articles and contains code to run the activities as demanded by your workflow:

def fan_out_fan_in_orchestrator(ctx, articles: list) -> Any:

    # Fan out: Create a task for each article
    parallel_tasks = []
    for article in articles:
        parallel_tasks.append(ctx.call_activity("process_article", input=article))

    # Wait for all tasks to complete
    results = yield task.when_all(parallel_tasks)
    
    # Fan in: Aggregate all the results
    html_mail = yield ctx.call_activity("aggregate_results", input=results)

    # Send email
    email_request = EmailRequest(
        to_email=to_address,
        subject="Newsletter",
        html_content=html_mail,
    )
    email = yield ctx.call_activity("send_email", input=asdict(email_request))

    return "Newsletter sent"

When an orchestration runs an activity with call_activity, a task is returned. For each article, such a task is started with all tasks stored in the parallel_tasks list. The when_all helper yields the results of these tasks to the results lists until all tasks are finished. After that, we can pass results (a list of strings) to the aggregate_results activity and send a mail with the send_email activity.

⚠️ I use a dataclass to provide the send_email activity the required parameters. Check the full source code for more details.

The worker is responsible for connecting to the Durable Task Scheduler and registering activities and orchestrators. The snippet below illustrates this:

with DurableTaskSchedulerWorker(
    host_address=endpoint, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:
    
    # Register activities and orchestrators
    worker.add_activity(process_article)
    worker.add_activity(aggregate_results)
    worker.add_activity(send_email)
    worker.add_orchestrator(fan_out_fan_in_orchestrator)
    
    # Start the worker (without awaiting)
    worker.start()

When you use the emulator, the default address is http://localhost:8080 with credential set to None. In Azure, you will use the provided endpoint and RBAC to authenticate (e.g., managed identity of a container app).

Full code of the worker is here.

Now we just need a client that starts an orchestration with the input it requires. In this example we use a Python script that reads articles from a JSON file. The client then connects to the Durable Task Scheduler to kick off the orchestration. Here is the core snippet that does the job:

client = DurableTaskSchedulerClient(
    host_address=endpoint, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
)

instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=articles # simply a list of strings
)

result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=120
)

# check runtime_status of result, grab serialized_result, etc...

Above, the client waits for 120 seconds before it times out.

⚠️ There are many ways to follow up on the results of an orchestration. This script uses a simple approach with a timeout. When there is a timeout, the script stops but the orchestration continues.

Full code of the client is here.

Checking if it works

Before we can run a test, ensure you started the Durable Task Scheduler emulator. Next, clone this repository:

From the root of the cloned folder, create a Python virtual environment:

python3 -m venv .venv

Activate the environment:

source .venv/bin/activate

cd into the src folder and install from requirements:

pip install -r requirements.txt

Ensure you have a .env in the root:

OPENAI_API_KEY=Your OpenAI API key 
RESEND_API_KEY=Your resend API key
FROM_ADDRESS=Your from address (e.g. NoReply <noreply@domain.com>)
TO_ADDRESS=Your to address

Now run the worker with python worker.py. You should see the following:

INFO:__main__:Starting Fan Out/Fan In pattern worker...
Using taskhub: default
Using endpoint: http://localhost:8080
2025-08-14 22:11:10.851 durabletask-worker INFO: Starting gRPC worker that connects to http://localhost:8080
2025-08-14 22:11:10.882 durabletask-worker INFO: Created fresh connection to http://localhost:8080
2025-08-14 22:11:10.882 durabletask-worker INFO: Successfully connected to http://localhost:8080. Waiting for work items...

The worker is waiting for work items. We will submit them from our client.

From the same folder, in another terminal, run the client with python client.py. It will use the sample articles_short.json file.

INFO:__main__:Starting Fan Out/Fan In pattern client...
Using taskhub: default
Using endpoint: http://localhost:8080
INFO:__main__:Loaded 11 articles from articles.json
INFO:__main__:Starting new fan out/fan in orchestration with 11 articles
2025-08-14 22:25:02.946 durabletask-client INFO: Starting new 'fan_out_fan_in_orchestrator' instance with ID = '3a1bb4d9e3f240dda33daf982a5a3882'.
INFO:__main__:Started orchestration with ID = 3a1bb4d9e3f240dda33daf982a5a3882
INFO:__main__:Waiting for orchestration to complete...
2025-08-14 22:25:02.969 durabletask-client INFO: Waiting 120s for instance '3a1bb4d9e3f240dda33daf982a5a3882' to complete.

When the orchestration completes, the client outputs the result. That is simply Newsletter sent. 🤷‍♂️

You can see the orchestration in action by going to http://localhost:8082. Select the task hub default and click your orchestration at the top. In the screenshot below, the orchestration was still running and busy aggregating results.🤷‍♂️

Orchestration in progress (I used a longer list of articles in articles.json. Check the repo!

An interesting thing to try is to kill the worker while it’s busy processing. When you do that, the client will eventually timeout with a TimeoutError (if you wait long enough). If you check the portal, the orchestration will stay in a running state. However, when you start the worker again, it will restart where it left off:

INFO:__main__:From address: baeke.info <noreply@baeke.info>
INFO:__main__:To address: Geert Baeke <geert@baeke.info>
INFO:__main__:Starting Fan Out/Fan In pattern worker...
Using taskhub: default
Using endpoint: http://localhost:8080
2025-08-14 23:14:22.979 durabletask-worker INFO: Starting gRPC worker that connects to http://localhost:8080
2025-08-14 23:14:23.003 durabletask-worker INFO: Created fresh connection to http://localhost:8080
2025-08-14 23:14:23.004 durabletask-worker INFO: Successfully connected to http://localhost:8080. Waiting for work items...
INFO:__main__:Aggregating 4 summaries

I killed my worker when it was busy aggregating the summaries. When the worker got restarted, the aggregation started again and used the previously saved state to get to work again. Cool! 😎

Wrapping Up!

In this post we created a durable workflow with the Durable Task Scheduler emulator running on a local machine. We used the Durable Task SDK for Python to create a worker that is able to run an orchestration that aggregates multiple summaries into a newsletter. We demonstrated that such a workflow survives worker crashes and that it can pick up where it left off.

However, we have only scratched the surface here. Stay tuned for another post that uses Durable Task Scheduler in Azure together with workers and clients in Azure Container Apps.

End-to-end authorization with Entra ID and MCP

Building an MCP server is really easy. Almost every language and framework has MCP support these days, both from a client and server perspective. For example: FastMCP (Python, Typescript), csharp-sdk and many more!

However, in an enterprise scenario where users use a web-based conversational agent that uses tools on an MCP server, those tools might need to connect to back-end systems using the identity of the user. Take a look at the following example:

A couple of things are important here:

  • The user logs on to the app and requests an access token that is valid for the MCP Server; you need app registrations in Entra ID to make this work
  • The MCP Server verifies that this token is from Entra ID and contains the correct audience; we will use FastMCP in Python which has some built-in functionality for token validation
  • When the user asks the agent a question that requires Azure AI Search, the agent decides to make a tool call; the tool on the MCP server does the actual work
  • The tool needs access to the token (there’s a helper in FastMCP for that); next it converts the token to a token valid for Azure AI Search
  • The tool can now perform a search in Azure AI Search using new functionality discussed here

⚠️ MCP is actually not that important here. This technique which uses OAuth 2.0 and OBO flows in Entra ID is a well established pattern that’s been in use for ages!

🧑‍💻 Full source code here: https://github.com/gbaeke/mcp-obo

Let’s get started and get this working!

Entra ID App Registrations

In this case, we will create two registrations: one for the front-end and one for the back-end, the MCP Server. Note that the front-end here will be a command-line app that uses a device flow to authenticate. It uses a simple token cache to prevent having to log on time and again.

Front-end app registration

We will create this in the Azure Portal. I assume you have some knowledge of this so I will not provide detailed step-by-step instructions:

  • Go to App Registrations
  • Create a new registration, FastMCP Auth Web
  • In Authentication, ensure you enable Allow public client flows

You will need the client ID of this app registration in the MCP client we will build later.

Back-end app registration

This is for the MCP server and needs more settings:

  • Create a new registration, FastMCP Auth API
  • In Certificates and secrets, add a secret. We will need this to implement the on-behalf-of flow to obtain the Azure AI Search token
  • In Expose an API, set the Application ID URI to https://CLIENTIDOFAPP. I also added a scope, execute. In addition, add the front-end app client Id to the list of Authorized client applications:
Expose API of MCP app registration

In order to exchange a token for this service for Azure AI Search, we also need to add API permissions:

Permissions for Azure Cognitive, ehhm, AI Search

When you add the above permission, find it in APIs my organization uses:

MCP Client

We can now write an MCP client that calls the MCP server with an access token for the API we created above. As noted before, this will be a command-line app written in Python.

The code simply uses MSAL to initiate a device flow. It also uses a token cache to avoid repeated logins. The code can be found here.

Once we have a token, we can construct an MCP client (with FastMCP) as follows:

token = get_jwt_token()

headers["Authorization"] = f"Bearer {token}"

transport_url = "http://localhost:8000/mcp/"

transport = StreamableHttpTransport(
        url=transport_url,
        headers=headers
    )

client = MCPClient(transport=transport)

This code ensures that requests to the MCP server have an Authorization header that contains the bearer token acquired by get_jwt_token(). The MCP server will validate this token strictly.

The code to connect to the MCP server looks like this:

try:
    logger.info("Connecting to the MCP server...")
    
    # Use the client as an async context manager
    async with client:
        # List available tools on the server
        tools = await client.list_tools()
        logger.info(f"Found {len(tools)} tools on the server")
        
        # Call search tool
        logger.info("Calling search tool...")
        search_result = await client.call_tool("get_documents", {"query": "*"})
        logger.info(f"Search Result: {search_result.structured_content}")
        documents = search_result.structured_content.get("documents", [])
        if documents:
            logger.info("Documents found:")
            for doc in documents:
                name = doc.get("name", "Unnamed Document")
                logger.info(f"  - {name}")
        else:
            logger.info("No documents found.")
    
except Exception as e:
    logger.error(f"Error connecting to MCP server: {e}")

Note that there is no AI Agent involved here. We simply call the tool directly. In my case the document search only lists three documents out of five in total because I only have access to those three. We will look at the MCP server code to see how this is implemented next.

⚠️ Example code of the client is here: https://github.com/gbaeke/mcp-obo/blob/main/mcp_client.py

MCP Server

We will write an MCP server that uses the streamable-http transport on port 8000. The URL to connect to from the client then becomes http://localhost:8000/mcp. That was the URL used by the MCP client above.

The server has one tool: get_documents that takes a query (string) as parameter. By default, the query is set to * which returns all documents. The tool does the following:

  • Obtains the access token with the get_access_token() helper function from FastMCP
  • Exchanges the token for a token with scope https://search.azure.com/.default
  • Creates a SearchClient for Azure AI Search that includes the AI Search endpoint, index name and credential. Note that that credential has nothing to do with the token obtained above. It’s simply a key provided by Azure AI Search to perform searches. The token is used in the actual search request to filter the results.
  • Performs the search, passing in the token via the x_ms_query_source_authorization parameter. You need to use this version of the Azure AI Search Python library: azure-search-documents==11.6.0b12

Here is the code:

# Get the access token from the context
access_token: AccessToken = get_access_token()
original_token = access_token.token

# Exchange token for Microsoft Search token
logger.info("Exchanging token for Microsoft Search access")
search_result = await exchange_token(original_token, scope="https://search.azure.com/.default")
if not search_result["success"]:
    return {"error": "Could not retrieve documents due to token exchange failure."}
else:
    logger.info("Search token exchange successful")
    search_token = search_result["access_token"]
    search_client = SearchClient(endpoint="https://srch-geba.search.windows.net", index_name="document-permissions-push-idx", credential=AzureKeyCredential(os.getenv("AZURE_SEARCH_KEY")))
    results = search_client.search(search_text="*", x_ms_query_source_authorization=search_token, select="name,oid,group", order_by="id asc")
    documents = [
        {
        "name": result.get("name"),
        "oid": result.get("oid"),
        "group": result.get("group")
        }
        for result in results
    ]
    return {"documents": documents}

The most important work is done by the exchange_token() function. It obtains an access token for Azure AI Search that contains the oid (object id) of the user.

Here’s that function:

async def exchange_token(original_token: str, scope: str = "https://graph.microsoft.com/.default") -> dict:
    
    obo_url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
    
    data = {
        "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "assertion": original_token,
        "scope": scope,
        "requested_token_use": "on_behalf_of"
    }
    
    try:
        response = requests.post(obo_url, data=data)
        
        if response.status_code == 200:
            token_data = response.json()
            return {
                "success": True,
                "access_token": token_data["access_token"],
                "expires_in": token_data.get("expires_in"),
                "token_type": token_data.get("token_type"),
                "scope_used": scope,
                "method": "OBO"
            }
        else:
            return {
                "success": False,
                "error": response.text,
                "status_code": response.status_code,
                "scope_attempted": scope,
                "method": "OBO"
            }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "method": "OBO"
        }

Above, the core is in the call to the obo_url which presents the original token to obtain a new one. This will only work if the API permissions are correct on the FastMCP Auth API app registration. When the call is successful, we return a dictionary that contains the access token in the access_token key.

Full code of the server: https://github.com/gbaeke/mcp-obo/blob/main/mcp/main.py

You have now seen the entire flow from client login to calling a method (tool) on the MCP server to connecting to Azure AI Search downstream via an on-behalf-of flow.

But wait! How do we create the Azure AI Search index with support for the permission filter? Let’s take look…

Azure AI Search Configuration

When you want to use permission filtering with the x_ms_query_source_authorization parameter, do the following:

  • Create the index with support for permission filtering
  • Your index needs fields like oid (object Ids) and group (group object Ids) with the correct permission filter option
  • When you add documents to an index, for instance with the push APIs, you need to populate the oid and group fields with identifiers of users and groups that have access
  • Perform a search with the x_ms_query_source_authorization like show below:
results = search_client.search(
search_text="*",
x_ms_query_source_authorization=token_to_use,
select="name,oid,group",
order_by="id asc"
)

Above, token_to_use is the access token for Azure AI Search!

On GitHub, check this notebook from Microsoft to create and populate the index with your own user’s oid. You will need an Azure Subscription with an Azure AI Search instance. The free tier will do. If you use VS Code, use the Jupyter extension to run this notebook.

At the time of this writing, this feature was in preview. Ensure you use the correct version of the AI Search library for Python: azure-search-documents==11.6.0b12.

Wrapping up

I hope this post gives you some ideas on how to build agents that use MCP tools with end-to-end user authentication and authorization. This is just one possible approach. Authorization in the MCP specification has evolved significantly in early 2025 and works somewhat differently.

For most enterprise scenarios where you control both code and configuration (such as with Entra ID), bearer authentication with OBO is often sufficient.

Also consider whether you need MCP at all. If you aren’t sharing tools across multiple agents or projects, a simple API might be enough. For even less overhead, you can embed the tool code directly in your agent and run everything in the same process. Simple and effective.

If you spot any errors or have questions, feel free to reach out!

Deploying a multi-agent solution with MCP and A2A to Azure Container Apps

In previous posts, we discussed multi-agent scenarios, how A2A servers work (here and here) and how to deploy the infrastructure to host a multi-agent application on Azure with Azure Container Apps and AI Foundry.

In this post, we will take a look at deploying the different components of the solution as containers in Azure Container Apps. This is what we will build:

Multi-agent solution with MCP and A2A

There are four main components:

ComponentDescription
Conversation AgentPresents a chat interface to the user. Built with Chainlit and Semantic Kernel. Uses an OpenAI model. This could be switched to an Azure OpenAI model easily.

The agent uses two tools, rag and web, hosted by the MCP server.
MCP Tools ServerMCP server built with Python FastMCP. It exposes two tools, web and rag. The tools use an A2A client to interact with the A2A servers for the web and rag agents.

Not exposed to the Internet. Used to demonstrate MCP and A2A together. We could have called the A2A servers directly from the conversation agent without MCP.
A2A Server for Foundry Agent (does RAG)This agent uses an Azure AI Foundry Agent with a hosted file-based RAG tool to provide answers about Contoso products.

Not exposed to the Internet. Communicates privately with the Azure AI Foundry project.
A2A Server for OpenAI Agent (does web searches)This agent uses an OpenAI Agent SDK agent with the hosted web search tool.

Not exposed to the Internet. Communicates over the Internet with the OpenAI backend. This could easily be replaced with an Azure AI Foundry Agent that uses Bing Search. As this is an example about A2A, using a different technology makes more sense. 😊

Before delving into the four different components, it is important to know that the mcp, web and rag containers do not use their internal ingresses to communicate over TLS. That means that the mcp container for example, will talk to the web container using http://ca-web instead of something like https://ca-web.internal.ACA_environment_default_domain.

There is something to be said for using messaging to facilitate communication between agents. They are a form of microservices after all. In this example however, all communication is synchronous and uses HTTP.

This is a technical example that could be implemented in a single in-process agent with two tools. However, the emphasis is on multi-agent communication across process boundaries with Google’s Agent2Agent protocol.

Let’s gets started with the Conversation Agent!

Conversation Agent

The conversation agent maintains a conversation with the end user and keeps track of chat history. The agent, written in Semantic Kernel, has two tools:

  • web-search: uses the OpenAI Agent A2A server to search the web via OpenAI’s hosted web search tool
  • rag-search: uses the Azure AI Foundry A2A server to search for Contoso projects via a hosted RAG tool

The user interface to the agent is provided by Chainlit:

Chainlit UI

Above, I asked for information about a project. The agent is configured to use the rag-search tool to find project information. Under the hood, an A2A Server that wraps an Azure AI Foundry Agent is used to obtain this information. Via a filter, Chainlit supports visualizing when tools are called as can be seen at the top of the screen. It basically has hooks into the kernel object that gets created by Semantic Kernel.

The code for this Chainlit-hosted agent is on GitHub. The code in main.py uses an environment variable, MCP_SERVER_URL, that contains the address of the MCP server. As discussed above this will be http://containername/mcp (e.g., http://ca-mcp/mcp).

Following the typical Semantic Kernel approach, a kernel is created . Here is a snippet of code:

# Create the Semantic Kernel
        kernel = Kernel()
        
        # Add AI service to kernel
        ai_service = OpenAIChatCompletion(ai_model_id="gpt-4o")
        kernel.add_service(ai_service)
        logger.debug("Kernel and AI service initialized successfully")
        
        # Add MCP tools plugin to kernel (uses global client)
        tools_plugin = MCPToolsPlugin()
        kernel.add_plugin(tools_plugin, plugin_name="mcp_tools")
        logger.debug("MCP tools plugin added to kernel")

Note that we are not using Semantic Kernel’s built-in support for remote MCP servers that use streamable HTTP. Instead, we create a plugin via the MCPToolsPlugin class. That class defines two kernel functions, rag_search and web_search. In such a function, you can do what you want. I did not have to use MCP and could have called the A2A servers directly using the A2A client.

In our functions, we do use the MCP client from FastMCP to call the appropriate tool on the MCP server. The call to the A2A servers is implemented in the MCP server’s tools.

⚠️ This approach was chosen to illustrate that even if your framework does not natively support MCP, under the hood this is always LLM function calling. Kernel functions in Semantic Kernel are simply an abstraction on top of function calling. If you use Semantic Kernel’s native support for MCP, the tools on the MCP server would automatically be created as kernel functions. This native support requires much less code.

Now that we have the conversation agent up and running with Chainlit and Semantic Kernel, let’s look at the MCP server.

MCP Server

The conversation agent uses an MCP client (from the FastMCP library) to call tools hosted by the MCP server. This illustrates the separation of tool implementation from agent implementation.

The MCP server is implemented in main.py. In its most basic form, an MCP server with a few tools is really simple. This MCP server just defines two tools: a web tool and a rag tool.

The web tool looks like this:

@mcp.tool()
async def web_tool(query: str) -> str:
    """
    Perform a web search for the given query.
    
    Args:
        query: The search query to perform
        
    Returns:
        Search results as a string
    """
    logger.info(f"Web tool called with query: {query}")
    logger.info(f"Using web A2A agent at: {WEB_A2A_BASE_URL}")
    
    try:
        return await _send_a2a_message(query, WEB_A2A_BASE_URL)
    except Exception as e:
        logger.error(f"Error performing web search: {e}")
        return f"Error performing web search: {str(e)}"

This tool only does one thing: send a message to the A2A server on the address in WEB_A2A_BASE_URL. In Azure Container Apps, this URL is http://ca-web. The rag tool is implemented in a similar way. You can check the code of the _send_a2a_message function on GitHub.

⚠️ The addresses of the A2A servers are supplied to the mcp container app via environment variables WEB_A2A_BASE_URL and RAG_A2A_BASE_URL.

We now have the following implemented:

conversation --tool call--> MCP Server --run tool--> A2A Server

All traffic is synchronous and over http (not https)! Everything depends on the correct tool call being made by the conversation agent and the agents in the A2A servers. The rest is just plumbing! No magic! 😊

A2A Servers

You can check my earlier posts about A2A servers for background information:

It is important to note that the A2A server (rag) uses Azure AI Foundry. To authenticate to AI Foundry, we need to use a managed identity.

The rag container needs the following environment variables:

  • RAG_A2A_BASE_URL: required to set the correct url in the agent card
  • INTERNAL_PORT: port to run on (e.g., 80)
  • FOUNDRY_PROJECT: url to the Foundry project (e.g., https://FOUNDRY-RESOURCE.services.ai.azure.com/api/projects/FOUNDRY-PROJECT
  • ASSISTANT_ID: id of the agent you want to use; needs to exist in Foundry project
  • CLIENT_ID: the client id of the user assigned managed identity; this identity is created in the Bicep script; a role is assigned as well

During deployment of the container apps, a managed identity (that has the client id above) is assigned to the container. In the A2A server code that contains the code to talk to Foundry, this identity is used as follows:

if client_id:
            logger.info(f"Using ManagedIdentityCredential with client ID: {client_id}")
            credential = ManagedIdentityCredential(client_id=client_id)
        else:
            logger.info("Using DefaultAzureCredential")
            credential = DefaultAzureCredential()

This allows for the use of the Azure CLI identity when the rag agent is running on you local machine. Full code is in Agent_Executor.py.

⚠️ If you run the rag A2A server on your local machine, ensure you allow your IP address in the firewall settings of the Azure AI Foundry resource.

Full code for the A2A servers:

Deployment

To make it easy to deploy the containers to the Azure Container Apps environment (discussed in previous post), use the following script: https://github.com/gbaeke/multi_agent_aca/blob/main/deploy_containers.sh

At the top of the script, change the variables to match your environment:

ACR_NAME="SHORT_ACR_NAME"
ACR_URL="SHORT_ACR_NAME.azurecr.io"
RESOURCE_GROUP="RESOURCE_GROUP"
CONTAINER_APP_ENV="CONTAINER_APP_ENV_NAME"
MANAGED_IDENTITY="MANAGED_IDENTITY_NAME"

To deploy, simply run deploy_containers.sh --to-build conversation,mcp,web,rag. This does the following:

  • Builds and pushes the four containers using an ACR Task (no local Docker required)
  • Deploys the four containers with appropriate secrets and environment variables; serets are read from a .env file

Ensure that you have this .env in the same folder with the following values:

OPENAI_API_KEY="your_openai_api_key_here"
# Replace with your actual OpenAI API key

FOUNDRY_PROJECT="your_foundry_project_url"
# The URL of the Foundry project endpoint you're connecting to
# Find it in the properties of the AI Foundry project

ASSISTANT_ID="your_assistant_id_here"
# The unique ID of the agent you're referencing

This should deploy the four containers as shown below:

conversation, mcp, web and rag containers

Now grab the ingress URL (aka Application Url) of the conversation container:

Application URL (ingress URL) to the conversation app

Paste that URL in your browser. Hopefully the Chainlit UI is shown. If not, check the following:

  • Chainlit container has the MCP_SERVER_URL set to http://ca-mcp/mcp and also has you OpenAI key in OPENAI_API_KEY
  • MCP container has the WEB_A2A_BASE_URL and RAG_A2A_BASE_URL url set to http://ca-web and http://ca-rag
  • Web container has WEB_A2A_BASE_URL set to http://ca-web and also has an OPENAI_API_KEY
  • Rag container has RAG_A2A_BASE_URL set to http://ca-rag and has environment variables set to use the Azure AI Foundry agent; also check the managed identity of the container has access rights to AI Foundry

Normally these should all be set by both the Bicep and the container deployment script.

Wrapping Up

If you’ve made it this far and tried to implement this yourself, you’ve likely realized how much effort it takes to get everything up and running. About 99% of the work is infrastructure and plumbing; only 1% is actual agent code. In more complex agentic applications, the ratio may shift slightly, but infrastructure will still dominate the effort.

We have not even touched on things like logging, metrics, tracing the end-to-end communication path, load balancing, saving agent state and much, much more.

This brings me back to a key point from an earlier post:


If you can build your multi-agent solution in-process, or use an agent PaaS like Azure AI Foundry, do it.


Only choose the approach I described above when no other viable option exists or when you’re building larger solutions where multiple teams develop agents that must coexist within the same system.

Deploying AI Foundry Agents and Azure Container Apps to support an Agent2Agent solution

In previous posts, I discussed multi-agent solutions and the potential use of Google’s Agent2Agent protocol (A2A). In this post, we will deploy the infrastructure for an end-to-end solution like follows:

Multi-agent solution in Azure

Here’s a short description of the components.

ComponentDescription
Foundry ProjectBasic Foundry project with a private endpoint. The private endpoint ensures private communication between the RAG Agent container and the Azure Foundry agent.
Virtual NetworkProvides subnet to integrate Azure Container Apps Environment in a private network. This allows container apps to connect to Azure AI Foundry privately.
Container Apps EnvironmentIntegrated in our private network. Hosts Container Apps.
Container AppsContainer apps for conversation agent, MCP server, RAG agent and web agent. Only the conversation agent is publicly available.
Main components of the deployment

In what follows, we will first provide more information about Azure AI Foundry and then proceed to deploy all components except the Azure Container Apps themselves. We will deploy the actual app components in a follow-up post.

Azure AI Foundry Project

Azure AI Foundry is Microsoft’s enterprise platform for building, deploying, and managing AI applications—especially those using large language models (LLMs) and generative AI. It brings together everything you need: production-ready infrastructure, access to powerful models from providers like OpenAI, Mistral, and Meta, and tools for customization, monitoring, and scaling—all in one unified environment.

It’s designed to support the full AI development lifecycle:

  • Explore and test models and services
  • Build and customize applications or agents
  • Deploy to production
  • Monitor, evaluate, and improve performance

You can work either through the Azure AI Foundry portal or directly via SDKs in your preferred development environment.

You will do your work in a project. When you create a project in Azure AI Foundry, you’ll choose between two types:

Foundry Project

This type is recommended for most cases and is what we will use to define our RAG agent. Agents in projects are generally available (GA). You deploy models like gpt-4o directly to the project. There is no need to create a connection to an Azure OpenAI resource. It can be configured with a private endpoint to ensure private communication.

This matches exactly with our needs. Note that we will deploy a basic Foundry environment with a private endpoint and not a standard environment. For more information about basic versus standard, check the Foundry documentation.

Later, when we create the resources via Bicep, two resources will be created:

  • The Azure AI Foundry resource: with private endpoint
  • The Azure AI Foundry Project: used to create our RAG agent

Hub-based Project

This type has some additional options like Prompt Flow. However, agents in hub-based projects are not generally available at the time of writing. A hub-based project is not the best match for our needs here.

⚠️ In general, always use an Foundry Project versus a Hub-based Project unless you need a specific feature that, at the time of creation, is not yet available in Foundry projects.

As explained above, a Foundry project is part of an AI Foundry resource. Here is the resource in the portal (hub-based projects are under AI Hubs):

AI Foundry resource

Inside the resource, you can create a project. The above resource has one project:

Projects in the Foundry resource: your Foundry Project

To work with your project, you can click Go to Azure AI Foundry portal in the Overview tab:

In the Foundry Portal, you can proceed to create agents. However, if you have enabled a private endpoint, ensure you can access your Azure virtual network via a jump host or VPN. If that is not possible, allow your IP to access the Foundry resource in the Networking section of the resource. When you do not have access, you will see the following error:

No access to manage agents in the project

⚠️ Even after giving access, it will take a while for the change to propagate.

If you have access, you will see the following screen to add and configure agents:

Creating and debugging agents in your AI Foundry Project

Deployment with Bicep

You can check https://github.com/gbaeke/multi_agent_aca/tree/main/bicep to find Bicep files together with a shell script to deploy the resources. Also check the README for more information.

In Bicep, you first create an account (type is Microsoft.CognitiveServices/accounts). This matches the fndry-a2a resource in one of the screenshots above. In a later step, you add the project. The snippet below shows how the account gets created:

resource account 'Microsoft.CognitiveServices/accounts@2025-04-01-preview' = {
  name: aiFoundryName
  location: location
  identity: {
    type: 'SystemAssigned'
  }
  kind: 'AIServices'
  sku: {
    name: 'S0'
  }
  properties: {
    // Networking
    publicNetworkAccess: 'Enabled'
    
    networkAcls: {
      bypass: 'AzureServices'
      defaultAction: 'Deny'
      ipRules: [
        {
          value: 'IP address'
        }
      ]
    }

    // Specifies whether this resource support project management as child resources, used as containers for access management, data isolation, and cost in AI Foundry.
    allowProjectManagement: true

    // Defines developer API endpoint subdomain
    customSubDomainName: aiFoundryName

    // Auth
    disableLocalAuth: false
  }
}

It’s at this level you block public network access. The private endpoint and related network resources are created in other sections of the Bicep file.

Once you have this account, you can create the project. This matches with the fndry-a2a-proj project in one of the screenshots above. Here is the Bicep snippet:

resource project 'Microsoft.CognitiveServices/accounts/projects@2025-04-01-preview' = {
  name: defaultProjectName
  parent: account
  location: location
  identity: {
    type: 'SystemAssigned'
  }
  properties: {}
}

Later, we will create agents in this project. However, an agent needs a supported model. In this case, we will use gpt-4o-mini so we need to deploy it:

resource modelDeployment 'Microsoft.CognitiveServices/accounts/deployments@2024-10-01'= {
  parent: account
  name: 'gpt-4o-mini'
  sku : {
    capacity: 1
    name: 'GlobalStandard'
  }
  properties: {
    model:{
      name: 'gpt-4o-mini'
      format: 'OpenAI'
      version: '2024-07-18'
    }
  }
}

⚠️ Above, a capacity of 1 only allows for 1000 tokens per minute. You will probably want to increase that. If not, you run into issues when you test your agents because you will quickly hit the limit.

In the Foundry Portal, the model is shown as follows:

gpt-4o-mini deployment (next to manually deployed gpt-4o)

I will not go into the rest of the Bicep code. Most of it is network related (network, subnets, private endpoint, private DNS, DNS network links, etc..).

Creating the RAG Agent

Although we can create the agent using the Foundry SDK, we will create and test it via the Foundry Portal. As a first step, create or modify an agent. You might get a question first about the model you want to use with your agents.

In your agent, do the following:

  • give the agent a name
  • select a model from the list of deployed models
  • set instructions

I used the following instructions:

You retrieve information about Contoso projects using your knowledge tools. Always use your knowledge tools to answer the user. If you cannot find the answer via tools, respond you do not know.

Name, model and instructions

Next, scroll down and click + Add next to Knowledge. You will see the following screen:

List of agent knowledge tool

Select the Files tool and upload the files from https://github.com/gbaeke/multi_agent_aca/tree/main/project_descriptions. Use git clone https://github.com/gbaeke/multi_agent_aca.git to grab those files.

After selecting the local files, click Upload and Save to upload these files so the agent can search them. Behind the scenes, the files are chunked, chunks are vectorized and stored in a vector database. However, this is all hidden from you. Your agent configuration should now show the knowledge tool:

Knowledge tool added to agent

You should now test your agent. At the top of the configuration section, there is a Try in Playground link.

When I ask about EduForge, I get the following:

Asking about EduForge with a reponse from the files tool (+ annotation)

When you click View Run Info (at the end of the response), the use of the tool should be shown in the trace:

Tracing shows the tool calls and the file_search tool

If this works, you have a simple agent in Foundry that has access to a file_search tool to perform RAG (retrieval-augmented generation).

Wrapping up

We have now deployed the RAG agent with Azure AI Foundry. We created a Foundry resource in Azure with a private endpoint. The Foundry resource has one project within it. The project contains our RAG agent.

But remember, we want to wrap this agent with Google’s Agent2Agent. To achieve that, we will deploy the A2A server that uses the Foundry agent as a container in the Container Apps Environment.

We will take a look at how that works in a next post. In that post, we will use these agents as tools via MCP and provide the MCP tools to our conversation agent. The conversation agent will use Semantic Kernel.

Stay tuned! 😊

Using tasks with streaming in Google Agent2Agent (A2A)

In a previous post we created a simple A2A agent that uses synchronous message exchange. An A2A client sends a message and the A2A server, via the Agent Executor, responds with a message.

But what if you have a longer running task to perform and you want to inform the client that the task in ongoing? In that case, you can enable streaming on the A2A server and use a task that streams updates and the final result to the client.

The sequence diagram illustrates the flow of messages. It is based on the streaming example in the A2A specification.

A2A tasks with streaming updates

In this case, the A2A client needs to perform a streaming request which is sent to the /message/stream endpoint of the A2A server. The code in the AgentExecutor will need to create a task and provide updates to the client at regular intervals.

⚠️ If you want to skip directly to the code, check out the example on GitHub.

Let’s get into the details in the following order:

  • Writing an agent that provides updates while it is doing work: I will use the OpenAI Agents SDK with its support for agent hooks
  • Writing an AgentExecutor that accepts a message, creates a task and provides updates to the client
  • Updating the A2A Server to support streaming
  • Updating the A2A Client to support streaming

AI Agent that provides updates

Although streaming updates is an integral part of A2A, the agent that does the actual work needs to provide feedback about its progress. That work is up to you, the developer.

In my example, I use an agent created with the OpenAI Agents SDK. This SDK supports AgentHooks that execute at certain events:

  • Agent started/finished
  • Tool call started/finished

The agent class in agent.py on GitHub uses an asyncio queue to emit both the hook events and the agent’s reponse to the caller. The A2A AgentExecutor uses the invoke_stream() method which returns an AsyncGenerator.

You can run python agent.py independently. This should result in the following:

The agent has a tool that returns the current date. The hooks emit the events as shown above followed by the final result.

We can now use this agent from the AgentExecutor and stream the events and final result from the agent to the A2A Client.

AgentExecutor Tasks and Streaming

Instead of simply returning a message to the A2A client, we now need to initiate a long-running task that sends intermediate updates to the client.. Under the hood this uses SSE (Server Sent Events) between the A2A Client and A2A Server.

The file agent_executor.py on GitHub contains the code that makes this happen. Let’s step through it:

message_text = context.get_user_input()  # helper method to extract the user input from the context
        logger.info(f"Message text: {message_text}")

        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

Above, we extract the user’s input from the incoming message and we check if the context already contains a task. If not, we create the task and we queue it. This informs the client a task was created and that sse can be used to obtain intermediate results.

Now that we have a task (a new or existing one), the following code is used:

updater = TaskUpdater(event_queue, task.id, task.contextId)
async for event in self.agent.invoke_stream(message_text):
    if event.event_type == StreamEventType.RESPONSE:
        # send the result as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=event.data['response']))],
            name='calculator_result',
        )

        await updater.complete()
            
    else:
        await updater.update_status(
        TaskState.working,
        new_agent_text_message(
            event.data.get('message', ''),
            task.contextId,
            task.id,
        ),
    )

We first create a TaskUpdater instance that has the event queue, current task Id and contextId. The task updater is used to provide status updates, complete or even cancel a task.

We then call invoke_stream(query) on our agent and grab the events it emits. If we get a event type of RESPONSE, we create an artifact with the agent response as text and mask the task as complete. In all other cases, we send a status event with updater.update_status(). A status update contains a task state (working in this case) and a message about the state. The message we send is part of the event that is emitted from invoke_stream() and includes things like agent started, tool started, etc…

So in short, to send streaming updates:

  • Ensure your agents emit events of some sort
  • Use those events in the AgentExecutor and create a task that sends intermediate updates until the agent has finished

However, our work is not finished. The A2A Server needs to be updated to support streaming.

A2A Server streaming support

The A2A server code in is main.py on GitHub. To support streaming, we need to update the capabilities of the server:

capabilities = AgentCapabilities(streaming=True, pushNotifications=True)

⚠️ pushNotifications=True is not required for streaming. I include it here to show that sending a push notification to a web hook is also an option.

That’s it! The A2A Server now supports streaming. Easy! 😊

Streaming with the A2A Client

Instead of sending a message to the non-streaming endpoint, the client should now use the streaming endpoint. Here is the code to do that (check test_client.py for the full code):

message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text=question))],
        )
        streaming_request = SendStreamingMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        stream_response = client.send_message_streaming(streaming_request)

To send to the streaming endpoint, the SendStreamingMessageRequest() function is your friend, together with client.send_message_streaming()

We can now grab the responses as they come in:

async for chunk in stream_response:
            # Only print status updates and text responses
            chunk_dict = chunk.model_dump(mode='json', exclude_none=True)
            
            if 'result' in chunk_dict:
                result = chunk_dict['result']
                
                # Handle status updates
                if result.get('kind') == 'status-update':
                    status = result.get('status', {})
                    state = status.get('state', 'unknown')
                    
                    if 'message' in status:
                        message = status['message']
                        if 'parts' in message:
                            for part in message['parts']:
                                if part.get('kind') == 'text':
                                    print(f"[{state.upper()}] {part.get('text', '')}")
                    else:
                        print(f"[{state.upper()}]")
                
                # Handle artifact updates (contain actual responses)
                elif result.get('kind') == 'artifact-update':
                    artifact = result.get('artifact', {})
                    if 'parts' in artifact:
                        for part in artifact['parts']:
                            if part.get('kind') == 'text':
                                print(f"[RESPONSE] {part.get('text', '')}")
                
                # Handle initial task submission
                elif result.get('kind') == 'task':
                    print(f"[TASK SUBMITTED] ID: {result.get('id', 'unknown')}")
                    
                # Handle final completion
                elif result.get('final') is True:
                    print("[TASK COMPLETED]")

This code checks the the type of content coming in:

  • status-update: when AgentExecutor sends a status update
  • artifact-update: when AgentExecutor sends an artifact with the agent’s response
  • task: when tasks are submitted and completed

Running the client and asking what today’s date is, results in the following response:

Streaming is working as intended! But what if you use a client that does not support streaming? That actually works and results in a full response with the agent’s answer in the result field. You would also get a history field that contains the initial user question, including all the task updates.

Here’s a snippet of that result:

{
  "id": "...",
  "jsonrpc": "2.0",
  "result": {
    "artifacts": [
      {
        "artifactId": "...",
        "name": "calculator_result",
        "parts": [
          {
            "kind": "text",
            "text": "Today's date is July 13, 2025."
          }
        ]
      }
    ],
    "contextId": "...",
    "history": [
      {
        "role": "user",
        "parts": [
          {
            "kind": "text",
            "text": "What is today's date?"
          }
        ]
      },
      {
        "role": "agent",
        "parts": [
          {
            "kind": "text",
            "text": "Agent 'CalculatorAgent' is starting..."
          }
        ]
      }
    ],
    "id": "...",
    "kind": "task",
    "status": {
      "state": "completed"
    }
  }
}

Wrapping up

You have now seen how to run longer running tasks and provide updates along the way via streaming. As long as your agent code provides status updates, the AgentExecutor can create a task and provide task updates and the task result to the A2A Server which uses SSE to send them to the A2A Client.

In an upcoming post, we will take a look at running a multi-agent solution in the Azure cloud.

Google’s A2A: taking a closer look

In the previous post, I talked about options to build multi-agent solutions. The last option used Google’s A2A. A2A provides a wrapper around your agent, basically a JSON-RPC API, that standardizes how you talk to your agent. In this post we take a closer look at the basics of A2A with simple synchronous message exchange.

⚠️ A2A is still in development. We do not use it in production yet!

The idea is to build solutions that look like this (just one of the many possibilities):

The conversation agent is an agent that uses tools to get the job done. It wouldn’t be much of an agent without tools right? The tools are custom tools created by the developer that call other agents to do work. The other agents can be written in any framework and use any development language. How the agent works internally is irrelevant. When the conversation agent detects (via standard function calling) that the RAG tool needs to be executed, that tool will call the RAG agent over A2A and return the results.

A2A does not dictate how you build your agent. In the example below, an Azure AI Foundry Agent sits at the core. That agent can use any of its hosted tools or custom tools to get the job done. Because this is a RAG Agent, it might use the built-in Azure AI Search or SharePoint knowledge source. As a developer, you use the Azure AI Foundry SDK or Semantic Kernel to interact with your agent as you see fit. Although you do not have to, it is common to wrap your agent in a class and provide one or more methods to interact with it. For example, an invoke() method and an invoke_streaming() method.

Here is a minimal example for the AI Foundry Agent (the yellow box):

class RAGAgent:
    def __init__(self):
        # INITIALIZATION CODE NOT SHOWN
        self.project = AIProjectClient(
            credential=DefaultAzureCredential(),
            endpoint=endpoint)
        self.agent = self.project.agents.get_agent(agent_id)

    async def invoke(self, question: str) -> str:
        thread = self.project.agents.threads.create()

        message = self.project.agents.messages.create(
            thread_id=thread.id,
            role="user",
            content=question
        )
        run = self.project.agents.runs.create_and_process(
            thread_id=thread.id,
            agent_id=self.agent.id)
        messages = list(self.project.agents.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING))

        # ...

This code has nothing to do with Google A2A and could be implemented in many other ways. This is about to change because we will now call the above agent from A2A’s AgentExecutor. The AgentExecutor is a key server‑side interface: when a client sends a message, the A2A server calls execute() on your AgentExecutor instance, and your implementation handles the logic and sends updates via an event queue. Here’s how your agent is used by A2A. When a client sends a message it works its way down to your agent via several A2A components:

It’s important to understand the different types of message exchange in A2A. This post will not look at all of them. You can find more information in the A2A documentation. This post uses synchronous messaging via message/send where the response is a simple message and not a, potentially longer running, task.

Let’s dive into the AgentExecutor (it processes the message we send) and work our way up to the A2A client.

AgentExecutor

Let’s take a look at a bare bones implementation of AgentExecutor that works with plain/text input and output messages and without streaming:

Client --message--> A2A Server --> Agent Executor --> Agent

and

Agent --> Agent Executor --> A2A Server --message--> Client
class RAGAgentExecutor(AgentExecutor):

    def __init__(self):
        self.agent = RAGAgent()

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        message_text = context.get_user_input()
        
        result = await self.agent.invoke(message_text)

        await event_queue.enqueue_event(new_agent_text_message(result))
        
    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        raise Exception("Cancel not supported")

When a message is sent to the A2A server via JSON-RPC, the execute() method of the RAGAgentExecutor is called. At server startup, __init__ creates our AI Foundry RAGAgent which does the actual work.

Inside the execute() method, we assume the context contains a message. We use the get_user_input() helper to extract the message text (user query). We then simply call our agent’s invoke() method with that query and return the result via the event_queue. The A2A server uses an event_queue to provide responses back to the caller. In this case, the response will be a simple plain/text message.

This is probably as simple as it gets and is useful to understand A2A’s basic operation. In many cases though, you might want to return a longer running task instead of a message and provide updates to the client via streaming. That would require creating the task and streaming the task updates to the client. The client would need to be modified to handle this.

But wait, we still need to create the server that uses this AgentExecutor. Let’s take a look!

A2A Server

The A2A Python SDK uses starlette and uvicorn to create the JSON-RPC server. You don’t really need to know anything about this because A2A does this under the covers for you. The server needs to do a couple of things:

  • Create one or more skills: skills represent a specific capability or function your agent offers—for instance, “currency conversion,” “document summary” or “meeting scheduling”.
  • Create an agent card: an agent card is like a business card for your agent; it tells others what the agent can do; the above skills are part of the agent card; the agent card is published at /.well-known/agent.json on the agents domain (e.g., localhost:9999 on your local machine)
  • Create a request handler: the request handler ties the server to the AgentExecutor you created earlier
  • Create the A2AStarletteApplication: it ties the agent card and the request handler together
  • Serve the A2AStarletteApplication with uvicorn on an address and port of your choosing

This is what it looks like in code:

import logging
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from agent_executor import RagAgentExecutor

def main():
    skill = AgentSkill(
        id="rag_skill",
        name="RAG Skill",
        description="Search knowledge base for project information",
        tags=["rag", "agent", "information"],
        examples=["What is project Astro and what tech is used in it?"],
    )
    agent_card = AgentCard(
        name="RAG Agent",
        description="A simple agent that searches the knowledge base for information",
        url="http://localhost:9998/",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        skills=[skill],
        version="1.0.0",
        capabilities=AgentCapabilities(),
    )
    request_handler = DefaultRequestHandler(
        agent_executor=RagAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )
    server = A2AStarletteApplication(
        http_handler=request_handler,
        agent_card=agent_card,
    )
    uvicorn.run(server.build(), host="0.0.0.0", port=9998)
if __name__ == "__main__":
    main()

Validating the agent card

When you run the A2A server on your local machine and expose it to the public with ngrok or other tools, you can use https://a2aprotocol.ai/a2a-protocol-validator to validate it. When I do this for the RAG Agent, I get the following:

In JSON, the agent card is as follows:

{
  "capabilities": {},
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text"
  ],
  "description": "A simple agent that searches the knowledge base for information",
  "name": "RAG Agent",
  "protocolVersion": "0.2.5",
  "skills": [
    {
      "description": "Search knowledge base for project information",
      "examples": [
        "What is project Astro and what tech is used in it?"
      ],
      "id": "rag_agent",
      "name": "RAG Agent",
      "tags": [
        "rag",
        "agent",
        "information"
      ]
    }
  ],
  "url": "http://Geerts-MacBook-Air-2.local:9998/",
  "version": "1.0.0"
}

Now it is time to actually start talking to the agent.

Using the A2A client to talk to the agent

With the server up and running and the Agent Card verified, how do we exchange messages with the server?

In our case, where the server supports only text and there is no streaming, the client can be quite simple:

  • Create an httpx client and set timeout higher depending on how long it takes to get a response; this client is used by the A2ACardResolver and A2AClient
  • Retrieve the agent card with the A2ACardResolver
  • Create a client with A2AClient. It needs the agent card as input and will use the url in the agent card to connect to the A2A server
  • Create a Message, include it in a MessageRequest and send the MessageRequest with the client. We use the non-streaming message_send() method.
  • Handle the response from the client

The code below shows what this might look like:

import uuid

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    AgentCard,
    Message,
    MessageSendParams,
    Part,
    Role,
    SendMessageRequest,
    TextPart,
)

PUBLIC_AGENT_CARD_PATH = "/.well-known/agent.json"
BASE_URL = "http://localhost:9998"


async def main() -> None:
    timeout = httpx.Timeout(200.0, read=200.0, write=30.0, connect=10.0)
    async with httpx.AsyncClient(timeout=timeout) as httpx_client:
        # Initialize A2ACardResolver
        resolver = A2ACardResolver(
            httpx_client=httpx_client,
            base_url=BASE_URL,
        )

        final_agent_card_to_use: AgentCard | None = None

        try:
            print(
                f"Fetching public agent card from: {BASE_URL}{PUBLIC_AGENT_CARD_PATH}"
            )
            _public_card = await resolver.get_agent_card()
            print("Fetched public agent card")
            print(_public_card.model_dump_json(indent=2))

            final_agent_card_to_use = _public_card

        except Exception as e:
            print(f"Error fetching public agent card: {e}")
            raise RuntimeError("Failed to fetch public agent card")

        client = A2AClient(
            httpx_client=httpx_client, agent_card=final_agent_card_to_use
        )
        print("A2AClient initialized")

        message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text="Is there a project with the word Astro? If so, describe it."))],
        )
        request = SendMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        response = await client.send_message(request)
        print("Response:")
        print(response.model_dump_json(indent=2))


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Above, the entire response is printed as JSON. That is useful to learn what the responses look like. This is part of the response:

{
  "id": "6cc795d8-fa84-4734-8b5a-dccd3a22142d",
  "jsonrpc": "2.0",
  "result": {
    "contextId": null,
    "extensions": null,
    "kind": "message",
    "messageId": "fead200d-0ea4-4ccb-bf1c-ed507b38d79d",
    "metadata": null,
    "parts": [
      {
        "kind": "text",
        "metadata": null,
        "text": "RESPONSE FROM RAG AGENT"
      }
    ],
    "referenceTaskIds": null,
    "role": "agent",
    "taskId": null
  }
}

Simply sending the response as a string on the event queue results in a message with one text part. The result from the RAG agent is in the text property. For a longer running task with streaming updates, the response would be quite different.

You can now easily interact with your agent using this client. For example:

  • use the client in any application (need not be an agent)
  • use the client in a workflow engine like LangGraph
  • use the client in an agent tool; the agent can be written in any framework; when the agent identifies a tool call is needed, the tool is run which contains A2AClient code to interact with the A2A Agent

The entire flow

The diagram below shows the end-to-end flow:

Try it yourself

On GitHub, check https://github.com/gbaeke/multi_agent_aca/tree/main/a2a_simple for a skeleton implementation of a calculator agent. The CalculatorAgent class’s invoke() method always returns “I did not do anything!” It’s up to you to change that!

You can run this A2A server as-is and connect to it with test_client.py. To use an actual agent, update the CalculatorAgent class’s invoke() method with a real agent written in your preferred framework.

Check the README.md for more instructions.

That’s it for this post! In a next one, we will look at a more complex example that streams messages to the client. Stay tuned!

Building multi-agent solutions: what are your options?

When we meet with customers, the topic of a “multi-agent solution” often comes up. This isn’t surprising. There’s a lot of excitement around their potential to transform business processes, strengthen customer relationships, and more.

The first question you have to ask yourself though is this: “Do I really need a multi-agent solution?”. Often, we find that a single agent with a range of tools or a workflow is sufficient. If that’s the case, always go for that option!

On the other hand, if you do need a multi-agent solution, there are several things to think about. Suppose you want to build something like this:

Generic multi-agent setup

Users interact with a main agent that maintains the conversation with the user. When the user asks about a project, a RAG agent retrieves project information. If the user also asks to research or explain the technologies used in the project, the web agent is used to retrieve information from the Internet.

⚠️ If I were to follow my own advice, this would be a single agent with tools. There is no need for multiple agents here. However, let’s use this as an example because it’s easy to reason about.

What are some of your options to build this? The list below is not exhaustive but contains common patterns:

  • Choose a framework (or use the lower-level SDKs) and run everything in the same process
  • Choose an Agent PaaS like Azure AI Foundry Agents: the agents can be defined in the platform; they run independently and can be linked together using the connected agents feature
  • Create the agents in your framework of choice, run them as independent processes and establish a method of communication between these agents; in this post, we will use Google’s A2A (Agent-to-Agent) as an example. Other options are ACP (Agent Communication Protocol, IBM) or “roll your own”

Let’s look at these three in a bit more detail.

In-Process Agents

Running multiple agents in the same process and have them work together is relatively easy. Let’s look at how to do this with OpenAI Agents SDK. Other frameworks use similar approaches.

Multi-agent in-process using the OpenAI Agents SDK

Above, all agents are written using the OpenAI Agents SDK. In code, you first define the RAG and Web Agent as agents with their own tools. In the OpenAI Agents SDK, both the RAG tool and the web search tool are hosted tools provided by OpenAI. See https://openai.github.io/openai-agents-python/tools/ for more information about the FileSearchTool and the WebSearchTool.

Next, the Conversation Agent gets created using the same approach. This time however, two tools are addedd: the RAG Agent Tool and the Web Agent Tool. These tools get called by the Conversation Agent based on their description. This simply is tool calling in action where each tool calls another agent and returns the agent result. The way these agents interact with each other is hidden from you. The SDK simply takes care of it for you.

You can find an example of this in my agent_config GitHub repo. The sample code below shows how this works:

rag_agent = create_agent_from_config("rag")
web_agent = create_agent_from_config("web")

agent_as_tools = {
    "rag": {
        "agent": rag_agent,
        "name": "rag",
        "description": "Provides information about projects"
    },
    "web": {
        "agent": web_agent,
        "name": "web",
        "description": "Gets information about technologies"
    }
}

conversation_agent = create_agent_from_config("conversation", agent_as_tools)

result = await Runner.run(conversation_agent, user_question)

Note that I am using a helper function here that creates an agent from a configuration file that contains the agent instructions, model and tools. Check my previous post for more information. The repo used in this post uses slightly different agents but the concept is the same.

Creating a multi-agent solution in a single process, using a framework that supports calling other agents as tools, is relatively straightforward. However, what if you want to use the RAG Agent in other agents or workflows? In other words, you want reusability! Let’s see how to do this with the other approaches.

Using a Agent PaaS: Azure AI Foundry Agents

Azure AI Foundry Agents is a PaaS solution to create and run agents with enterprise-level features such as isolated networking. After creating an Azure AI Foundry resource and project, you can define agents in the portal:

Agents defined in Azure AI Foundry

⚠️ You can also create these agents from code (e.g., Foundry SDK or Semantic Kernel) which gives you extra flexibility in agent design.

The web and rag agents have their own tools, including hosted tools provided by Foundry, and can run on their own. This is already an improvement compared to the previous approach: agents can be reused from other agents, workflows or any other application.

Azure AI Foundry allows you to connect agents to each other. This uses the same approach as in the OpenAI Agents SDK: agents as tools. Below, the Conversation Agent is connected to the other two agents:

Connected Agents for the Conversation Agent

The configuration of a connected agent is shown below and has a name and description:

It all fits together like in the diagram below:

Multi-agent with Azure AI Foundry

As discussed above, each agent is a standalone entity. You can interact with these agents using the AI Foundry Agents protocol, which is an evolution of the OpenAI Assistant’s protocol. You can read more about it here. In short, to talk to an agent you do the following:

  • Create the agent in code or reference an existing agent (e.g., our conversation agent)
  • Create a thread
  • Put a message on the thread (e.g., the user’s question or a question from another agent via the connected agents principle)
  • Run the thread on the agent and grab the response

Below is an example in Python:

from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.agents.models import ListSortOrder

project = AIProjectClient(
    credential=DefaultAzureCredential(),
    endpoint="https://YOUR_FOUNDRY_ENDPOINT")

agent = project.agents.get_agent("YOUR_ASSISTANT_ID")

thread = project.agents.threads.create()
print(f"Created thread, ID: {thread.id}")

message = project.agents.messages.create(
    thread_id=thread.id,
    role="user",
    content="What tech is used in some of Contoso's projects?"
)

run = project.agents.runs.create_and_process(
    thread_id=thread.id,
    agent_id=agent.id)

if run.status == "failed":
    print(f"Run failed: {run.last_error}")
else:
    messages = project.agents.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)

    for message in messages:
        if message.text_messages:
            print(f"{message.role}: {message.text_messages[-1].text.value}")

The connected agents feature uses the same protocol under the hood. Like in the OpenAI Agents SDK, this is hidden from you.

When you mainly use Azure AI Foundry agents, there is no direct need for agent-to-agent protocols like A2A or ACP. In fact, even when you have an agent that is not created in Azure AI Foundry, you can simply create a tool in that agent. The tool can then use the thread/message/run approach to get a response from the agent hosted in Foundry. This can all run isolated in your own network if you wish.

You could argue that the protocol used by Azure AI Foundry is not an industry standard. You cannot simply use this protocol in combination with other frameworks. Unless you use something like https://pypi.org/project/llamphouse/, a project written by colleagues of mine which is protocol compatible with the OpenAI Assistants API.

Let’s take a look at the third approach which uses a protocol that aspires to be a standard and can be used together with any agent framework: Google’s A2A.

Using Google’s A2A in a multi-agent solution

The basic idea of Google’s A2A is the creation of a standard protocol for agent-to-agent communication. Without going into the details of A2A, that’s for another post, the solution looks like this:

A multi-agent solution with A2A

A2A allows you to wrap any agent, written in any framework, in a standard JSON-RPC API. With an A2A client, you can send messages to the API which uses an Agent Executor around your actual agent. Your agent provides the response and a message is sent back to the client.

Above, there are two A2A-based agents:

  • The RAG Agent uses Azure AI Foundry and its built-in vector store tool
  • The Web Agent uses OpenAI Agent SDK and its hosted web search tool

The conversation agent can be written in any framework as long as you define tools for that agent that use the A2A protocol (via an A2A client) to send messages to the other agents. This again is agents as tools in action.

To illustrate this standards-based approach, let’s use the A2A Inspector to send a message to the RAG Agent. As long as your agent has an A2A wrapper, this inspector will be able to talk to it. First, we connect to the agent to get its agent card:

Connecting to the RAG Agent with A2A

The agent card is defined in code and contains information about what the agent can do via skills. Once connected, I can send a message to the agent using the A2A protocol:

Sending a message which results in a task

The message that got sent was the following (JSON-RPC):

{
  "id": "msg-1752245905034-georiakp8",
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": {
    "configuration": {
      "acceptedOutputModes": [
        "text/plain",
        "video/mp4"
      ]
    },
    "message": {
      "contextId": "27effaaa-98af-44c4-b15f-10d682fd6496",
      "kind": "message",
      "messageId": "60f95a30-535a-454f-8a8d-31f52d7957b5",
      "parts": [
        {
          "kind": "text",
          "text": "What is project Astro (I might have the name wrong though)"
        }
      ],
      "role": "user"
    }
  }
}

This was the response:

{
  "artifacts": [
    {
      "artifactId": "d912666b-f9ff-4fa6-8899-b656adf9f09c",
      "parts": [
        {
          "kind": "text",
          "text": "Project \"Astro\" appears to refer to \"Astro Events,\" which is a web platform designed for users to discover, share, and RSVP to astronomy-related events worldwide. The platform includes features such as interactive sky maps, event notifications, and a community forum for both amateur and professional astronomers. If you were thinking about astronomy or space-related projects, this may be the correct project you had in mind【4:0†astro_events.md】. If you're thinking of something else, let me know!"
        }
      ]
    }
  ],
  "contextId": "27effaaa-98af-44c4-b15f-10d682fd6496",
  "history": [
    HISTORY HERE
  ],
  "id": "d5af08b3-93a0-40ec-8236-4269c1ed866d",
  "kind": "task",
  "status": {
    "state": "completed",
    "timestamp": "2025-07-11T14:58:38.029960+00:00"
  },
  "validation_errors": []
}

If you are building complex multi-agent solutions, where multiple teams write their agents in different frameworks and development languages, establishing communication standards pays off in the long run.

However, this approach is much more complex than the other two approaches. We have only scratched the surface of A2A here and have not touched on the following aspects:

  • How to handle authentication?
  • How to handle long running tasks?
  • How to scale your agents to multiple instances and how to preserve state?
  • How to handle logging and tracing across agent boundaries?

⚠️ Most of the above is simply software engineering and has not much to do with LLM-based agents!

Conclusion

In this article, we discussed three approaches to building a multi-agent solution

ApproachComplexityReusabilityStandardizationBest For
In-processLowLimitedNoSimple, single-team use cases
Agent PaaSMediumGoodNo (vendor-specific)Org-wide, moderate complexity
A2A ProtocolHighExcellentYesCross-team, cross-platform needs

When you really need a multi-agent solution, I strongly believe that the first two approaches should cover 90% of use cases.

In complex cases, the last option can be considered although it should not be underestimated. To make this option a bit more clear, a follow-up article will discuss how to create and connect agents with A2A in more detail.

Building Configurable AI Agents with the OpenAI Agents SDK

In this post, I will demonstrate how to build an AI agent system where agents can collaborate in different ways. The goal is to create agents that can either work independently with their own tools or collaborate with other agents through two distinct patterns: using agents as tools or handing off control to other agents.

In this post, I work directly with OpenAI models. You can also use Azure OpenAI if you want, but there are some caveats. Check this guide on using Azure OpenAI and potentially APIM with the OpenAI Agents SDK for more details.

All code can be found in this repo: https://github.com/gbaeke/agent_config. Not all code is shown in this post so be sure to check the repo.

Agent Factory and Configuration System

The core of this system is an agent factory that creates agents from JSON configurations stored either on the filesystem or in Redis. The factory reads configuration files that define:

  • Agent name and instructions
  • Which AI model to use (e.g., gpt-4o-mini)
  • Available tools from a centralized tool registry
  • Validation against a JSON schema

For example, a weather agent configuration looks like:

{
    "name": "Weather Agent",
    "instructions": "You are a helpful assistant for weather questions...",
    "model": "gpt-4o-mini",
    "tools": ["get_current_weather", "get_current_temperature", "get_seven_day_forecast"]
}

The agent factory validates each configuration against a schema and can load configurations from either JSON files in the configs/ directory or from Redis when USE_REDIS=True (env var). This flexibility allows for dynamic configuration management in a potential production setting. Besides the configuration above, other configuration could be useful such as MCP configuration, settings like temperature, guardrails and much more.

⚠️ Note that this is example code to explore ideas around agent configuration, agent factories, agents-as-tools versus handoffs, etc…

Tool Registry

All available tools are maintained in a centralized tools.py file that exports an all_tools dictionary. This registry includes:

  • Function-based tools decorated with @function_tool
  • External API integrations (like web search): the built-in web search tool from OpenAI is used as an example here
  • Remote service calls: example tool that uses a calculator agent exposes via an API (FastAPI); this is the same as agent-as-tool discussed below but the agent is remote and served as an API.

In a production environment, tool management would likely be handled differently – for example, through a dedicated tool registry service implementing the Model Context Protocol (MCP). This would allow tools to be dynamically registered, versioned, and accessed across multiple services while maintaining consistent interfaces and behaviors. The registry service could handle authentication, rate limiting, and monitoring of tool usage across all agents in the system.

Agent-as-Tool vs Handoff Patterns

The system supports two distinct collaboration patterns:

Agent-as-Tool

With this pattern, one agent uses another agent as if it were a regular tool. The main agent remains in control of the conversation flow. For example:

agent_as_tools = {
    "weather": {
        "agent": weather_agent,
        "name": "weather", 
        "description": "Get weather information based on the user's question"
    }
}
conversation_agent = create_agent_from_config("conversation", agent_as_tools)

When the conversation agent needs weather information, it calls the weather agent as a tool, gets the result, and continues processing the conversation. The main agent simply passes what is deems necessary to the agent used as a tool and uses the agent response to form an output.

The way you describe the tool is important here. It influences what the conversation agents sends to the weather agent as a parameter (the user’s question).

Handoff Pattern

With handoffs, control is transferred to another agent entirely. The receiving agent takes over the conversation until it’s complete or hands control back. This is implemented by passing agents to the handoffs parameter:

agent_handoffs = [simulator_agent]
conversation_agent = create_agent_from_config("conversation", {}, agent_handoffs)

The key difference is control: agent-as-tool keeps the original agent in charge, while handoffs transfer complete control to the receiving agent.

To implement the handoff pattern and to allow transfer back to the original agent, support from the UI is needed. In the code, which uses a simple text-based UI, this is done by using a current_agent variable that refers to the agent currently in charge and by falling back to the base conversation agent when the user types ‘exit`. Note that this pattern is quite tricky to implement correctly. Often, the main agent thinks it can do the simulation on its own. When the user does not type exit but asks to go back to the conversation agent, the simulator agent might seem to comply but in reality, you are still in the simulator. This can be solved by prompting both agents properly but do not expect it to be automatic.

A look at the code

If you look at agent_from_config.py (the main script), you will notice that it is very simple. Most of the agent creation logic is in agent_factory.py which creates the agent from a config file or a config stored in Redis.

# Create specialized agents
weather_agent = create_agent_from_config("weather")
news_agent = create_agent_from_config("news")
simulator_agent = create_agent_from_config("simulator")

# Configure agents as tools
agent_as_tools = {
    "weather": {
        "agent": weather_agent,
        "name": "weather",
        "description": "Get weather information based on the user's full question"
    },
    "news": {
        "agent": news_agent,
        "name": "news", 
        "description": "Get news information based on the user's full question"
    }
}

# Configure handoff agents
agent_handoffs = [simulator_agent]

# Create main agent with both patterns
conversation_agent = create_agent_from_config("conversation", agent_as_tools, agent_handoffs)

Above, we create three agents: weather, news (with OpenAI built-in web search) and simulator. These agents are used by the conversation agent created at the end. To provide the conversation agent with two agents as tools and one agent handoff, the create_agent_from_config function that returns a value of type Agent has two optional parameters:

  • a dictionary that with references to agents and their tool descriptions (used by the main agent to know when to call the agent)
  • a list with agents to handoff to

In this code, you need to build these arrays in code. This could also be done via the configuration system but that was not implemented.

To simulate a chat session, the following code is used:

async def chat():
    current_agent = conversation_agent
    convo: list[TResponseInputItem] = []
    
    while True:
        user_input = input("You: ")
        
        if user_input == "exit":
            if current_agent != conversation_agent:
                current_agent = conversation_agent  # Return to main agent
            else:
                break
        
        convo.append({"content": user_input, "role": "user"})
        result = await Runner.run(current_agent, convo)
        
        convo = result.to_input_list()
        current_agent = result.last_agent  # Track agent changes

We always start with the conversation agent. When the conversation agent decides to do a handoff, the last_agent property of the result of the last run will be the simulation agent. The current agent is then set to that agent so the conversations stays within the simulation agent. Note that the code also implements callbacks to tell you which agent is answering and what tools are called. Those callbacks are defined in agent_factory.py.

Built-in Tracing

The OpenAI Agents SDK includes tracing capabilities that are enabled by default. Every agent interaction, tool call, and handoff is automatically traced and can be viewed in the OpenAI dashboard. This provides visibility into:

  • Which agent handled each part of a conversation
  • What tools were called and when
  • Performance metrics for each interaction
  • The full conversation flow across multiple agents

Tracing can be customized or disabled if needed, but the default implementation provides comprehensive observability out of the box.

This is what the traces look like:

These traces provide detailed insights into a conversation’s flow. Track down issues and adjust agent configs, especially instructions, when things go awry.

Conclusion

In this post, we looked at a simple approach to build multi-agent systems using the OpenAI Agents SDK. The combination of configurable agents, centralized tool management, and flexible collaboration patterns creates a foundation for more complex AI workflows. The agent factory pattern allows for easy deployment and management of different agent configurations, while the built-in tracing provides the observability needed for production systems.

However, much more effort is required to implement this in production with more complex agents. As always keep things as simple as possible and implement the minimum amount of agents possible. You should also ask yourself if you even need multi-agent because state management, chat history, tracing, testing etc… become increasingly complex in a multi-agent world.