Creating an agent with Hugging Face smolagents and Azure OpenAI

Artificial Intelligence (AI) agents have garnered significant attention, with numerous posts discussing them on platforms such as LinkedIn and X/Twitter. In that sense, this post is not different. Instead of theory though, let’s look at building an agent that has a reasoning loop in a very simple way.

Although you can build an agent from scratch, I decided to use the smolagents library from Hugging Face for several reasons:

  • It is very easy to use
  • It uses a reasoning loop similar to ReAct: when it receives a question, it thinks about how to solve it (thought), it performs one or more actions and then observes these actions. These thought-actions-observations steps get repeated until the agent decides the answer is correct or when the maximum amount of steps is reached
  • It is very easy to add tools to the agent
  • There are multiple agent types to choose from, depending on your use case. A Code Agent is the agent of choice.

The reasoning loop is important here. There is no fixed path the agent will take to answer your question or reach its goal. That’s what makes it an agent versus a workflow, which has a predefined path. There is more to that but let’s focus on building the agent.

The agent uses an LLM to reason, act and observe. We will use Azure OpenAI gpt-4o in this post. I assume you have access to Azure and that you are able to deploy an Azure OpenAI services. I use an Azure OpenAI service in the Sweden Central region. To use the service, you need the following:

  • The model endpoint
  • The Azure OpenAI API key

Getting started

Clone the repository at https://github.com/gbaeke/smolagents_post into a folder. In that folder, create a Python virtual environment and run the following command:

pip install -r requirements.txt

This will install several packages in the virtual environement:

  • smolagents: the Hugging Face library
  • litellm: used to support OpenAI, Anthropic and many other LLMs in smolagents
  • arize-phoenix: used to create OpenTelemetry bases traces and spans to inspect the different agent steps

Add a .env file with the following content:

AZURE_OPENAI_API_KEY=your_azure_openai_key
AZURE_API_BASE=https://your_service_name.openai.azure.com/
AZURE_MODEL=name_of_your_deployed_model

In the cloned repo, there is a get_started.py. Before running it, start Phoenix Arize with python -m phoenix.server.main serve in another terminal. This gives you a UI to inspect OpenTelemetry traces at http://localhost:6006/projects. Traces will be in the default project.

Now run get_started.py as follows:

python get_started.py "How to make cookies"

The result is not too exciting. But it does show that the agent works and is able to respond with the help of the Azure OpenAI model that you used. You should find a trace in Phoenix Arize as well:

How to make cookies trace

Above, the agent needed only one step. It’s important to know that we use a CodeAgent here. Such an agent writes code to provide you with an answer. The code it wrote was as follows:

Thought: I will write the answer in plain text detailing the steps to make cookies.

Code:
```py
cookie_recipe = """\
To make cookies, you will need the following ingredients:
- 1 cup of unsalted butter, softened
- 1 cup of granulated sugar
- 1 cup of packed brown sugar
- 2 large eggs
- 1 teaspoon of vanilla extract
- 3 cups of all-purpose flour
- 1/2 teaspoon of baking soda
- 1 teaspoon of baking powder
- 1/2 teaspoon of salt
- 2 cups of chocolate chips (optional)

Steps:
1. Preheat your oven to 350°F (175°C).
2. In a large mixing bowl, cream together the butter, granulated sugar, and brown sugar until light and fluffy.
3. Beat in the eggs one at a time, then stir in the vanilla extract.
4. In a separate bowl, whisk together the flour, baking soda, baking powder, and salt.
5. Gradually blend the dry ingredients into the wet mixture until well combined.
6. Fold in the chocolate chips if desired.
7. Drop spoonfuls of dough onto ungreased baking sheets, spacing them about 2 inches apart.
8. Bake in the preheated oven for about 10-12 minutes, or until the edges are golden brown.
9. Let the cookies cool on the baking sheets for a few minutes before transferring to wire racks to cool completely.

Enjoy your homemade cookies!
"""

final_answer(cookie_recipe)
```

Of course, smolagents uses a prompt to tell the model and specifically the Code Agent how to behave. The code generates a final answer which will be the answer the user sees.

Let’s take a look at get_started.py:

from smolagents import CodeAgent, LiteLLMModel
import os
import sys
from dotenv import load_dotenv

# instrumentation
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from openinference.instrumentation.smolagents import SmolagentsInstrumentor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

endpoint = "http://0.0.0.0:6006/v1/traces"
trace_provider = TracerProvider()
trace_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))

SmolagentsInstrumentor().instrument(tracer_provider=trace_provider)


def print_usage():
    print("\nUsage: python app.py \"your question in quotes\"")
    print("\nExample:")
    print("  python app.py \"Find the cheapest laptop\"")
    print("  python app.py \"Find a Python tutorial to write a FastAPI API\"")
    sys.exit(1)

def main():
    # Check if a question was provided
    if len(sys.argv) != 2:
        print("\nError: Please provide a question as a command-line argument.")
        print_usage()

    # Get the question from command line
    question = sys.argv[1]

    # Load environment variables from .env file
    load_dotenv()

    # Check for required environment variables
    if not os.getenv("AZURE_OPENAI_API_KEY"):
        print("\nError: OPENAI_API_KEY not found in .env file")
        sys.exit(1)
    if not os.getenv("BING_SUBSCRIPTION_KEY"):
        print("\nError: BING_SUBSCRIPTION_KEY not found in .env file")
        sys.exit(1)
    if not os.getenv("AZURE_API_BASE"):
        print("\nError: AZURE_API_BASE not found in .env file")
        sys.exit(1)
    if not os.getenv("AZURE_MODEL"):
        print("\nError: AZURE_MODEL not found in .env file")
        sys.exit(1)

    # get keys from .env
    azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")
    azure_api_base = os.getenv("AZURE_API_BASE")
    azure_model = os.getenv("AZURE_MODEL")
    # refer to Azure model as azure/NAME_OF_YOUR_DEPLOYED_MODEL
    model = LiteLLMModel(model_id=f"azure/{azure_model}", api_key=azure_openai_api_key, api_base=azure_api_base, max_tokens=4096)
    
    agent = CodeAgent(
        model=model,
        max_steps=10,
        verbosity_level=2,
        tools=[],
        # additional_authorized_imports=["requests", "bs4"]
    )

    extra_instructions="""
        Answer in plain text. Do not use markdown or JSON.
    """

    result = agent.run(question + " " + extra_instructions)

if __name__ == "__main__":
    main()
    

Most of the code is imports, getting environment variables etc… Let’s focus on the core:

  • Specifying the model the agent should use: smolagents relies on LiteLLM to give you access to many models. One of those is Azure OpenAI. To tell LiteLLM what model we use, we prefix the model name with azure/. You can also use models directly from Hugging Face or local models.
  • Creating the agent: in this case we use a CodeAgent instead of a ToolCallingAgent; as you have seen above, a CodeAgent writes Python code to provide answers and executes that Python code; you will see later how it handles tools
  • Doing an agent run: simply call the run method with your question; append extra instructions to your question as needed

The verbosity level ensures we can see what happens in the console:

Console logging by the agent

In just a few lines of code, you have an agent that can use code to answer your questions. There is no predefined path it takes.

Try asking “What is the last post on https://atomic-temporary-16150886.wpcomstaging.com“. It will try to write code that uses Python libraries that are not allowed by default. By uncommenting the additional_authorized_imports line, the agent will probably be able to answer the question anyway:

Answering “What is the last post on https://atomic-temporary-16150886.wpcomstaging.com?”

The agent decides to use the requests and BeatifulSoup libraries to scrape this blog and retrieve the latest post. How cool is that? 😉

Adding tools

Although you can let the agent run arbitrary code, you will probably want to give the agent extra tools. Those tools might require API keys and other parameters that the Code Agent will not know how to use. They might query internal knowledge bases or databases and much, much more.

As an example, we will give the agent a Bing Search tool. It can use the tool to search for information on the web. If you enable the additional imports, it can also scrape those URLs for extra content.

Note: smolagents has a default Google Search tool that uses the Serper API.

Note: scraping will not work for dynamically loaded content; use tools such as https://firecrawl.dev or https://jina.ai with those websites; alternatively, write a tool that uses a headless browser

If you cloned the repository, you have the following:

  • search.py: the same code as get_started.py but with the Bing tool included
  • a tools folder: contains bing_search.py that implements the tool

In search.py, you will find the following extra lines throughout the code:

from tools import bing_search  # import the tool

# add the tool to a list of tools
tools = [
  bing_search.BingSearchTool(api_key=bing_subscription_key)
]

# agent with tools
agent = CodeAgent(
     model=model,
     max_steps=10,
     verbosity_level=2,
     tools=tools,
     additional_authorized_imports=["requests", "bs4"]
)

A tool is either a Python class based on the smolagents Tool class, or a function decorated with the @tool decorator. Here, we are using a class:

  • The description field in the class is used by the agent to know what the tool can do
  • The inputs field describes the parameter the tool can accept
  • The output fields sets the type of the output, e.g., string

The most important method of the class is the forward method. When the agent uses the tool, it executes that method. Implement the tool’s behavior in that method. The code below is the Bing tool:

from smolagents import Tool
import requests
from typing import Dict, List

class BingSearchTool(Tool):
    name = "bing_search"
    description = """
    This tool performs a Bing web and image search and returns the top search results for a given query.
    It returns a string containing formatted search results including web pages and images.
    It is best for overview information or to find a url to scrape."""
    
    inputs = {
        "query": {
            "type": "string",
            "description": "The search query to look up on Bing",
        },
        "num_results": {
            "type": "integer",
            "description": "Number of search results to return (default: 5)",
            "default": 5,
            "nullable": True
        },
        "include_images": {
            "type": "boolean",
            "description": "Whether to include image results (default: False)",
            "default": False,
            "nullable": True
        }
    }
    output_type = "string"

    def __init__(self, api_key: str):
        super().__init__()
        self.api_key = api_key
        self.web_endpoint = "https://api.bing.microsoft.com/v7.0/search"
        self.image_endpoint = "https://api.bing.microsoft.com/v7.0/images/search"
        
    def _get_web_results(self, query: str, num_results: int) -> List[str]:
        headers = {"Ocp-Apim-Subscription-Key": self.api_key}
        params = {
            "q": query,
            "count": num_results,
            "textDecorations": False,
            "textFormat": "Raw"
        }
        
        response = requests.get(self.web_endpoint, headers=headers, params=params)
        response.raise_for_status()
        search_results = response.json()
        
        formatted_results = []
        for item in search_results.get("webPages", {}).get("value", []):
            result = f"Title: {item['name']}\nSnippet: {item['snippet']}\nURL: {item['url']}\n"
            formatted_results.append(result)
            
        return formatted_results

    def _get_image_results(self, query: str, num_results: int) -> List[str]:
        headers = {"Ocp-Apim-Subscription-Key": self.api_key}
        params = {
            "q": query,
            "count": num_results,
            "textDecorations": False,
            "textFormat": "Raw"
        }
        
        response = requests.get(self.image_endpoint, headers=headers, params=params)
        response.raise_for_status()
        image_results = response.json()
        
        formatted_results = []
        for item in image_results.get("value", []):
            result = f"Image Title: {item['name']}\nImage URL: {item['contentUrl']}\nThumbnail URL: {item['thumbnailUrl']}\nSource: {item['hostPageDisplayUrl']}\n"
            formatted_results.append(result)
            
        return formatted_results
        
    def forward(self, query: str, num_results: int = 5, include_images: bool = True) -> str:
        try:
            results = []
            
            # Get web results
            web_results = self._get_web_results(query, num_results)
            if web_results:
                results.append("=== Web Results ===")
                results.extend(web_results)
            
            # Get image results if requested
            if include_images:
                image_results = self._get_image_results(query, num_results)
                if image_results:
                    results.append("\n=== Image Results ===")
                    results.extend(image_results)
            
            return "\n".join(results) if results else "No results found."
            
        except requests.exceptions.RequestException as e:
            raise Exception(f"Bing search failed: {str(e)}") 

To try the tool, make sure you create a Bing Search resource in Azure and grab its key. Note that we are using Bing Search and not Bing Custom Search here. When you have the key, add it to the .env file:

BING_SUBSCRIPTION_KEY=your_bing_search_api_key

Now run the following command (or similar):

python search.py "Search the web for information about DeepSeek R1. Summarize and provide links"

The agent should use multiple steps before reaching the final answer:

Trace of the search

In step 0, the agent decides to use the BingSearchTool. It writes the following code and executes it (remember it is a CodeAgent):

results = bing_search(query="DeepSeek R1", num_results=5)
print(results)

The response is a list of web and images results.

Sometimes, there are steps that do not have code to execute. Step 1 and 2 provide LLM output which the CodeAgent cannot execute. In your case, it might not happen or it might be a different number of steps. In Step 3, that is solved as the assistant output is code that uses the final_answer call to provide the final answer and stop. It basically self corrects at the expense of some extra tokens:

Thought: I will correctly format the plain text summary in the code block to ensure it handles the string properly, and then provide the final answer.

Code:

summary = """
DeepSeek R1 is an advanced AI model developed by DeepSeek-AI. It uses large-scale reinforcement learning (RL) directly on the base model without relying on supervised fine-tuning (SFT) as a preliminary step. The model has been designed to perform a variety of reasoning tasks with high accuracy and speed. DeepSeek R1 and its variants, such as DeepSeek R1-Zero and DeepSeek R1-Lite-Preview, have been launched for web, app, and API usage, competing with other leading AI models like OpenAI's Model o1.

Key Highlights:
1. DeepSeek R1 GitHub Repository: https://github.com/deepseek-ai/DeepSeek-R1
2. DeepSeek Official Website: https://www.deepseek.com/
3. DeepSeek R1 Research Paper on arXiv: https://arxiv.org/abs/2501.12948
4. DeepSeek R1 API Documentation: https://api-docs.deepseek.com/news/news1120
5. Article on Nature about DeepSeek R1: https://www.nature.com/articles/d41586-025-00229-6

DeepSeek R1 is positioned as a powerful AI model with significant advancements in reasoning and inference capabilities, making it a competitive alternative to other leading models in the AI community.
"""
final_answer(summary)

Note: I feel those errors are a bug that might be related to the system prompt of the Code Agent.

Running code securely

Our Code Agent runs the code on the same system as the agent. For extra security, it is recommended to use secure code execution in a remote sandbox environment. To that end, smolagents supports E2B. Check the smolagents docs for more information.

E2B is similar to Azure Container Apps Dynamic Sessions. Sadly, smolagents does not support that yet.

Conclusion

We have barely scratched the surface of what is possible with smolagents. It is a small and simple library with which you can quickly build an agent that reasons, acts and observes in multiple steps until it reaches an answer. It supports a wide range of LLMs and has first-class support for Code Agents. We used the Code Agent in this post. There is another agent, the ToolCallingAgent, which uses the LLM to generate the tool calls using JSON. However, using the Code Agent is the recommended approach and is more flexible.

If you need to build applications where you want the LLM to decide on the course of actions, smolagents is an easy to use library to get started. Give it a go and try it out!

Using Bing Search to ground LLM responses

We often get the question to build an assistant based on the content of a website. These assistants often get implemented in one of two ways:

  • Turn-based chat assistant: user can ask a question and follow-up questions
  • Enhanced search: user asks a questions without the option to ask follow-up questions; this is often used to replace the built-in search functionalities of a website

In both cases, you have to make a decision about how to ground the LLM with your website content. There are several approaches:

  • Use the website’s content management system (CMS): extract the content from the CMS, chunk it optimally and store it in a vector database like Azure AI Search
  • Crawl the website and scrape the pages: the scraped content can then be chunked and vectorized just as in the first option
  • Use a search engine: use Google or Bing to search for answers and optionally scrape pages in real time

In the first two approaches, you need a pipeline and a vector database to properly store and update your vectorized chunks. It is often underestimated that creating and maintaining such a pipeline is a complex matter. You have to add new content, update existing content and remove content that is not required anymore. You need to run that pipeline on a schedule or based on user demand. You have to add proper logging to know when it goes wrong etc… It is a never ending story.

The search engine approach is much simpler and might be the easiest to implement, depending on your use case. Let’s take a look at how this works. We will look at two approaches:

  • Custom: call the Bing API from your code and use the output in your prompt; you have full control
  • Azure AI Agent Service: use the Bing grounding tool that is part of the knowledge tools of the agent service; the grounding tool is somewhat of a black box which means less control but easier to use

Calling the Bing API from your code

To use the Bing API and make it work on a subset of websites, you should use a Bing Custom Search resource in Azure:

Bing Custom Search in Azure

To customize the search, you can go to the instructions on Microsoft Learn. They explain how to go to the Bing custom search portal to create a custom search instance. The screenshot below shows a custom instance named baeke.info:

Bing Custom Search Instance

This custom instance contains my blog because I want the custom search resource to only return results from my blog and not any other website.

When you create a custom instance, you get a Custom Configuration ID you can provide to the search API. Ensure to publish the custom instance before using it in your code.

To search using a custom configuration ID, you can use the following code. I used the REST API below:

bing_endpoint = 'https://api.bing.microsoft.com/v7.0/custom/search'

headers = {
    'Ocp-Apim-Subscription-Key': bing_subscription_key
}
params = {
    'q': query,
    'customconfig': 'YOUR_CUSTOM_CONFIG_KEY',
    'mkt': 'en-US'
}
response = requests.get(bing_endpoint, headers=headers, params=params)
web_data = response.json()

The bing_subscription_keycan be found in your Bing Custom Search resource in Azure. The query q was provided by the user. The customconfig field is the custom configuration ID of the custom search instance.

The response, web_data, should contain a webPages field that has a value field. The value field is an array of search results. In each result is a url and a snippet field. The snippet should be relevant to the user’s query and can be used as grounding information. Below is the first result for the query “What is the OpenAI Assistants API” from my blog:

{
"id": "https://api.bing.microsoft.com/api/v7/#WebPages.0",
"name": "Using tools with the Azure OpenAI Assistants API – baeke.info",
"url": "https://atomic-temporary-16150886.wpcomstaging.com/2024/02/09/using-tools-with-the-azure-openai-assistants-api/",
"urlPingSuffix": "DevEx,5113.1",
"datePublished": "2024-02-09T00:00:00.0000000",
"datePublishedDisplayText": "9 Feb 2024",
"isFamilyFriendly": true,
"displayUrl": "https://atomic-temporary-16150886.wpcomstaging.com/2024/02/09/using-tools-with-the-azure-openai-assistants-api",
"snippet": "In this post, we will provide the assistant with custom tools. These custom tools use the function calling features of more recent GPT models. As a result, these custom tools are called functions in the Assistants API. What’s in a name right? There are a couple of steps you need to take for this to work: Create an assistant and give it a name ...",
"deepLinks": [],
"dateLastCrawled": "2025-01-14T18:08:00.0000000Z",
"openGraphImage": {
    "contentUrl": "https://i0.wp.com/atomic-temporary-16150886.wpcomstaging.com/wp-content/uploads/2024/02/dallc2b7e-2024-02-09-16.49.38-visualize-a-cozy-and-inviting-office-space-where-a-charming-ai-assistant-is-the-heart-of-interaction-taking-the-form-of-a-small-adorable-robot-with-.webp?resize=1200%2C1024&ssl=1",
    "width": 0,
    "height": 0
},
"fixedPosition": false,
"language": "en",
"isNavigational": true,
"noCache": true,
"siteName": "baeke.info"
}

Above, the first result is actually not the most relevant. However, the query returns 10 results by default and all 10 snippets can be provided as context to your LLM. Typically, a default search with 10 results takes under a second to complete.

Of course, the snippets are relatively short. They are snippets after all. If the snippets do not provide enough context, you can scrape one or more pages from the results and add that to your context.

To scrape web pages, you have several options:

  • Use a simple HTTP request: this if not sufficient to retrieve content from dynamic websites that use Javascript to load content; if the website is fully static, you can use this approach
  • Use scraping services: scraping services like Jina Reader (https://jina.ai/) or Firecrawl (https://www.firecrawl.dev/); although they have a free tier, most production applications will require paying extra for these services
  • Use open source solutions: there are many available solutions; Crawl4AI (https://crawl4ai.com/mkdocs/) is a service with many options; it is a bit harder to use and there are lots of dependencies because the crawler relies on headless browsers and tools like Playwright.

Below is a basic class that uses Jina to scrape URLs in parallel:

import os
import asyncio
import logging
import aiohttp
from typing import List, Dict, Any
from dotenv import load_dotenv

load_dotenv()

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ParallelCrawler:
    def __init__(self, urls: List[str], max_concurrent: int = 3, api_key: str = None):
        logger.info(f"Initializing crawler with {len(urls)} URLs and max_concurrent={max_concurrent}")
        self.urls = urls
        self.max_concurrent = max_concurrent
        self.api_key = api_key or os.environ.get('JINA_API_KEY')
        self.base_url = 'https://r.jina.ai/'

    async def fetch_url(self, session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
        jina_url = f"{self.base_url}{url}"
        logger.debug(f"Fetching URL: {jina_url}")
        headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {self.api_key}",
            "X-Retain-Images": "none",
            "X-Return-Format": "markdown"
        }
        
        try:
            async with session.get(jina_url, headers=headers) as response:
                logger.info(f"Response status for {url}: {response.status}")
                if response.status != 200:
                    logger.error(f"Error fetching {url}: HTTP {response.status}")
                    return None
                return await response.json()
        except Exception as e:
            logger.error(f"Exception while fetching {url}: {str(e)}")
            raise

    async def crawl(self):
        logger.info(f"Starting parallel crawling of {len(self.urls)} URLs")
        all_results = []
        
        async with aiohttp.ClientSession() as session:
            tasks = []
            for url in self.urls:
                logger.debug(f"Creating task for URL: {url}")
                tasks.append(self.fetch_url(session, url))
            
            logger.info(f"Executing {len(tasks)} tasks concurrently")
            responses = await asyncio.gather(*tasks, return_exceptions=True)
            
            for i, response in enumerate(responses):
                if isinstance(response, Exception):
                    logger.error(f"Failed to process {self.urls[i]}: {response}")
                    continue
                if response and response.get('data'):
                    logger.info(f"Successfully processed {self.urls[i]}")
                    all_results.append(response['data']['content'])
                else:
                    logger.warning(f"No data returned for {self.urls[i]}")

        logger.info(f"Crawling complete. Processed {len(all_results)} URLs successfully")
        return all_results

    def run(self):
        logger.info("Starting crawler run")
        result = asyncio.run(self.crawl())
        logger.info("Crawler run completed")
        return result

With the combination of Bing snippets and, optionally, the full content from the top articles, you can create a prompt with the original user query and the context from Bing and scraping. Below is an example web app, that uses these features:

Answering questions about baeke.info

Above, fetch mode was enabled to add the full content of the first three Bing results to the prompt. The Bing search takes about a second. The time to answer, which includes scraping and an Azure OpenAI chat completion, takes quite a bit of time. Most of the time is consumed by the chat completion. Although you could optimize the scraper by introducing caching, that will only result in modest time savings.

The prompt is rather large because it contains markdown for three of my blog posts. If we limit the search to Bing only, the result is as follows:

Same query but answer only from Bing snippets

In this case, the answer is a bit more generic. The snippets contain information relevant to the query of the user but they do not contain enough information. This is especially true for more complex questions. The upside is faster speed and much less token consumption.

To keep the amount of tokens to a minimum, you could chunk the scraped websites in real time, filter out the relevant chunks using similarity metrics and only feed those chunks to the prompt. You can use the snippet to find relevant chunks or the user’s original query.

To really speed things up, you could implement prompt caching. The screenshot below shows the cache in action:

Answering from cache

In this case, we store previous questions and answers in Redis. When a new question comes in, we check if there are similar questions based on vector similarity. When the similarity score is above 0.95, a threshold we configure, we use the cache. Otherwise, we search, scrape and use OpenAI as before. Needless to say that this is very fast.

You need to write quite some code to implement the searching, scraping and caching features. The web application above uses this code via a web API you have to write and host yourself. Depending on your needs, there might be an easier solution by using the Azure AI Agent Service with built-in Bing grounding.

Using the Azure AI Agent Service with Bing Grounding

The new Azure AI Agent Service supports grounding with Bing Search out of the box as documented here: https://learn.microsoft.com/en-us/azure/ai-services/agents/how-to/tools/bing-grounding.

When you ask the agent a question by adding a message to a thread and running the thread, the agent will automatically use Bing to ground its answer.

It works by adding a Bing connection to an Azure AI Foundry project and providing the grounding tool to the agent. Take a look at the sample code below:

import os
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.projects.models import BingGroundingTool
from dotenv import load_dotenv

load_dotenv()

project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(),
    conn_str=os.environ["PROJECT_CONNECTION_STRING"],
)

bing_connection = project_client.connections.get(
    connection_name=os.environ["BING_CONNECTION_NAME"]
)
conn_id = bing_connection.id

print(conn_id)

# Initialize agent bing tool and add the connection id
bing = BingGroundingTool(connection_id=conn_id)

# Create agent with the bing tool and process assistant run
with project_client:
    agent = project_client.agents.create_agent(
        model="gpt-4o-global",
        name="my-assistant",
        instructions="You are a helpful assistant",
        tools=bing.definitions,
        headers={"x-ms-enable-preview": "true"}
    )

Above, we connect to an Azure AI Foundry project with Entra ID. Next, we grab the connection identified by the value of the BING_CONNECTION_NAME environment variable. With the id of the connection, we can create the BingGroundingTool and add it to the tools property of our agent.

The advantage of this approach is that it is easy to use and configure. However, there are several drawbacks:

  • The tool does not surface all the URLs it found so you cannot display them nicely in a client application
  • It is currently not possible to provide a custom configuration key to search a subset of sites (e.g., only https://baeke.info for instance)

At the time of writing, the Azure AI Agent Service SDK was in preview so some or all of the drawbacks might be solved before or at general availability.

Sample implementation

You can find an easy to use example in this gist: https://gist.github.com/gbaeke/97afb88da56d59e1b6ca460653fc8700. To make it work, do the following:

  • In a new folder, save the script as app.py
  • Create a .env file with two environment variables: OPENAI_API_KEY, BING_API_KEY
  • Install packages: pip install fastapi python-dotenv uvicorn requests beautifulsoup4 openai sentence-transformers scikit-learn numpy
  • Run the api with python app.py

The example uses a simple chunking technique in addition to the all-MiniLM-L6-v2 SentenceTranformer to vectorize chunks and return the top 3 results to include in the OpenAI prompt’s context. To scrape web pages, we use a simple HTTP GET with BeautifulSoup. As discussed above, that will not yield good results with dynamic web pages. Most web pages will be fine though.

Conclusion

When you want to create an AI assistant or AI-based search feature based on a website using the site’s content, using Bing Search for grounding is one of the options. We discussed two approaches:

  • Fully custom code with the Bing custom search API
  • Azure AI Agents with the Bing grounding service

The first approach gives you full control over how you perform the search and process the results. You can rely on just the snippets provided by Bing or add the full content of the top URLs to your prompt with scraping. To improve response times you can add scrape caching or prompt caching. Prompt caching will provide you with almost instantaneous results when the prompt and answer was previously cached. You do not need to implement a pipeline to keep your vector database up-to-date.

Although built-in Bing grounding with the Azure AI Agent service is much easier, it has some limitations for the use case that I described. However, if you need to add general grounding to augment LLM responses, the Bing Grounding tool is definitely the one to go for. And although not discussed in this article, if you can use Copilot Studio, Bing grounding based on specific websites is available and is even easier to implement with just a few clicks!

Creating an agent with the Azure AI Agent SDK

Source: Microsoft

Azure AI Agents Service simplifies building intelligent agents by combining advanced AI models, tools, and technology from Microsoft, OpenAI, and partners like Meta and Cohere. It enables integration with knowledge sources such as Bing, SharePoint, and Azure AI Search, and lets agents perform actions across Microsoft and third-party applications using Logic Apps, Azure Functions, and Code Interpreter. With Azure AI Foundry, you get an intuitive agent-building experience, backed by enterprise-grade features like customizable storage, private networking, secure authentication, and detailed observability through OpenTelemetry.

At the time of this writing (December 2024), Azure AI Foundry did not provide a user interface yet to create these agents in the portal. In this post, we will use the Azure AI Foundry SDK to create the agent from code.

You can find the code in this repository: https://github.com/gbaeke/agent_service/tree/main/agentui

How does it work?

The agent service uses the same wire protocol as the Azure OpenAI Assistants API. The Assistants API was developed as an alternative to the chat completions API. The big difference is that the Assistants API is stateful: your interactions with the AI model are saved as messages on a thread. You simply add messages to the thread for the model to respond.

For more information, check this video:

To get started, you need three things:

  • An agent: the agent uses a model and instructions about how it should behave. In addition, you add knowledge sources and tools. Knowledge sources can be files you upload to the agent or existing sources such as files on SharePoint. Tools can be built-in tools like code interpreter or custom tools like any API or custom functions that you write.
  • A thread: threads receive messages from users and the assistant (the model) responds with assistant messages. In a chat application, each of the user’s conversations can be a thread. Note that threads are created, independent of an agent. The thread is associated with the agent when you add a message.
  • Messages: you add messages to a thread and check the thread for new messages. Messages can contain both text and images. For example, if you use the code interpreter tool and you asked for a chart, the chart will be created and handed to you as a file id. To render the chart, you would need to download it first based on its id.

Creating the agent

Before we create the agent, we need to connect to our Azure AI Foundry project. To do that (and more), we need the following imports:

import os
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import CodeInterpreterTool
from azure.identity import DefaultAzureCredential
from fastapi import FastAPI
from typing import Dict
from azure.ai.projects.models import FunctionTool, ToolSet
from typing import Any, Callable, Set, Dict
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import base64

We will use the AIProjectClient to get a reference to an Azure AI Foundry project. We do that with the following code:

# Set up credentials and project client
credential = DefaultAzureCredential()
conn_str = os.environ["PROJECT_CONNECTION_STRING"]
project_client = AIProjectClient.from_connection_string(
    credential=credential, conn_str=conn_str
)

Note that we authenticate with Entra ID. On your local machine, ensure you are logged on via the Azure CLI with az login. Your account needs at least AI Developer access to the Foundry project.

You also need the connection string to your project. The code requires it in the PROJECT_CONNECTION_STRING environment variable. You can find the connection string in Azure AI Foundry:

AI Foundry project connection string

We can now create the agent with the following code:

agent = project_client.agents.create_agent(
    model="gpt-4o-mini",
    name="my-agent",
    instructions="You are helpful agent with functions to turn on/off light and get temperature in a location. If location is not specified, ask the user.",
    toolset=toolset
)

Above, the agent uses gpt-4o-mini. You need to ensure that model is deployed in your Azure AI Foundry Hub. In our example, we also provide the assistant with tools. We will not provide it with knowledge.

What’s inside the toolset?

  • built-in code interpreter tool: provides a way for the model to write Python code, execute it and provide the result back to the model; the result can be text and/or images.
  • custom tools: in our case, custom Python functions to turn on/off lights and look up weather information in a location.

There are other tool types that we will not discuss in this post.

Adding tools

Let’s look at adding our own custom functions first. In the code, three functions are used as tools:

def turn_on_light(room: str) -> str:
    return f"Light in room {room} turned on"

def turn_off_light(room: str) -> str:
    return f"Light in room {room} turned off"

def get_temperature(location: str) -> str:
    # check the github repo for the code

The SDK provides helpers to turn these functions into tools the assistant understands:

user_functions: Set[Callable[..., Any]] = {
    turn_on_light,
    turn_off_light,
    get_temperature
}
functions = FunctionTool(user_functions)
toolset = ToolSet()
toolset.add(functions)

Now we need to add the built-in code interpreter:

code_interpreter = CodeInterpreterTool()
toolset.add(code_interpreter)

Now we have a toolset with three custom functions and the code interpreter. This toolset is given to the agent via the toolset parameter.

Now that we have an agent, we need to provide a way to create a thread and add messages to the thread.

Creating a thread

We are creating an API so we will create and endpoint to create a thread:

@app.post("/threads")
def create_thread() -> Dict[str, str]:
    thread = project_client.agents.create_thread()
    return {"thread_id": thread.id}

As discussed earlier, a thread is created as a separate entity. It is not associated with the agent when you create it. When we later add a message, the thread will be associated with the agent that should process the message.

Working with messages

Next, we will provide an endpoint that accepts a thread id and a message you want to add to it:

@app.post("/threads/{thread_id}/messages")
def send_message(thread_id: str, request: MessageRequest):
    created_msg = project_client.agents.create_message(
        thread_id=thread_id,
        role="user",
        content=request.message  # Now accessing message from the request model
    )
    run = project_client.agents.create_and_process_run(
        thread_id=thread_id,
        assistant_id=agent.id
    )
    if run.status == "failed":
        return {"error": run.last_error or "Unknown error"}

    messages = project_client.agents.list_messages(thread_id=thread_id)
    last_msg = messages.get_last_message_by_sender("assistant")
    
    last_msg_text = last_msg.text_messages[0].text.value if last_msg.text_messages else None
    last_msg_image = last_msg.image_contents[0].image_file if last_msg.image_contents else None
    
    last_msg_image_b64 = None
    if last_msg_image:
        file_stream = project_client.agents.get_file_content(file_id=last_msg_image.file_id)
        base64_encoder = base64.b64encode
        byte_chunks = b"".join(file_stream)  # Concatenate all bytes from the iterator.
        last_msg_image_b64 = base64_encoder(byte_chunks).decode("utf-8")
        
    return {"assistant_text": last_msg_text, 
            "assistant_image": last_msg_image_b64}

The code is pretty self-explanatory. In summary, here is what happens:

  • a message is created with the create_message method; the message is added to the specified thread_id as a user message
  • the thread is run on the agent specified by the agent.id
  • to know if the run is finished, polling is used; the create_and_process_run hides that complexity for you
  • messages are retrieved from the thread but only the last assistant message is used
  • we extract the text and image from the message if it is present
  • when there is an image, we use get_file_content to retrieve the file content from the API; that functions returns an Iterator of bytes that are joined together and base64 encoded
  • the message and image are returned

Testing the API

When we POST to the threads enpoint, this is the response:

{
  "thread_id": "thread_meYRMrkRtUiI1u0ZGH0z7PEN"
}

We can use that id to post to the messages endpoint. For example in a .http file:

POST http://localhost:8000/threads/thread_meYRMrkRtUiI1u0ZGH0z7PEN/messages
Content-Type: application/json

{
    "message": "Create a sample bar chart"
}

The response to the above request should be something like below:

{
  "assistant_text": "Here is a sample bar chart displaying four categories (A to D) with their corresponding values. If you need any modifications or another type of chart, just let me know!",
  "assistant_image": "iVBORw0KGgoAAAANSUhEUgAABpYAAARNCAYAAABYAnNeAAAAOXRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjQuMywgaHR0cHM6Ly9tYXRwbG90bGliLm9yZy/MnkTPAAAACXBIWXMAAB7CAAAewgFu0HU+AADWf0lEQ..."
}

In this case, the model determined that the code interpreter should be used to create the sample bar chart. When you ask for something simpler, like the weather, you get the following response:

{
  "assistant_text": "The current temperature in London is 11.4°C. If you need more information or updates, feel free to ask!",
  "assistant_image": null
}

In this case, our custom weather function was used to answer. The assistant determines what tools should be used to provide an answer.

Integration in a web app

The GitHub repository contains a sample UI to try the API:

Sample UI and a chat combining weather and plotting

Beautiful, is it not? 😂

Conclusion

The Azure AI Agent service makes it relatively easy to create an agent that has access to knowledge and tools. The assistant decides on its own how to use the knowledge and tools. However, you can steer the assistant via its instructions and influence how the assistant behaves.

The SDK makes it easy to add your own custom functions as tools, next to the built-in tools that it supports. Soon, there will be an Agent Service user interface in Azure AI Foundry. You will be able to create agents in code that reference the agents you have built in Foundry.

To try it for yourself, use the code in the GitHub repo. Note that the code is demo code with limited error handling. It’s merely meant to demonstrate first steps.

Enjoy and let me know what you build with it! 😉

Using WebRTC with the OpenAI Realtime API

In October 2024, OpenAI introduced the Realtime API. It enables developers to integrate low-latency, multimodal conversational experiences into their applications. It supports both text and audio inputs and outputs, facilitating natural speech-to-speech interactions without the need for multiple models.

It addresses the following problems:

  • Simplified Integration: Combines speech recognition, language processing, and speech synthesis into a single API call, eliminating the need for multiple models.
  • Reduced Latency: Streams audio inputs and outputs directly, enabling more natural and responsive conversational experiences.
  • Enhanced Nuance: Preserves emotional tone, emphasis, and accents in speech interactions.

If you have used Advanced Voice Mode in ChatGPT, the Realtime API offers a similar experience for developers to integrate into their applications.

The initial release of the API required WebSockets to support the continuous exchange of messages, including audio. Although that worked, using a protocol like WebRTC is much more interesting:

  • Low latency: WebRTC is optimized for realtime media like audio and video with features such as congestion control and bandwidth optimization built in
  • Proven in the real world: many applications use WebRTC, including Microsoft Teams, Google Meet and many more
  • Native support for audio streaming: compared to WebSockets, as a developer, you don’t have to handle the audio streaming part. WebRTC takes care of that for you.
  • Data channels: suitable for low-latency data exchange between peers; these channels are used to send and receive messages between yourself and the Realtime API.

In December 2024, OpenAI announced support for WebRTC in their Realtime API. It makes using the API much simpler and more robust.

Instead of talking about it, let’s look at an example.

Note: full source code is in https://github.com/gbaeke/realtime-webrtc. It is example code without features like user authentication, robust error handling, etc… It’s meant to get you started.

Helper API

To use the Realtime API from the browser, you need to connect to OpenAI with a token. You do not want to use your OpenAI token in the browser as that is not secure. Instead, you should have an API endpoint in a helper API that gets an ephemeral token. In app.py, the helper API, the endpoint looks as follows:

@app.get("/session")
async def get_session():
    async with httpx.AsyncClient() as client:
        response = await client.post(
            'https://api.openai.com/v1/realtime/sessions',
            headers={
                'Authorization': f'Bearer {OPENAI_API_KEY}',
                'Content-Type': 'application/json'
            },
            json={
                "model": "gpt-4o-realtime-preview-2024-12-17",
                "voice": "echo"
            }
        )
        return response.json()

Above, we ask the realtime’s API sessions endpoint for a session. The session includes the ephemeral token. You need an OpenAI key to ask for that session which is known to the helper API via an environment variable. Note the realtime model and voice are set as options. Other options, such as tools, temperature and others can be set here. In this example we will set some of these settings from the browser client by updating the session.

In index.html, the following JavaScript code is used to obtain the session. The ephemeral key or token is in client_secret.value:

const tokenResponse = await fetch("http://localhost:8888/session");
const data = await tokenResponse.json();
const EPHEMERAL_KEY = data.client_secret.value;

In addition to fetching a token via a session, the helper API has another endpoint called weather. The weather endpoint is called with a location parameter to get the current temperature at that location. This endpoint is called when the model detects a function call is needed. For example, when the user says “What is the weather in Amsterdam?”, code in the client will call the weather endpoint with Amsterdam as a parameter and provide the model with the results.

@app.get("/weather/{location}")
async def get_weather(location: str):
    # First get coordinates for the location
    try:
        async with httpx.AsyncClient() as client:
            # Get coordinates for location
            geocoding_response = await client.get(
                f"https://geocoding-api.open-meteo.com/v1/search?name={location}&count=1"
            )
            geocoding_data = geocoding_response.json()
            
            if not geocoding_data.get("results"):
                return {"error": f"Could not find coordinates for {location}"}
                
            lat = geocoding_data["results"][0]["latitude"]
            lon = geocoding_data["results"][0]["longitude"]
            
            # Get weather data
            weather_response = await client.get(
                f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current=temperature_2m"
            )
            weather_data = weather_response.json()
            
            temperature = weather_data["current"]["temperature_2m"]
            return WeatherResponse(temperature=temperature, unit="celsius")
            
    except Exception as e:
        return {"error": f"Could not get weather data: {str(e)}"}

The weather API does not require authentication so we could have called it from the web client as well. I do not consider that a best practice so it is better to call an API separate from the client code.

The client

The client is an HTML web page with plain JavaScript code. The code to interact with the realtime API is all part of the client. Our helper API simply provides the ephemeral secret.

Let’s look at the code step-by-step. Full code is on GitHub. But first, here is the user interface:

The fabulous UI

Whenever you ask a question, the transcript of the audio response is updated in the text box. Only the responses are added, not the user questions. I will leave that as an exercise for you! 😉

When you click the Start button, the init function gets called:

async function init() {
    startButton.disabled = true;
    
    try {
        updateStatus('Initializing...');
        
        const tokenResponse = await fetch("http://localhost:8888/session");
        const data = await tokenResponse.json();
        const EPHEMERAL_KEY = data.client_secret.value;

        peerConnection = new RTCPeerConnection();
        await setupAudio();
        setupDataChannel();

        const offer = await peerConnection.createOffer();
        await peerConnection.setLocalDescription(offer);

        const baseUrl = "https://api.openai.com/v1/realtime";
        const model = "gpt-4o-realtime-preview-2024-12-17";
        const sdpResponse = await fetch(`${baseUrl}?model=${model}`, {
            method: "POST",
            body: offer.sdp,
            headers: {
                Authorization: `Bearer ${EPHEMERAL_KEY}`,
                "Content-Type": "application/sdp"
            },
        });

        const answer = {
            type: "answer",
            sdp: await sdpResponse.text(),
        };
        await peerConnection.setRemoteDescription(answer);

        updateStatus('Connected');
        stopButton.disabled = false;
        hideError();

    } catch (error) {
        startButton.disabled = false;
        stopButton.disabled = true;
        showError('Error: ' + error.message);
        console.error('Initialization error:', error);
        updateStatus('Failed to connect');
    }
}

In the init function, we get the ephemeral key as explained before and then setup the WebRTC peer-to-peer connection. The setupAudio function creates an autoplay audio element and connects the audio stream to the peer-to-peer connection.

The setupDataChannel function sets up a data channel for the peer-to-peer connection and gives it a name. The name is oai-events. Once we have a data channel, we can use it to connect an onopen handler and add an event listener to handle messages sent by the remote peer.

Below are the setupAudio and setupDataChannel functions:

async function setupAudio() {
    const audioEl = document.createElement("audio");
    audioEl.autoplay = true;
    peerConnection.ontrack = e => audioEl.srcObject = e.streams[0];
    
    audioStream = await navigator.mediaDevices.getUserMedia({ audio: true });
    peerConnection.addTrack(audioStream.getTracks()[0]);
}

function setupDataChannel() {
    dataChannel = peerConnection.createDataChannel("oai-events");
    dataChannel.onopen = onDataChannelOpen;
    dataChannel.addEventListener("message", handleMessage);
}

When the audio and data channel is setup, we can now proceed to negotiate communication parameters between the two peers: your client and OpenAI. WebRTC uses the session description protocol (SDP) to do so. First, an offer is created describing the local peer capabilities like audio codecs etc… The offer is then sent to the server over at OpenAI. Authentication is with the ephemeral key. The response is a description of the remote peer’s capabilities, which is needed to complete the handshake process. With the handshake complete, the peers can now exchange audio and messages. The code below does the handshake:

const offer = await peerConnection.createOffer();
await peerConnection.setLocalDescription(offer);

const baseUrl = "https://api.openai.com/v1/realtime";
const model = "gpt-4o-realtime-preview-2024-12-17";
const sdpResponse = await fetch(`${baseUrl}?model=${model}`, {
    method: "POST",
    body: offer.sdp,
    headers: {
        Authorization: `Bearer ${EPHEMERAL_KEY}`,
        "Content-Type": "application/sdp"
    },
});

const answer = {
    type: "answer",
    sdp: await sdpResponse.text(),
};
await peerConnection.setRemoteDescription(answer);

The diagram below summarizes the steps:

Simplified overview of the setup process

What happens when the channel opens?

After the creation of the data channel, we set up an onopen handler. In this case, the handler does two things:

  • Update the session
  • Send an initial message

The session is updated with a description of available functions. This is very similar to function calling in the chat completion API. To update the session, you need to send a message of type session.update. The sendMessage helper functions sends messages to the remote peer:

function sendSessionUpdate() {
    const sessionUpdateEvent = {
        "event_id": "event_" + Date.now(),
        "type": "session.update",
        "session": {
            "tools": [{
                "type": "function",
                "name": "get_weather",
                "description": "Get the current weather. Works only for Earth",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": { "type": "string" }
                    },
                    "required": ["location"]
                }
            }],
            "tool_choice": "auto"
        }
    };
    sendMessage(sessionUpdateEvent);
}

Although I added an event_id above, that is optional. In the session property we can update the list of tools and set the tool_choice to auto. In this case, that means that the model will select a function if it thinks it is needed. If you ask something like “What is the weather?”, it will first ask for a location and then indicate that the function get_weather needs to be called.

We also send an initial message when the channel opens. The message is of type conversation.item.create and says “MY NAME IS GEERT”.

Check the session update and conversation item code below:

function sendSessionUpdate() {
    const sessionUpdateEvent = {
        "event_id": "event_" + Date.now(),
        "type": "session.update",
        "session": {
            "tools": [{
                "type": "function",
                "name": "get_weather",
                "description": "Get the current weather. Works only for Earth",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": { "type": "string" }
                    },
                    "required": ["location"]
                }
            }],
            "tool_choice": "auto"
        }
    };
    sendMessage(sessionUpdateEvent);
}

function sendInitialMessage() {
    const conversationMessage = {
        "event_id": "event_" + Date.now(),
        "type": "conversation.item.create",
        "previous_item_id": null,
        "item": {
            "id": "msg_" + Date.now(),
            "type": "message",
            "role": "user",
            "content": [{
                "type": "input_text",
                "text": "MY NAME IS GEERT"
            }]
        }
    };
    sendMessage(conversationMessage);
}

Note that the above is optional. Without that code, we could start talking with the model. However, it’s a bit more interesting to add function calling to the mix. That does mean we have to check incoming messages from the data channel to find out if we need to call a function.

Handling messages

The function handleMessage is called whenever a new message is sent on the data channel. In that function, we log all messages and check for a specific type of message: response.done.

We do two different things:

  • if there is a transcript of the audio: display it
  • if the response is a function call, handle the function call

To handle the function call, we check the payload of the response for an output of type function_call and also check the function name and call_id of the message that identified the function call in the first place.

If the function with name get_weather is identified, the weather endpoint of the API is called and the response is sent to the model.

The message handler is shown below:

function handleMessage(event) {
    try {
        const message = JSON.parse(event.data);
        console.log('Received message:', message);
        
        switch (message.type) {
            case "response.done":
                handleTranscript(message);
                const output = message.response?.output?.[0];
                if (output) handleFunctionCall(output);
                break;
            default:
                console.log('Unhandled message type:', message.type);
        }
    } catch (error) {
        showError('Error processing message: ' + error.message);
    }
}

The function call check is in handleFunctionCall:

function handleFunctionCall(output) {
    if (output?.type === "function_call" && 
        output?.name === "get_weather" && 
        output?.call_id) {
        console.log('Function call found:', output);
        handleWeatherFunction(output);
    }
}

You can check the full source code for the code of handleWeatherFunction and its helpers sendFunctionOutput and sendResponseCreate. They are responsible for:

  • parsing the arguments from the function call output and calling the API
  • sending the output of the function back to the model and linking it to the message that identified the function call in the first place
  • getting a response from the model to tell us about the result of the function call

Conclusion

With WebRTC support, a W3C standard, it has become significantly easier to utilize the OpenAI Realtime API from a browser that natively supports it. All widely recognized desktop and mobile browsers, including Chrome, Safari, Firefox, and Edge, provide WebRTC capabilities.

WebRTC has become the preferred method for browser-based realtime API usage. WebSockets are exclusively recommended for server-to-server applications.

The advent of WebRTC has the potential to catalyze the development of numerous applications that leverage this API. What interesting applications do you intend to build?

Using the Azure AI Inference Service

If you are a generative AI developer that works with different LLMs, it can be cumbersome to make sure your code works with your LLM of choice. You might start with Azure OpenAI models and the OpenAI APIs but later decide you want to use a Phi-3 model. What do you do in that case? Ideally, you would want your code to work with either model. The Azure AI Inference Services allows you to do just that.

The API is available via SDKs in Python, JavaScript, C# and as a generic REST service. In this post, we will look at the Python SDK. Note that the API does not work with all models in the Azure AI Foundry model catalog. Below are some of the supported models:

  • Via serverless endpoints: Cohere, Llama, Mistral, Phi-3 and some others
  • Via managed inference (on VMs): Mistral, Mixtral, Phi-3 and Llama 3 instruct

In this post, we will use the serverless endpoints. Let’s stop talking about it and look at some code. Although you can use the inferencing services fully on its own, I will focus on some other ways to use it:

  • From GitHub Marketplace: for experimentation; authenticate with GitHub
  • From Azure AI Foundry: towards production quality code; authenticate with Entra ID

Getting started from GitHub Marketplace

Perhaps somewhat unexpectedly, an easy way to start exploring these APIs is via models in GitHub Marketplace. GitHub supports the inferencing service and allows you to authenticate via your GitHub personal access token (PAT).

If you have a GitHub account, even as a free user, simply go to the GitHub model catalog at https://github.com/marketplace/models/catalog. Select any model from the list and click Get API key:

Ministral 3B in the GitHub model catalog

In the Get API key screen, you can select your language and SDK. Below, I selected Python and Azure AI Inference SDK:

Steps to get started with Ministral and the AI Inference SDK

Instead of setting this up on you workstation, you can click on Run codespace. A codespace will be opened with lots of sample code:

Codespace with sample code for different SDKs, including the AI Inference

Above, I opened the Getting Started notebook for the Azure AI Inference SDK. You can run the cells in that notebook to see the results. To create a client, the following code is used:

import os
import dotenv
from azure.ai.inference import ChatCompletionsClient
from azure.ai.inference.models import SystemMessage, UserMessage
from azure.core.credentials import AzureKeyCredential

dotenv.load_dotenv()

if not os.getenv("GITHUB_TOKEN"):
    raise ValueError("GITHUB_TOKEN is not set")

github_token = os.environ["GITHUB_TOKEN"]
endpoint = "https://models.inference.ai.azure.com"


# Create a client
client = ChatCompletionsClient(
    endpoint=endpoint,
    credential=AzureKeyCredential(github_token),
)

The endpoint above is similar to the endpoint you would use without GitHub. The SDK, however, supports authenticating with your GITHUB_TOKEN which is available to the codespace as an environment variable.

When you have the ChatCompletionsClient, you can start using the client as if this was an OpenAI model. Indeed, the AI Inference SDK work similarly to the OpenAI SDK:

response = client.complete(
    messages=[
        SystemMessage(content="You are a helpful assistant."),
        UserMessage(content="What is the capital of France?"),
    ],
    model=model_name,
    # Optional parameters
    temperature=1.,
    max_tokens=1000,
    top_p=1.    
)

print(response.choices[0].message.content)

The code above is indeed similar to the OpenAI SDK. The model is set via the model_name variable. Model name can be any of the supported GitHub models:

  • AI21 Labs: `AI21-Jamba-Instruct`
  • Cohere: `Cohere-command-r`, `Cohere-command-r-plus`
  • Meta: `Meta-Llama-3-70B-Instruct`, `Meta-Llama-3-8B-Instruct` and others
  • Mistral AI: `Mistral-large`, `Mistral-large-2407`, `Mistral-Nemo`, `Mistral-small`
  • Azure OpenAI: `gpt-4o-mini`, `gpt-4o`
  • Microsoft: `Phi-3-medium-128k-instruct`, `Phi-3-medium-4k-instruct`, and others

The full list of models is in the notebook. It’s easy to get started with GitHub models to evaluate and try out models. Do note that these models are for experimentation only and heavily throttled. In production, use models deployed in Azure. One of the ways to do that is with Azure AI Foundry.

Azure AI Foundry and its SDK

Another way to use the inferencing service is via Azure AI Foundry and its SDK. To use the inferencing service via Azure AI Foundry, simply create a project. If this is the first time you create a project, a hub will be created as well. Check Microsoft Learn for more information.

Project in AI Foundry with the inference endpoint

The endpoint above can be used directly with the Azure AI Inference SDK. There is no need to use the Azure AI Foundry SDK in that case. In what follows, I will focus on the Azure AI Foundry SDK and not use the inference SDK on its own.

Unlike GitHub models, you need to deploy models in Azure before you can use them:

Deployment of Mistral Large and Phi-3 small 128k instruct

To deploy a model, simply click on Deploy model and follow the steps. Take the serverless deployment when asked. Above, I deployed Mistral Large and Phi-3 small 128k.

The Azure AI Foundry SDK makes it easy to work with services available to your project. A service can be a model via the inferencing SDK but also Azure AI Search and other services.

In code, you connect to your project with a connection string and authenticate with Entra ID. From a project client, you then obtain a generic chat completion client. Under the hood, the correct AI inferencing endpoint is used.

from azure.identity import DefaultAzureCredential
from azure.ai.projects import AIProjectClient

project_connection_string="your_conn_str"

project = AIProjectClient.from_connection_string(
  conn_str=project_connection_string,
  credential=DefaultAzureCredential())

model_name ="Phi-3-small-128k-instruct"

client = project.inference.get_chat_completions_client()

response = client.complete(
    model=model_name,
    messages=[
        {"role": "system", "content": "You are a helpful writing assistant"},
        {"role": "user", "content": "Write me a poem about flowers"},
    ]
)

print(response.choices[0].message.content)

Above, replace your_conn_str with the connection string from your project:

AI Foundry project connection string

Now, if you want to run your code with another model, simply deploy it and switch the model name in your code. Note that you do not use the deployment name. Instead, use the model name.

Note that these models are typically deployed with content filtering. If the filter is triggered, you will get a HttpResponseError 400. This will also happen if you use GitHub because they use the same models and content filters.

Other capabilities of the inferencing service

Below, some of the other capabilities of the inferencing service are listed:

  • Next to chat completions, text completions, text embeddings and image embeddings are supported
  • If the underlying model supports parameters not supported by the inferencing service, use model_extras. The properties you put in model extras are passed to the API that is specific to the model. One example is the safe_mode parameter in Mistral.
  • You can configure the API to give you an error when you use a parameter the underlying model does not support
  • The API supports images as input with select models
  • Streaming is supported
  • Tools and function calling is supported
  • Prompt templates are supported, including Prompty.

Should you use it?

Whether or not you should use the AI inferencing services is not easy to answer. If you use frameworks such as LangChain or Semantic Kernel, they already have abstractions to work with multiple models. They also make it easier to work with functions and tool calling and also support prompt templates. If you use those, stick with them.

If you do not use those frameworks and you simply want to use an OpenAI-compatible API, the inferencing service in combination with Azure AI Foundry is a good fit! There are many developers that prefer using the OpenAI API directly without the abstractions of a higher-level framework. If you do, you can easily switch models.

It’s important to note that if you use more advanced features such as tool calling, not all models support that. In practice, that means that the amount of models you can switch between are limited. In my experience, even with models that support tool calling, if can go wrong easily. If your application is heavily dependent on function calling, it’s best to use frameworks like Semantic Kernel.

The service in general is useful in other ways though. Copilot Studio for example, can use custom models to answer questions and uses the inferencing service under the hood to make that happen!

Create a Copilot declarative agent that calls an API with authentication

In a previous post, we looked at creating a Copilot declarative agent. The agent had one custom action that called the JSONPlaceholder API. Check that post for an introduction to what these agents can do. Using a dummy, unauthenticated API is not much fun so let’s take a look at doing the same for a custom API that requires authentication.

Python API with authentication

The API we will create has one endpoint: GET /sales. It’s implemented as follows:

@app.get("/sales/", dependencies=[Depends(verify_token)])
async def get_sales():
    """
    Retrieve sales data.
    Requires Bearer token authentication.
    """
    return {
        "status": "success",
        "data": generate_sample_sales_data()
    }

The data is generated by the generate_sample_sales_data function. It just generates random sales data. You can check the full code on GitHub. The important thing here is that we use bearer authentication with a key.

When I hit the /sales endpoint with a wrong key, a 401 Unauthorized is raised:

401 Unauthorized (via REST client VS Code plugin)

With the correct key, the /sales endpoint returns the random data:

GET /sales returns random data

Running the API

To make things easy, we will run the API on the local machine and expose it with ngrok. Install ngrok using the instructions on their website. If you cloned the repo, go to the api folder and run the commands below. Run the last command from a different terminal window.

pip install -r requirements.txt
python app.py
ngrok http 8000

Note: you can also use local port forwarding in VS Code. I prefer ngrok but if you do not want to install it, simply use the VS Code feature.

In the terminal where you ran ngrok, you should see something like below:

ngrok tunnel is active

Ngrok has a nice UI to inspect the calls via the web interface at http://localhost:4040:

ngrok web interface

Before continuing, ensure that the ngrok forwarding URL (https://xyz.ngrok-free.app) responds when you hit the /sales endpoint.

Getting the OpenAPI document

When you create a FastAPI API, it generates OpenAPI documentation that describes all the endpoints. The declarative agent needs that documentation to configure actions.

For the above API, that looks like below. Note that this is not the default document. It was changed in code.

{
  "openapi": "3.0.0",
  "info": {
    "title": "Sales API",
    "description": "API for retrieving sales data",
    "version": "1.0.0"
  },
  "paths": {
    "/sales/": {
      "get": {
        "summary": "Get Sales",
        "description": "Retrieve sales data.\nRequires Bearer token authentication.",
        "operationId": "get_sales_sales__get",
        "responses": {
          "200": {
            "description": "Successful Response",
            "content": {
              "application/json": {
                "schema": {

                }
              }
            }
          }
        }
      }
    },
    "/": {
      "get": {
        "summary": "Root",
        "description": "Root endpoint - provides API information",
        "operationId": "root__get",
        "responses": {
          "200": {
            "description": "Successful Response",
            "content": {
              "application/json": {
                "schema": {

                }
              }
            }
          }
        }
      }
    }
  },
  "components": {
    "securitySchemes": {
      "BearerAuth": {
        "type": "http",
        "scheme": "bearer"
      }
    }
  },
  "servers": [
    {
      "url": "https://627d-94-143-189-241.ngrok-free.app",
      "description": "Production server"
    }
  ]
}

The Teams Toolkit requires OpenAPI 3.0.x instead of 3.1.x. By default, recent versions of FastAPI generate 3.1.x docs. You can change that in the API’s code by adding the following:

def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema
    
    openapi_schema = get_openapi(
        title="Sales API",
        version="1.0.0",
        description="API for retrieving sales data",
        routes=app.routes,
    )
    
    # Set OpenAPI version
    openapi_schema["openapi"] = "3.0.0"
    
    # Add servers
    openapi_schema["servers"] = [
        {
            "url": "https://REPLACE_THIS.ngrok-free.app",  # Replace with your production URL
            "description": "Production server"
        }
    ]
    
    # Add security scheme
    openapi_schema["components"] = {
        "securitySchemes": {
            "BearerAuth": {
                "type": "http",
                "scheme": "bearer"
            }
        }
    }
    
    # Remove endpoint-specific security requirements
    for path in openapi_schema["paths"].values():
        for operation in path.values():
            if "security" in operation:
                del operation["security"]
    
    app.openapi_schema = openapi_schema
    return app.openapi_schema

app.openapi = custom_openapi

In the code, we switch to OpenAPI 3.0.0, add our server (the ngrok forwarding URL), add the security scheme and more. Now, when you go to https://your_ngrok_url/openapi.json, the JSON shown above should be returned.

Creating the Copilot Agent

Now we can create a new declarative agent like we did in the previous post. When you are asked for the OpenAPI document, you can retrieve it from the live server via the ngrok forwarding URL.

After creating the agent, declarativeAgent.json should contain the following action:

"actions": [
    {
        "id": "action_1",
        "file": "ai-plugin.json"
    }

In ai-plugin.json, in functions and runtimes, you should see the function description and a reference to the OpenAPI operation.

That’s all fine but of course, but the API will not work because a key needs to be provided. You create the key in the Teams developer portal at https://dev.teams.microsoft.com/tools:

Adding an API key for Bearer auth

You create the key by clicking New API key and filling in the form. Ensure you add a key that matches the key in the API. Also ensure that the URL to your API is correct (the ngrok forwarding URL). With an incorrect URL, the key will not be accepted.

Now we need to add a reference to the key. The agent can use that reference to retrieve the key and use it when it calls your API. Copy the key’s registration ID and then open ai-plugin.json. Add the following to the runtimes array:

"runtimes": [
    {
        "type": "OpenApi",
        "auth": {
            "type": "ApiKeyPluginVault",
            "reference_id": "KEY_REGISTRATION_ID"
        },
        "spec": {
            "url": "apiSpecificationFile/openapi.json"
        },
        "run_for_functions": [
            "get_sales_sales__get"
        ]
    }
]

The above code ensures that HTTP bearer authentication is used with the stored key when the agent calls the get_sales_sales__get endpoint.

Now you are ready to provision your agent. After provisioning, locate the agent in Teams:

Find the agent

Now either use a starter (if you added some; above that is (2)) or type the question in the chat box.

Getting laptop sales in 2024

Note that I did not do anything fancy with the adaptive card. It just says success.

If you turned on developer mode in Copilot, you can check the raw response:

Viewing the raw response, right from within Microsoft 365 Chat

Conclusion

In this post, we created a Copilot agent that calls a custom API secured with HTTP bearer authentication. The “trick” to get this to work is to add the key to the Teams dev portal and reference it in the json file that defines the API call.

HTTP bearer authentication is the easiest to implement. In another post, we will look at using OAuth to protect the API. There’s a bit more to that, as expected.

Creating a Copilot declarative agent with VS Code and the Teams Toolkit

If you are a Microsoft 365 Copilot user, you have probably seen that the words “agent” and “Copilot agent” are popping up here and there. For example, if you chat with Copilot there is an Agents section in the top right corner:

Copilot Chat with agents

Above, there is a Visual Creator agent that’s built-in. It’s an agent dedicated to generating images. Below Visual Creator, there are agents deployed to your organisation and ways to add and create agents.

A Copilot agent in this context, runs on top of Microsoft 365 Copilot and uses the Copilot orchestrator and underlying model. An agent is dedicated to a specific task and has the following properties. Some of these properties are optional:

  • Name: name of the agent
  • Description: you guessed it, the description of the agent
  • Instructions: instructions for the agent about how to do its work and respond to the user; you can compare this to a system prompt you give an LLM to guide its responses
  • Conversation starters: prompts to get started like the Learn More and Generate Ideas in the screenshot above
  • Documents: documents the agent can use to provide the user with answers; this will typically be a SharePoint site or a OneDrive location
  • Actions: actions the agents can take to provide the user with an answer; these actions will be API calls that can fetch information from databases, create tickets in a ticketing system and much more…

There are several ways to create these agents:

  • Start from SharePoint and create an agent based on the documents you select
  • Start from Microsoft 365 Copilot chat
  • Start from Copilot Studio
  • Start from Visual Studio Code

Whatever you choose, you are creating the agent declaratively. You do not have to write code to create the agent. Depending on the tool you use, not all capabilities are exposed. For example, if you want to add actions to your agent, you need Copilot Studio or Visual Studio Code. You could start creating the agent from SharePoint and then add actions with Copilot Studio.

In this post, we will focus on creating a declarative agent with Visual Studio Code.

Getting Started

You need Visual Studio Code or a compatible editor and add the Teams Toolkit extension. Check Microsoft Learn to learn about all requirements. After installing it in VS Code, click the extension. You will be presented with the options below:

Teams Toolkit extension in VS Code

To create a declarative agent, click Create a New App. Select Copilot Agent.

Copilot Agent in Teams Toolkit

Next, select Declarative Agent. You will be presented with the choices below:

Creating an agent with API plugin so we can call APIs

To make this post more useful, we will add actions to the agent. Although the word “action” is not mentioned above, selecting Add plugin will give us that functionality.

We will create our actions from an OpenAPI 3.0.x specification. Select Start with an OpenAPI Description Document as shown below.

When you select the above option, you can either:

  • Use a URL that returns the OpenAPI document
  • Browse for an OpenAPI file (json or yaml) on your file system

I downloaded the OpenAPI specification for JSON Placeholder from https://arnu515.github.io/jsonplaceholder-api-docs/. JSON Placeholder is an online dummy API that provides information about blog posts. After downloading the OpenAPI spec, browse for the swagger.json file via the Browse for an OpenAPI file option. In the next screen, you can select the API operations you want to expose:

Select the operations you want the agent to use

I only selected the GET /posts operation (getPosts). Next, you will be asked for a folder location and a name for your project. I called mine DemoAgent. After specifying the name, a new VS Code window will pop up:

Declarative Agent opens in a new Window

You might get questions about installing additional extensions and even to provision the app.

How does it work?

Before explaining some of the internals, let’s look at the end result in Copilot chat. Below is the provisioned app, provisioned only to my own account. This is the app as created by the extension, without modifications on my part.

Agent in Copilot Chat; sample API we use returns Latin 😉

Above, I have asked for three posts. Copilot matches my intent to the GET /posts API call and makes the call. The JSONPlaceholder API does not require authentication so that’s easy. Authentication is supported but that’s for another post. If it’s the first time the API is used, you will be asked for permission to use it.

In Copilot, I turned on developer mode by typing -developer on in the chat box. When you click Show plugin developer info, you will see something like the below screenshot:

Copilot developer mode

Above, the Copilot orchestrator has matched the function getPosts from the DemoAgent plugin. Plugin is just the general name for Copilot extensions that can perform actions (or functions). Yes, naming is hard. The Copilot orchestrator selected the getPosts function to execute. The result was a 200 OK from the underlying API. If you click the 200 OK message, you see the raw results returned from the API.

Now let’s look at some of the files that are used to create this agent. The main file, from the agent’s point of view, is declarativeAgent.json in the appPackage folder. It contains the name, description, instructions and actions of the agent:

{
    "$schema": "https://developer.microsoft.com/json-schemas/copilot/declarative-agent/v1.0/schema.json",
    "version": "v1.0",
    "name": "DemoAgent",
    "description": "Declarative agent created with Teams Toolkit",
    "instructions": "$[file('instruction.txt')]",
    "actions": [
        {
            "id": "action_1",
            "file": "ai-plugin.json"
        }
    ]
}

The instructions property references another file which contains the instructions for the agent. One of the instructions is: You should start every response and answer to the user with “Thanks for using Teams Toolkit to create your declarative agent!”. That’s the reason why my question had that in the response to start with.

Of course, the actions are where the magic is. You can provide your agent with multiple actions. Here, we only have one. These actions are defined in a file that references the OpenAPI spec. Above, that file is ai-plugin.json. This file tells the agent what API call to make. It contains a functions array with only one function in this case: getPosts. It’s important you provide a good description for the function because Copilot selects the function to call based on its description. See the Matched functions list in the plugin developer info section.

Below the functions array is a runtimes array. It specifies what operation to call from the referenced OpenAPI specification. In here, you also specify the authentication to the API. In this case, the auth type is None but agents support HTTP bearer authentication with a simple key or OAuth.

Here’s the entire file:

{
    "$schema": "https://developer.microsoft.com/json-schemas/copilot/plugin/v2.1/schema.json",
    "schema_version": "v2.1",
    "name_for_human": "DemoAgent",
    "description_for_human": "Free fake API for testing and prototyping.",
    "namespace": "demoagent",
    "functions": [
        {
            "name": "getPosts",
            "description": "Returns all posts",
            "capabilities": {
                "response_semantics": {
                    "data_path": "$",
                    "properties": {
                        "title": "$.title",
                        "subtitle": "$.id"
                    },
                    "static_template": {
                        "type": "AdaptiveCard",
                        "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                        "version": "1.5",
                        "body": [
                            {
                                "type": "TextBlock",
                                "text": "id: ${if(id, id, 'N/A')}",
                                "wrap": true
                            },
                            {
                                "type": "TextBlock",
                                "text": "title: ${if(title, title, 'N/A')}",
                                "wrap": true
                            },
                            {
                                "type": "TextBlock",
                                "text": "body: ${if(body, body, 'N/A')}",
                                "wrap": true
                            },
                            {
                                "type": "TextBlock",
                                "text": "userId: ${if(userId, userId, 'N/A')}",
                                "wrap": true
                            }
                        ]
                    }
                }
            }
        }
    ],
    "runtimes": [
        {
            "type": "OpenApi",
            "auth": {
                "type": "None"
            },
            "spec": {
                "url": "apiSpecificationFile/openapi.json"
            },
            "run_for_functions": [
                "getPosts"
            ]
        }
    ],
    "capabilities": {
        "localization": {},
        "conversation_starters": [
            {
                "text": "Returns all posts"
            }
        ]
    }
}

As you can see, you can also control how the agent responds by providing an adaptive card. Teams toolkit decided on the format above based on the API specification and the data returned by the getPosts operation. In this case, the card looks like this:

Addaptive card showing the response from the API: id, title, body and userId of the fake blog post

Adding extra capabilities

You can add conversation starters to the agent in declarativeAgent.json. They are shown in the opening screen of your agent:

Conversation Starters

These starters are added to declarativeAgent.json:

{
    "$schema": "https://developer.microsoft.com/json-schemas/copilot/declarative-agent/v1.0/schema.json",
    "version": "v1.0",
    "name": "DemoAgent",
    "description": "Declarative agent created with Teams Toolkit",
    "instructions": "$[file('instruction.txt')]",
    "actions": [
        ...
    ],
    "conversation_starters": [
    {
        "title": "Recent posts",
        "text": "Show me recent posts"
    },
    {
        "title": "Last post",
        "text": "Show me the last post"
    }
]
}

In addition to conversation starters, you can also enable web searches. Simply add the following to the file above,

"capabilities": [
    {
        "name": "WebSearch"
    }
]

With this feature enabled, the agent can search the web for answers via Bing. It will do so when it thinks it needs to or when you tell it to. For instance: “Search the web for recent news about AI” gets you something like this:

Agent with WebSearch turned on

In the plugin developer info, you will see that none of your functions were executed. Developer info does not provide additional information about the web search.

Next to starter prompts and WebSearch, here are some of the other things you can do:

  • Add OneDrive and SharePoint content: extra capability with name OneDriveAndSharePoint; the user using the agent needs access to these files or they cannot be used to generate an answer
  • Add Microsoft Graph Connectors content: extra capability with name GraphConnectors; Graph Connectors pull in data from other sources in Microsoft Graph; by specifying the connector Ids, that data can then be retrieved by the agent

More information about the above settings can be found here: https://learn.microsoft.com/en-us/microsoft-365-copilot/extensibility/declarative-agent-manifest.

Provisioning

To provision the agent just for you, open VS Code’s command palette and search for Teams: Provision. You will be asked to log on to Microsoft 365. When all goes well, you should see the messages below in the Output pane:

Output after provisioning an app

If you are familiar with app deployment to Teams in general, you will notice that this is the same.

When the app is provisioned, it should appear in the developer portal at https://dev.teams.microsoft.com/apps:

DemoAgent in the Teams dev portal

Note that the extension adds dev to the agent when you provision the app. When you publish the app, this is different. You can also see this in VS Code in the build folder:

App package for provisioning in VS Code

Note: we did not discuss the manifest.json file which is used to configure the Teams app as a whole. Use it to set developer info, icons, name, description and more.

There are more steps to take to publish the app and make it available to your organisation. See https://learn.microsoft.com/en-us/microsoftteams/platform/toolkit/publish for more information

Conclusion

The goal of this blogpost was to show how easy it is to create a declarative agent on top of Microsoft 365 Copilot in VS Code. Remember that these agents use the underlying Copilot orchestrator and model and that is something you cannot change. If you need more freedom (e.g., control over LLM, its parameters, advanced prompting techniques etc…) and you want to create such an app in Teams, there’s always the Custom Engine Agent.

Declarative agents don’t require you to code although you do need to edit multiple files to get it to work?

In a follow-up post, we will take a look at adding a custom API with authentication. I will also show you how to easily add additional actions to an agent without too much manual editing. Stay tuned!

Using your own message broker with Diagrid Catalyst

In a previous post, I wrote about Diagrid Catalyst. Catalyst provides services like pub/sub and state stores to support the developer in writing distributed applications. In the post, we discussed a sample application that processes documents and extracts fields with an LLM (gpt-4o structured extraction). Two services, upload and process, communicate via the pub/sub pattern.

In that post, we used a pub/sub broker built-in to Catalyst. Using the built-in broker makes it extremely easy to get started. You simply create the service and topic subscription and write code to wire it all up using the Dapr APIs.

Catalyst built-in pub/sub service

But what if you want to use your own broker? Read on to learn how that works.

Using Azure Service Bus as the broker

To use Azure Service Bus, simply deploy an instance in a region of your choice. Ensure you use the standard tier because you need topics, not queues:

Azure Service Bus Standard Tier deployed in Sweden; public endpoint

With Service Bus deployed, we can now tell Catalyst about it. You do so in Components in the Catalyst portal:

Creating an Azure Service Bus component

Simply click Create Component to start a wizard. After completion of the wizard, your component will appear in the list. Above, at the bottom, a component with Azure Service Bus as the target is in the list.

The wizard itself is fairly straightforward. The first screen is shown below:

Component wizard

Above, in the first step, I clicked Pub/Sub and selected Azure Service Bus Topics. As you can see, several other pub/sub brokers are supported. The above list is not complete.

In the next steps, the following is set:

  • Assign access: configure the services that can access this component; in my case, that is the upload and process service
  • Authentication profile: decide how to authenticate to Azure Service Bus; I used a connection string
  • Configure component: set the component name and properties such as timeouts. These properties are specific to Service Bus. I only set the name and left the properties at their default.

That’s it. You now have defined a component that can be used by your applications. When you click the component, you can also inspect its YAML definition:

YAML representation of the component

You can use these YAML files from the diagrid CLI to create components. In the CLI they are called connections but it’s essentially the same from what I can tell at this point:

Listing connections

Showing the call graph

With Catalyst, all activity is logged and can be used to visualize a call graph like the one below:

Call Graph

Above, I clicked on the subscription that delivers messages to the process service. The messages come from our Azure pub/sub broker.

Note: you can also see the older pub/sub Catalyst broker in the call graph. It will be removed from the call graph some time after it is not used anymore.

Creating a subscription

A subscription to an Azure Service Bus topic looks the same as a subscription to the built-in Pub/Sub broker:

Subscription to topic invoices

The only difference with the previous blog post is the component. It’s the one we just created. The /process handler in your code will stay the same.

Code changes

The code from the previous post does not have to change a lot. That code uses an environment variable, PUBSUB_NAME, that needs to be set to pubsub-azure now. That’s it. The Dapr SDK code is unchanged:

with DaprClient() as d:
    try:
        result = d.publish_event(
            pubsub_name=pubsub_name,
            topic_name=topic_name,
            data=invoice.model_dump_json(),
            data_content_type='application/json',
        )
        logging.info('Publish Successful. Invoice published: %s' %
                        invoice.path)
        logging.info(f"Invoice model: {invoice.model_dump()}")
        return True
    except grpc.RpcError as err:
        logging.error(f"Failed to publish invoice: {err}")
        return False

Conclusion

Instead of using the default Catalyst pub/sub broker, we switched the underlying broker to a broker of our choice. This is just configuration. You code, besides maybe an environment variable, does not need to change.

In this post, we only changed the pub/sub broker. You can also easily change the underlying state store to Azure Blob Storage or Azure Cosmos DB.

Writing an multi-service document extractor with the help of Diagrid’s Catalyst

Many enterprises have systems in place that take documents, possibly handwritten, that contain data that needs to be extracted. In this post, we will create an application that can extract data from documents that you upload. We will make use of an LLM, in this case gpt-4o. We will use model version 2024-08-06 and its new structured output capabilities. Other LLMs can be used as well.

The core of the application is illustrated in the diagram below. The application uses more services than in the diagram. We will get to them later in this post.

Application Diagram

Note: the LLM-based extraction logic in this project is pretty basic. In production, you need to do quite a bit more to get the extraction just right.

The flow of the application is as follows:

  • A user or process submits a document to the upload service. This can be a pdf but other formats are supported as well.
  • In addition to the document, a template is specified by name. A template contains the fields to extract, together with their type (str, bool, float). For example: customer_name (str), invoice_total (float).
  • The upload service uploads the document to an Azure Storage account using a unique filename and preserves the extension.
  • The upload service publishes a message to a topic on a pub/sub message broker. The message contains data such as the document url and the name of the template.
  • The process service subscribes to the topic on the message broker and retrieves the message.
  • It downloads the file from the storage account and sends it to Azure Document Intelligence to convert it to plain text.
  • Using a configurable extractor, an LLM is used to extract the fields in the template from the document text. The sample code contains an OpenAI and a Groq extractor.
  • The extracted fields are written to a configurable output handler. The sample code contains a CSV and JSONL handler.

In addition to a pub-sub broker, templates are stored in a state store. The upload service is the only service that interfaces with the state store. It provides an HTTP method that the process service can use to retrieve a template from the state store.

To implement pub-sub, the state store and method invocations, we will use Diagrid’s Catalyst instead of doing this all by ourselves.

What is Catalyst?

If you are familiar with Dapr, the distributed application runtime, Catalyst will be easy to understand. Catalyst provides you with a set of APIs, hosted in the cloud and compatible with Dapr to support you in building cloud-native, distributed applications. It provides several building blocks. The ones we use are below:

  • request/reply: to support synchronous communication between services in a secure fashion
  • publish/subscribe: to support asynchronous communication between services using either a broker provided by Catalyst or other supported brokers like Azure Service Bus
  • key/value: allows services to save state in a key/value store. You can use the state store provided by Catalyst or other supported state stores like Azure Cosmos DB or an Azure Storage Account

The key to these building blocks is that your code stays the same if you swap the underlying message broker or key/value store. For example, you can start with Catalyst’s key/value store and later switch to Cosmos DB very easily. There is no need to add Cosmos DB libraries to your code. Catalyst will handle the Cosmos DB connectivity for you.

Important: I am referring mainly to Azure services here but Catalyst (and Dapr) support many services in other clouds as well!

Note that you do not need to install Dapr on your local machine or on platforms like Kubernetes when you use Catalyst. You only use the Dapr SDKs in your code and, when configured to do so, the SDK will connect to the proper APIs hosted in the cloud by Catalyst. In fact, you do not even need an SDK because the APIs can be used with plain HTTP or GRPC. Of course, using an SDK makes things a lot easier.

If you want to learn more about Catalyst, take a look at the following playlist: https://www.youtube.com/watch?v=7D7rMwJEMsk&list=PLdl4NkEiMsJscq00RLRrN4ip_VpzvuwUC. Lots of good stuff in there!

By doing all of the above in Catalyst we have a standardised approach that remains the same no matter the service behind it. We also get implementation best practices, for example for pub/sub. In addition, we are also provided with golden metrics and a UI to see how the application performs. All API calls are logged to aid in troubleshooting.

Let’s now take a look at the inner loop development process!

Scaffolding a new project

You need to sign up for Catalyst first. At the time of writing, Catalyst was in preview and not supported for production workloads. When you have an account, you should install the Diagrid CLI. The CLI is not just for Catalyst. It’s also used with Diagrid’s other products, such as Conductor.

With the CLI, you can create a new project, create services and application identities. For this post, we will use the UI instead.

In the Catalyst dashboard, I created a project called idpdemo:

List of projects; use Create Project to create a new one

Next, for each of my services (upload and process), we create an App ID. Each App ID has its own token. Services use the token to authenticate to the Catalyst APIs and use the services they are allowed to use.

The process App ID has the following configuration (partial view):

process App ID API configuration

The process service interacts with both the Catalyst key/value store (kvstore) and the pub/sub broker (pubsub). These services need to be enabled as well. We will show that later. We can also see that the process service has a pub/sub subscription called process-consumer. Via that subscription, we have pub/sub messages delivered to the process service whenever the upload service sends a message to the pub/sub topic.

In Diagrid Services, you can click on the pub/sub and key/value store to see what is going on. For example, in the pub/sub service you can see the topics, the subscribers to these topics and the message count.

pub/sub topics

In Connections, you can see your services (represented by App ID upload and process) and their scope. In this case, all App IDs have access to all services. That can easily be changed:

changing the scope: access by App IDs to the pubsub service; default All

Now that we have some understanding of App IDs, Diagrid services and connections, we can take a look at how to connect to Catalyst from code.

Important: in this post we only look at using request/reply, Diagrid pub/sub and key/value. Catalyst also supports workflow and bindings but they are not used in this post.

Connecting your code

All code is available on GitHub: https://github.com/gbaeke/catalyst

The upload service needs to connect to both the pub/sub broker and key/value store:

  • Whenever a document is uploaded, it is uploaded to Azure Storage. When that succeeds, a message is put on the broker with the path of the file and a template name.
  • Templates are created and validated by the upload service so that you can only upload files with a template that exists. Templates are written and read in the key/value store.

Before we write code, we need to provide the Dapr SDK for Python (we’ll only use the Python SDK here) the necessary connection information. It needs to know it should not connect to a Dapr sidecar but to Catalyst. You set these via environment variables:

These environment variables are automatically picked up and used by SDK to interact with the Catalyst APIs. The following code can be used to put a message on the pub/sub broker:

with DaprClient() as d:
    try:
        result = d.publish_event(
            pubsub_name=pubsub_name,
            topic_name=topic_name,
            data=invoice.model_dump_json(),
            data_content_type='application/json',
        )
        logging.info('Publish Successful. Invoice published: %s' %
                        invoice.path)
        return True
    except grpc.RpcError as err:
        logging.error(f"Failed to publish invoice: {err}")
        return False

This is the same code that you would use with Dapr on your local machine or in Kubernetes or Azure Container Apps. Like with Dapr, you need to specify the pubsub name and topic. Here that is pubsub and invoices as previously shown in the Catalyst UI. The data in the message is an instance of a Pydantic class that holds the path and template but converted to JSON.

The code below shows how to write to the state store (key/value store):

with DaprClient() as d:
    try:
        d.save_state(store_name=kvstore_name,
                        key=template_name, value=str(invoice_data))
    except grpc.RpcError as err:
        logging.error(f"Dapr state store error: {err.details()}")
        raise HTTPException(status_code=500, detail="Failed to save template")

This is of course very similar. We use the save_state method here and provide the store name (kvstore), key (template name) and value.

Let’s now turn to the process service. It needs to:

  • be notified when there is a new message on the invoices topic
    • check and retrieve the template by calling a method on the upload service

We only use two building blocks here: pub/sub and request/reply. The process service does not interact directly with the state store.

To receive a message, Catalyst needs a handler to call. In the pub/sub subscription, the handler (default route to be correct) is configured to be /process:

Configuration of default route on subscription

Our code that implements the handler is as follows (FastAPI):

@app.post('/process')  # called by pub/sub when a new invoice is uploaded
async def consume_orders(event: CloudEvent):
    # your code here

As you can see, when Catalyst calls the handler, it passes in a CloudEvent. The event has a data field that holds the path to our document and the template name. The CloudEvent type is defined as follows:

# pub/sub uses CloudEvent; Invoice above is the data
class CloudEvent(BaseModel):
    datacontenttype: str
    source: str
    topic: str
    pubsubname: str
    data: dict
    id: str
    specversion: str
    tracestate: str
    type: str
    traceid: str

In the handler, you simply extract the expected data and use it to process the event. In our case:

  • extract path and template from the data field
  • download the file from blob storage
  • send the file to Azure Document Intelligence to convert to text
  • extract the details from the document based on the template; if the template contains fields like customer_name and invoice_total, the LLM will try to extract that and return that content in JSON.
  • write the extracted values to JSON or CSV or any other output handler

Of course, we do need to extract the full template because we only have the template name. Let’s use the request/reply APIs to do that and call the template GET endpoint of the upload service via Catalyst:

def retrieve_template_from_kvstore(template_name: str):

    headers = {'dapr-app-id': invoke_target_appid, 'dapr-api-token': dapr_api_token,
               'content-type': 'application/json'}  
    try:
        result = requests.get(
            url='%s/template/%s' % (base_url, template_name),
            headers=headers
        )

        if result.ok:
            logging.info('Invocation successful with status code: %s' %
                         result.status_code)
            logging.info(f"Template retrieved: {result.json()}")
            return result.json()

    except Exception as e:
        logging.error(f"An error occurred while retrieving template from Dapr KV store: {str(e)}")
        return None

As an example, we use the HTTP API here instead of the Dapr invoke API. It might not be immediately clear but Catalyst is involved in this process and will have information and metrics about these calls:

Call Graph

The full line represents request/reply (invoke) from process to upload as just explained. The dotted line represents pub/sub traffic where upload creates messages to be consumed by process.

Running the app

You can easily run your application locally using the Diagrid Dev CLI. Ensure you are logged in by running diagrid login. In the preview, with only one project, the default project should already be that one. Then simply run diagrid dev scaffold to generate a yaml file.

In my case, after some modification, my dev-{project-name}.yaml file looked like below:

project: idpdemo
apps:
- appId: process
  disabled: true
  appPort: 8001
  env:
    DAPR_API_TOKEN: ...
    DAPR_APP_ID: process
    DAPR_CLIENT_TIMEOUT_SECONDS: 10
    DAPR_GRPC_ENDPOINT: https://XYZ.api.cloud.diagrid.io:443
    DAPR_HTTP_ENDPOINT: https://XYZ.api.cloud.diagrid.io
    OTHER ENV VARS HERE

  workDir: process
  command: ["python", "app.py"]
- appId: upload
  appPort: 8000
  env:
    ... similar
  workDir: upload
  command: ["python", "app.py"]
appLogDestination: ""

Of course, the file was modified with environment variables required by the code. For example the storage account key, Azure Document Intelligence key, etc…

All you need to do now is to run diagrid dev start to start the apps. The result should be like below:

Local project startup

By default, your service logs are written to the console with a prefix for each service.

If you use the code in GitHub, check the README.md to configure the project and run the code properly. If you would rather run the code with Dapr on your local machine (e.g., if you do not have access to Catalyst) you can do that as well.

Conclusion

In this post, we have taken a look at Catalyst, a set of cloud APIs that help you to write distributed applications in a standard and secure fashion. These APIs are compatible with Dapr, a toolkit that has already gained quite some traction in the community. With Catalyst, we quickly built an application that can be used as a starter to implement an asynchronous LLM-based document extraction pipeline. I did not have to worry too much about pub/sub and key/value services because that’s all part of Catalyst.

What will you build with Catalyst?

Token consumption in Microsoft’s Graph RAG

In the previous post, we discussed Microsoft’s Graph RAG implementation. In this post, we will take a look at token consumption to query the knowledge graph, both for local and global queries.

Note: this test was performed with gpt-4o. A few days after this blog post, OpenAI released gpt-4o-mini. Initital tests with gpt-4o-mini show that index creation and querying work well with a significantly lower cost. You can replace gpt-4o with gpt-4o-mini in the setup below.

Setting up Langfuse logging

To make it easy to see the calls to the LLM, I used the following components:

  • LiteLLM: configured as a proxy; we configure Graph RAG to use this proxy instead of talking to OpenAI or Azure OpenAI directly; see https://www.litellm.ai/
  • Langfuse: an LLM engineering platform that can be used to trace LLM calls; see https://langfuse.com/

To setup LiteLLM, follow the instructions here: https://docs.litellm.ai/docs/proxy/quick_start. I created the following config.yaml for use with LiteLLM:

model_list:
 - model_name: gpt-4o
   litellm_params:
     model: gpt-4o
 - model_name: text-embedding-3-small
   litellm_params:
     model: text-embedding-3-small
litellm_settings:
  success_callback: ["langfuse"]

Before starting the proxy, set the following environment variables:

export OPENAI_API_KEY=my-api-key
export LANGFUSE_PUBLIC_KEY="pk_kk"
export LANGFUSE_SECRET_KEY="sk_ss"

You can obtain the values from both the OpenAI and Langfuse portals. Ensure you also install Langfuse with pip install langfuse.

Next, we can start the proxy with litellm --config config.yaml --debug.

To make Graph RAG work with the proxy, open Graph RAG’s settings.yaml and set the following value under the llm settings:

api_base: http://localhost:4000

LiteLLM is listening for incoming OpenAI requests on that port.

Running a local query

A local query creates an embedding of your question and finds related entities in the knowledge graph by doing a similarity search first. The embeddings are stored in LanceDB during indexing. Basically, the results of the similarity search are used as entrypoints into the graph.

That is the reason that you need to add the embedding model to LiteLLM’s config.yaml. Global queries do not require this setting.

After the similar entities have been found in LanceDB, they are put in a prompt to answer your original question together with related entities.

A local query can be handled with a single LLM call. Let’s look at the trace:

Trace from local query

The query took about 10 seconds and 11500 tokens. The system prompt starts as follows:

First part of local query system prompt

The actual data it works with (called data tables) are listed further in the prompt. You can find a few data points below:

Entity about Winston Smith, a character in the book 1984 (just a part of the text)
Entity for O’Brien, a character he interacts with

The prompt also contains sources from the book where the entities are mentioned. For example:

Relevant sources

The response to this prompt is something like the response below:

LLM response to local query

The response contains references to both the entities and sources with their ids.

Note that you can influence the number of entities retrieved and the number of consumed tokens. In Graph RAG’s settings.yaml, I modified the local search settings as follows:

local_search:
  # text_unit_prop: 0.5
  # community_prop: 0.1
  # conversation_history_max_turns: 5
  top_k_mapped_entities: 5
  top_k_relationships: 5
  max_tokens: 6000

The trace results are clear: token consumption is lower and the latency is lower as well.

Lower token cost

Of course, there will be a bit less detail in the answer. You will have to experiment with these values to see what works best in your scenario.

Global Queries

Global queries are great for broad questions about your dataset. For example: “What are the top themes in 1984?”. A global query is not a single LLM call and is more expensive than a local query.

Let’s take a look at the traces for a global query. Every trace is an LLM call to answer the global query:

Traces for a global query

The last one in the list is where it starts:

First call of many to answer a global query

As you can probably tell, the call above is not returned directly to the user. The system prompt does not contain entities from the graph but community reports. Community reports are created during indexing. First, communities are detected using the Leiden algorithm and then summarized. You can have many communities and summaries in the dataset.

This first trace asks the LLM to answer the question: “What are the top themes in 1984?” to a first set of community reports and generates intermediate answers. These intermediate answers are saved until a last call used to answer the question based on all the intermediate answers. It is entirely possible that community reports are used that are not relevant to the query.

Here is that last call:

Answer the question based on the intermediate answers

I am not showing the whole prompt here. Above, you see the data that is fed to the final prompt: the intermediate answers from the community reports. This then results in the final answer:

Final answer to the global query

Below is the list with all calls again:

All calls to answer a global query

In total, and based on default settings, 12 LLM calls were made consuming around 150K tokens. The total latency cannot be calculated from this list because the calls are made in parallel. That total cost is around 80 cents.

The number of calls and token cost can be reduced by tweaking the default parameters in settings.yaml. For example, I made the following changes:

global_search:
  max_tokens: 6000 # was 12000
  data_max_tokens: 500 # was 1000
  map_max_tokens: 500 # was 1000
  # reduce_max_tokens: 2000
  # concurrency: 32

However, this resulted in more calls with around 140K tokens. Not a big reduction. I tried setting lower values but then I got Python errors and many more LLM calls due to retries. I would need to dig into that further to explain why this happens.

Conclusion

From the above, it is clear that local queries are less intensive and costly than global queries. By tweaking the local query settings, you can get pretty close to the baseline RAG cost where you return 3-5 chunks of text of about 500 tokens each. Latency is pretty good as well. Of course, depending on your data, it’s not guaranteed that the responses of local search will be better that baseline RAG.

Global queries are more costly but do allow you to ask broad questions about your dataset. I would not use these global queries in a chat assistant scenario consistently. However, you could start with a global query and then process follow-up questions with a local query or baseline RAG.