Building an AI Agent Server with AG-UI and Microsoft Agent Framework

In this post, I want to talk about the Python backend I built for an AG-UI demo project. It is part of a larger project that also includes a frontend that uses CopilotKit:

This post discusses the Python AG-UI server that is built with Microsoft Agent Framework.

All code is on GitHub: https://github.com/gbaeke/agui. Most of the code for this demo was written with GitHub Copilot with the help of Microsoft Docs MCP and Context7. 🤷

What is AG-UI?

Before we dive into the code, let’s talk about AG-UI. AG-UI is a standardized protocol for building AI agent interfaces. Think of it as a common language that lets your frontend talk to any backend agent that supports it, no matter what technology you use.

The protocol gives you some nice features out of the box:

  • Remote Agent Hosting: deploy your agents as web services (e.g. FastAPI)
  • Real-time Streaming: stream responses using Server-Sent Events (SSE)
  • Standardized Communication: consistent message format for reliable interactions (e.g. tool started, tool arguments, tool end, …)
  • Thread Management: keep conversation context across multiple requests

Why does this matter? Well, without a standard like AG-UI, every frontend needs custom code to talk to different backends. With AG-UI, you build your frontend once and it works with any AG-UI compatible backend. The same goes for backends – build it once and any AG-UI client can use it.

Under the hood, AG-UI uses simple HTTP POST requests for sending messages and Server-Sent Events (SSE) for streaming responses back. It’s not complicated, but it’s standardized. And that’s the point.

AG-UI has many more features than the ones discussed in this post. Check https://docs.ag-ui.com/introduction for the full picture.

Microsoft Agent Framework

Now, you could implement AG-UI from scratch but that’s a lot of work. This is where Microsoft Agent Framework comes in. It’s a Python (and C#) framework that makes building AI agents really easy.

The framework handles the heavy lifting when it comes to agent building:

  • Managing chat with LLMs like Azure OpenAI
  • Function calling (tools)
  • Streaming responses
  • Multi-turn conversations
  • And a lot more

The key concept is the ChatAgent. You give it:

  1. chat client (like Azure OpenAI)
  2. Instructions (the system prompt)
  3. Tools (functions the agent can call)

And you’re done. The agent knows how to talk to the LLM, when to call tools, and how to stream responses back.

What’s nice about Agent Framework is that it integrates with AG-UI out of the box, similar to other frameworks like LangGraph, Google ADK and others. You write your agent code and expose it via AG-UI with basically one line of code. The framework translates everything automatically – your agent’s responses become AG-UI events, tool calls get streamed correctly, etc…

The integration with Microsoft Agent Framework was announced on the blog of CopilotKit, the team behind AG-UI. The blog included the diagram below to illustrate the capabilities:

From https://www.copilotkit.ai/blog/microsoft-agent-framework-is-now-ag-ui-compatible

The Code

Let’s look at how this actually works in practice. The code is pretty simple. Most of the code is Microsoft Agent Framework code. AG-UI gets exposed with one line of code.

The Server (server.py)

The main server file is really short:

import uvicorn
from api import app
from config import SERVER_HOST, SERVER_PORT

def main():
    print(f"🚀 Starting AG-UI server at http://{SERVER_HOST}:{SERVER_PORT}")
    uvicorn.run(app, host=SERVER_HOST, port=SERVER_PORT)

if __name__ == "__main__":
    main()

That’s it. We run a FastAPI server on port 8888. The interesting part is in api/app.py:

from fastapi import FastAPI
from agent_framework.ag_ui.fastapi import add_agent_framework_fastapi_endpoint
from agents.main_agent import agent

app = FastAPI(title="AG-UI Demo Server")

# This single line exposes your agent via AG-UI protocol
add_agent_framework_fastapi_endpoint(app, agent, "/")

See that add_agent_framework_fastapi_endpoint() call? That’s all you need. This function from Agent Framework takes your agent and exposes it as an AG-UI endpoint. It handles HTTP requests, SSE streaming, protocol translation – everything.

You just pass in your FastAPI app, your agent, and the route path. Done.

The Main Agent (agents/main_agent.py)

Here’s where we define the actual agent with standard Microsoft Agent Framework abstractions:

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import DefaultAzureCredential
from config import AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME
from tools import get_weather, get_current_time, calculate, bedtime_story_tool

# Create Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(
    credential=DefaultAzureCredential(),
    endpoint=AZURE_OPENAI_ENDPOINT,
    deployment_name=AZURE_OPENAI_DEPLOYMENT_NAME,
)

# Create the AI agent with tools
agent = ChatAgent(
    name="AGUIAssistant",
    instructions="You are a helpful assistant with access to tools...",
    chat_client=chat_client,
    tools=[get_weather, get_current_time, calculate, bedtime_story_tool],
)

This is the heart of the backend. We create a ChatAgent with:

  1. A name: “AGUIAssistant”
  2. Instructions: the system prompt that tells the agent how to behave
  3. A chat clientAzureOpenAIChatClient that handles communication with Azure OpenAI
  4. Tools: a list of functions the agent can call

The code implements a few toy tools and a sub-agent to illustrate how AG-UI handels tool calls. The tools are discussed below:

The Tools (tools/)

In Agent Framework, tools can be Python functions with a decorator:

from agent_framework import ai_function
import httpx
import json

@ai_function(description="Get the current weather for a location")
def get_weather(location: str) -> str:
    """Get real weather information for a location using Open-Meteo API."""
    # Step 1: Geocode the location
    geocode_url = "https://geocoding-api.open-meteo.com/v1/search"
    # ... make HTTP request ...
    
    # Step 2: Get weather data
    weather_url = "https://api.open-meteo.com/v1/forecast"
    # ... make HTTP request ...
    
    # Return JSON string
    return json.dumps({
        "location": resolved_name,
        "temperature": current["temperature_2m"],
        "condition": condition,
        # ...
    })

The @ai_function decorator tells Agent Framework “this is a tool the LLM can use”. The framework automatically:

  • Generates a schema from the function signature
  • Makes it available to the LLM
  • Handles calling the function when needed
  • Passes the result back to the LLM

You just write normal Python code. The function takes typed parameters (location: str) and returns a string. Agent Framework does the rest.

The weather tool calls the Open-Meteo API to get real weather data. In an AG-UI compatible client, you can intercept the tool result and visualize it any way you want before the LLM generates an answer from the tool result:

React client with CopilotKit

Above, when the user asks for weather information, AG-UI events inform the client that a tool call has started and ended. It also streams the tool result back to the client which uses a custom component to render the information. This happens before the chat client generates the answer based on the tool result.

The Subagent (tools/storyteller.py)

This is where it gets interesting. In Agent Framework, a ChatAgent can become a tool with .as_tool():

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

# Create a specialized agent for bedtime stories
bedtime_story_agent = ChatAgent(
    name="BedTimeStoryTeller",
    description="A creative storyteller that writes engaging bedtime stories",
    instructions="""You are a gentle and creative bedtime story teller.
When given a topic, create a short, soothing bedtime story for children.
Your stories should be 3-5 paragraphs long, calming, and end peacefully.""",
    chat_client=chat_client,
)

# Convert the agent to a tool
bedtime_story_tool = bedtime_story_agent.as_tool(
    name="tell_bedtime_story",
    description="Generate a calming bedtime story based on a theme",
    arg_name="theme",
    arg_description="The theme for the story (e.g., 'a brave rabbit')",
)

This creates a subagent – another ChatAgent with different instructions. When the main agent needs to tell a bedtime story, it calls tell_bedtime_story which delegates to the subagent.

Why is this useful? Because you can give each agent specialized instructions. The main agent handles general questions and decides which tool to use. The storyteller agent focuses only on creating good stories. Clean separation of concerns.

The subagent has its own chat client and can have its own tools too if you want. It’s a full agent, just exposed as a tool.

And because it is a tool, you can render it with the standard AG-UI tool events:

Testing with a client

In src/backend there is a Python client client_raw.py. When you run that client against the server and invoke a tool, you will see something like below:

AG-UI client in Python

This client simply uses httpx to talk the AG-UI server and inspects and renders the AG-UI events as they come in.

Why This Works

Let me tell you what I like about this setup:

Separation of concerns: The frontend doesn’t know about Python, Azure OpenAI, or any backend details. It just speaks AG-UI. You could swap the backend for a C# implementation or something else entirely – the frontend wouldn’t care. Besides of course the handling of specific tool calls.

Standard protocol: Because we use AG-UI, any AG-UI client can talk to this backend. We use CopilotKit in the frontend but you could use anything that speaks AG-UI. Take the Python client as an example.

Framework handles complexity: Streaming, tool calls, conversation history, protocol translation – Agent Framework does all of this. You just write business logic.

Easy to extend: Want a new tool? Write a function with @ai_function. Want a specialized agent? Create a ChatAgent and call .as_tool(). That’s it.

The AG-UI documentation explains that the protocol supports 7 different features including human-in-the-loop, generative UI, and shared state. Our simple backend gets all of these capabilities because Agent Framework implements the protocol.

Note that there are many more capabilities. Check the AG-UI interactive Dojo to find out: https://dojo.ag-ui.com/microsoft-agent-framework-python

Wrap Up

This is a simple but powerful pattern for building AI agent backends. You write minimal code and get a lot of functionality. AG-UI gives you a standard way to expose your agent, and Microsoft Agent Framework handles the implementation details.

If you want to try this yourself, the code is in the repo. You’ll need an Azure OpenAI deployment and follow the OAuth setup. After that, just run the code as instructed in the repo README!

The beauty is in the simplicity. Sometimes the best code is the code you don’t have to write.

Creating presentations with Microsoft Agent Framework and GAMMA API

Introduction

Imagine automating the creation of professional presentations through a coordinated series of intelligent agents. In this blog post, we’ll explore how to build a workflow that combines research, outline generation, content creation, and design, all orchestrated through the Microsoft Agent Framework.

We’ll build a practical example that generates presentations using:

  • Azure OpenAI Responses API for agent creation
  • Microsoft Agent Framework Workflows for orchestration
  • Tavily Search API for concurrent web research
  • GAMMA API for stunning presentation generation (In October 2025, this API was in beta)

By the end, you’ll understand how to leverage these tools to create end-to-end workflows that tackle complex, multi-step tasks.


What is Microsoft Agent Framework?

The Microsoft Agent Framework is a new, modern framework for building intelligent automation systems powered by AI agents. It enables you to create agents and workflows that blend AI reasoning with structured business processes.

Agents vs. Workflows: Understanding the Difference

Before diving into workflows, it’s important to understand how agents and workflows complement each other:

  • AI Agents are LLM-powered systems that make dynamic decisions based on context. They have access to tools and can reason through problems, but the steps they take are determined by the model at runtime.
Agent (from Microsoft Learn)
  • Workflows, on the other hand, are predefined sequences of operations with explicit control flow. They define exactly how data flows from one step to the next. Workflows can contain agents as components, orchestrating them alongside other executors and services.
Workflow (from Microsoft Learn)

Think of it this way:

  • An agent is like an expert consultant who decides how to solve a problem
  • workflow is like a project manager who coordinates multiple specialists in a structured process

The framework provides the tools to do both and to combine them in interesting ways.


Creating an Agent with Azure OpenAI Responses API

Let’s start by understanding how to create individual agents before we orchestrate them into workflows.

The Azure OpenAI Responses API is Azure’s version of the OpenAI Responses API . It’s the successor to the chat completion APIs and supports structured outputs, tool calling, stateful interactions and hosted tools.

Note: Azure OpenAI Responses API does not support the native web search tool. We will use Tavily to support web searches.

Here’s how to create a simple agent in Agent Framework, based on the Responses API:

import asyncio
import os
from dotenv import load_dotenv
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

# Load environment variables from .env
load_dotenv()

async def main():
    # Create a client
    client = AzureOpenAIResponsesClient(
        endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o")
    )
    
    # Create an agent
    agent = client.create_agent(
        name="OutlineAgent",
        instructions="Based on the user's request, you create an outline for a presentation.",
    )
    
    # Run the agent
    result = await agent.run("Create an outline for a presentation about Python")
    print(result.text)

if __name__ == "__main__":
    asyncio.run(main())

Note: this is just one type of agent you can create with Agent Framework. Some of the other supported types are chat completion agents and Azure AI Foundry Agents. See Microsoft Learn for more information. We could have used these agents as well.

Agents with Function Tools

Agents become truly useful when equipped with function tools—custom Python functions that agents can call to accomplish tasks. In Agent Framework, you can use the optional @ai_function decorator to provide a meaningful description to the LLM using the function. An alternative is to use a docstring. The framework will automatically convert the Python functions to tools the LLM understands.

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function(
    name="search_web",
    description="Search the web for information"
)
async def search_web(
    queries: Annotated[list[str], Field(description="List of search queries")]
) -> str:
    # Implementation here
    pass

Once defined, you pass these tools to your agent:

agent = client.create_agent(
    name="ResearchAgent",
    instructions="You research and create content for slides.",
    tools=[search_web]  # Register the tool
)

Now the agent can autonomously decide to call search_web when it needs information! In the agent instructions, you can tell the agent how and when to use the tool if needed.

In the example discussed further in this post, both the outline agent and research agent use the search function to build the slide content.

Structured Output with Pydantic

For predictable, structured responses, the framework supports structured output with Pydantic models:

from pydantic import BaseModel, Field
from typing import List, Annotated

class OutlineResponse(BaseModel):
    title: str
    number_of_slides: Annotated[int, Field(default=5)]
    slide_titles: List[str]
    audience: Annotated[str, Field(default="general")]

agent = client.create_agent(
    name="OutlineAgent",
    instructions="Create a presentation outline.",
    response_format=OutlineResponse  # Enforce this structure
)

The agent’s response will automatically be validated and parsed as an OutlineResponse object: type-safe and predictable. This feature is supported by the underlying Responses API with its support for structured outputs.

We use structured outputs extensively in the example code discussed below.


Understanding Workflows: Orchestrating Agents at Scale

Now that we understand agents, let’s see how to compose them into workflows.

A workflow is built from four key components:

  1. Executors – The individual processing nodes (agents, custom logic, integrations)
  2. Edges – Connections defining data flow between executors
  3. Workflows – The orchestrator that manages execution and routing
  4. Events – Real-time observability into execution

Agent Framework can visualize workflows in different ways. One option is to use the DevUI:

DevUI with a sequential workflow ready to run

To run the workflow, simply click Configure & Run and provide your input. The steps in the workflow will light up when they are doing work.

For more information about Dev UI see the Agent Framework repository on GitHub.

Building a Workflow with WorkflowBuilder

After defining the executors, you use WorkflowBuilder to build a workflow:

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder()
    .set_start_executor(outline_agent)          # First step
    .add_edge(outline_agent, research_agent)    # Data flows here
    .add_edge(research_agent, gamma_executor)   # Final step
    .build()
)

# serve workflow with Dev UI
serve(entities=[workflow], port=8093, auto_open=True)

This creates a linear, sequential flow where:

  1. Input → outline_agent produces an outline
  2. Outline → research_agent researches and creates slide content
  3. Slide content → gamma_executor generates the presentation

Note that the first two steps are simply Azure Responses API agents (AgentExecutors). The third step is a custom executor inheriting from the Executor class.

The workflow is served with Dev UI, which results in a browser opening the Dev UI interface as shown in the screenshot above.

Input and Output Between Nodes

Understanding data flow in the workflow is crucial. Each executor (agents or custom) receives input and must send output to the next executor. Below is a custom executor created from a Python function. You can also create executors from a Python class:

@executor(id="some executor id")
async def handle_input(self, message: str, ctx: WorkflowContext) -> None:
    # Process the input
    result = do_some_work(message)
    
    # Send output to next executor(s)
    ctx.send_message(result)

For agents specifically, they receive messages and produce AgentExecutorResponse objects automatically. You do not have to write code that sends the output of an agent to the next node.

Custom Executors for Complex Logic

Sometimes you need logic beyond agent reasoning. Custom executors handle this. Below is a class-based executor versus a fuction-based executor above:

from agent_framework import Executor, handler, WorkflowContext

class GammaAPIExecutor(Executor):
    def __init__(self, id: str = "gamma_api"):
        super().__init__(id=id)

    @handler
    async def call_gamma_api(
        self, response: AgentExecutorResponse, ctx: WorkflowContext
    ) -> None:
        # Extract data from agent response
        slide_content = response.agent_run_response.text
        
        # Call external API
        api_response = requests.post(
            "https://api.gamma.app/generations",
            json=payload
        )
        
        # Yield final output
        await ctx.yield_output({"status": "success", "pdf_url": pdf_url})

Custom executors are perfect for:

  • Calling external APIs
  • Data transformation and validation
  • Conditional routing logic
  • Long-running operations like polling

We will use a custom executor to call the GAMMA API to create a presentation based on the output of the previous agent nodes.

Using Sequential Orchestration

The simplest types of workflow process tasks one after another, each building on the previous result. This is ideal for pipelines where each step depends on prior outputs.

Input → Agent A → Agent B → Agent C → Output

Use sequential when:

  • Steps have dependencies
  • Later steps need data from earlier ones
  • You want controlled, predictable execution flow

Other types of workflow orchestration are discussed on Microsoft Learn. We will use a sequential workflow in the example below.


The Presentation Workflow in Action

Now let’s see how all these concepts come together in our actual implementation.

Architecture Overview

Our workflow follows this flow:

Step 1: Outline Agent

This is an agent with a web search tool. The agent generates a search query relevant to the user input and decides on a presentation title and a list of slide titles:

outline_agent = AzureOpenAIResponsesClient(
    endpoint=endpoint,
    api_key=api_key,
    deployment_name=deployment_name
).create_agent(
    name="OutlineAgent",
    instructions="""
        Based on the user's request, you create an outline for a presentation.
        Before you generate the outline, use the 'search_web' tool to research 
        the topic thoroughly with ONE query.
        Base the outline on the research findings.
    """,
    tools=[search_web],
    response_format=OutlineResponse,
    middleware=[logging_function_middleware]
)

  • Input: User’s presentation request sent to the agent as a user message

Processing:

  • Calls search_web to research the topic
  • Generates a structured outline with title and slide titles

OutputOutlineResponse object with titlenumber_of_slidesslide_titles, and audience

If the user specifies the number of slides and audience, this will be reflected in the reponses.

Note: this agent uses middleware to log the use of the search_web tool.

Step 2: Research Agent

The research agent takes the presentation title and slides from the previous step and does research on the web for each slide:

research_agent = AzureOpenAIResponsesClient(
    endpoint=endpoint,
    api_key=api_key,
    deployment_name=deployment_name
).create_agent(
    name="ResearchAgent",
    instructions="""
        You research and create content for slides.
        Generate one web search query for each slide title.
        Keep in mind the target audience when generating queries.
        Next, use the 'search_web' tool ONCE passing in the queries all at once.

        Use the result from the queries to generate content for each slide.
        Content for slides should be limited to 100 words max with three main bullet points.
    """,
    tools=[search_web],
    response_format=ResearchResponse,
    middleware=[logging_function_middleware]
)

Input: The OutlineResponse from the outline agent is automatically sent as a user message. This message will contain the JSON output from the previous step.

Processing:

  • Generates one search queries per slide
  • Calls search_web with all queries concurrently (crucial for performance!)
  • Creates 100-word slide content with bullet points. It’s best to keep slide content concise for best results.

OutputResearchResponse object with slide content for each slide

Why Concurrency Matters: The Search Tool

To speed up research, the search_web function uses AsyncTavilyClient for concurrent searching:

from tavily import AsyncTavilyClient

tavily_client = AsyncTavilyClient(TAVILY_API_KEY)

@ai_function(
    name="search_web",
    description="Search the web for multiple topics using Tavily"
)
async def search_web(
    queries: Annotated[list[str], Field(description="List of search queries")]
) -> str:
    logger.info(f"SEARCH - Performing web searches for {len(queries)} queries")
    
    async def _search_single_query(query: str) -> dict:
        try:
            logger.info(f'SEARCH - "{query}" - Searching')
            # AsyncTavilyClient.search() is awaitable
            response = await tavily_client.search(
                query=query,
                search_depth="advanced",
                max_results=5,
            )
            return {"query": query, "results": response.get("results", [])}
        except Exception as e:
            logger.error(f'SEARCH - "{query}" - {str(e)}')
            return {"query": query, "results": [], "error": str(e)}
    
    # Run all searches concurrently!
    tasks = [_search_single_query(query) for query in queries]
    search_results = await asyncio.gather(*tasks)
    
    # Aggregate and return
    aggregated = {r["query"]: r["results"] for r in search_results}
    return json.dumps({"results": aggregated})

Key insight: By using AsyncTavilyClient instead of the synchronous TavilyClient, we enable concurrent execution. All queries run in parallel, dramatically reducing total execution time.

To learn more about Tavily, check their website. They have a generous free tier which I almost exhausted creating this example! 😊

Step 3: Custom Gamma Executor

Step three does not need an agent. It uses the output of the agents to call the GAMMA API:

class GammaAPIExecutor(Executor):
    @handler
    async def call_gamma_api(
        self, response: AgentExecutorResponse, ctx: WorkflowContext
    ) -> None:
        # Extract slide content
        slide_content = response.agent_run_response.text
        response_json = json.loads(slide_content)
        
        logger.info(f'GAMMA - "{title}" - Slides: {number_of_slides}')
        
        # Call Gamma API
        api_response = requests.post(
            f"{GAMMA_API_BASE_URL}/generations",
            headers=headers,
            json=payload,
            timeout=30,
        )
        
        generation_id = data.get("id") or data.get("generationId")
        logger.info(f'GAMMA - POST /generations {api_response.status_code}')
        
        # Poll for completion
        completed_data = await self._poll_for_completion(generation_id)
        
        # Download PDF
        pdf_path = self._download_pdf(completed_data.get("exportUrl"))
        
        logger.info(f'GAMMA - Presentation completed')
        await ctx.yield_output(f"PDF saved to: {pdf_path}")

Input: The ResearchResponse from the research agent (slide content)

Processing:

  • Calls GAMMA API to generate presentation
  • Polls for completion (generation is asynchronous)
  • Downloads resulting PDF
  • The presentation is also available in Gamma for easy editing:
Presentation available in Gamma after creation with the workflow

Output: Success message with PDF path and presentation in Gamma.


Running the example

The workflow code is on GitHub in the gamma folder: https://github.com/gbaeke/maf/tree/main/gamma

In that folder, check https://github.com/gbaeke/maf/blob/main/gamma/SETUP_GUIDE.md for setup instructions.

End-to-end authorization with Entra ID and MCP

Building an MCP server is really easy. Almost every language and framework has MCP support these days, both from a client and server perspective. For example: FastMCP (Python, Typescript), csharp-sdk and many more!

However, in an enterprise scenario where users use a web-based conversational agent that uses tools on an MCP server, those tools might need to connect to back-end systems using the identity of the user. Take a look at the following example:

A couple of things are important here:

  • The user logs on to the app and requests an access token that is valid for the MCP Server; you need app registrations in Entra ID to make this work
  • The MCP Server verifies that this token is from Entra ID and contains the correct audience; we will use FastMCP in Python which has some built-in functionality for token validation
  • When the user asks the agent a question that requires Azure AI Search, the agent decides to make a tool call; the tool on the MCP server does the actual work
  • The tool needs access to the token (there’s a helper in FastMCP for that); next it converts the token to a token valid for Azure AI Search
  • The tool can now perform a search in Azure AI Search using new functionality discussed here

⚠️ MCP is actually not that important here. This technique which uses OAuth 2.0 and OBO flows in Entra ID is a well established pattern that’s been in use for ages!

🧑‍💻 Full source code here: https://github.com/gbaeke/mcp-obo

Let’s get started and get this working!

Entra ID App Registrations

In this case, we will create two registrations: one for the front-end and one for the back-end, the MCP Server. Note that the front-end here will be a command-line app that uses a device flow to authenticate. It uses a simple token cache to prevent having to log on time and again.

Front-end app registration

We will create this in the Azure Portal. I assume you have some knowledge of this so I will not provide detailed step-by-step instructions:

  • Go to App Registrations
  • Create a new registration, FastMCP Auth Web
  • In Authentication, ensure you enable Allow public client flows

You will need the client ID of this app registration in the MCP client we will build later.

Back-end app registration

This is for the MCP server and needs more settings:

  • Create a new registration, FastMCP Auth API
  • In Certificates and secrets, add a secret. We will need this to implement the on-behalf-of flow to obtain the Azure AI Search token
  • In Expose an API, set the Application ID URI to https://CLIENTIDOFAPP. I also added a scope, execute. In addition, add the front-end app client Id to the list of Authorized client applications:
Expose API of MCP app registration

In order to exchange a token for this service for Azure AI Search, we also need to add API permissions:

Permissions for Azure Cognitive, ehhm, AI Search

When you add the above permission, find it in APIs my organization uses:

MCP Client

We can now write an MCP client that calls the MCP server with an access token for the API we created above. As noted before, this will be a command-line app written in Python.

The code simply uses MSAL to initiate a device flow. It also uses a token cache to avoid repeated logins. The code can be found here.

Once we have a token, we can construct an MCP client (with FastMCP) as follows:

token = get_jwt_token()

headers["Authorization"] = f"Bearer {token}"

transport_url = "http://localhost:8000/mcp/"

transport = StreamableHttpTransport(
        url=transport_url,
        headers=headers
    )

client = MCPClient(transport=transport)

This code ensures that requests to the MCP server have an Authorization header that contains the bearer token acquired by get_jwt_token(). The MCP server will validate this token strictly.

The code to connect to the MCP server looks like this:

try:
    logger.info("Connecting to the MCP server...")
    
    # Use the client as an async context manager
    async with client:
        # List available tools on the server
        tools = await client.list_tools()
        logger.info(f"Found {len(tools)} tools on the server")
        
        # Call search tool
        logger.info("Calling search tool...")
        search_result = await client.call_tool("get_documents", {"query": "*"})
        logger.info(f"Search Result: {search_result.structured_content}")
        documents = search_result.structured_content.get("documents", [])
        if documents:
            logger.info("Documents found:")
            for doc in documents:
                name = doc.get("name", "Unnamed Document")
                logger.info(f"  - {name}")
        else:
            logger.info("No documents found.")
    
except Exception as e:
    logger.error(f"Error connecting to MCP server: {e}")

Note that there is no AI Agent involved here. We simply call the tool directly. In my case the document search only lists three documents out of five in total because I only have access to those three. We will look at the MCP server code to see how this is implemented next.

⚠️ Example code of the client is here: https://github.com/gbaeke/mcp-obo/blob/main/mcp_client.py

MCP Server

We will write an MCP server that uses the streamable-http transport on port 8000. The URL to connect to from the client then becomes http://localhost:8000/mcp. That was the URL used by the MCP client above.

The server has one tool: get_documents that takes a query (string) as parameter. By default, the query is set to * which returns all documents. The tool does the following:

  • Obtains the access token with the get_access_token() helper function from FastMCP
  • Exchanges the token for a token with scope https://search.azure.com/.default
  • Creates a SearchClient for Azure AI Search that includes the AI Search endpoint, index name and credential. Note that that credential has nothing to do with the token obtained above. It’s simply a key provided by Azure AI Search to perform searches. The token is used in the actual search request to filter the results.
  • Performs the search, passing in the token via the x_ms_query_source_authorization parameter. You need to use this version of the Azure AI Search Python library: azure-search-documents==11.6.0b12

Here is the code:

# Get the access token from the context
access_token: AccessToken = get_access_token()
original_token = access_token.token

# Exchange token for Microsoft Search token
logger.info("Exchanging token for Microsoft Search access")
search_result = await exchange_token(original_token, scope="https://search.azure.com/.default")
if not search_result["success"]:
    return {"error": "Could not retrieve documents due to token exchange failure."}
else:
    logger.info("Search token exchange successful")
    search_token = search_result["access_token"]
    search_client = SearchClient(endpoint="https://srch-geba.search.windows.net", index_name="document-permissions-push-idx", credential=AzureKeyCredential(os.getenv("AZURE_SEARCH_KEY")))
    results = search_client.search(search_text="*", x_ms_query_source_authorization=search_token, select="name,oid,group", order_by="id asc")
    documents = [
        {
        "name": result.get("name"),
        "oid": result.get("oid"),
        "group": result.get("group")
        }
        for result in results
    ]
    return {"documents": documents}

The most important work is done by the exchange_token() function. It obtains an access token for Azure AI Search that contains the oid (object id) of the user.

Here’s that function:

async def exchange_token(original_token: str, scope: str = "https://graph.microsoft.com/.default") -> dict:
    
    obo_url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
    
    data = {
        "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "assertion": original_token,
        "scope": scope,
        "requested_token_use": "on_behalf_of"
    }
    
    try:
        response = requests.post(obo_url, data=data)
        
        if response.status_code == 200:
            token_data = response.json()
            return {
                "success": True,
                "access_token": token_data["access_token"],
                "expires_in": token_data.get("expires_in"),
                "token_type": token_data.get("token_type"),
                "scope_used": scope,
                "method": "OBO"
            }
        else:
            return {
                "success": False,
                "error": response.text,
                "status_code": response.status_code,
                "scope_attempted": scope,
                "method": "OBO"
            }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "method": "OBO"
        }

Above, the core is in the call to the obo_url which presents the original token to obtain a new one. This will only work if the API permissions are correct on the FastMCP Auth API app registration. When the call is successful, we return a dictionary that contains the access token in the access_token key.

Full code of the server: https://github.com/gbaeke/mcp-obo/blob/main/mcp/main.py

You have now seen the entire flow from client login to calling a method (tool) on the MCP server to connecting to Azure AI Search downstream via an on-behalf-of flow.

But wait! How do we create the Azure AI Search index with support for the permission filter? Let’s take look…

Azure AI Search Configuration

When you want to use permission filtering with the x_ms_query_source_authorization parameter, do the following:

  • Create the index with support for permission filtering
  • Your index needs fields like oid (object Ids) and group (group object Ids) with the correct permission filter option
  • When you add documents to an index, for instance with the push APIs, you need to populate the oid and group fields with identifiers of users and groups that have access
  • Perform a search with the x_ms_query_source_authorization like show below:
results = search_client.search(
search_text="*",
x_ms_query_source_authorization=token_to_use,
select="name,oid,group",
order_by="id asc"
)

Above, token_to_use is the access token for Azure AI Search!

On GitHub, check this notebook from Microsoft to create and populate the index with your own user’s oid. You will need an Azure Subscription with an Azure AI Search instance. The free tier will do. If you use VS Code, use the Jupyter extension to run this notebook.

At the time of this writing, this feature was in preview. Ensure you use the correct version of the AI Search library for Python: azure-search-documents==11.6.0b12.

Wrapping up

I hope this post gives you some ideas on how to build agents that use MCP tools with end-to-end user authentication and authorization. This is just one possible approach. Authorization in the MCP specification has evolved significantly in early 2025 and works somewhat differently.

For most enterprise scenarios where you control both code and configuration (such as with Entra ID), bearer authentication with OBO is often sufficient.

Also consider whether you need MCP at all. If you aren’t sharing tools across multiple agents or projects, a simple API might be enough. For even less overhead, you can embed the tool code directly in your agent and run everything in the same process. Simple and effective.

If you spot any errors or have questions, feel free to reach out!

Deploying a multi-agent solution with MCP and A2A to Azure Container Apps

In previous posts, we discussed multi-agent scenarios, how A2A servers work (here and here) and how to deploy the infrastructure to host a multi-agent application on Azure with Azure Container Apps and AI Foundry.

In this post, we will take a look at deploying the different components of the solution as containers in Azure Container Apps. This is what we will build:

Multi-agent solution with MCP and A2A

There are four main components:

ComponentDescription
Conversation AgentPresents a chat interface to the user. Built with Chainlit and Semantic Kernel. Uses an OpenAI model. This could be switched to an Azure OpenAI model easily.

The agent uses two tools, rag and web, hosted by the MCP server.
MCP Tools ServerMCP server built with Python FastMCP. It exposes two tools, web and rag. The tools use an A2A client to interact with the A2A servers for the web and rag agents.

Not exposed to the Internet. Used to demonstrate MCP and A2A together. We could have called the A2A servers directly from the conversation agent without MCP.
A2A Server for Foundry Agent (does RAG)This agent uses an Azure AI Foundry Agent with a hosted file-based RAG tool to provide answers about Contoso products.

Not exposed to the Internet. Communicates privately with the Azure AI Foundry project.
A2A Server for OpenAI Agent (does web searches)This agent uses an OpenAI Agent SDK agent with the hosted web search tool.

Not exposed to the Internet. Communicates over the Internet with the OpenAI backend. This could easily be replaced with an Azure AI Foundry Agent that uses Bing Search. As this is an example about A2A, using a different technology makes more sense. 😊

Before delving into the four different components, it is important to know that the mcp, web and rag containers do not use their internal ingresses to communicate over TLS. That means that the mcp container for example, will talk to the web container using http://ca-web instead of something like https://ca-web.internal.ACA_environment_default_domain.

There is something to be said for using messaging to facilitate communication between agents. They are a form of microservices after all. In this example however, all communication is synchronous and uses HTTP.

This is a technical example that could be implemented in a single in-process agent with two tools. However, the emphasis is on multi-agent communication across process boundaries with Google’s Agent2Agent protocol.

Let’s gets started with the Conversation Agent!

Conversation Agent

The conversation agent maintains a conversation with the end user and keeps track of chat history. The agent, written in Semantic Kernel, has two tools:

  • web-search: uses the OpenAI Agent A2A server to search the web via OpenAI’s hosted web search tool
  • rag-search: uses the Azure AI Foundry A2A server to search for Contoso projects via a hosted RAG tool

The user interface to the agent is provided by Chainlit:

Chainlit UI

Above, I asked for information about a project. The agent is configured to use the rag-search tool to find project information. Under the hood, an A2A Server that wraps an Azure AI Foundry Agent is used to obtain this information. Via a filter, Chainlit supports visualizing when tools are called as can be seen at the top of the screen. It basically has hooks into the kernel object that gets created by Semantic Kernel.

The code for this Chainlit-hosted agent is on GitHub. The code in main.py uses an environment variable, MCP_SERVER_URL, that contains the address of the MCP server. As discussed above this will be http://containername/mcp (e.g., http://ca-mcp/mcp).

Following the typical Semantic Kernel approach, a kernel is created . Here is a snippet of code:

# Create the Semantic Kernel
        kernel = Kernel()
        
        # Add AI service to kernel
        ai_service = OpenAIChatCompletion(ai_model_id="gpt-4o")
        kernel.add_service(ai_service)
        logger.debug("Kernel and AI service initialized successfully")
        
        # Add MCP tools plugin to kernel (uses global client)
        tools_plugin = MCPToolsPlugin()
        kernel.add_plugin(tools_plugin, plugin_name="mcp_tools")
        logger.debug("MCP tools plugin added to kernel")

Note that we are not using Semantic Kernel’s built-in support for remote MCP servers that use streamable HTTP. Instead, we create a plugin via the MCPToolsPlugin class. That class defines two kernel functions, rag_search and web_search. In such a function, you can do what you want. I did not have to use MCP and could have called the A2A servers directly using the A2A client.

In our functions, we do use the MCP client from FastMCP to call the appropriate tool on the MCP server. The call to the A2A servers is implemented in the MCP server’s tools.

⚠️ This approach was chosen to illustrate that even if your framework does not natively support MCP, under the hood this is always LLM function calling. Kernel functions in Semantic Kernel are simply an abstraction on top of function calling. If you use Semantic Kernel’s native support for MCP, the tools on the MCP server would automatically be created as kernel functions. This native support requires much less code.

Now that we have the conversation agent up and running with Chainlit and Semantic Kernel, let’s look at the MCP server.

MCP Server

The conversation agent uses an MCP client (from the FastMCP library) to call tools hosted by the MCP server. This illustrates the separation of tool implementation from agent implementation.

The MCP server is implemented in main.py. In its most basic form, an MCP server with a few tools is really simple. This MCP server just defines two tools: a web tool and a rag tool.

The web tool looks like this:

@mcp.tool()
async def web_tool(query: str) -> str:
    """
    Perform a web search for the given query.
    
    Args:
        query: The search query to perform
        
    Returns:
        Search results as a string
    """
    logger.info(f"Web tool called with query: {query}")
    logger.info(f"Using web A2A agent at: {WEB_A2A_BASE_URL}")
    
    try:
        return await _send_a2a_message(query, WEB_A2A_BASE_URL)
    except Exception as e:
        logger.error(f"Error performing web search: {e}")
        return f"Error performing web search: {str(e)}"

This tool only does one thing: send a message to the A2A server on the address in WEB_A2A_BASE_URL. In Azure Container Apps, this URL is http://ca-web. The rag tool is implemented in a similar way. You can check the code of the _send_a2a_message function on GitHub.

⚠️ The addresses of the A2A servers are supplied to the mcp container app via environment variables WEB_A2A_BASE_URL and RAG_A2A_BASE_URL.

We now have the following implemented:

conversation --tool call--> MCP Server --run tool--> A2A Server

All traffic is synchronous and over http (not https)! Everything depends on the correct tool call being made by the conversation agent and the agents in the A2A servers. The rest is just plumbing! No magic! 😊

A2A Servers

You can check my earlier posts about A2A servers for background information:

It is important to note that the A2A server (rag) uses Azure AI Foundry. To authenticate to AI Foundry, we need to use a managed identity.

The rag container needs the following environment variables:

  • RAG_A2A_BASE_URL: required to set the correct url in the agent card
  • INTERNAL_PORT: port to run on (e.g., 80)
  • FOUNDRY_PROJECT: url to the Foundry project (e.g., https://FOUNDRY-RESOURCE.services.ai.azure.com/api/projects/FOUNDRY-PROJECT
  • ASSISTANT_ID: id of the agent you want to use; needs to exist in Foundry project
  • CLIENT_ID: the client id of the user assigned managed identity; this identity is created in the Bicep script; a role is assigned as well

During deployment of the container apps, a managed identity (that has the client id above) is assigned to the container. In the A2A server code that contains the code to talk to Foundry, this identity is used as follows:

if client_id:
            logger.info(f"Using ManagedIdentityCredential with client ID: {client_id}")
            credential = ManagedIdentityCredential(client_id=client_id)
        else:
            logger.info("Using DefaultAzureCredential")
            credential = DefaultAzureCredential()

This allows for the use of the Azure CLI identity when the rag agent is running on you local machine. Full code is in Agent_Executor.py.

⚠️ If you run the rag A2A server on your local machine, ensure you allow your IP address in the firewall settings of the Azure AI Foundry resource.

Full code for the A2A servers:

Deployment

To make it easy to deploy the containers to the Azure Container Apps environment (discussed in previous post), use the following script: https://github.com/gbaeke/multi_agent_aca/blob/main/deploy_containers.sh

At the top of the script, change the variables to match your environment:

ACR_NAME="SHORT_ACR_NAME"
ACR_URL="SHORT_ACR_NAME.azurecr.io"
RESOURCE_GROUP="RESOURCE_GROUP"
CONTAINER_APP_ENV="CONTAINER_APP_ENV_NAME"
MANAGED_IDENTITY="MANAGED_IDENTITY_NAME"

To deploy, simply run deploy_containers.sh --to-build conversation,mcp,web,rag. This does the following:

  • Builds and pushes the four containers using an ACR Task (no local Docker required)
  • Deploys the four containers with appropriate secrets and environment variables; serets are read from a .env file

Ensure that you have this .env in the same folder with the following values:

OPENAI_API_KEY="your_openai_api_key_here"
# Replace with your actual OpenAI API key

FOUNDRY_PROJECT="your_foundry_project_url"
# The URL of the Foundry project endpoint you're connecting to
# Find it in the properties of the AI Foundry project

ASSISTANT_ID="your_assistant_id_here"
# The unique ID of the agent you're referencing

This should deploy the four containers as shown below:

conversation, mcp, web and rag containers

Now grab the ingress URL (aka Application Url) of the conversation container:

Application URL (ingress URL) to the conversation app

Paste that URL in your browser. Hopefully the Chainlit UI is shown. If not, check the following:

  • Chainlit container has the MCP_SERVER_URL set to http://ca-mcp/mcp and also has you OpenAI key in OPENAI_API_KEY
  • MCP container has the WEB_A2A_BASE_URL and RAG_A2A_BASE_URL url set to http://ca-web and http://ca-rag
  • Web container has WEB_A2A_BASE_URL set to http://ca-web and also has an OPENAI_API_KEY
  • Rag container has RAG_A2A_BASE_URL set to http://ca-rag and has environment variables set to use the Azure AI Foundry agent; also check the managed identity of the container has access rights to AI Foundry

Normally these should all be set by both the Bicep and the container deployment script.

Wrapping Up

If you’ve made it this far and tried to implement this yourself, you’ve likely realized how much effort it takes to get everything up and running. About 99% of the work is infrastructure and plumbing; only 1% is actual agent code. In more complex agentic applications, the ratio may shift slightly, but infrastructure will still dominate the effort.

We have not even touched on things like logging, metrics, tracing the end-to-end communication path, load balancing, saving agent state and much, much more.

This brings me back to a key point from an earlier post:


If you can build your multi-agent solution in-process, or use an agent PaaS like Azure AI Foundry, do it.


Only choose the approach I described above when no other viable option exists or when you’re building larger solutions where multiple teams develop agents that must coexist within the same system.

Using tasks with streaming in Google Agent2Agent (A2A)

In a previous post we created a simple A2A agent that uses synchronous message exchange. An A2A client sends a message and the A2A server, via the Agent Executor, responds with a message.

But what if you have a longer running task to perform and you want to inform the client that the task in ongoing? In that case, you can enable streaming on the A2A server and use a task that streams updates and the final result to the client.

The sequence diagram illustrates the flow of messages. It is based on the streaming example in the A2A specification.

A2A tasks with streaming updates

In this case, the A2A client needs to perform a streaming request which is sent to the /message/stream endpoint of the A2A server. The code in the AgentExecutor will need to create a task and provide updates to the client at regular intervals.

⚠️ If you want to skip directly to the code, check out the example on GitHub.

Let’s get into the details in the following order:

  • Writing an agent that provides updates while it is doing work: I will use the OpenAI Agents SDK with its support for agent hooks
  • Writing an AgentExecutor that accepts a message, creates a task and provides updates to the client
  • Updating the A2A Server to support streaming
  • Updating the A2A Client to support streaming

AI Agent that provides updates

Although streaming updates is an integral part of A2A, the agent that does the actual work needs to provide feedback about its progress. That work is up to you, the developer.

In my example, I use an agent created with the OpenAI Agents SDK. This SDK supports AgentHooks that execute at certain events:

  • Agent started/finished
  • Tool call started/finished

The agent class in agent.py on GitHub uses an asyncio queue to emit both the hook events and the agent’s reponse to the caller. The A2A AgentExecutor uses the invoke_stream() method which returns an AsyncGenerator.

You can run python agent.py independently. This should result in the following:

The agent has a tool that returns the current date. The hooks emit the events as shown above followed by the final result.

We can now use this agent from the AgentExecutor and stream the events and final result from the agent to the A2A Client.

AgentExecutor Tasks and Streaming

Instead of simply returning a message to the A2A client, we now need to initiate a long-running task that sends intermediate updates to the client.. Under the hood this uses SSE (Server Sent Events) between the A2A Client and A2A Server.

The file agent_executor.py on GitHub contains the code that makes this happen. Let’s step through it:

message_text = context.get_user_input()  # helper method to extract the user input from the context
        logger.info(f"Message text: {message_text}")

        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

Above, we extract the user’s input from the incoming message and we check if the context already contains a task. If not, we create the task and we queue it. This informs the client a task was created and that sse can be used to obtain intermediate results.

Now that we have a task (a new or existing one), the following code is used:

updater = TaskUpdater(event_queue, task.id, task.contextId)
async for event in self.agent.invoke_stream(message_text):
    if event.event_type == StreamEventType.RESPONSE:
        # send the result as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=event.data['response']))],
            name='calculator_result',
        )

        await updater.complete()
            
    else:
        await updater.update_status(
        TaskState.working,
        new_agent_text_message(
            event.data.get('message', ''),
            task.contextId,
            task.id,
        ),
    )

We first create a TaskUpdater instance that has the event queue, current task Id and contextId. The task updater is used to provide status updates, complete or even cancel a task.

We then call invoke_stream(query) on our agent and grab the events it emits. If we get a event type of RESPONSE, we create an artifact with the agent response as text and mask the task as complete. In all other cases, we send a status event with updater.update_status(). A status update contains a task state (working in this case) and a message about the state. The message we send is part of the event that is emitted from invoke_stream() and includes things like agent started, tool started, etc…

So in short, to send streaming updates:

  • Ensure your agents emit events of some sort
  • Use those events in the AgentExecutor and create a task that sends intermediate updates until the agent has finished

However, our work is not finished. The A2A Server needs to be updated to support streaming.

A2A Server streaming support

The A2A server code in is main.py on GitHub. To support streaming, we need to update the capabilities of the server:

capabilities = AgentCapabilities(streaming=True, pushNotifications=True)

⚠️ pushNotifications=True is not required for streaming. I include it here to show that sending a push notification to a web hook is also an option.

That’s it! The A2A Server now supports streaming. Easy! 😊

Streaming with the A2A Client

Instead of sending a message to the non-streaming endpoint, the client should now use the streaming endpoint. Here is the code to do that (check test_client.py for the full code):

message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text=question))],
        )
        streaming_request = SendStreamingMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        stream_response = client.send_message_streaming(streaming_request)

To send to the streaming endpoint, the SendStreamingMessageRequest() function is your friend, together with client.send_message_streaming()

We can now grab the responses as they come in:

async for chunk in stream_response:
            # Only print status updates and text responses
            chunk_dict = chunk.model_dump(mode='json', exclude_none=True)
            
            if 'result' in chunk_dict:
                result = chunk_dict['result']
                
                # Handle status updates
                if result.get('kind') == 'status-update':
                    status = result.get('status', {})
                    state = status.get('state', 'unknown')
                    
                    if 'message' in status:
                        message = status['message']
                        if 'parts' in message:
                            for part in message['parts']:
                                if part.get('kind') == 'text':
                                    print(f"[{state.upper()}] {part.get('text', '')}")
                    else:
                        print(f"[{state.upper()}]")
                
                # Handle artifact updates (contain actual responses)
                elif result.get('kind') == 'artifact-update':
                    artifact = result.get('artifact', {})
                    if 'parts' in artifact:
                        for part in artifact['parts']:
                            if part.get('kind') == 'text':
                                print(f"[RESPONSE] {part.get('text', '')}")
                
                # Handle initial task submission
                elif result.get('kind') == 'task':
                    print(f"[TASK SUBMITTED] ID: {result.get('id', 'unknown')}")
                    
                # Handle final completion
                elif result.get('final') is True:
                    print("[TASK COMPLETED]")

This code checks the the type of content coming in:

  • status-update: when AgentExecutor sends a status update
  • artifact-update: when AgentExecutor sends an artifact with the agent’s response
  • task: when tasks are submitted and completed

Running the client and asking what today’s date is, results in the following response:

Streaming is working as intended! But what if you use a client that does not support streaming? That actually works and results in a full response with the agent’s answer in the result field. You would also get a history field that contains the initial user question, including all the task updates.

Here’s a snippet of that result:

{
  "id": "...",
  "jsonrpc": "2.0",
  "result": {
    "artifacts": [
      {
        "artifactId": "...",
        "name": "calculator_result",
        "parts": [
          {
            "kind": "text",
            "text": "Today's date is July 13, 2025."
          }
        ]
      }
    ],
    "contextId": "...",
    "history": [
      {
        "role": "user",
        "parts": [
          {
            "kind": "text",
            "text": "What is today's date?"
          }
        ]
      },
      {
        "role": "agent",
        "parts": [
          {
            "kind": "text",
            "text": "Agent 'CalculatorAgent' is starting..."
          }
        ]
      }
    ],
    "id": "...",
    "kind": "task",
    "status": {
      "state": "completed"
    }
  }
}

Wrapping up

You have now seen how to run longer running tasks and provide updates along the way via streaming. As long as your agent code provides status updates, the AgentExecutor can create a task and provide task updates and the task result to the A2A Server which uses SSE to send them to the A2A Client.

In an upcoming post, we will take a look at running a multi-agent solution in the Azure cloud.

Google’s A2A: taking a closer look

In the previous post, I talked about options to build multi-agent solutions. The last option used Google’s A2A. A2A provides a wrapper around your agent, basically a JSON-RPC API, that standardizes how you talk to your agent. In this post we take a closer look at the basics of A2A with simple synchronous message exchange.

⚠️ A2A is still in development. We do not use it in production yet!

The idea is to build solutions that look like this (just one of the many possibilities):

The conversation agent is an agent that uses tools to get the job done. It wouldn’t be much of an agent without tools right? The tools are custom tools created by the developer that call other agents to do work. The other agents can be written in any framework and use any development language. How the agent works internally is irrelevant. When the conversation agent detects (via standard function calling) that the RAG tool needs to be executed, that tool will call the RAG agent over A2A and return the results.

A2A does not dictate how you build your agent. In the example below, an Azure AI Foundry Agent sits at the core. That agent can use any of its hosted tools or custom tools to get the job done. Because this is a RAG Agent, it might use the built-in Azure AI Search or SharePoint knowledge source. As a developer, you use the Azure AI Foundry SDK or Semantic Kernel to interact with your agent as you see fit. Although you do not have to, it is common to wrap your agent in a class and provide one or more methods to interact with it. For example, an invoke() method and an invoke_streaming() method.

Here is a minimal example for the AI Foundry Agent (the yellow box):

class RAGAgent:
    def __init__(self):
        # INITIALIZATION CODE NOT SHOWN
        self.project = AIProjectClient(
            credential=DefaultAzureCredential(),
            endpoint=endpoint)
        self.agent = self.project.agents.get_agent(agent_id)

    async def invoke(self, question: str) -> str:
        thread = self.project.agents.threads.create()

        message = self.project.agents.messages.create(
            thread_id=thread.id,
            role="user",
            content=question
        )
        run = self.project.agents.runs.create_and_process(
            thread_id=thread.id,
            agent_id=self.agent.id)
        messages = list(self.project.agents.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING))

        # ...

This code has nothing to do with Google A2A and could be implemented in many other ways. This is about to change because we will now call the above agent from A2A’s AgentExecutor. The AgentExecutor is a key server‑side interface: when a client sends a message, the A2A server calls execute() on your AgentExecutor instance, and your implementation handles the logic and sends updates via an event queue. Here’s how your agent is used by A2A. When a client sends a message it works its way down to your agent via several A2A components:

It’s important to understand the different types of message exchange in A2A. This post will not look at all of them. You can find more information in the A2A documentation. This post uses synchronous messaging via message/send where the response is a simple message and not a, potentially longer running, task.

Let’s dive into the AgentExecutor (it processes the message we send) and work our way up to the A2A client.

AgentExecutor

Let’s take a look at a bare bones implementation of AgentExecutor that works with plain/text input and output messages and without streaming:

Client --message--> A2A Server --> Agent Executor --> Agent

and

Agent --> Agent Executor --> A2A Server --message--> Client
class RAGAgentExecutor(AgentExecutor):

    def __init__(self):
        self.agent = RAGAgent()

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        message_text = context.get_user_input()
        
        result = await self.agent.invoke(message_text)

        await event_queue.enqueue_event(new_agent_text_message(result))
        
    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        raise Exception("Cancel not supported")

When a message is sent to the A2A server via JSON-RPC, the execute() method of the RAGAgentExecutor is called. At server startup, __init__ creates our AI Foundry RAGAgent which does the actual work.

Inside the execute() method, we assume the context contains a message. We use the get_user_input() helper to extract the message text (user query). We then simply call our agent’s invoke() method with that query and return the result via the event_queue. The A2A server uses an event_queue to provide responses back to the caller. In this case, the response will be a simple plain/text message.

This is probably as simple as it gets and is useful to understand A2A’s basic operation. In many cases though, you might want to return a longer running task instead of a message and provide updates to the client via streaming. That would require creating the task and streaming the task updates to the client. The client would need to be modified to handle this.

But wait, we still need to create the server that uses this AgentExecutor. Let’s take a look!

A2A Server

The A2A Python SDK uses starlette and uvicorn to create the JSON-RPC server. You don’t really need to know anything about this because A2A does this under the covers for you. The server needs to do a couple of things:

  • Create one or more skills: skills represent a specific capability or function your agent offers—for instance, “currency conversion,” “document summary” or “meeting scheduling”.
  • Create an agent card: an agent card is like a business card for your agent; it tells others what the agent can do; the above skills are part of the agent card; the agent card is published at /.well-known/agent.json on the agents domain (e.g., localhost:9999 on your local machine)
  • Create a request handler: the request handler ties the server to the AgentExecutor you created earlier
  • Create the A2AStarletteApplication: it ties the agent card and the request handler together
  • Serve the A2AStarletteApplication with uvicorn on an address and port of your choosing

This is what it looks like in code:

import logging
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from agent_executor import RagAgentExecutor

def main():
    skill = AgentSkill(
        id="rag_skill",
        name="RAG Skill",
        description="Search knowledge base for project information",
        tags=["rag", "agent", "information"],
        examples=["What is project Astro and what tech is used in it?"],
    )
    agent_card = AgentCard(
        name="RAG Agent",
        description="A simple agent that searches the knowledge base for information",
        url="http://localhost:9998/",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        skills=[skill],
        version="1.0.0",
        capabilities=AgentCapabilities(),
    )
    request_handler = DefaultRequestHandler(
        agent_executor=RagAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )
    server = A2AStarletteApplication(
        http_handler=request_handler,
        agent_card=agent_card,
    )
    uvicorn.run(server.build(), host="0.0.0.0", port=9998)
if __name__ == "__main__":
    main()

Validating the agent card

When you run the A2A server on your local machine and expose it to the public with ngrok or other tools, you can use https://a2aprotocol.ai/a2a-protocol-validator to validate it. When I do this for the RAG Agent, I get the following:

In JSON, the agent card is as follows:

{
  "capabilities": {},
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text"
  ],
  "description": "A simple agent that searches the knowledge base for information",
  "name": "RAG Agent",
  "protocolVersion": "0.2.5",
  "skills": [
    {
      "description": "Search knowledge base for project information",
      "examples": [
        "What is project Astro and what tech is used in it?"
      ],
      "id": "rag_agent",
      "name": "RAG Agent",
      "tags": [
        "rag",
        "agent",
        "information"
      ]
    }
  ],
  "url": "http://Geerts-MacBook-Air-2.local:9998/",
  "version": "1.0.0"
}

Now it is time to actually start talking to the agent.

Using the A2A client to talk to the agent

With the server up and running and the Agent Card verified, how do we exchange messages with the server?

In our case, where the server supports only text and there is no streaming, the client can be quite simple:

  • Create an httpx client and set timeout higher depending on how long it takes to get a response; this client is used by the A2ACardResolver and A2AClient
  • Retrieve the agent card with the A2ACardResolver
  • Create a client with A2AClient. It needs the agent card as input and will use the url in the agent card to connect to the A2A server
  • Create a Message, include it in a MessageRequest and send the MessageRequest with the client. We use the non-streaming message_send() method.
  • Handle the response from the client

The code below shows what this might look like:

import uuid

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    AgentCard,
    Message,
    MessageSendParams,
    Part,
    Role,
    SendMessageRequest,
    TextPart,
)

PUBLIC_AGENT_CARD_PATH = "/.well-known/agent.json"
BASE_URL = "http://localhost:9998"


async def main() -> None:
    timeout = httpx.Timeout(200.0, read=200.0, write=30.0, connect=10.0)
    async with httpx.AsyncClient(timeout=timeout) as httpx_client:
        # Initialize A2ACardResolver
        resolver = A2ACardResolver(
            httpx_client=httpx_client,
            base_url=BASE_URL,
        )

        final_agent_card_to_use: AgentCard | None = None

        try:
            print(
                f"Fetching public agent card from: {BASE_URL}{PUBLIC_AGENT_CARD_PATH}"
            )
            _public_card = await resolver.get_agent_card()
            print("Fetched public agent card")
            print(_public_card.model_dump_json(indent=2))

            final_agent_card_to_use = _public_card

        except Exception as e:
            print(f"Error fetching public agent card: {e}")
            raise RuntimeError("Failed to fetch public agent card")

        client = A2AClient(
            httpx_client=httpx_client, agent_card=final_agent_card_to_use
        )
        print("A2AClient initialized")

        message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text="Is there a project with the word Astro? If so, describe it."))],
        )
        request = SendMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        response = await client.send_message(request)
        print("Response:")
        print(response.model_dump_json(indent=2))


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Above, the entire response is printed as JSON. That is useful to learn what the responses look like. This is part of the response:

{
  "id": "6cc795d8-fa84-4734-8b5a-dccd3a22142d",
  "jsonrpc": "2.0",
  "result": {
    "contextId": null,
    "extensions": null,
    "kind": "message",
    "messageId": "fead200d-0ea4-4ccb-bf1c-ed507b38d79d",
    "metadata": null,
    "parts": [
      {
        "kind": "text",
        "metadata": null,
        "text": "RESPONSE FROM RAG AGENT"
      }
    ],
    "referenceTaskIds": null,
    "role": "agent",
    "taskId": null
  }
}

Simply sending the response as a string on the event queue results in a message with one text part. The result from the RAG agent is in the text property. For a longer running task with streaming updates, the response would be quite different.

You can now easily interact with your agent using this client. For example:

  • use the client in any application (need not be an agent)
  • use the client in a workflow engine like LangGraph
  • use the client in an agent tool; the agent can be written in any framework; when the agent identifies a tool call is needed, the tool is run which contains A2AClient code to interact with the A2A Agent

The entire flow

The diagram below shows the end-to-end flow:

Try it yourself

On GitHub, check https://github.com/gbaeke/multi_agent_aca/tree/main/a2a_simple for a skeleton implementation of a calculator agent. The CalculatorAgent class’s invoke() method always returns “I did not do anything!” It’s up to you to change that!

You can run this A2A server as-is and connect to it with test_client.py. To use an actual agent, update the CalculatorAgent class’s invoke() method with a real agent written in your preferred framework.

Check the README.md for more instructions.

That’s it for this post! In a next one, we will look at a more complex example that streams messages to the client. Stay tuned!

Load balancing OpenAI API calls with LiteLLM

If you have ever created an application that makes calls to Azure OpenAI models, you know there are limits to the amount of calls you can make per minute. Take a look at the settings of a GPT model below:

GPT deployment settings

Above, the tokens per minute (TPM) rate limit is set to 60 000 tokens. This translates to about 360 requests per minute. When you exceed these limits, you get 429 Too Many Requests errors.

There are many ways to deal with these limits. A few of the main ones are listed below:

  • You can ask for a PAYGO quota increase: remember that high quotas do not necessarily lead to consistent lower-latency responses
  • You can use PTUs (provisioned throughput units): highly recommended if you want consistently quick responses with the lowest latency. Don’t we all? 😉
  • Your application can use retries with backoffs. Note that OpenAI libraries use automatic retries by default. For Python, it is set to two but that is configurable.
  • You can use multiple Azure OpenAI instances and load balance between them

In this post, we will take a look at implementing load balancing between OpenAI resources with an open source solution called LiteLLM. Note that, in Azure, you can also use Azure API Management. One example is discussed here. Use it if you must but know it is not simple to configure.

A look at LiteLLM

LiteLLM has many features. In this post, I will be implementing it as a standalone proxy, running as a container in Azure Kubernetes Service (AKS). The proxy is part of a larger application illustrated in the diagram below:

LLM-based document processor

The application above has an upload service that allows users to upload a PDF or other supported document. After storing the document in an Azure Storage Account container, the upload service sends a message to an Azure Service Bus topic. The process service uses those messages to process each file. One part of the process is the use of Azure OpenAI to extract fields from the document. For example, a supplier, document number or anything else.

To support the processing of many documents, multiple Azure OpenAI resources are used: one in France and one in Sweden. Both regions have the gpt-4-turbo model that we require.

The process service uses the Python OpenAI library in combination with the instructor library. Instructor is great for getting structured output from documents based on Pydantic classes. Below is a snippet of code:

from openai import OpenAI
import instructor

client = instructor.from_openai(OpenAI(
        base_url=azure_openai_endpoint,
        api_key=azure_openai_key
))

The only thing we need to do is to set the base_url to the LiteLLM proxy. The api_key is configurable. By default it is empty but you can configure a master key or even virtual keys for different teams and report on the use of these keys. More about that later.

The key point here is that LiteLLM is a transparent proxy that fully supports the OpenAI API. Your code does not have to change. The actual LLM does not have to be an OpenAI LLM. It can be Gemini, Claude and many others.

Let’s take a look at deploying the proxy in AKS.

Deploying LiteLLM on Kubernetes

Before deploying LiteLLM, we need to configure it via a config file. In true Kubernetes style, let’s do that with a ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: litellm-config-file
data:
  config.yaml: |
      model_list: 
        - model_name: gpt-4-preview
          litellm_params:
            model: azure/gpt-4-preview
            api_base: os.environ/SWE_AZURE_OPENAI_ENDPOINT
            api_key: os.environ/SWE_AZURE_OPENAI_KEY
          rpm: 300
        - model_name: gpt-4-preview
          litellm_params:
            model: azure/gpt-4-preview
            api_base: os.environ/FRA_AZURE_OPENAI_ENDPOINT
            api_key: os.environ/FRA_AZURE_OPENAI_KEY
          rpm: 360
      router_settings:
        routing_strategy: least-busy
        num_retries: 2
        timeout: 60                                  
        redis_host: redis
        redis_password: os.environ/REDIS_PASSWORD
        redis_port: 6379
      general_settings:
        master_key: os.environ/MASTER_KEY

The configuration contains a list of models. Above, there are two models with the same name: gpt-4-preview. Each model points to a deployed model in Azure with the same name (can be different) and its own API base and key. For example, the first model uses an API base and API key for my instance in Sweden. However, by using os.environ/ and appending an environment variable, we can tell LiteLLM to use an environment variable. Of course, that means we have to set these environment variables in the LiteLLM container. We will do that later.

When the code in the process service uses the gpt-4-preview model via the proxy, the proxy will perform load balancing based on the router settings.

To spin up more than one instance of LiteLLM, a Redis instance is required. Redis is used to share information between the instances to make routing decisions. The routing strategy is set to least-busy.

Note that retries is set to 2. You can turn off retries in your code and let the proxy handle this for you.

To support mounting the secrets as environment variables, I use a .env file in combination with a secretGenerator in Kustomize:

STORAGE_CONNECTION_STRING=<placeholder for storage connection string>
CONTAINER=<placeholder for container name>
AZURE_AI_ENDPOINT=<placeholder for Azure AI endpoint>
AZURE_AI_KEY=<placeholder for Azure AI key>
AZURE_OPENAI_ENDPOINT=<placeholder for Azure OpenAI endpoint>
AZURE_OPENAI_KEY=<placeholder for Azure OpenAI key>

LLM_LITE_SWE_AZURE_OPENAI_ENDPOINT=<placeholder for LLM Lite SWE Azure OpenAI endpoint>
LLM_LITE_SWE_AZURE_OPENAI_KEY=<placeholder for LLM Lite SWE Azure OpenAI key>

LLM_LITE_FRA_AZURE_OPENAI_ENDPOINT=<placeholder for LLM Lite FRA Azure OpenAI endpoint>
LLM_LITE_FRA_AZURE_OPENAI_KEY=<placeholder for LLM Lite FRA Azure OpenAI key>

TOPIC_KEY=<placeholder for topic key>
TOPIC_ENDPOINT=<placeholder for topic endpoint>
PUBSUB_NAME=<placeholder for pubsub name>
TOPIC_NAME=<placeholder for topic name>
SB_CONNECTION_STRING=<placeholder for Service Bus connection string>

REDIS_PASSWORD=<placeholder for Redis password>
MASTER_KEY=<placeholder for Cosmos DB master key>

POSTGRES_DB_URL=postgresql://USER:PASSWORD@SERVERNAME-pg.postgres.database.azure.com:5432/postgres

There are many secrets here. Some are for LiteLLM, although weirdly prefixed with LLM_LITE instead. I do that sometimes! The others are to support the upload and process services.

To get these values into secrets, I use the following kustomization.yaml:

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: inv-demo


resources:
- namespace.yaml
- pubsub.yaml
- upload.yaml
- process.yaml
- llmproxy.yaml
- redis.yaml

secretGenerator:
- name: invoices-secrets
  envs:
  - .env
  
generatorOptions:
  disableNameSuffixHash: true

The secretGenerator will create a secret called invoices-secrets in the inv-demo namespace. We can reference the secrets in the LiteLLM Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: litellm-deployment
  labels:
    app: litellm
spec:
  replicas: 2
  selector:
    matchLabels:
      app: litellm
  template:
    metadata:
      labels:
        app: litellm
    spec:
      containers:
      - name: litellm
        image: ghcr.io/berriai/litellm:main-latest
        args:
        - "--config"
        - "/app/proxy_server_config.yaml"
        ports:
        - containerPort: 4000
        volumeMounts:
        - name: config-volume
          mountPath: /app/proxy_server_config.yaml
          subPath: config.yaml
        env:
        - name: SWE_AZURE_OPENAI_ENDPOINT
          valueFrom:
            secretKeyRef:
              name: invoices-secrets
              key: LLM_LITE_SWE_AZURE_OPENAI_ENDPOINT
        - name: SWE_AZURE_OPENAI_KEY
          valueFrom:
            secretKeyRef:
              name: invoices-secrets
              key: LLM_LITE_SWE_AZURE_OPENAI_KEY
        - name: FRA_AZURE_OPENAI_ENDPOINT
          valueFrom:
            secretKeyRef:
              name: invoices-secrets
              key: LLM_LITE_FRA_AZURE_OPENAI_ENDPOINT
        - name: FRA_AZURE_OPENAI_KEY
          valueFrom:
            secretKeyRef:
              name: invoices-secrets
              key: LLM_LITE_FRA_AZURE_OPENAI_KEY
        - name: MASTER_KEY
          valueFrom:
            secretKeyRef:
              name: invoices-secrets
              key: MASTER_KEY
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: invoices-secrets
              key: POSTGRES_DB_URL
      volumes:
        - name: config-volume
          configMap:
            name: litellm-config-file

The ConfigMap content is mounted as /app/proxy_server_config.yaml. You need to specify the config file via the --config parameter, supplied in args.

Next, we simply mount all the environment variables that we need. The LiteLLM ConfigMap uses several of those via the os.environ references. There is also a DATABASE_URL that is not mentioned in the ConfigMap. The URL points to a PostgreSQL instance in Azure where information is kept to support the LiteLLM dashboard and other settings. If you do not want the dashboard feature, you can omit the database URL.

There’s one last thing: the process service needs to connect to LiteLLM via Kubernetes internal networking. Of course, that means we need a service:

apiVersion: v1
kind: Service
metadata:
  name: litellm-service
spec:
  selector:
    app: litellm
  ports:
  - protocol: TCP
    port: 80
    targetPort: 4000
  type: ClusterIP

With this service definition, the process service can set the OpenAI base URL as http://litellm-service to route all requests to the proxy via its internal IP address.

As you can probably tell from the kustomization.yaml file, the ConfigMap, Deployment and Service are in llmproxy.yaml. The other YAML files do the following:

  • namespace.yaml: creates the inv-demo namespace
  • upload.yaml: deploys the upload service (written in Python and uses FastAPI, 1 replica))
  • process.yaml: deploys the process service (written in Python as a Dapr grpc service, 2 replicas)
  • pubsub.yaml: creates a Dapr pubsub component that uses Azure Service Bus
  • redis.yaml: creates a standalone Redis instance to support multiple replicas of the LiteLLM proxy

To deploy all of the above, you just need to run the command below:

kubectl apply -k .

⚠️ Although this can be used in production, several shortcuts are taken. One thing that would be different is secrets management. Secrets would be in a Key Vault and made available to applications via the Secret Store CSI driver or other solutions.

With everything deployed, I see the following in k9s:

The view from k9s

As a side note, I also use Diagrid to provide insights about the use of Dapr on the cluster:

upload and process are communicating via pubsub (Service Bus)

Dapr is only used between process and upload. The other services do not use Dapr and, as a result, are not visible here. The above is from Diagrid Conductor Free. As I said…. total side note! 🤷‍♂️

Back to the main topic…

The proxy in action

Let’s see if the proxy uses both Azure OpenAI instances. The dashboard below presents a view of the metrics after processing several documents:

OpenAI usage in France and Sweden

It’s clear that the proxy uses both resources. Remember that this is the least-busy routing option. It picks the deployment with the least number of ongoing calls. Both these instances are only used by the process service so the expectation is a more or less even distribution.

LiteLLM Dashboard

If you configured authentication in combination with providing a URL to a PostGreSQL database, you can access the dashboard. To see the dashboard in action without deploying it, see https://litellm.vercel.app/docs/proxy/demo.

One of the things you can do is creating teams. Below, you see a team called dev which has access to only the gpt-4-preview model with unlimited TPM and RPM:

dev team in LiteLLM

In addition to the team, a virtual key is created and assigned to the team. This virtual key starts with sk- and is used as the OpenAI API key in the process service:

LiteLLM virtual key for the dev team

We can now report on the use of OpenAI by the dev team:

Usage for the dev team

Above, there’s a small section that’s unassigned because I used LiteLLM without a key and a master key before switching to a team-based key.

The idea here is that you can deploy the LiteLLM proxy centrally and hand out virtual keys to teams so they can all access their models via the proxy. We have not tested this in a production setting yet but it is certainly something worth exploring.

Conclusion

I have only scratched the surface of LiteLLM here but my experience with it so far is pretty good. If you want to deploy it as a central proxy server that developers can use to access models, deployment to Kubernetes and other environments with the container image is straightforward.

In this post I used Kubernetes but that is not required. It runs in Container Apps and other container runtimes as well. In fact, you do not need to run it in a container at all. It also works as a standalone application or can be used directly in your Python apps.

There is much more to explore but for now, if you need a transparent OpenAI-based proxy that works with many different models, take a look at LiteLLM.

Writing your first flow with Prompt Flow in Visual Studio Code

In this blog post, we will create a flow with Prompt Flow in Visual Studio Code. Prompt Flow is a suite of development tools to build LLM-based AI applications. It tries to cover the end-to-end development cycle, including prototyping, testing and deployment to production.

In Prompt Flow, you create flows. Flows link LLMs (large language models), prompts and tools together in an executable workflow. An example of such a flow is show below:

Sample flow

The flow above (basically a distributed acyclical graph – DAG – of functions) sends its input, a description to search for an image, to a tool that embeds the description with an Azure OpenAI embedding model. The embedding is used as input to a Python tool that does a similarity search in Azure AI Search. The search returns three results. The original input, together with the query results, are subsequently handed to an LLM (above, the final_result node) that hopefully picks the correct image url.

Although you could write your own API that does all of the above, Prompt Flow allows you to visually build, run and debug a flow that has input and output. When you are happy with the flow, you can convert it to an API. One of the ways to host the API is via a container.

We will build this flow on our local machine and host it as a container. Note that Prompt Flow can also be used from the portal using Azure Machine Learning or Azure AI Studio.

👉 Another blog post will describe how to build and run the container

Installing Prompt Flow on your machine

To install Prompt Flow you will need Python on your machine. Use Python 3.9 or higher. I use Python 3.11 on an Apple M2. Check the full installation instructions here. Without using a Python virtual environment, you can just run the following command to install Prompt Flow:

pip install promptflow promptflow-tools

Next, run pf -v to check the installation.

⚠️ Do not forget to install promptflow-tools because it enables the embedding tool, llm tool and other tools to be used as nodes in the flow; also ensure this package is installed in the container image that will be created for this flow

In Visual Studio Code, install the Prompt flow for VS Code extension. It has the VS Code Python Extension as a prerequisite. Be sure to check the Prompt Flow Quick Start for full instructions.

We will mainly use the Visual Code extension. Note that the pf command can be used to perform many of the tasks we will discuss below (e.g, creating connections, running a flow, etc…).

Creating an empty flow

In VS Code, ensure you opened an empty folder or create a new folder. Right click and select New flow in this directory. You will get the following question:

Flow selection

Select Empty Flow. This creates a file called flow.dag.yaml with the following content:

Empty flow.dag.yaml

If you look closely, you will see a link to open a Visual editor. Click that link:

Visual editor with empty input and output and blank canvas

We can now add input(s) and output(s) and add the nodes in between.

Inputs and outputs

Inputs have a type and a value. Add a string input called description:

One string input: a description (of an image, like creature or fruit)

When you later run the flow, you can type the description in the Value textbox. When the flow is converted to an API, the API will except a description in the POST body.

Next, add an output called url. In the end, the flow returns a url to an image that matches the description:

One output: the url to a matching image

The value of the output will be the coming from another node. We still have to add those. If you click the Value dropdown list, you will only be able to select the input value for now. You can do that and click the run icon. Save your flow before running it.

Running the flow with output set to the input

When you click the run button, a command will be run in the terminal that runs the flow:

python3 -m promptflow._cli._pf.entry flow test --flow /Users/geertbaeke/projects/promptflow/images/blogpost --user-agent "prompt-flow-extension/1.6.0 (darwin; arm64) VSCode/1.85.0"

The output of this command is:

Output of the flow is JSON, here with just the url

Although this is not very useful, the flow runs and produces a result. The output is our input. We can now add nodes to do something useful.

Creating an embedding from the description

We need to embed the description to search for similar descriptions in an Azure AI Search index. If you are not sure what embeddings are, check Microsoft Learn for a quick intro. It short, it’s a bunch of numbers that represents the meaning of a piece of text. We can use the numbers of the description to compare it to the sets of numbers of image descriptions to see how close they are.

To create an embedding, we need access to an Azure OpenAI embedding model. Such a model takes text as input and returns the bunch of numbers we talked about. This model returns 1536 numbers, aka dimensions.

To use the model, we will need an Azure OpenAI resource’s endpoint and key. If you do not have an Azure OpenAI resource in Azure, create one and deploy the text-embedding-ada-002 model. In my example, the deployment is called embedding:

Embedding model in Azure OpenAI

With the Azure resources created, we can add a connection in Prompt Flow that holds the OpenAI endpoint and key:

Click the Prompt Flow extension icon and click + next to Azure OpenAI in the Connections section:

Azure OpenAI connection

A document will open that looks like the one below:

Connection information

Fill in the name and api_base only. The api_base is the https url to your Azure OpenAI instance. It’s something like https://OPENAIRESOURCENAME.openai.azure.com/. Do not provide the api_key. When you click Create connection (the smallish link at the bottom), you will be asked for the key.

After providing the key, the connection should appear under the Azure OpenAI section. You will need this connection in the embedding tool to point to the embedding model to use.

In the Prompt Flow extension pane, now click + next to Embedding in the TOOLS section:

Embedding tool

You will be asked for the tool’s name (top of VS Code window). Provide a name (e.g, embedding) and press enter. Select the connection you just created, the deployment name of your embedding model and the input. The input is the description we configured in the flow’s input node. We want to embed that description. The output of this tool will be a list of floating point numbers, a vector, of 1536 dimensions.

Embedding tool

The moment you set the input of the embedding, the input node will be connected to the embedding node on the canvas. To check if embedding works, you can connect the output of the embedding node to the url output and run the flow. You should then see the vector as output. The canvas looks like:

Show the embedding as output

Of course, we will need to supply the embedding to a vector search engine, not to the output. In our case, that is Azure AI Search. Let’s try that…

⚠️ Instead of connecting the embedding to the output, you can simply debug the embedding by clicking the debug icon in the embedding tool. The tool will be executed with the value of the input. The result should be a bunch of numbers in your terminal:

Debug output of the embedding tool

Searching for similar images

This section is a bit more tricky because you need an Azure AI Search index that allows you to search for images using a description of an image. To create such an index, see https://atomic-temporary-16150886.wpcomstaging.com/2023/12/09/building-an-azure-ai-search-index-with-a-custom-skill/.

Although you can use a Vector DB Lookup tool that supports Azure AI Search, we will create a custom Python tool that does the same thing. The Python tool uses the azure-search-documents Python library to perform the search. Learning how to use Python tools is important to implement logic there is no specific tool for.

First, we will create a custom connection that holds the name of our Azure AI Search instance and a key to authenticate.

Similar to the Azure OpenAI connection, create a custom connection:

Custom connection

After clicking +, a document opens. Modify it as follows:

Custom connection content

Like before, set a name. In a custom connection, you can have configs and secrets. In configs add the Azure AI Search endpoint and index name. In the secrets set key to <user-input>. When you click Create connection, you will be asked to supply the key.

⚠️ Connection information is saved to a local SQLLite database in the .promtflow folder in your home folder

We can now add a Python tool. In TOOLS, next to Python click +. Give the tool a name and select new file. You should get a new Python file in your code with the filename set to <YOURTOOLNAME>.py. The code without comments is below:

from promptflow import tool

@tool
def my_python_tool(input1: str) -> str:
    return 'hello ' + input1

This tool takes a string input and returns a string. The @tool decorator is required.

We need to change this code to get the custom connection information, query Azure AI Search and return search results as a list. The code is below:

from promptflow import tool
from promptflow.connections import CustomConnection
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery

@tool
def my_python_tool(vector: list, ai_conn: CustomConnection) -> list:
    ai_conn_dict = dict(ai_conn)
    endpoint = ai_conn_dict['endpoint']
    key = ai_conn_dict['key']
    index = ai_conn_dict['index']

    # query azure ai search
    credential = AzureKeyCredential(key)
    client = SearchClient(endpoint=endpoint,
                          index_name=index,
                          credential=credential)
    vector_query = VectorizedQuery(vector=vector, k_nearest_neighbors=3, fields="textVector", exhaustive=True)
    results = client.search(
        search_text=None,
        vector_queries=[vector_query],
        select=["name", "description", "url"]
    )

    # convert results to json list
    results = [dict(result) for result in results]

    return results

The function has two parameters: a vector of type list to match the output of the embedding tool, and a variable of type CustomConnection. The custom connection can be converted to a dict to retrieve both the configs and the secret.

Next, we use the configs and secret to perform the query with a SearchClient. The query only returns three fields from our index: name, description and url. The result returned from Azure AI Search is converted to a list and returned.

When you save the Python file and go back to your flow, you should see the Python tool (aisearch) with the vector and ai_conn field. If not, click the regenerate link. Set it as below:

Python tool

The input to the Python tool is the output from the embedding tool. We also pass in the custom connection to provide the configs and key to the tool.

You can set the output of the entire flow (url) to the output of the Python tool to check the results of the search when you run the flow:

Running the flow with Python tool’s output as output

I ran the flow with a description equal to cat. A list of three JSON objects is returned.The first search result is the url to cat.jpg but there are other results as well (not shown above).

Adding an LLM tool

Although we could just pick the first result from the search, that would not work very well. Azure AI Search will always return a result, even if it does not make much sense. In a search for nearest neighbors, your nearest neighbor could be very far away! 😀

For example, if I search for person with a hat, I will get a result even though I do not have such a picture in my index. It simply finds vectors that are “closest” but semantically “far” away from my description. That is bound to happen with just a few images in the index.

An LLM can look at the original description and see if it matches one of the search results. It might pick the 3rd result if it fits better. It might also decide to return nothing if there is no match. In order to do so, we will need a good prompt.

Click + LLM at the top left of the flow to add an LLM tool:

Adding an LLM

Give the LLM tool a name and select new file. In the flow editor, set the LLM model information:

LLM settings

You can reuse the connection that was used for the embedding. Ensure you have deployed a chat model in your Azure OpenAI resource. I deployed gpt-4 and called the deployment gpt-4 as well. I also set temperature to 0.

The inputs of the node do not make much sense. We do not need chat history for instance. The inputs come from a .jinja2 file that was created for you. The file has the name of the LLM tool. Following the example above, the name is pick_result.jinja2. Open that file and replace it with the following contents and save it:

system:
You return the url to an image that best matches the user's question. Use the provided context to select the image. Only return the url. When no
matching url is found, simply return NO_IMAGE

user:
{{description}}

context : {{search_results}}

The file defines a system message to tell the LLM what to do. The input from the user is the description from the input node. We provide extra context to the LLM as well (the output from search). The {{…}} serve as placeholders to inject data into the prompt.

When you save the file and go back to the flow designer, you should see description and search_results as parameters. Set them as follows:

Inputs to the LLM node

In addition, set the output of the flow output node to the output of the LLM node:

Setting the output

Save your flow and run it. In my case, with a description of cat I get the following output:

Output is just a URL from the LLM node

It I use man with a hat as input, I get:

LLM did not find a URL to match the description

Using a prompt variant

Suppose we want to try a different prompt that returns JSON instead of text. To try that, we can create a prompt variant.

In the LLM node, click the variants icon:

Variants

You will see a + icon to create a new variant. Click it.

New variant

The variant appears under the original variant and is linked to a new file: pick_result_variant_1.jinja2. I have also set the variant as default. Let’s click the new file to open it. Add the following prompt:

system:
You return the url to an image that best matches the user's question. Use the provided context to select the image. 
Return the url and name of the file as JSON. Here is an example of a response. Do not use markdown in the response. Use pure JSON.
{
  "url": "http://www.example.com/images/1.jpg",
  "name": "1.jpg"
}

If there is not matching image, return an empty string in the JSON:
{
  "url": ""
}

user:
{{description}}

context : {{search_results}}

This prompt should return JSON instead of just the url or NO_IMAGE. To test this, run the flow and select Use default variant for all nodes. When I run the flow with description cat, I get the following output:

JSON output

Because the flow’s output is already JSON, the string representation of the JSON result is used. Adding an extra Python tool that parses the JSON and outputs both the URL and file name might be a good idea here.

You can modify and switch between the prompts and see which one works best. This is especially handy when you are prototyping your flow.

Conclusion

On your local machine, Prompt Flow is easy to install and get started with. In this post we built a relatively simple flow that did not require a lot of custom code. We also touched on using variants, to test different prompts and their outcome.

In a follow-up post, we will take a look at turning this flow into a container. Stay tuned! 📺

Using Azure Database for PostgreSQL as a vector store

When we build LLM applications, there is always a recurring question: “What vector store will we use?”. In Azure, there are several native solutions. Some of them were discussed in previous posts.

  • Azure Cognitive Search: supports vector search but also hybrid search with semantic reranking as discussed here
  • Azure Redis Cache Enterprise: check my blog post
  • Azure Cosmos DB for MongoDB Core: see Microsoft Learn

In addition to the above, you can of course host your own vector database in a container such as Qdrant or others. You can install these on any service that supports containers such as App Service, Container Instances, Container Apps, or Kubernetes.

Using PostgreSQL

If you are familiar with Azure Database for PostgreSQL flexible servers, you can use it as a vector store, as long as you install the vector extension. This extension can be enabled in all compute tiers. I installed PostgreSQL and set the compute tier to Burstable with size Standard_B1ms (1 vCore, 2GB). This is great for testing and will cost around 20 euros per month with 32GB of storage. For production use, the monthly cost will start from about 150 euros at the lowest General Purpose tier.

PostrgreSQL flexible server with lowest compute tier

After deployment, you need to enable the vector extension. In Server Parameters, search for azure.extensions and select VECTOR from the list. Then click Save.

VECTOR extension added

When done, grab the connection details from the Connect pane:

Connection details

In pgAdmin, register the server with the above details. Connect to the server and create a database. Ensure you configure the firewall settings in Azure to allow your IP address to connect to the server.

Database creation in pgAdmin

Inside the database, go to Extensions and add the vector extension:

vector extension added to the database

Note: in the code below, we will use LangChain. LangChain will try to enable the vector extension if it is not enabled

Note: if you do not want to install pgAdmin, you can create the database from the Azure Portal or use the Azure CLI.

Working with the vector store

Although you can create and query tables that contain vectors with plain SQL, we will use LangChain as a higher-level library that takes care of many of the details for us.

Take a look at the following Python code that creates a few embeddings (vectors) and then uses RAG (retrieval augmented generation) to answer a question with OpenAI’s text-davinci-003 model.

Note: the code is on Github as well

import os
import getpass

# read from .env file
from dotenv import load_dotenv
load_dotenv()

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import PGVector
from langchain.document_loaders import TextLoader
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI


loader = TextLoader("./state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

embeddings = OpenAIEmbeddings()


pgpassword = os.getenv("PGPASSWORD", "")
if not pgpassword:
    pgpassword = getpass.getpass("Enter pgpassword: ")

CONNECTION_STRING = f"postgresql+psycopg2://pgadmin:{pgpassword}@pg-vec-geba.postgres.database.azure.com:5432/pgvector"

COLLECTION_NAME = "state_of_the_union_test"

# if you run this code more than once, you will duplicated vectors
# no upserts
db = PGVector.from_documents(
    embedding=embeddings,
    documents=docs,
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING
)

retriever = db.as_retriever()

query = "What did the president say about Ketanji Brown Jackson"

# LLM will default to text-davinci-003 because we are using a completion endpoint
# versus a chat endpoint
qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=retriever)

answer = qa.run(query)

print(answer)

The code above requires a .env file with the following content:

OPENAI_API_KEY=OPENAI_API_KEY
PGPASSWORD=PASSWORD_TO_POSTGRES

You will also need the State of the Union text file from here.

Before running the code, install the following packages:

pip install pgvector
pip install openai
pip install psycopg2-binary
pip install tiktoken

The code does the following:

  • Import required libraries: important here is the PGVector import
  • Loads the text file and splits it into chunks: chunking strategy is not too important here; just use chunks of 1000 characters
  • Create an instance of type OpenAIEmbeddings, later used to create a vector per chunk for storage in PostgreSQL; it will also be used when doing queries with a retrieval QA chain (see below); uses text-embedding-ada-002 embedding model
  • Construct the connection string for later use and set a collection name: collections are a way to store vectors together; the collections you create are kept in a table and each vector references the collection
  • Create an instance of PGVector with PGVector.from_documents: this will create/use tables to hold the collection(s) and vectors for you; all chunks will be vectorized and stored in a table; we will take a look at those tables in a moment; in a real application, you would reference existing tables and another process would create/update the vectors
  • Create a retriever (qa) from the PGVector instance for use in a retrieval QA chain
  • Run a query and print the answer: the qa.run (query) line does the n-nearest neighbor vector search in PostgreSQL (via the retriever), creates a meta-prompt with the relevant context, and returns the OpenAI model response in one step

In the PostgreSQL database, the above code creates two tables:

Tables created by LangChain to store the vectors

The collection table contains the collections you create from code. Each collection has a unique ID. The embedding table contains the vectors. Each vector has a unique ID and belongs to a collection. The fields of the embedding table are:

  • uuid: unique ID of the vector
  • collection_id: collection ID referencing the collection table
  • embedding: a field of type vector that stores the embedding (1536 dimensions)
  • document: the chunk of text that was vectorized
  • cmetadata: a JSON field with a link to the source file
  • custom_id: an id that is unique for each run

Note that when you run the sample Python code multiple times, you will have duplicated content. In a real application, you should avoid that. The process that creates and stores the vectors will typically be separate from the process that queries them.

⚠️ Important: Today, LangChain cannot search over all vectors in all collections. You always need to specify the collection to search. If you do need to search over all vectors, you can use SQL statements instead.

The search has the following properties:

  • Distance strategy: cosine similarity; the pgvector extension also supports L2 distance and inner product; the code above uses the text-embedding-ada-002 embeddings model by default; with that model, you should use cosine similarity; LangChain uses cosine similarity as the default for PGVector so that’s a match! 👏
  • Exact nearest neighbor search: although this provides perfect recall, it can get slow when there are many vectors because the entire table is scanned; the extension supports the creation of indexes to perform an approximate nearest neighbor search using IVFFLat or HNSW; see pgvector on GitHub for more details and also this article from Crunchy Data.

Note: most other vector databases use HNSW as the index type (e.g., Azure Cognitive Search, Qdrant, …); unlike IVFFLat you can create this index without having any vectors in your database table; at the time of writing (end of September 2023), the version of the vector extension on Azure was 0.4.1 and did not support HNSW; HNSW requires version 0.5.0 or higher

Conclusion

Azure Database for PostgreSQL with the vector extension is an interesting alternative to other vector database solutions in Azure. This is especially the case when PostgreSQL is your database of choice! In this post, we have shown how LangChain supports it with a simple example. If you do not use LangChain or other libraries, you can simply use SQL statements to create and search indexes as documented here.

The drawback of using PostgreSQL is that you need to know a bit more about exact and approximate nearest neighbor searches and the different index mechanisms. That’s actually a good thing if you want to create production applications with good performance. For a simple POC with not a lot of data, you can skip all of this and perform exact searches.

Besides the free tier of Azure Cognitive Search, the configuration above is the service with the lowest cost for POCs that need vector search. On top of that, the cheapest PostgreSQL option has more storage than Cognitive Search’s free tier (32GB vs. 50MB). Adding more storage is easy and relatively cheap as well. Give it a go and tell me what you think!

Semantic Kernel Planner 101

Introduction

If you are a developer who wants to build AI-first apps with natural language processing and large language models, you might be interested in Semantic Kernel (SK), a lightweight and open-source SDK that aims to simplify the integration of AI with conventional programming languages.

SK is part of the CoPilot Stack and Microsoft is using it in its own CoPilots.

SK allows you to create and orchestrate semantic functions, native functions, memories, and connectors using C# or Python. Much like LangChain, it supports prompt templating, chaining, and memory with vectors (embeddings).

You can also use SK’s planner to automatically generate and execute complex tasks based on a user’s goals. This is similar to LangChain’s Agents & Tools capabilities. In this blog post, we will introduce some of the features and benefits of SK’s Planner, and show you how to use it in your own applications. I am still learning so I am going to stick to the basics! 😃

Source: Unlock the Potential of AI in Your Apps with Semantic Kernel: A Lightweight SDK for Large Language Models Integration (microsoft.com)

SK’s Planner allows you to create and execute plans based on semantic queries. You start by providing it a goal (an ask). The goal could be: “Create a photo of a meal with these ingredients: {list of ingredients}”. To achieve the goal, the planner can use plugins to generate and execute the plan. For the goal above, suppose we have two plugins:

  • Recipe plugin: creates a recipe based on starter ingredients
  • Image description plugin: creates an image description based on any input

The recipe plugin takes a list of ingredients as input while the image description plugin can take the recipe as input and generate an image description of it. That image description could be used by DALL-E to generate an actual image.

Note: at the time of writing, Microsoft was on the verge of using the word plugin instead of skill. In the code, you will see references to skills but that will go away. This post already the word plugins instead of skills.

Creating plugins

Plugins make expertise available to SK and consist of one or more functions. A function can be either:

  • an LLM prompt: a semantic function
  • native computer code: a native function

A plugin is a container where functions live. Think of it as a folder with each subfolder containing a function. For example:

Badly named 😃 MySkills plugin with two semantic functions

A semantic function is a prompt with placeholders for one or more input variables. The prompt is in skprompt.txt. The Recipe function uses the following prompt:

Write a recipe with the starter ingredients below and be specific about the steps to take and the amount of ingredients to use: 

{{$input}}

The file config.json contains metadata about the function and LLM completion settings such as max_tokens and temperature. For example:

{
    "schema": 1,
    "type": "completion",
    "description": "Creates a recipe from starting ingredients",
    "completion": {
        "max_tokens": 256,
        "temperature": 0,
        "top_p": 0,
        "presence_penalty": 0,
        "frequency_penalty": 0
    },
    "input": {
        "parameters": [
            {
                "name": "input",
                "description": "Input for this semantic function.",
                "defaultValue": ""
            }
        ]
    },
    "default_backends": []
}

From your code, you can simply run this function to create a recipe. The plugin above is similar to a PromptTemplate in LangChain that you can combine with an LLM into a chain. You would then simply run the chain to get the output (a recipe). SK supports creating functions inline in your code as well, similar to how LangChain works.

Using the Planner

As stated above, the Planner can use plugins to reach the goal provided by a user’s ask. It actually works its way backward from the goal to create the plan:

Source: https://learn.microsoft.com/en-us/semantic-kernel/create-chains/planner

There are different types of planners like a sequential planner, an action planner, a custom planner, and more. In our example, we will use a sequential planner and keep things as simple as possible. We will only use semantic functions, no native code functions.

Time for some code. We will build a small .NET Console App based on the example above: create a recipe and generate a photo description for this recipe. Here is the code:


using Microsoft.Extensions.Logging;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.AI.ImageGeneration;
using System.Diagnostics;
using Microsoft.SemanticKernel.Planning;
using System.Text.Json;

var kernelSettings = KernelSettings.LoadSettings();

var kernelConfig = new KernelConfig();
kernelConfig.AddCompletionBackend(kernelSettings);

using ILoggerFactory loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .SetMinimumLevel(kernelSettings.LogLevel ?? LogLevel.Warning)
        .AddConsole()
        .AddDebug();
});

IKernel kernel = new KernelBuilder()
    .WithLogger(loggerFactory.CreateLogger<IKernel>())
    .WithConfiguration(kernelConfig)
    .Configure(c =>
    {
        c.AddOpenAIImageGenerationService(kernelSettings.ApiKey);

    })
    .Build();

// used later to generate image with dallE
var dallE = kernel.GetService<IImageGeneration>();

// use SKs planner
var planner = new SequentialPlanner(kernel);

// depends on MySkills skill which has two semantic fucntions
// skills will be renamed to plugins in the future
var skillsDirectory = Path.Combine(System.IO.Directory.GetCurrentDirectory(), "skills");
var skill = kernel.ImportSemanticSkillFromDirectory(skillsDirectory, "MySkills");

// ask for starter ingredients
Console.Write("Enter ingredients: ");
string? input = Console.ReadLine();


if (!string.IsNullOrEmpty(input))
{

    // define the ASK for the planner; the two semantic functions should be used by the plan
    // note that these functions are too simple to be useful in a real application
    // a single prompt to the model would be enough
    var ask = "Create a photo of a meal with these ingredients:" + input;

    // create the plan and print it to see if the functions are used correctly
    var newPlan = await planner.CreatePlanAsync(ask);

    Console.WriteLine("Updated plan:\n");
    Console.WriteLine(JsonSerializer.Serialize(newPlan, new JsonSerializerOptions { WriteIndented = true }));

    // run the plan; result should be an image description
    var newPlanResult = await newPlan.InvokeAsync();


    // generate the url to the images created by dalle
    Console.WriteLine("Plan result: " + newPlanResult.ToString());
    var imageURL = await dallE.GenerateImageAsync(newPlanResult.ToString(), 512, 512);

    // display image in browser (MacOS!!!)
    Process.Start("open", imageURL);

The code uses config/appsettings.json which contains settings like serviceType, serviceId, and the API key to use with either OpenAI or Azure OpenAI. In my case, serviceType is OpenAI and the serviceId is gpt-4. Ensure you have gpt-4 access in OpenAI’s API. I actually wanted to use Azure OpenAI but I do not have access to DALL-E and I do not think SK would support it anyway.

After loading the settings, a KernelConfig is created, and a completion backend gets added (using gpt-4 here). After setting up logging, a new kernel is created with a new KernelBuilder() with the following settings;

  • a logger
  • the configuration with the completion backend
  • an image generation service (DALL-E2 here) which needs the OpenAI API key (retrieved from kernelSettings)

We can now create the planner with var planner = new SequentialPlanner(kernel); and add skills (plugins) to the kernel. We add plugins from the skills/MySkills folder in our project.

Now it’s just a matter of asking the user for some ingredients (stored in input) and creating the plan based on the ask. The ask is “Create a photo of a meal with these ingredients: …”

var ask = "Create a photo of a meal with these ingredients:" + input;

var newPlan = await planner.CreatePlanAsync(ask);

Note that CreatePlanAsync does not execute the plan, it just creates it. We can look at the plan with the following code:

Console.WriteLine("Updated plan:\n");
    Console.WriteLine(JsonSerializer.Serialize(newPlan, new JsonSerializerOptions { WriteIndented = true }));

The output is something like this (note that there are typos in the ingredients but that’s ok, the model should understand):

{
  "state": [
    {
      "Key": "INPUT",
      "Value": ""
    }
  ],
  "steps": [
    {
      "state": [
        {
          "Key": "INPUT",
          "Value": ""
        }
      ],
      "steps": [],
      "parameters": [
        {
          "Key": "INPUT",
          "Value": "courgette, collieflower, steak, tomato"
        }
      ],
      "outputs": [
        "RECIPE_RESULT"
      ],
      "next_step_index": 0,
      "name": "Recipe",
      "skill_name": "MySkills",
      "description": "Creates a recipe from starting ingredients"
    },
    {
      "state": [
        {
          "Key": "INPUT",
          "Value": ""
        }
      ],
      "steps": [],
      "parameters": [
        {
          "Key": "INPUT",
          "Value": "$RECIPE_RESULT"
        }
      ],
      "outputs": [
        "RESULT__IMAGE_DESCRIPTION"
      ],
      "next_step_index": 0,
      "name": "ImageDesc",
      "skill_name": "MySkills",
      "description": "Generate image description for a photo of a recipe or meal"
    }
  ],
  "parameters": [
    {
      "Key": "INPUT",
      "Value": ""
    }
  ],
  "outputs": [
    "RESULT__IMAGE_DESCRIPTION"
  ],
  "next_step_index": 0,
  "name": "",
  "skill_name": "Microsoft.SemanticKernel.Planning.Plan",
  "description": "Create a photo of a meal with these ingredients:courgette, collieflower, steak, tomato"
}

The output shows that the two skills will be used in this case. The input to the ImageDesc plugin is the output from the Recipe plugin.

Note that if your ask has nothing to do with generating dishes and photos of a dish, the skills will still be used resulting in unexpected results.

If you do not provide any skills, the planner will just use itself as a skill and the result will be the original ask. That would still work in this case because the ask can be passed to Dall-E on its own!

With the plan created, we can now execute it and show the result:

var newPlanResult = await newPlan.InvokeAsync();
Console.WriteLine("Plan result: " + newPlanResult.ToString());

This should print the description of the photo. We can pass that result to DALL-E with:

var imageURL = await dallE.GenerateImageAsync(newPlanResult.ToString(), 512, 512);
Process.Start("open", imageURL);   // works on MacOS

If I provide carrots and meat, then I get the following description and photo.

Description: A steaming pot of hearty carrot and meat stew is pictured. The pot is filled with chunks of lean ground beef, diced carrots, diced onion, minced garlic, and a rich tomato paste. Aromatic herbs of oregano and thyme are sprinkled on top, and the stew is finished with a drizzle of olive oil. The stew is ready to be served and is sure to be a delicious and comforting meal.

The photo:

Testing skills and plans

The Semantic Kernel extension for VS Code will find your plugins (skills) and allow you to execute them:

Recipe skill in VS Code

When you click the play icon next to the skill, you will be asked for input. The prompt will then be run by your selected model, with output to the screen:

You can also create plans and execute them in VS Code:

Plan in VS Code

Above, a plan was created based on a goal. The plan included the two plugins and shows the inputs and outputs. By clicking on Execute Plan you can run it without having to write any code. The UI above allows you to inspect the generated plan and change it if it’s not performing as intended.

Conclusion

This concludes my quick look at the Planner functionality in Semantic Kernel with a simple plan and a couple of semantic skills. If you want to learn more, be sure to check these resources: