Building an AI Agent Server with AG-UI and Microsoft Agent Framework

In this post, I want to talk about the Python backend I built for an AG-UI demo project. It is part of a larger project that also includes a frontend that uses CopilotKit:

This post discusses the Python AG-UI server that is built with Microsoft Agent Framework.

All code is on GitHub: https://github.com/gbaeke/agui. Most of the code for this demo was written with GitHub Copilot with the help of Microsoft Docs MCP and Context7. 🤷

What is AG-UI?

Before we dive into the code, let’s talk about AG-UI. AG-UI is a standardized protocol for building AI agent interfaces. Think of it as a common language that lets your frontend talk to any backend agent that supports it, no matter what technology you use.

The protocol gives you some nice features out of the box:

  • Remote Agent Hosting: deploy your agents as web services (e.g. FastAPI)
  • Real-time Streaming: stream responses using Server-Sent Events (SSE)
  • Standardized Communication: consistent message format for reliable interactions (e.g. tool started, tool arguments, tool end, …)
  • Thread Management: keep conversation context across multiple requests

Why does this matter? Well, without a standard like AG-UI, every frontend needs custom code to talk to different backends. With AG-UI, you build your frontend once and it works with any AG-UI compatible backend. The same goes for backends – build it once and any AG-UI client can use it.

Under the hood, AG-UI uses simple HTTP POST requests for sending messages and Server-Sent Events (SSE) for streaming responses back. It’s not complicated, but it’s standardized. And that’s the point.

AG-UI has many more features than the ones discussed in this post. Check https://docs.ag-ui.com/introduction for the full picture.

Microsoft Agent Framework

Now, you could implement AG-UI from scratch but that’s a lot of work. This is where Microsoft Agent Framework comes in. It’s a Python (and C#) framework that makes building AI agents really easy.

The framework handles the heavy lifting when it comes to agent building:

  • Managing chat with LLMs like Azure OpenAI
  • Function calling (tools)
  • Streaming responses
  • Multi-turn conversations
  • And a lot more

The key concept is the ChatAgent. You give it:

  1. chat client (like Azure OpenAI)
  2. Instructions (the system prompt)
  3. Tools (functions the agent can call)

And you’re done. The agent knows how to talk to the LLM, when to call tools, and how to stream responses back.

What’s nice about Agent Framework is that it integrates with AG-UI out of the box, similar to other frameworks like LangGraph, Google ADK and others. You write your agent code and expose it via AG-UI with basically one line of code. The framework translates everything automatically – your agent’s responses become AG-UI events, tool calls get streamed correctly, etc…

The integration with Microsoft Agent Framework was announced on the blog of CopilotKit, the team behind AG-UI. The blog included the diagram below to illustrate the capabilities:

From https://www.copilotkit.ai/blog/microsoft-agent-framework-is-now-ag-ui-compatible

The Code

Let’s look at how this actually works in practice. The code is pretty simple. Most of the code is Microsoft Agent Framework code. AG-UI gets exposed with one line of code.

The Server (server.py)

The main server file is really short:

import uvicorn
from api import app
from config import SERVER_HOST, SERVER_PORT

def main():
    print(f"🚀 Starting AG-UI server at http://{SERVER_HOST}:{SERVER_PORT}")
    uvicorn.run(app, host=SERVER_HOST, port=SERVER_PORT)

if __name__ == "__main__":
    main()

That’s it. We run a FastAPI server on port 8888. The interesting part is in api/app.py:

from fastapi import FastAPI
from agent_framework.ag_ui.fastapi import add_agent_framework_fastapi_endpoint
from agents.main_agent import agent

app = FastAPI(title="AG-UI Demo Server")

# This single line exposes your agent via AG-UI protocol
add_agent_framework_fastapi_endpoint(app, agent, "/")

See that add_agent_framework_fastapi_endpoint() call? That’s all you need. This function from Agent Framework takes your agent and exposes it as an AG-UI endpoint. It handles HTTP requests, SSE streaming, protocol translation – everything.

You just pass in your FastAPI app, your agent, and the route path. Done.

The Main Agent (agents/main_agent.py)

Here’s where we define the actual agent with standard Microsoft Agent Framework abstractions:

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import DefaultAzureCredential
from config import AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME
from tools import get_weather, get_current_time, calculate, bedtime_story_tool

# Create Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(
    credential=DefaultAzureCredential(),
    endpoint=AZURE_OPENAI_ENDPOINT,
    deployment_name=AZURE_OPENAI_DEPLOYMENT_NAME,
)

# Create the AI agent with tools
agent = ChatAgent(
    name="AGUIAssistant",
    instructions="You are a helpful assistant with access to tools...",
    chat_client=chat_client,
    tools=[get_weather, get_current_time, calculate, bedtime_story_tool],
)

This is the heart of the backend. We create a ChatAgent with:

  1. A name: “AGUIAssistant”
  2. Instructions: the system prompt that tells the agent how to behave
  3. A chat clientAzureOpenAIChatClient that handles communication with Azure OpenAI
  4. Tools: a list of functions the agent can call

The code implements a few toy tools and a sub-agent to illustrate how AG-UI handels tool calls. The tools are discussed below:

The Tools (tools/)

In Agent Framework, tools can be Python functions with a decorator:

from agent_framework import ai_function
import httpx
import json

@ai_function(description="Get the current weather for a location")
def get_weather(location: str) -> str:
    """Get real weather information for a location using Open-Meteo API."""
    # Step 1: Geocode the location
    geocode_url = "https://geocoding-api.open-meteo.com/v1/search"
    # ... make HTTP request ...
    
    # Step 2: Get weather data
    weather_url = "https://api.open-meteo.com/v1/forecast"
    # ... make HTTP request ...
    
    # Return JSON string
    return json.dumps({
        "location": resolved_name,
        "temperature": current["temperature_2m"],
        "condition": condition,
        # ...
    })

The @ai_function decorator tells Agent Framework “this is a tool the LLM can use”. The framework automatically:

  • Generates a schema from the function signature
  • Makes it available to the LLM
  • Handles calling the function when needed
  • Passes the result back to the LLM

You just write normal Python code. The function takes typed parameters (location: str) and returns a string. Agent Framework does the rest.

The weather tool calls the Open-Meteo API to get real weather data. In an AG-UI compatible client, you can intercept the tool result and visualize it any way you want before the LLM generates an answer from the tool result:

React client with CopilotKit

Above, when the user asks for weather information, AG-UI events inform the client that a tool call has started and ended. It also streams the tool result back to the client which uses a custom component to render the information. This happens before the chat client generates the answer based on the tool result.

The Subagent (tools/storyteller.py)

This is where it gets interesting. In Agent Framework, a ChatAgent can become a tool with .as_tool():

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

# Create a specialized agent for bedtime stories
bedtime_story_agent = ChatAgent(
    name="BedTimeStoryTeller",
    description="A creative storyteller that writes engaging bedtime stories",
    instructions="""You are a gentle and creative bedtime story teller.
When given a topic, create a short, soothing bedtime story for children.
Your stories should be 3-5 paragraphs long, calming, and end peacefully.""",
    chat_client=chat_client,
)

# Convert the agent to a tool
bedtime_story_tool = bedtime_story_agent.as_tool(
    name="tell_bedtime_story",
    description="Generate a calming bedtime story based on a theme",
    arg_name="theme",
    arg_description="The theme for the story (e.g., 'a brave rabbit')",
)

This creates a subagent – another ChatAgent with different instructions. When the main agent needs to tell a bedtime story, it calls tell_bedtime_story which delegates to the subagent.

Why is this useful? Because you can give each agent specialized instructions. The main agent handles general questions and decides which tool to use. The storyteller agent focuses only on creating good stories. Clean separation of concerns.

The subagent has its own chat client and can have its own tools too if you want. It’s a full agent, just exposed as a tool.

And because it is a tool, you can render it with the standard AG-UI tool events:

Testing with a client

In src/backend there is a Python client client_raw.py. When you run that client against the server and invoke a tool, you will see something like below:

AG-UI client in Python

This client simply uses httpx to talk the AG-UI server and inspects and renders the AG-UI events as they come in.

Why This Works

Let me tell you what I like about this setup:

Separation of concerns: The frontend doesn’t know about Python, Azure OpenAI, or any backend details. It just speaks AG-UI. You could swap the backend for a C# implementation or something else entirely – the frontend wouldn’t care. Besides of course the handling of specific tool calls.

Standard protocol: Because we use AG-UI, any AG-UI client can talk to this backend. We use CopilotKit in the frontend but you could use anything that speaks AG-UI. Take the Python client as an example.

Framework handles complexity: Streaming, tool calls, conversation history, protocol translation – Agent Framework does all of this. You just write business logic.

Easy to extend: Want a new tool? Write a function with @ai_function. Want a specialized agent? Create a ChatAgent and call .as_tool(). That’s it.

The AG-UI documentation explains that the protocol supports 7 different features including human-in-the-loop, generative UI, and shared state. Our simple backend gets all of these capabilities because Agent Framework implements the protocol.

Note that there are many more capabilities. Check the AG-UI interactive Dojo to find out: https://dojo.ag-ui.com/microsoft-agent-framework-python

Wrap Up

This is a simple but powerful pattern for building AI agent backends. You write minimal code and get a lot of functionality. AG-UI gives you a standard way to expose your agent, and Microsoft Agent Framework handles the implementation details.

If you want to try this yourself, the code is in the repo. You’ll need an Azure OpenAI deployment and follow the OAuth setup. After that, just run the code as instructed in the repo README!

The beauty is in the simplicity. Sometimes the best code is the code you don’t have to write.

End-to-end authorization with Entra ID and MCP

Building an MCP server is really easy. Almost every language and framework has MCP support these days, both from a client and server perspective. For example: FastMCP (Python, Typescript), csharp-sdk and many more!

However, in an enterprise scenario where users use a web-based conversational agent that uses tools on an MCP server, those tools might need to connect to back-end systems using the identity of the user. Take a look at the following example:

A couple of things are important here:

  • The user logs on to the app and requests an access token that is valid for the MCP Server; you need app registrations in Entra ID to make this work
  • The MCP Server verifies that this token is from Entra ID and contains the correct audience; we will use FastMCP in Python which has some built-in functionality for token validation
  • When the user asks the agent a question that requires Azure AI Search, the agent decides to make a tool call; the tool on the MCP server does the actual work
  • The tool needs access to the token (there’s a helper in FastMCP for that); next it converts the token to a token valid for Azure AI Search
  • The tool can now perform a search in Azure AI Search using new functionality discussed here

⚠️ MCP is actually not that important here. This technique which uses OAuth 2.0 and OBO flows in Entra ID is a well established pattern that’s been in use for ages!

🧑‍💻 Full source code here: https://github.com/gbaeke/mcp-obo

Let’s get started and get this working!

Entra ID App Registrations

In this case, we will create two registrations: one for the front-end and one for the back-end, the MCP Server. Note that the front-end here will be a command-line app that uses a device flow to authenticate. It uses a simple token cache to prevent having to log on time and again.

Front-end app registration

We will create this in the Azure Portal. I assume you have some knowledge of this so I will not provide detailed step-by-step instructions:

  • Go to App Registrations
  • Create a new registration, FastMCP Auth Web
  • In Authentication, ensure you enable Allow public client flows

You will need the client ID of this app registration in the MCP client we will build later.

Back-end app registration

This is for the MCP server and needs more settings:

  • Create a new registration, FastMCP Auth API
  • In Certificates and secrets, add a secret. We will need this to implement the on-behalf-of flow to obtain the Azure AI Search token
  • In Expose an API, set the Application ID URI to https://CLIENTIDOFAPP. I also added a scope, execute. In addition, add the front-end app client Id to the list of Authorized client applications:
Expose API of MCP app registration

In order to exchange a token for this service for Azure AI Search, we also need to add API permissions:

Permissions for Azure Cognitive, ehhm, AI Search

When you add the above permission, find it in APIs my organization uses:

MCP Client

We can now write an MCP client that calls the MCP server with an access token for the API we created above. As noted before, this will be a command-line app written in Python.

The code simply uses MSAL to initiate a device flow. It also uses a token cache to avoid repeated logins. The code can be found here.

Once we have a token, we can construct an MCP client (with FastMCP) as follows:

token = get_jwt_token()

headers["Authorization"] = f"Bearer {token}"

transport_url = "http://localhost:8000/mcp/"

transport = StreamableHttpTransport(
        url=transport_url,
        headers=headers
    )

client = MCPClient(transport=transport)

This code ensures that requests to the MCP server have an Authorization header that contains the bearer token acquired by get_jwt_token(). The MCP server will validate this token strictly.

The code to connect to the MCP server looks like this:

try:
    logger.info("Connecting to the MCP server...")
    
    # Use the client as an async context manager
    async with client:
        # List available tools on the server
        tools = await client.list_tools()
        logger.info(f"Found {len(tools)} tools on the server")
        
        # Call search tool
        logger.info("Calling search tool...")
        search_result = await client.call_tool("get_documents", {"query": "*"})
        logger.info(f"Search Result: {search_result.structured_content}")
        documents = search_result.structured_content.get("documents", [])
        if documents:
            logger.info("Documents found:")
            for doc in documents:
                name = doc.get("name", "Unnamed Document")
                logger.info(f"  - {name}")
        else:
            logger.info("No documents found.")
    
except Exception as e:
    logger.error(f"Error connecting to MCP server: {e}")

Note that there is no AI Agent involved here. We simply call the tool directly. In my case the document search only lists three documents out of five in total because I only have access to those three. We will look at the MCP server code to see how this is implemented next.

⚠️ Example code of the client is here: https://github.com/gbaeke/mcp-obo/blob/main/mcp_client.py

MCP Server

We will write an MCP server that uses the streamable-http transport on port 8000. The URL to connect to from the client then becomes http://localhost:8000/mcp. That was the URL used by the MCP client above.

The server has one tool: get_documents that takes a query (string) as parameter. By default, the query is set to * which returns all documents. The tool does the following:

  • Obtains the access token with the get_access_token() helper function from FastMCP
  • Exchanges the token for a token with scope https://search.azure.com/.default
  • Creates a SearchClient for Azure AI Search that includes the AI Search endpoint, index name and credential. Note that that credential has nothing to do with the token obtained above. It’s simply a key provided by Azure AI Search to perform searches. The token is used in the actual search request to filter the results.
  • Performs the search, passing in the token via the x_ms_query_source_authorization parameter. You need to use this version of the Azure AI Search Python library: azure-search-documents==11.6.0b12

Here is the code:

# Get the access token from the context
access_token: AccessToken = get_access_token()
original_token = access_token.token

# Exchange token for Microsoft Search token
logger.info("Exchanging token for Microsoft Search access")
search_result = await exchange_token(original_token, scope="https://search.azure.com/.default")
if not search_result["success"]:
    return {"error": "Could not retrieve documents due to token exchange failure."}
else:
    logger.info("Search token exchange successful")
    search_token = search_result["access_token"]
    search_client = SearchClient(endpoint="https://srch-geba.search.windows.net", index_name="document-permissions-push-idx", credential=AzureKeyCredential(os.getenv("AZURE_SEARCH_KEY")))
    results = search_client.search(search_text="*", x_ms_query_source_authorization=search_token, select="name,oid,group", order_by="id asc")
    documents = [
        {
        "name": result.get("name"),
        "oid": result.get("oid"),
        "group": result.get("group")
        }
        for result in results
    ]
    return {"documents": documents}

The most important work is done by the exchange_token() function. It obtains an access token for Azure AI Search that contains the oid (object id) of the user.

Here’s that function:

async def exchange_token(original_token: str, scope: str = "https://graph.microsoft.com/.default") -> dict:
    
    obo_url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
    
    data = {
        "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "assertion": original_token,
        "scope": scope,
        "requested_token_use": "on_behalf_of"
    }
    
    try:
        response = requests.post(obo_url, data=data)
        
        if response.status_code == 200:
            token_data = response.json()
            return {
                "success": True,
                "access_token": token_data["access_token"],
                "expires_in": token_data.get("expires_in"),
                "token_type": token_data.get("token_type"),
                "scope_used": scope,
                "method": "OBO"
            }
        else:
            return {
                "success": False,
                "error": response.text,
                "status_code": response.status_code,
                "scope_attempted": scope,
                "method": "OBO"
            }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "method": "OBO"
        }

Above, the core is in the call to the obo_url which presents the original token to obtain a new one. This will only work if the API permissions are correct on the FastMCP Auth API app registration. When the call is successful, we return a dictionary that contains the access token in the access_token key.

Full code of the server: https://github.com/gbaeke/mcp-obo/blob/main/mcp/main.py

You have now seen the entire flow from client login to calling a method (tool) on the MCP server to connecting to Azure AI Search downstream via an on-behalf-of flow.

But wait! How do we create the Azure AI Search index with support for the permission filter? Let’s take look…

Azure AI Search Configuration

When you want to use permission filtering with the x_ms_query_source_authorization parameter, do the following:

  • Create the index with support for permission filtering
  • Your index needs fields like oid (object Ids) and group (group object Ids) with the correct permission filter option
  • When you add documents to an index, for instance with the push APIs, you need to populate the oid and group fields with identifiers of users and groups that have access
  • Perform a search with the x_ms_query_source_authorization like show below:
results = search_client.search(
search_text="*",
x_ms_query_source_authorization=token_to_use,
select="name,oid,group",
order_by="id asc"
)

Above, token_to_use is the access token for Azure AI Search!

On GitHub, check this notebook from Microsoft to create and populate the index with your own user’s oid. You will need an Azure Subscription with an Azure AI Search instance. The free tier will do. If you use VS Code, use the Jupyter extension to run this notebook.

At the time of this writing, this feature was in preview. Ensure you use the correct version of the AI Search library for Python: azure-search-documents==11.6.0b12.

Wrapping up

I hope this post gives you some ideas on how to build agents that use MCP tools with end-to-end user authentication and authorization. This is just one possible approach. Authorization in the MCP specification has evolved significantly in early 2025 and works somewhat differently.

For most enterprise scenarios where you control both code and configuration (such as with Entra ID), bearer authentication with OBO is often sufficient.

Also consider whether you need MCP at all. If you aren’t sharing tools across multiple agents or projects, a simple API might be enough. For even less overhead, you can embed the tool code directly in your agent and run everything in the same process. Simple and effective.

If you spot any errors or have questions, feel free to reach out!

Using tasks with streaming in Google Agent2Agent (A2A)

In a previous post we created a simple A2A agent that uses synchronous message exchange. An A2A client sends a message and the A2A server, via the Agent Executor, responds with a message.

But what if you have a longer running task to perform and you want to inform the client that the task in ongoing? In that case, you can enable streaming on the A2A server and use a task that streams updates and the final result to the client.

The sequence diagram illustrates the flow of messages. It is based on the streaming example in the A2A specification.

A2A tasks with streaming updates

In this case, the A2A client needs to perform a streaming request which is sent to the /message/stream endpoint of the A2A server. The code in the AgentExecutor will need to create a task and provide updates to the client at regular intervals.

⚠️ If you want to skip directly to the code, check out the example on GitHub.

Let’s get into the details in the following order:

  • Writing an agent that provides updates while it is doing work: I will use the OpenAI Agents SDK with its support for agent hooks
  • Writing an AgentExecutor that accepts a message, creates a task and provides updates to the client
  • Updating the A2A Server to support streaming
  • Updating the A2A Client to support streaming

AI Agent that provides updates

Although streaming updates is an integral part of A2A, the agent that does the actual work needs to provide feedback about its progress. That work is up to you, the developer.

In my example, I use an agent created with the OpenAI Agents SDK. This SDK supports AgentHooks that execute at certain events:

  • Agent started/finished
  • Tool call started/finished

The agent class in agent.py on GitHub uses an asyncio queue to emit both the hook events and the agent’s reponse to the caller. The A2A AgentExecutor uses the invoke_stream() method which returns an AsyncGenerator.

You can run python agent.py independently. This should result in the following:

The agent has a tool that returns the current date. The hooks emit the events as shown above followed by the final result.

We can now use this agent from the AgentExecutor and stream the events and final result from the agent to the A2A Client.

AgentExecutor Tasks and Streaming

Instead of simply returning a message to the A2A client, we now need to initiate a long-running task that sends intermediate updates to the client.. Under the hood this uses SSE (Server Sent Events) between the A2A Client and A2A Server.

The file agent_executor.py on GitHub contains the code that makes this happen. Let’s step through it:

message_text = context.get_user_input()  # helper method to extract the user input from the context
        logger.info(f"Message text: {message_text}")

        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

Above, we extract the user’s input from the incoming message and we check if the context already contains a task. If not, we create the task and we queue it. This informs the client a task was created and that sse can be used to obtain intermediate results.

Now that we have a task (a new or existing one), the following code is used:

updater = TaskUpdater(event_queue, task.id, task.contextId)
async for event in self.agent.invoke_stream(message_text):
    if event.event_type == StreamEventType.RESPONSE:
        # send the result as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=event.data['response']))],
            name='calculator_result',
        )

        await updater.complete()
            
    else:
        await updater.update_status(
        TaskState.working,
        new_agent_text_message(
            event.data.get('message', ''),
            task.contextId,
            task.id,
        ),
    )

We first create a TaskUpdater instance that has the event queue, current task Id and contextId. The task updater is used to provide status updates, complete or even cancel a task.

We then call invoke_stream(query) on our agent and grab the events it emits. If we get a event type of RESPONSE, we create an artifact with the agent response as text and mask the task as complete. In all other cases, we send a status event with updater.update_status(). A status update contains a task state (working in this case) and a message about the state. The message we send is part of the event that is emitted from invoke_stream() and includes things like agent started, tool started, etc…

So in short, to send streaming updates:

  • Ensure your agents emit events of some sort
  • Use those events in the AgentExecutor and create a task that sends intermediate updates until the agent has finished

However, our work is not finished. The A2A Server needs to be updated to support streaming.

A2A Server streaming support

The A2A server code in is main.py on GitHub. To support streaming, we need to update the capabilities of the server:

capabilities = AgentCapabilities(streaming=True, pushNotifications=True)

⚠️ pushNotifications=True is not required for streaming. I include it here to show that sending a push notification to a web hook is also an option.

That’s it! The A2A Server now supports streaming. Easy! 😊

Streaming with the A2A Client

Instead of sending a message to the non-streaming endpoint, the client should now use the streaming endpoint. Here is the code to do that (check test_client.py for the full code):

message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text=question))],
        )
        streaming_request = SendStreamingMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        stream_response = client.send_message_streaming(streaming_request)

To send to the streaming endpoint, the SendStreamingMessageRequest() function is your friend, together with client.send_message_streaming()

We can now grab the responses as they come in:

async for chunk in stream_response:
            # Only print status updates and text responses
            chunk_dict = chunk.model_dump(mode='json', exclude_none=True)
            
            if 'result' in chunk_dict:
                result = chunk_dict['result']
                
                # Handle status updates
                if result.get('kind') == 'status-update':
                    status = result.get('status', {})
                    state = status.get('state', 'unknown')
                    
                    if 'message' in status:
                        message = status['message']
                        if 'parts' in message:
                            for part in message['parts']:
                                if part.get('kind') == 'text':
                                    print(f"[{state.upper()}] {part.get('text', '')}")
                    else:
                        print(f"[{state.upper()}]")
                
                # Handle artifact updates (contain actual responses)
                elif result.get('kind') == 'artifact-update':
                    artifact = result.get('artifact', {})
                    if 'parts' in artifact:
                        for part in artifact['parts']:
                            if part.get('kind') == 'text':
                                print(f"[RESPONSE] {part.get('text', '')}")
                
                # Handle initial task submission
                elif result.get('kind') == 'task':
                    print(f"[TASK SUBMITTED] ID: {result.get('id', 'unknown')}")
                    
                # Handle final completion
                elif result.get('final') is True:
                    print("[TASK COMPLETED]")

This code checks the the type of content coming in:

  • status-update: when AgentExecutor sends a status update
  • artifact-update: when AgentExecutor sends an artifact with the agent’s response
  • task: when tasks are submitted and completed

Running the client and asking what today’s date is, results in the following response:

Streaming is working as intended! But what if you use a client that does not support streaming? That actually works and results in a full response with the agent’s answer in the result field. You would also get a history field that contains the initial user question, including all the task updates.

Here’s a snippet of that result:

{
  "id": "...",
  "jsonrpc": "2.0",
  "result": {
    "artifacts": [
      {
        "artifactId": "...",
        "name": "calculator_result",
        "parts": [
          {
            "kind": "text",
            "text": "Today's date is July 13, 2025."
          }
        ]
      }
    ],
    "contextId": "...",
    "history": [
      {
        "role": "user",
        "parts": [
          {
            "kind": "text",
            "text": "What is today's date?"
          }
        ]
      },
      {
        "role": "agent",
        "parts": [
          {
            "kind": "text",
            "text": "Agent 'CalculatorAgent' is starting..."
          }
        ]
      }
    ],
    "id": "...",
    "kind": "task",
    "status": {
      "state": "completed"
    }
  }
}

Wrapping up

You have now seen how to run longer running tasks and provide updates along the way via streaming. As long as your agent code provides status updates, the AgentExecutor can create a task and provide task updates and the task result to the A2A Server which uses SSE to send them to the A2A Client.

In an upcoming post, we will take a look at running a multi-agent solution in the Azure cloud.

Google’s A2A: taking a closer look

In the previous post, I talked about options to build multi-agent solutions. The last option used Google’s A2A. A2A provides a wrapper around your agent, basically a JSON-RPC API, that standardizes how you talk to your agent. In this post we take a closer look at the basics of A2A with simple synchronous message exchange.

⚠️ A2A is still in development. We do not use it in production yet!

The idea is to build solutions that look like this (just one of the many possibilities):

The conversation agent is an agent that uses tools to get the job done. It wouldn’t be much of an agent without tools right? The tools are custom tools created by the developer that call other agents to do work. The other agents can be written in any framework and use any development language. How the agent works internally is irrelevant. When the conversation agent detects (via standard function calling) that the RAG tool needs to be executed, that tool will call the RAG agent over A2A and return the results.

A2A does not dictate how you build your agent. In the example below, an Azure AI Foundry Agent sits at the core. That agent can use any of its hosted tools or custom tools to get the job done. Because this is a RAG Agent, it might use the built-in Azure AI Search or SharePoint knowledge source. As a developer, you use the Azure AI Foundry SDK or Semantic Kernel to interact with your agent as you see fit. Although you do not have to, it is common to wrap your agent in a class and provide one or more methods to interact with it. For example, an invoke() method and an invoke_streaming() method.

Here is a minimal example for the AI Foundry Agent (the yellow box):

class RAGAgent:
    def __init__(self):
        # INITIALIZATION CODE NOT SHOWN
        self.project = AIProjectClient(
            credential=DefaultAzureCredential(),
            endpoint=endpoint)
        self.agent = self.project.agents.get_agent(agent_id)

    async def invoke(self, question: str) -> str:
        thread = self.project.agents.threads.create()

        message = self.project.agents.messages.create(
            thread_id=thread.id,
            role="user",
            content=question
        )
        run = self.project.agents.runs.create_and_process(
            thread_id=thread.id,
            agent_id=self.agent.id)
        messages = list(self.project.agents.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING))

        # ...

This code has nothing to do with Google A2A and could be implemented in many other ways. This is about to change because we will now call the above agent from A2A’s AgentExecutor. The AgentExecutor is a key server‑side interface: when a client sends a message, the A2A server calls execute() on your AgentExecutor instance, and your implementation handles the logic and sends updates via an event queue. Here’s how your agent is used by A2A. When a client sends a message it works its way down to your agent via several A2A components:

It’s important to understand the different types of message exchange in A2A. This post will not look at all of them. You can find more information in the A2A documentation. This post uses synchronous messaging via message/send where the response is a simple message and not a, potentially longer running, task.

Let’s dive into the AgentExecutor (it processes the message we send) and work our way up to the A2A client.

AgentExecutor

Let’s take a look at a bare bones implementation of AgentExecutor that works with plain/text input and output messages and without streaming:

Client --message--> A2A Server --> Agent Executor --> Agent

and

Agent --> Agent Executor --> A2A Server --message--> Client
class RAGAgentExecutor(AgentExecutor):

    def __init__(self):
        self.agent = RAGAgent()

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        message_text = context.get_user_input()
        
        result = await self.agent.invoke(message_text)

        await event_queue.enqueue_event(new_agent_text_message(result))
        
    async def cancel(self, context: RequestContext, event_queue: EventQueue):
        raise Exception("Cancel not supported")

When a message is sent to the A2A server via JSON-RPC, the execute() method of the RAGAgentExecutor is called. At server startup, __init__ creates our AI Foundry RAGAgent which does the actual work.

Inside the execute() method, we assume the context contains a message. We use the get_user_input() helper to extract the message text (user query). We then simply call our agent’s invoke() method with that query and return the result via the event_queue. The A2A server uses an event_queue to provide responses back to the caller. In this case, the response will be a simple plain/text message.

This is probably as simple as it gets and is useful to understand A2A’s basic operation. In many cases though, you might want to return a longer running task instead of a message and provide updates to the client via streaming. That would require creating the task and streaming the task updates to the client. The client would need to be modified to handle this.

But wait, we still need to create the server that uses this AgentExecutor. Let’s take a look!

A2A Server

The A2A Python SDK uses starlette and uvicorn to create the JSON-RPC server. You don’t really need to know anything about this because A2A does this under the covers for you. The server needs to do a couple of things:

  • Create one or more skills: skills represent a specific capability or function your agent offers—for instance, “currency conversion,” “document summary” or “meeting scheduling”.
  • Create an agent card: an agent card is like a business card for your agent; it tells others what the agent can do; the above skills are part of the agent card; the agent card is published at /.well-known/agent.json on the agents domain (e.g., localhost:9999 on your local machine)
  • Create a request handler: the request handler ties the server to the AgentExecutor you created earlier
  • Create the A2AStarletteApplication: it ties the agent card and the request handler together
  • Serve the A2AStarletteApplication with uvicorn on an address and port of your choosing

This is what it looks like in code:

import logging
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from agent_executor import RagAgentExecutor

def main():
    skill = AgentSkill(
        id="rag_skill",
        name="RAG Skill",
        description="Search knowledge base for project information",
        tags=["rag", "agent", "information"],
        examples=["What is project Astro and what tech is used in it?"],
    )
    agent_card = AgentCard(
        name="RAG Agent",
        description="A simple agent that searches the knowledge base for information",
        url="http://localhost:9998/",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        skills=[skill],
        version="1.0.0",
        capabilities=AgentCapabilities(),
    )
    request_handler = DefaultRequestHandler(
        agent_executor=RagAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )
    server = A2AStarletteApplication(
        http_handler=request_handler,
        agent_card=agent_card,
    )
    uvicorn.run(server.build(), host="0.0.0.0", port=9998)
if __name__ == "__main__":
    main()

Validating the agent card

When you run the A2A server on your local machine and expose it to the public with ngrok or other tools, you can use https://a2aprotocol.ai/a2a-protocol-validator to validate it. When I do this for the RAG Agent, I get the following:

In JSON, the agent card is as follows:

{
  "capabilities": {},
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text"
  ],
  "description": "A simple agent that searches the knowledge base for information",
  "name": "RAG Agent",
  "protocolVersion": "0.2.5",
  "skills": [
    {
      "description": "Search knowledge base for project information",
      "examples": [
        "What is project Astro and what tech is used in it?"
      ],
      "id": "rag_agent",
      "name": "RAG Agent",
      "tags": [
        "rag",
        "agent",
        "information"
      ]
    }
  ],
  "url": "http://Geerts-MacBook-Air-2.local:9998/",
  "version": "1.0.0"
}

Now it is time to actually start talking to the agent.

Using the A2A client to talk to the agent

With the server up and running and the Agent Card verified, how do we exchange messages with the server?

In our case, where the server supports only text and there is no streaming, the client can be quite simple:

  • Create an httpx client and set timeout higher depending on how long it takes to get a response; this client is used by the A2ACardResolver and A2AClient
  • Retrieve the agent card with the A2ACardResolver
  • Create a client with A2AClient. It needs the agent card as input and will use the url in the agent card to connect to the A2A server
  • Create a Message, include it in a MessageRequest and send the MessageRequest with the client. We use the non-streaming message_send() method.
  • Handle the response from the client

The code below shows what this might look like:

import uuid

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    AgentCard,
    Message,
    MessageSendParams,
    Part,
    Role,
    SendMessageRequest,
    TextPart,
)

PUBLIC_AGENT_CARD_PATH = "/.well-known/agent.json"
BASE_URL = "http://localhost:9998"


async def main() -> None:
    timeout = httpx.Timeout(200.0, read=200.0, write=30.0, connect=10.0)
    async with httpx.AsyncClient(timeout=timeout) as httpx_client:
        # Initialize A2ACardResolver
        resolver = A2ACardResolver(
            httpx_client=httpx_client,
            base_url=BASE_URL,
        )

        final_agent_card_to_use: AgentCard | None = None

        try:
            print(
                f"Fetching public agent card from: {BASE_URL}{PUBLIC_AGENT_CARD_PATH}"
            )
            _public_card = await resolver.get_agent_card()
            print("Fetched public agent card")
            print(_public_card.model_dump_json(indent=2))

            final_agent_card_to_use = _public_card

        except Exception as e:
            print(f"Error fetching public agent card: {e}")
            raise RuntimeError("Failed to fetch public agent card")

        client = A2AClient(
            httpx_client=httpx_client, agent_card=final_agent_card_to_use
        )
        print("A2AClient initialized")

        message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text="Is there a project with the word Astro? If so, describe it."))],
        )
        request = SendMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        response = await client.send_message(request)
        print("Response:")
        print(response.model_dump_json(indent=2))


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Above, the entire response is printed as JSON. That is useful to learn what the responses look like. This is part of the response:

{
  "id": "6cc795d8-fa84-4734-8b5a-dccd3a22142d",
  "jsonrpc": "2.0",
  "result": {
    "contextId": null,
    "extensions": null,
    "kind": "message",
    "messageId": "fead200d-0ea4-4ccb-bf1c-ed507b38d79d",
    "metadata": null,
    "parts": [
      {
        "kind": "text",
        "metadata": null,
        "text": "RESPONSE FROM RAG AGENT"
      }
    ],
    "referenceTaskIds": null,
    "role": "agent",
    "taskId": null
  }
}

Simply sending the response as a string on the event queue results in a message with one text part. The result from the RAG agent is in the text property. For a longer running task with streaming updates, the response would be quite different.

You can now easily interact with your agent using this client. For example:

  • use the client in any application (need not be an agent)
  • use the client in a workflow engine like LangGraph
  • use the client in an agent tool; the agent can be written in any framework; when the agent identifies a tool call is needed, the tool is run which contains A2AClient code to interact with the A2A Agent

The entire flow

The diagram below shows the end-to-end flow:

Try it yourself

On GitHub, check https://github.com/gbaeke/multi_agent_aca/tree/main/a2a_simple for a skeleton implementation of a calculator agent. The CalculatorAgent class’s invoke() method always returns “I did not do anything!” It’s up to you to change that!

You can run this A2A server as-is and connect to it with test_client.py. To use an actual agent, update the CalculatorAgent class’s invoke() method with a real agent written in your preferred framework.

Check the README.md for more instructions.

That’s it for this post! In a next one, we will look at a more complex example that streams messages to the client. Stay tuned!

Creating a custom GPT to query any knowledge base with actions

A while ago, OpenAI introduced GPTs. A GPT is a custom version of ChatGPT that combine instructions, extra knowledge, and any combination of skills.

In this tutorial, we are going to create a custom GPT that can answer questions about articles on this blog. In order to achieve that, we will do the following:

  • create an Azure AI Search index
  • populate the index with content of the last 50 blog posts (via its RSS feed)
  • create a custom API with FastAPI (Python) that uses the Azure OpenAI “add your data” APIs to provide relevant content to the user’s query
  • add the custom API as an action to the custom GPT

The image below shows the properties of the GPT. You need to be a ChatGPT Plus subscriber to create a GPT.

Part of the custom GPT definition

To implement a custom action for the GPT, you need an API with an OpenAPI spec. When you use FastAPI, an OpenAPI JSON document can easily be downloaded and provided to the GPT. You will need to modify the JSON document with a servers section to specify the URL the GPT has to use.

In what follows, we will look at all of the different pieces that make this work. Beware: long post! 😀

Azure AI Search Index

Azure AI Search is a search service you create in Azure. Although there is a free tier, I used the basic tier. The basic tiers allows you to use its semantic reranker to optimise search results.

To create the index and populate it with content, I used the following notebook: https://github.com/gbaeke/custom-gpt/blob/main/blog-index/website-index.ipynb.

The result is an index like below:

Index in Azure AI Search

The index contains 292 documents although I only retrieve the last 50 blog posts. This is the result of chunking each post into smaller pieces of about 500 tokens with 100 tokens of overlap for each chunk. We use smaller chunks because we do not want to send entire blog posts as content to the large language model (LLM).

Note that the index supports similarity searches using vectors. The contentVector field contains the OpenAI embedding of the text in the content field.

Although vectors are available, we do not have to use vector search. Azure AI search supports simple keyword search as well. Together with the semantic ranker, it can provide more relevant results than keyword search on its own.

Note: in general, vector search will provide better results, especially when combined with keyword search and the semantic ranker

Use the index with Azure OpenAI “add your data”

I have written about the Azure OpenAI “add your data” features before. It provides a wizard experience to add an Azure AI Search index to the Azure OpenAI playground and directly test your index with the model of your choice.

From you Azure OpenAI instance, first open Azure OpenAI Studio:

Go to OpenAI Studio from the Overview page of your Azure OpenAI instance

Note: you still need to complete a form to get access to Azure OpenAI. Currently, it can take around a day before you are allowed to create Azure OpenAI instances in your subscription.

In Azure OpenAI Studio, click Bring your own data from the Home screen:

Bring your own data

Select the Azure AI Search index and click Next.

Azure AI Search index selection

Note: I created the index using the generally available API that supports vector search. The Add your data wizard, at the time of writing, was not updated yet to support these new indexes. That is the reason why vector search cannot be enabled. We will use keyword + semantic search instead. I expect this functionality to be available soon (November/December 2023).

Next, provide field mappings:

Field Mappings

These mappings are required because the Add your data feature excepts these standard fields. You should have at least a content field to search. Above, I do not have a file name field because I have indexed blog posts. It’s ok to leave that field blank.

After clicking Next, we get to data management:

Data Management

Here, we specify the type of search. Semantic means keyword + semantic. In the dropdown list, you can also select keyword search on its own. However, that might give you less relevant results.

Note: for Semantic to work, you need to turn on the Semantic ranker on the Azure AI Search resource. Additionally, you need to create a semantic profile on the index.

Now you can click Next, followed by Save and close. The Azure OpenAI Chat Playground appears with the index added:

Index added as a data source

You can now start chatting with your data. Select a chat model like gpt-4 or gpt-35-turbo. In Azure OpenAI, you have to deploy these models first and give the deployment a name.

Chat session with your data

Above, I asked about the OpenAI Assistants API, which is one of the posts on my blog. In the background, the playground performs a search on the Azure AI Search index and provides the results as context to the model. The gpt-35-turbo model answers the user’s question, based on the context coming from the index.

When you are happy with the result, you can export this experience to an Azure Web App of CoPilot Studio (Power Virtual Agents):

Export the “chat with data” experience

In our case, we want to use this configuration from code and provide an API we can add to the custom GPT.

⚠️ It’s import to realise that, with this approach, we will send the final answer, generated by an Azure OpenAI model, to the custom GPT. An alternate approach would be to hand the results of the Azure AI Search query to the custom GPT and let it formulate the answer on its own. That would be faster and less costly. If you also provide the blog post’s URL, ChatGPT can refer to it. However, the focus here is on using any API with a custom GPT so let’s continue with the API that uses the “add your data” APIs.

If you want to hand over Azure AI search results directly to ChatGPT, check out the code in the azure-ai-search folder in the Github repo.

Creating the API

To create an API that uses the index with the model, as configured in the playground, we can use some code. In fact, the playground provides sample code to work with:

Sample code from the playground

‼️ Sadly, this code will not work due to changes to the openai Python package. However, the principle is still the same:

  • call the chat completion extension API which is specific to Azure; in the code you will see this is as a Python f-string: f"{openai.api_base}/openai/deployments/{deployment_id}/extensions/chat/completions?api-version={openai.api_version}"
  • the JSON payload for this API needs to include the Azure AI Search configuration in a dataSources array.

The extension API will query Azure AI Search for you and create the prompt for the chat completion with context from the search result.

To create a FastAPI API that does this for the custom GPT, I decided to not use the openai package and simply use the REST API. Here is the code:

from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
import httpx, os
import dotenv
import re

# Load environment variables
dotenv.load_dotenv()

# Initialize FastAPI app
app = FastAPI()

# Constants (replace with your actual values)
api_base = "https://oa-geba-france.openai.azure.com/"
api_key = os.getenv("OPENAI_API_KEY")
deployment_id = "gpt-35-turbo"
search_endpoint = "https://acs-geba.search.windows.net"
search_key = os.getenv("SEARCH_KEY")
search_index = "blog"
api_version = "2023-08-01-preview"

# Pydantic model for request body
class RequestBody(BaseModel):
    query: str

# Define the API key dependency
def get_api_key(api_key: str = Header(None)):
    if api_key is None or api_key != os.getenv("API_KEY"):
        raise HTTPException(status_code=401, detail="Invalid API Key")
    return api_key

# Endpoint to generate response
@app.post("/generate_response", dependencies=[Depends(get_api_key)])
async def generate_response(request_body: RequestBody):
    url = f"{api_base}openai/deployments/{deployment_id}/extensions/chat/completions?api-version={api_version}"
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key
    }
    data = {
        "dataSources": [
            {
                "type": "AzureCognitiveSearch",
                "parameters": {
                    "endpoint": search_endpoint,
                    "key": search_key,
                    "indexName": search_index
                }
            }
        ],
        "messages": [
            {
                "role": "system",
                "content": "You are a helpful assistant"
            },
            {
                "role": "user",
                "content": request_body.query
            }
        ]
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=data, headers=headers, timeout=60)

    if response.status_code != 200:
        raise HTTPException(status_code=response.status_code, detail=response.text)

    response_json = response.json()

    # get the assistant response
    assistant_content = response_json['choices'][0]['message']['content']
    assistant_content = re.sub(r'\[doc.\]', '', assistant_content)
    
    # return assistant_content as json
    return {
        "response": assistant_content
    }

# Run the server
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, timeout_keep_alive=60)

This API has one endpoint: /generate_response that takes { "query": "your query" }as input and returns { "response": assistant_content }as output. Note that the original response from the model contains references like [doc1], [doc2], etc… The regex in the code removes those references. I don not particularly like how the references are handled by the API so I decided to not include them and simplify the response.

The endpoint expects an api-key header. It it is not present, it returns an error.

The endpoint does a call to the Azure OpenAI chat completion extension API which looks very similar to a regular OpenAI chat completion. The request does however, contain a dataSources field with the Azure AI Search information.

The environment variables like the OPENAI_API_KEY and the SEARCH_KEY are retrieved from a .env file.

Note: to stress this again, this API returns the answer to the query as generated by the chosen Azure OpenAI model. This allows it to be used in any application, not just a custom GPT. For a custom GPT in ChatGPT, an alternate approach would be to hand over the search results from Azure AI search directly, allowing the model in the custom GPT to generate the response. It would be faster and avoid Azure OpenAI costs. We are effectively using the custom GPT as a UI and as a way to maintain history between action calls. 😀

If you want to see the code in GitHub, check this URL: https://github.com/gbaeke/custom-gpt.

Running the API in Azure Container Apps

To run the API in the cloud, I decided to use Azure Container Apps. That means we need a Dockerfile to build the container image locally or in the cloud:

# Use an official Python runtime as a parent image
FROM python:3.9-slim-buster

# Set the working directory in the container to /app
WORKDIR /app

# Add the current directory contents into the container at /app
ADD . /app

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Run app.py when the container launches
CMD ["python3", "app.py"]

We also need a requirements.txt file:

fastapi==0.104.1
pydantic==2.5.2
pydantic_core==2.14.5
httpx==0.25.2
python-dotenv==1.0.0
uvicorn==0.24.0.post1

I use the following shell script to build and run the container locally. The script can also push the container to Azure Container Apps.

#!/bin/bash

# Load environment variables from .env file
export $(grep -v '^#' .env | xargs)

# Check the command line argument
if [ "$1" == "build" ]; then
    # Build the Docker image
    docker build -t myblog .
elif [ "$1" == "run" ]; then
    # Run the Docker container, mapping port 8000 to 8000 and setting environment variables
    docker run -p 8000:8000 -e OPENAI_API_KEY=$OPENAI_API_KEY -e SEARCH_KEY=$SEARCH_KEY -e API_KEY=$API_KEY myblog
elif [ "$1" == "up" ]; then
    az containerapp up -n myblog --ingress external --target-port 8000 \
        --env-vars OPENAI_API_KEY=$OPENAI_API_KEY SEARCH_KEY=$SEARCH_KEY API_KEY=$API_KEY \
        --source .
else
    echo "Usage: $0 {build|run|up}"
fi

The shell script extracts the environment variables defined in .env and sets them in the session. Next, we check the first parameter given to the script (Docker is required on your machine for build and run):

  • build: build the Docker image
  • run: run the Docker image locally on port 8000 and specify the environment variables to authenticate to Azure OpenAI and Azure AI Search
  • up: build the Docker image in the cloud and run it in Container Apps; if you do not have a Container Apps Environment or Azure Container Registry, they will be created for you. In the end, you will get an https endpoint to your API in the cloud.

Note: you should not put secrets in environment variables in Azure Container Apps directly; use Container Apps secrets or Key Vault instead; the above is just quick and easy to simplify the deployment

To test the API locally, use the REST Client extension in VS Code with an .http file:

POST http://localhost:8000/generate_response HTTP/1.1
Host: localhost:8000
Content-Type: application/json
api-key: API_KEY_FROM_DOTENV

{
  "query": "what is the openai assistants api?"
}

###

POST https://AZURE_CONTAINER_APPS_ENDPOINT/generate_response HTTP/1.1
Host: AZURE_CONTAINER_APPS_ENDPOINT
Content-Type: application/json
api-key: API_KEY_FROM_DOTENV

{
  "query": "Can I use Redis as a vector db?"
}

When you get something like below, you are good to go. Note again that we return a final answer and not the relevant chunks from Azure AI search.

Successful response from .http file

Getting the OpenAPI spec and adding it to the GPT

With your API running, you can go to its URL, like this one if the API runs locally: http://localhost:8000/openapi.json. The result is a JSON document you can copy to your GPT. I recommend to copy the JSON to VS Code and format it before you paste it in the GPT.

In the GPT, modify the OpenAPI spec with a servers section that includes your Azure Container Apps ingress URL:

Adding the URL to the GPT Action definition

If you want to give the ability to the user to trust the action to be called without approval (after a first call), also add the following:

Allowing the user to say Always Allow when action is used the first time

Take a look at the video below that shows how to create the GPT, including the configuration of the action and testing it.

Conclusion

Custom GPTs in ChatGPT open up a world of possibilities to offer personalised ChatGPT experiences. With custom actions, you can let the GPT do anything you want. In this tutorial, the custom action is an API call that answers the user’s question using Azure OpenAI with Azure AI Search as the provider of relevant context.

As long as you build and host an API and have an OpenAPI spec for your API, the possibilities are virtually limitless.

Note that custom GPTs with actions are not available in the ChatGPT app on mobile yet (end November, 2023). When that happens, it will open up all these capabilities on the go, including enabling voice chat. Fun stuff! 😀

Enhancing Blog Post Search with Chunk-based Embeddings and Pinecone

In this blog post, we’ll show you a different approach to searching through a large database of blog posts. The previous approach involved creating a single embedding for the entire article and storing it in a vector database. The new approach is much more effective, and in this post, we’ll explain why and how to implement it.

The new approach involves the following steps:

  1. Chunk the article into pieces of about 400 tokens using LangChain
  2. Create an embedding for each chunk
  3. Store each embedding, along with its metadata such as the URL and the original text, in Pinecone
  4. Store the original text in Pinecone, but not indexed
  5. To search the blog posts, find the 5 best matching chunks and add them to the ChatCompletion prompt

We’ll explain each step in more detail below, but first, let’s start with a brief overview of the previous approach.

The previous approach used OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. The article was vectorized as a whole, and the resulting vector was stored in Pinecone. To search the blog posts, cosine similarity was used to find the closest matching article, and the contents of the article were retrieved using the Python requests library and the BeautifulSoup library. Finally, a prompt was created for the ChatCompletion API, including the retrieved article.

The problem with this approach was that the entire article was vectorized as one piece. This meant that if the article was long, the vector might not represent the article accurately, as it would be too general. Moreover, if the article was too long, the ChatCompletion API call might fail because too many tokens were used.

The new approach solves these problems by chunking the article into smaller pieces, creating an embedding for each chunk, and storing each embedding in Pinecone. This way, we have a much more accurate representation of the article, as each chunk represents a smaller, more specific part of the article. And because each chunk is smaller, there is less risk of using too many tokens in the ChatCompletion API call.

To implement the new approach, we’ll use LangChain to chunk the article into pieces of about 400 tokens. LangChain is a library aimed at assisting in the development of applications that use LLMs, or large language models.

Next, we’ll create an embedding for each chunk using OpenAI’s embeddings API. As before, we will use the text-embedding-ada-002 model. And once we have the embeddings, we’ll store each one, along with its metadata, in Pinecone. The key for each embedding will be a hash of the URL, combined with the chunk number.

The original text will also be stored in Pinecone, but not indexed, so that it can be retrieved later. With this approach, we do not need to retrieve a blog article from the web. Instead, we just get the text from Pinecone directly.

To search the blog posts, we’ll use cosine similarity to find the 5 best-matching chunks. The 5 best matching chunks will be added to the ChatCompletion prompt, allowing us to ask questions based on the article’s contents.

Uploading the embeddings

The code to upload the embeddings is shown below. You will need to set the following environment variables:

export OPENAI_API_KEY=your_openai_api_key
export PINECONE_API_KEY=your_pinecone_api_key
export PINECONE_ENVIRONMENT=your_pinecone_environment
import feedparser
import os
import pinecone
import openai
import requests
from bs4 import BeautifulSoup
from retrying import retry
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tiktoken
import hashlib

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')

# create the length function used by the RecursiveCharacterTextSplitter
def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def create_embedding(article):
    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    return embedding["data"][0]["embedding"]

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

if "blog-index" not in pinecone.list_indexes():
    print("Index does not exist. Creating...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})
else:
    print("Index already exists. Deleting...")
    pinecone.delete_index("blog-index")
    print("Creating new index...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://atomic-temporary-16150886.wpcomstaging.com/feed/'

# Parse the RSS feed with feedparser
print("Parsing RSS feed: ", url)
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

# create recursive text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=400,
    chunk_overlap=20,  # number of tokens overlap between chunks
    length_function=tiktoken_len,
    separators=['\n\n', '\n', ' ', '']
)

pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Create embeddings for entry ", i, " of ", entries, " (", entry.link, ")")

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # create chunks
    chunks = text_splitter.split_text(article)

    # create md5 hash of entry.link
    url = entry.link
    url_hash = hashlib.md5(url.encode("utf-8"))
    url_hash = url_hash.hexdigest()
        
    # create embeddings for each chunk
    for j, chunk in enumerate(chunks):
        print("\tCreating embedding for chunk ", j, " of ", len(chunks))
        vector = create_embedding(chunk)

        # concatenate hash and j
        hash_j = url_hash + str(j)

        # add vector to pinecone_vectors list
        print("\tAdding vector to pinecone_vectors list for chunk ", j, " of ", len(chunks))
        pinecone_vectors.append((hash_j, vector, {"url": entry.link, "chunk-id": j, "text": chunk}))

        # upsert every 100 vectors
        if len(pinecone_vectors) % 100 == 0:
            print("Upserting batch of 100 vectors...")
            upsert_response = index.upsert(vectors=pinecone_vectors)
            pinecone_vectors = []

# if there are any vectors left, upsert them
if len(pinecone_vectors) > 0:
    print("Upserting remaining vectors...")
    upsert_response = index.upsert(vectors=pinecone_vectors)
    pinecone_vectors = []

print("Vector upload complete.")

Searching for blog posts

The code below is used to search blog posts:

import os
import pinecone
import openai
import tiktoken

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index
index = pinecone.Index('blog-index')

while True:
    # set query
    your_query = input("\nWhat would you like to know? ")
    
    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        print("Error calling OpenAI Embedding API: ", e)
        continue

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=5,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunks = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunks)

    # print urls of the chunks
    print("URLs:\n\n", urls)

    # print the text number and first 50 characters of each text
    print("\nChunks:\n")
    for i, t in enumerate(chunks):
        print(f"\nChunk {i}: {t[:50]}...")

    try:
        # openai chatgpt with article as context
        # chat api is cheaper than gpt: 0.002 / 1000 tokens
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                { "role": "system", "content":  "You are a thruthful assistant!" },
                { "role": "user", "content": f"""Answer the following query based on the context below ---: {your_query}
                                                    Do not answer beyond this context!
                                                    ---
                                                    {all_chunks}""" }
            ],
            temperature=0,
            max_tokens=750
        )

        print(f"\n{response.choices[0]['message']['content']}")
    except Exception as e:
        print(f"Error with OpenAI Completion: {e}")

In Action

Below, we ask if Redis supports storing vectors and what version of Redis we need in Azure. The Pinecone vector search found 5 chunks, all from the same blog post (there is only one URL). The five chunks are combined and sent to ChatGPT, together with the original question. The response from the ChatCompletion API is clear!

Example question and response

Conclusion

In conclusion, the “chunked” approach to searching through a database of blog posts is much more effective and solves many of the problems associated with the previous approach. We hope you found this post helpful, and we encourage you to try out the new approach in your own projects!

Pinecone and OpenAI magic: A guide to finding your long lost blog posts with vectorized search and ChatGPT

Searching through a large database of blog posts can be a daunting task, especially if there are thousands of articles. However, using vectorized search and cosine similarity, you can quickly query your blog posts and retrieve the most relevant content.

In this blog post, we’ll show you how to query a list of blog posts (from this blog) using a combination of vectorized search with cosine similarity and OpenAI ChatCompletions. We’ll be using OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. We’ll also show you how to retrieve the contents of the article, create a prompt using the ChatCompletion API, and return the result to a web page.

ℹ️ Sample code is on GitHub: https://github.com/gbaeke/gpt-vectors

ℹ️ If you want an introduction to embeddings and cosine similarity, watch the video on YouTube by Part Time Larry.

Setting Up Pinecone

Before we can start querying our blog posts, we need to set up Pinecone. Pinecone is a vector database that makes it easy to store and query high-dimensional data. It’s perfect for our use case since we’ll be working with high-dimensional vectors.

ℹ️ Using a vector database is not strictly required. The GitHub repo contains app.py, which uses scikit-learn to create the vectors and perform a cosine similarity search. Many other approaches are possible. Pinecone just makes storing and querying the vectors super easy.

ℹ️ If you want more information about Pinecone and the concept of a vector database, watch this introduction video.

First, we’ll need to create an account with Pinecone and get the API key and environment name. In the Pinecone UI, you will find these as shown below. There will be a Show Key and Copy Key button in the Actions section next to the key.

Key and environment for Pinecone

Once we have an API key and the environment, we can use the Pinecone Python library to create and use indexes. Install the Pinecone library with pip install pinecone-client.

Although you can create a Pinecone index from code, we will create the index in the Pinecone portal. Go to Indexes and select Create Index. Create the index using cosine as metric and 1536 dimensions:

blog-index in Pinecone

The embedding model we will use to create the vectors, text-embedding-ada-002, outputs vectors with 1536 dimensions. For more info see OpenAI’s blog post of December 15, 2022.

To use the Pinecode index from code, look at the snippet below:

import pinecone

pinecone_api = "<your_api_key>"
pinecone_env = "<your_environment>"

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

index = pinecone.Index('blog-index')

We create an instance of the Index class with the name “blog-index” and store this in index. This index will be used to store our blog post vectors or to perform searches on.

Vectorizing Blog Posts with OpenAI’s Embeddings API

Next, we’ll need to vectorize our blog post articles. We’ll be using OpenAI’s embeddings API to do this. The embeddings API takes a piece of text and returns a high-dimensional vector representation of that text. Here’s an example of how to do that for one article or string:

import openai

openai.api_key = "<your_api_key>"

article = "Some text from a blog post"

vector = openai.Embedding.create(
    input=article,
    model="text-embedding-ada-002"
)["data"][0]["embedding"]

We create a vector representation of our blog post article by calling the Embedding class’s create method. We pass in the article text as input and the text-embedding-ada-002 model, which is a pre-trained language model that can generate high-quality embeddings.

Storing Vectors in Pinecone

Once we have the vector representations of our blog post articles, we can store them in Pinecone. Instead of storing vector per vector, we can use upsert to store a list of vectors. The code below uses the feed of this blog to grab the URLs for 50 posts, every post is vectorized and the vector is added to a Python list of tuples, as expected by the upsert method. The list is then added to Pinecone at once. The tuple that Pinecone expects is:

(id, vector, metadata dictionary)

e.g. (0, vector for post 1, {"url": url to post 1}

Here is the code that uploads the first 50 posts of baeke.info to Pinecone. You need to set the Pinecone key and environment and the OpenAI key as environment variables. The code uses feedparser to grab the blog feed, and BeatifulSoup to parse the retrieved HTML. The code serves as an example only. It is not very robust when it comes to error checking etc…

import feedparser
import os
import pinecone
import numpy as np
import openai
import requests
from bs4 import BeautifulSoup

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://atomic-temporary-16150886.wpcomstaging.com/feed/'

# Parse the RSS feed with feedparser
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

post_texts = []
pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Processing entry ", i, " of ", entries)

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    # print the embedding (length = 1536)
    vector = embedding["data"][0]["embedding"]

    # append tuple to pinecone_vectors list
    pinecone_vectors.append((str(i), vector, {"url": entry.link}))

# all vectors can be upserted to pinecode in one go
upsert_response = index.upsert(vectors=pinecone_vectors)

print("Vector upload complete.")

Querying Vectors with Pinecone

Now that we have stored our blog post vectors in Pinecone, we can start querying them. We’ll use cosine similarity to find the closest matching blog post. Here is some code that does just that:

query_vector = <vector representation of query>  # vector created with OpenAI as well

search_response = index.query(
    top_k=5,
    vector=query_vector,
    include_metadata=True
)

url = get_highest_score_url(search_response['matches'])

def get_highest_score_url(items):
    highest_score_item = max(items, key=lambda item: item["score"])

    if highest_score_item["score"] > 0.8:
        return highest_score_item["metadata"]['url']
    else:
        return ""

We create a vector representation of our query (you don’t see that here but it’s the same code used to vectorize the blog posts) and pass it to the query method of the Pinecone Index class. We set top_k=5 to retrieve the top 5 matching blog posts. We also set include_metadata=True to include the metadata associated with each vector in our response. That way, we also have the URL of the top 5 matching posts.

The query method returns a dictionary that contains a matches key. The matches value is a list of dictionaries, with each dictionary representing a matching blog post. The score key in each dictionary represents the cosine similarity score between the query vector and the blog post vector. We use the get_highest_score_url function to find the blog post with the highest cosine similarity score.

The function contains some code to only return the highest scoring URL if the score is > 0.8. It’s of course up to you to accept lower matching results. There is a potential for the vector query to deliver an article that’s not highly relevant which results in an irrelevant context for the OpenAI ChatCompletion API call we will do later.

Retrieving the Contents of the Blog Post

Once we have the URL of the closest matching blog post, we can retrieve the contents of the article using the Python requests library and the BeautifulSoup library.

import requests
from bs4 import BeautifulSoup

r = requests.get(url)
soup = BeautifulSoup(r.text, 'html.parser')

article = soup.find('div', {'class': 'entry-content'}).text

We send a GET request to the URL of the closest matching blog post and retrieve the HTML content. We use the BeautifulSoup library to parse the HTML and extract the contents of the <div> element with the class “entry-content”.

Creating a Prompt for the ChatCompletion API

Now that we have the contents of the blog post, we can create a prompt for the ChatCompletion API. The crucial part here is that our OpenAI query should include the blog post we just retrieved!

response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
        { "role": "system", "content": "You are a polite assistant" },
        { "role": "user", "content": "Based on the article below, answer the following question: " + your_query +
            "\nAnswer as follows:" +
            "\nHere is the answer directly from the article:" +
            "\nHere is the answer from other sources:" +
             "\n---\n" + article }
           
    ],
    temperature=0,
    max_tokens=200
)

response_text=f"\n{response.choices[0]['message']['content']}"

We use the ChatCompletion API with the gpt-3.5-turbo model to ask our question. This is the same as using ChatGPT on the web with that model. At this point in time, the GPT-4 model was not available yet.

Instead of one prompt, we send a number of dictionaries in a messages list. The first item in the list sets the system message. The second item is the actual user question. We ask to answer the question based on the blog post we stored in the article variable and we provide some instructions on how to answer. We add the contents of the article to our query.

If the article is long, you run the risk of using too many tokens. If that happens, the ChatCompletion call will fail. You can use the tiktoken library to count the tokens and prevent the call to happen in the first place. Or you can catch the exception and tell the user. In the above code, there is no error handling. We only include the core code that’s required.

Returning the Result to a Web Page

If you are running the search code in an HTTP handler as the result of the user typing a query in a web page, you can return the result to the caller:

return jsonify({
    'url': url,
    'response': response_text
})

The full example, including an HTML page and Flask code can be found on GitHub.

The result could look like this:

Query results in the closest URL using vectorized search and ChatGPT answering the question based on the contents the URL points at

Conclusion

Using vectorized search and cosine similarity, we can quickly query a database of blog posts and retrieve the most relevant post. By combining OpenAI’s embeddings API, Pinecone, and the ChatCompletion API, we can create a powerful tool for searching and retrieving blog post content using natural language.

Note that there are some potential issues as well. The code we show is merely a starting point:

  • Limitations of cosine similarity: it does not take into account all properties of the vectors, which can lead to misleading results
  • Prompt engineering: the prompt we use works but there might be prompts that just work better. Experimentation with different prompts is crucial!
  • Embeddings: OpenAI embeddings are trained on a large corpus of text, which may not be representative of the domain-specific language in the posts
  • Performance might not be sufficient if the size of the database grows large. For my blog, that’s not really an issue. 😀

Adding Authentication and Authorization to an Azure Static Web App

In a previous post, we created a static web app that retrieves documents from Cosmos DB via an Azure Function. The Azure Function got deployed automatically and runs off the same domain as your app. In essence, that frees you from having to setup Azure Functions separately and configuring CORS in the process.

Instead of allowing anonymous users to call the api at https://yourwebapp/api/device, I only want to allow specific users to do so. In this post, we will explore how that works.

You can find the source code of the static web app and the API on GitHub: https://github.com/gbaeke/az-static-web-app.

More into video tutorials? Then check out the video below. I recommend 1.2x speed! 😉

Full version about creating the app and protecting the API

Create a routes.json

To define the protected routes, you need routes.json in the root of your project:

routes.json to protect /api/*

The routes.json file serves multiple purposes. Check out how it works here. In my case, I just want to protect the /api/* routes and allow the Authenticated users role. The Authenticated role is a built-in role but you should create custom roles to protect sensitive data (more info near the end of this post). For our purposes, the platform error override is not needed and be removed. These overrides are useful though as they allow you to catch errors and act accordingly.

Push the above change to your repository for routes.json to go in effect. Once you do, access to /api/* requires authentication. Without it, you will get a 401 Unauthorized error. To fix that, invite your users and define roles.

Inviting Users

In Role Management, you can invite individual users to your app:

User gbaeke (via GitHub) user identity added

Just click Invite and fill in the blanks. Inviting a user results in an invitation link you should send the user. Below is an example for my Twitter account:

Let’s invite myself via my Twitter account

When I go to the invite link, I can authorize the app:

Authorizing Static Web Apps to access my account

After this, you will also get a Consent screen:

Granting Consent (users can always remove their data later; yeah right 😉)

When consent is given, the application will open with authentication. I added some code to the HTML page to display when the user is authenticated. The user name can be retrieved with a call to .auth/me (see later).

App with Twitter handle shown

In the Azure Portal, the Twitter account is now shown as well.

User added to roles of the web app

Note: anyone can actually authenticate to your app; you do not have to invite them; you invite users only when you want to assign them custom roles

Simple authentication code

The HTML code in index.html contains some links to login and logout:

  • To login: a link to /.auth/login/github
  • To logout: a link to /.auth/logout

Microsoft provides these paths under /.auth automatically to support the different authentication scenarios. In my case, I only have a GitHub login. To support Twitter or Facebook logins, I would need to provide some extra logic for the user to choose the provider.

In the HTML, the buttons are shown/hidden depending on the existence of user.UserDetails. The user information is retrieved via a call to the system-provided /.auth/me with the code below that uses fetch:

async  getUser() {
     const response = await fetch("/.auth/me");
     const payload = await response.json();
     const { clientPrincipal } = payload;
     this.user = clientPrincipal;

user.UserDetails is just the username on the platform: gbaeke on GitHub, geertbaeke on Twitter, etc…

The combination of the routes.json file that protects /api/* and the authentication logic above results in the correct retrieval of the Cosmos DB documents. Note that when you are not authorized, the list is just empty with a 401 error in the console. In reality, you should catch the error and ask the user to authenticate.

One way of doing so is redirecting to a login page. Just add logic to routes.json that serves the path you want to use when the errorType is Unauthenticated as shown below:

"platformErrorOverrides": [
    {
      "errorType": "NotFound",
      "serve": "/custom-404.html"
    },
    {
      "errorType": "Unauthenticated",
      "serve": "/login"
    }
  ]

The danger of the Authenticated role

Above, we used the Authenticated role to provide access to the /api/* routes. That is actually not a good idea once you realize that non-invited users can authenticate to your app as well. As a general rule: always use a custom role to allow access to sensitive resources. Below, I changed the role in routes.json to reader. Now you can invite users and set their role to reader to make sure that only invited users can access the API!

"routes": [
      {
        "route": "/api/*",
        "allowedRoles": ["reader"]
      }

      
    ]

Below you can clearly see the effect of this. I removed GitHub user gbaeke from the list of users but I can still authenticate with the account. Because I am missing the reader role, the drop down list is not populated and a 401 error is shown:

Authenticated but not in the reader role

Conclusion

In this post, we looked at adding authentication and authorization to protect calls to our Azure Functions API. Azure Static Web Apps tries to make that process as easy as possible and we all now how difficult authentication and authorization can be in reality! And remember: protect sensitive API calls with custom roles instead of the built-in Authenticated role.

First Look at Azure Static Web Apps

Note: part 2 looks at the authentication and authorization part.

At Build 2020, Microsoft announced Azure Static Web Apps, a new way to host static web apps on Azure. In the past, static web apps, which are just a combination of HTML, JavaScript and CSS, could be hosted in a Storage Account or a regular Azure Web App.

When you compare Azure Static Web Apps with the Storage Account approach, you will notice there are many more features. Some of those features are listed below (also check the docs):

  • GitHub integration: GitHub actions are configured for you to easily deploy your app from your GitHub repository to Azure Static Web Apps
  • Integrated API support: APIs are provided by Azure Functions with an HTTP Trigger
  • Authentication support for Azure Active Directory, GitHub and other providers
  • Authorization role definitions via the portal and a roles.json file in your repository
  • Staging versions based on a pull request

It all works together as shown below:

SWAdiagram.png
Azure Static Web Apps (from https://techcommunity.microsoft.com/t5/azure-app-service/introducing-app-service-static-web-apps/ba-p/1394451)

As a Netlify user, this type of functionality is not new to me. Next to static site hosting, they also provide serverless functions, identity etc…

If you are more into video tutorials…

Creating the app and protecting calls to the API

Let’s check out an example to see how it works on Azure…

GitHub repository

The GitHub repo I used is over at https://github.com/gbaeke/az-static-web-app. You will already see the .github/workflows folder that contains the .yml file that defines the GitHub Actions. That folder will be created for you when you create the Azure Static Web App.

The static web app in this case is a simple index.html that contains HTML, JavaScript and some styling. Vue.js is used as well. When you are authenticated, the application reads a list of devices from Cosmos DB. When you select a device, the application connects to a socket.io server, waiting for messages from the chosen device. The backend for the messages come from Redis. Note that the socket.io server and Redis configuration are not described in this post. Here’s a screenshot from the app with a message from device01. User gbaeke is authenticated via GitHub. When authenticated, the device list is populated. When you log out, the device list is empty. There’s no error checking here so when the device list cannot be populated, you will see a 404 error in the console. 😉

Azure Static Web App in action

Note: Azure Static Web Apps provides a valid certificate for your app, whether it uses a custom domain or not; in the above screenshot, Not secure is shown because the application connects to the socket.io server over HTTP and Mixed Content is allowed; that is easy to fix with SSL for the socket.io server but I chose to not configure that

The API

Although API is probably too big a word for it, the devices drop down list obtains its data from Cosmos DB, via an Azure Function. It was added from Visual Studio Code as follows:

  • add the api folder to your project
  • add a new Function Project and choose the api folder: simply use F1 in Visual Studio Code and choose Azure Functions: Create New Project… You will be asked for the folder. Choose api.
  • modify the code of the Function App to request data from Cosmos DB

To add an Azure Function in Visual Studio Code, make sure you install the Azure Functions extension and the Azure Function Core Tools. I installed the Linux version of Core Tools in WSL 2.

Adding the function (JavaScript; HTTP Trigger, anonymous, name of GetDevice) should result in the following structure:

Function app as part of the static web app (api folder)

Next, I modified function.json to include a Cosmos DB input next to the existing HTTP input and output:

{
  "bindings": [
    {
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ],
      "route": "device"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    },
    {
      "name": "devices",
      "type": "cosmosDB",
      "direction": "in",
      "databaseName": "geba",
      "collectionName": "devices",
      "sqlQuery": "SELECT c.id, c.room FROM c",
      "connectionStringSetting": "CosmosDBConnection"    
    }
  ]
}

In my case, I have a Cosmos DB database geba with a devices collection. Device documents contain an id and room field which simply get selected with the query: SELECT c.id, c.room FROM c.

Note: with route set to device, the API will need to be called with /api/device instead of /api/GetDevice.

The actual function in index.js is kept as simple as possible:

module.exports = async function (context, req) {
    context.log('Send devices from Cosmos');
  
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: context.bindings.devices
    };
    
};

Yes, the above code is all that is required to retrieve the JSON output of the Cosmos DB query and set is as the HTTP response.

Note that local.settings.json contains the Cosmos DB connection string in CosmosDBConnection:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "",
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "CosmosDBConnection": "AccountEndpoint=https://geba-cosmos.documents.a...;"
  }
}

You will have to make sure the Cosmos DB connection string is made known to Azure Static Web App later. During local testing, local.settings.json is used to retrieve it. local.settings.json is automatically added to .gitignore to not push it to the remote repository.

Local Testing

We can test the app locally with the Live Server extension. But first, modify .vscode/settings.json and add a proxy for your api:

"liveServer.settings.proxy": {
        "enable": true,
        "baseUri": "/api",
        "proxyUri": "http://172.28.242.32:7071/api"
    }

With the above setting, a call to /api via Live Server will be proxied to Azure Functions on your local machine. Note that the IP address refers to the IP address of WSL 2 on my Windows 10 machine. Find it by running ifconfig in WSL 2.

Before we can test the application locally, start your function app by pressing F5. You should see:

Function App started locally

Now go to index.html, right click and select Open with Live Server. The populated list of devices shows that the query to Cosmos DB works and that the API is working locally:

Test the static web app and API locally

Notes on using WSL 2:

  • for some reason, http://localhost:5500/index.html (Live Server running in WSL 2) did not work from the Windows session although it should; in the screenshot above, you see I replaced localhost with the IP address of WSL 2
  • time skew can be an issue with WSL 2; if you get an error during the Cosmos DB query of authorization token is not valid at the current time, perform a time sync with ntpdate time.windows.com from your WSL 2 session

Deploy the Static Web App

Create a new Static Web App in the portal. The first screen will be similar to the one below:

Static Web App wizard first screen

You will need to authenticate to GitHub and choose your repository and branch as shown above. Click Next. Fill in the Build step as follows:

Static Web App wizard second screen

Our app will indeed run off the root. We are not using a framework that outputs a build to a folder like dist so you can leave the artifact location blank. We are just serving index.html off the root.

Complete the steps for the website to be created. You GitHub Action will be created and run for the first time. You can easily check the GitHub Action runs from the Overview screen:

Checking the GitHub Action runs

Here’s an example of a GitHub action run:

A GitHub Action run

When the GitHub Action is finished, your website will be available on a URL provided by Azure Static Web Apps. In my case: https://polite-cliff-01b6ab303.azurestaticapps.net.

To make sure the connection to Cosmos DB works, add an Application Setting via Configuration:

Adding the Cosmos DB connection string

The Function App that previously obtained the Cosmos DB connection string from local.settings.json can now retrieve the value from Application Settings. Note that you can also change these settings via Azure CLI.

Conclusion

In this post, we created a simple web app in combination with an function app that serves as the API. You can easily create and test the web app and function app locally with the help of Live Server and a Live Server proxy. Setting up the web app is easy via the Azure Portal, which also creates a GitHub Action that takes care of deployment for you. In a next post, we will take a look at enabling authentication via the GitHub identity provider and only allowing authorized users to retrieve the list of devices.

Writing a Kubernetes operator with Kopf

In today’s post, we will write a simple operator with Kopf, which is a Python framework created by Zalando. A Kubernetes operator is a piece of software, running in Kubernetes, that does something application specific. To see some examples of what operators are used for, check out operatorhub.io.

Our operator will do something simple in order to easily grasp how it works:

  • the operator will create a deployment that runs nginx
  • nginx will serve a static website based on a git repository that you specify; we will use an init container to grab the website from git and store it in a volume
  • you can control the number of instances via a replicas parameter

That’s great but how will the operator know when it has to do something, like creating or updating resources? We will use custom resources for that. Read on to learn more…

Note: source files are on GitHub

Custom Resource Definition (CRD)

Kubernetes allows you to define your own resources. We will create a resource of type (kind) DemoWeb. The CRD is created with the YAML below:

# A simple CRD to deploy a demo website from a git repo
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: demowebs.baeke.info
spec:
  scope: Namespaced
  group: baeke.info
  versions:
    - name: v1
      served: true
      storage: true
  names:
    kind: DemoWeb
    plural: demowebs
    singular: demoweb
    shortNames:
      - dweb
  additionalPrinterColumns:
    - name: Replicas
      type: string
      priority: 0
      JSONPath: .spec.replicas
      description: Amount of replicas
    - name: GitRepo
      type: string
      priority: 0
      JSONPath: .spec.gitrepo
      description: Git repository with web content

For more information (and there is a lot) about CRDs, see the documentation.

Once you create the above resource with kubectl apply (or create), you can create a custom resource based on the definition:

apiVersion: baeke.info/v1
kind: DemoWeb
metadata:
  name: demoweb1
spec:
  replicas: 2
  gitrepo: "https://github.com/gbaeke/static-web.git"

Note that we specified our own API and version in the CRD (baeke.info/v1) and that we set the kind to DemoWeb. In the additionalPrinterColumns, we defined some properties that can be set in the spec that will also be printed on screen. When you list resources of kind DemoWeb, you will the see replicas and gitrepo columns:

Custom resources based on the DemoWeb CRD

Of course, creating the CRD and the custom resources is not enough. To actually create the nginx deployment when the custom resource is created, we need to write and run the operator.

Writing the operator

I wrote the operator on a Mac with Python 3.7.6 (64-bit). On Windows, for best results, make sure you use Miniconda instead of Python from the Windows Store. First install Kopf and the Kubernetes package:

pip3 install kopf kubernetes

Verify you can run kopf:

Running kopf

Let’s write the operator. You can find it in full here. Here’s the first part:

Naturally, we import kopf and other necessary packages. As noted before, kopf and kubernetes will have to be installed with pip. Next, we define a handler that runs whenever a resource of our custom type is spotted by the operator (with the @kopf.on.create decorator). The handler has two parameters:

  • spec object: allows us to retrieve our custom properties with spec.get (e.g. spec.get(‘replicas’, 1) – the second parameter is the default value)
  • **kwargs: a dictionary with lots of extra values we can use; we use it to retrieve the name of our custom resource (e.g. demoweb1); we can use that name to derive the name of our deployment and to set labels for our pods

Note: instead of using **kwargs to retrieve the name, you can also define an extra name parameter in the handler like so: def create_fn(spec, name, **kwargs); see the docs for more information

Our deployment is just yaml stored in the doc variable with some help from the Python yaml package. We use spec.get and the name variable to customise it.

After the doc variable, the following code completes the event handler:

The rest of the operator

With kopf.adopt, we make sure the deployment we create is a child of our custom resource. When we delete the custom resource, its children are also deleted.

Next, we simply use the kubernetes client to create a deployment via the apps/v1 api. The method create_namespaced_deployment takes two required parameters: the namespace and the deployment specification. Note there is only minimal error checking here. There is much more you can do with regards to error checking, retries, etc…

Now we can run the operator with:

kopf run operator-filename.py

You can perfectly run this on your local workstation if you have a working kube config pointing at a running cluster with the CRD installed. Kopf will automatically use that for authentication:

Running the operator on your workstation

Running the operator in your cluster

To run the operator in your cluster, create a Dockerfile that produces an image with Python, kopf, kubernetes and your operator in Python. In my case:

FROM python:3.7
RUN mkdir /src
ADD with_create.py /src
RUN pip install kopf
RUN pip install kubernetes
CMD kopf run /src/with_create.py --verbose

We added the verbose parameter for extra logging. Next, run the following commands to build and push the image (example with my image name):

docker build -t gbaeke/kopf-demoweb .
docker push gbaeke/kopf-demoweb

Now you can deploy the operator to the cluster:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: demowebs-operator
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      application: demowebs-operator
  template:
    metadata:
      labels:
        application: demowebs-operator
    spec:
      serviceAccountName: demowebs-account
      containers:
      - name: demowebs
        image: gbaeke/kopf-demoweb

The above is just a regular deployment but the serviceAccountName is extremely important. It gives kopf and your operator the required access rights to create the deployment is the target namespace. Check out the documentation to find out more about the creation of the service account and the required roles. Note that you should only run one instance of the operator!

Once the operator is deployed, you will see it running as a normal pod:

The operator is running

To see what is going on, check the logs. Let’s show them with octant:

Your operator logs

At the bottom, you see what happens when a creation event is detected for a resource of type DemoWeb. The spec is shown with the git repository and the number on replicas.

Now you can create resources of kind DemoWeb and see what happens. If you have your own git repository with some HTML in it, try to use that. Otherwise, just use mine at https://github.com/gbaeke/static-web.

Conclusion

Writing an operator is easy to do with the Kopf framework. Do note that we only touched on the basics to get started. We only have an on.create handler, and no on.update handler. So if you want to increase the number of replicas, you will have to delete the custom resource and create a new one. Based on the example though, it should be pretty easy to fix that. The git repo contains an example of an operator that also implements the on.update handler (with_update.py).