Using tasks with streaming in Google Agent2Agent (A2A)

In a previous post we created a simple A2A agent that uses synchronous message exchange. An A2A client sends a message and the A2A server, via the Agent Executor, responds with a message.

But what if you have a longer running task to perform and you want to inform the client that the task in ongoing? In that case, you can enable streaming on the A2A server and use a task that streams updates and the final result to the client.

The sequence diagram illustrates the flow of messages. It is based on the streaming example in the A2A specification.

A2A tasks with streaming updates

In this case, the A2A client needs to perform a streaming request which is sent to the /message/stream endpoint of the A2A server. The code in the AgentExecutor will need to create a task and provide updates to the client at regular intervals.

⚠️ If you want to skip directly to the code, check out the example on GitHub.

Let’s get into the details in the following order:

  • Writing an agent that provides updates while it is doing work: I will use the OpenAI Agents SDK with its support for agent hooks
  • Writing an AgentExecutor that accepts a message, creates a task and provides updates to the client
  • Updating the A2A Server to support streaming
  • Updating the A2A Client to support streaming

AI Agent that provides updates

Although streaming updates is an integral part of A2A, the agent that does the actual work needs to provide feedback about its progress. That work is up to you, the developer.

In my example, I use an agent created with the OpenAI Agents SDK. This SDK supports AgentHooks that execute at certain events:

  • Agent started/finished
  • Tool call started/finished

The agent class in agent.py on GitHub uses an asyncio queue to emit both the hook events and the agent’s reponse to the caller. The A2A AgentExecutor uses the invoke_stream() method which returns an AsyncGenerator.

You can run python agent.py independently. This should result in the following:

The agent has a tool that returns the current date. The hooks emit the events as shown above followed by the final result.

We can now use this agent from the AgentExecutor and stream the events and final result from the agent to the A2A Client.

AgentExecutor Tasks and Streaming

Instead of simply returning a message to the A2A client, we now need to initiate a long-running task that sends intermediate updates to the client.. Under the hood this uses SSE (Server Sent Events) between the A2A Client and A2A Server.

The file agent_executor.py on GitHub contains the code that makes this happen. Let’s step through it:

message_text = context.get_user_input()  # helper method to extract the user input from the context
        logger.info(f"Message text: {message_text}")

        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

Above, we extract the user’s input from the incoming message and we check if the context already contains a task. If not, we create the task and we queue it. This informs the client a task was created and that sse can be used to obtain intermediate results.

Now that we have a task (a new or existing one), the following code is used:

updater = TaskUpdater(event_queue, task.id, task.contextId)
async for event in self.agent.invoke_stream(message_text):
    if event.event_type == StreamEventType.RESPONSE:
        # send the result as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=event.data['response']))],
            name='calculator_result',
        )

        await updater.complete()
            
    else:
        await updater.update_status(
        TaskState.working,
        new_agent_text_message(
            event.data.get('message', ''),
            task.contextId,
            task.id,
        ),
    )

We first create a TaskUpdater instance that has the event queue, current task Id and contextId. The task updater is used to provide status updates, complete or even cancel a task.

We then call invoke_stream(query) on our agent and grab the events it emits. If we get a event type of RESPONSE, we create an artifact with the agent response as text and mask the task as complete. In all other cases, we send a status event with updater.update_status(). A status update contains a task state (working in this case) and a message about the state. The message we send is part of the event that is emitted from invoke_stream() and includes things like agent started, tool started, etc…

So in short, to send streaming updates:

  • Ensure your agents emit events of some sort
  • Use those events in the AgentExecutor and create a task that sends intermediate updates until the agent has finished

However, our work is not finished. The A2A Server needs to be updated to support streaming.

A2A Server streaming support

The A2A server code in is main.py on GitHub. To support streaming, we need to update the capabilities of the server:

capabilities = AgentCapabilities(streaming=True, pushNotifications=True)

⚠️ pushNotifications=True is not required for streaming. I include it here to show that sending a push notification to a web hook is also an option.

That’s it! The A2A Server now supports streaming. Easy! 😊

Streaming with the A2A Client

Instead of sending a message to the non-streaming endpoint, the client should now use the streaming endpoint. Here is the code to do that (check test_client.py for the full code):

message_payload = Message(
            role=Role.user,
            messageId=str(uuid.uuid4()),
            parts=[Part(root=TextPart(text=question))],
        )
        streaming_request = SendStreamingMessageRequest(
            id=str(uuid.uuid4()),
            params=MessageSendParams(
                message=message_payload,
            ),
        )
        print("Sending message")

        stream_response = client.send_message_streaming(streaming_request)

To send to the streaming endpoint, the SendStreamingMessageRequest() function is your friend, together with client.send_message_streaming()

We can now grab the responses as they come in:

async for chunk in stream_response:
            # Only print status updates and text responses
            chunk_dict = chunk.model_dump(mode='json', exclude_none=True)
            
            if 'result' in chunk_dict:
                result = chunk_dict['result']
                
                # Handle status updates
                if result.get('kind') == 'status-update':
                    status = result.get('status', {})
                    state = status.get('state', 'unknown')
                    
                    if 'message' in status:
                        message = status['message']
                        if 'parts' in message:
                            for part in message['parts']:
                                if part.get('kind') == 'text':
                                    print(f"[{state.upper()}] {part.get('text', '')}")
                    else:
                        print(f"[{state.upper()}]")
                
                # Handle artifact updates (contain actual responses)
                elif result.get('kind') == 'artifact-update':
                    artifact = result.get('artifact', {})
                    if 'parts' in artifact:
                        for part in artifact['parts']:
                            if part.get('kind') == 'text':
                                print(f"[RESPONSE] {part.get('text', '')}")
                
                # Handle initial task submission
                elif result.get('kind') == 'task':
                    print(f"[TASK SUBMITTED] ID: {result.get('id', 'unknown')}")
                    
                # Handle final completion
                elif result.get('final') is True:
                    print("[TASK COMPLETED]")

This code checks the the type of content coming in:

  • status-update: when AgentExecutor sends a status update
  • artifact-update: when AgentExecutor sends an artifact with the agent’s response
  • task: when tasks are submitted and completed

Running the client and asking what today’s date is, results in the following response:

Streaming is working as intended! But what if you use a client that does not support streaming? That actually works and results in a full response with the agent’s answer in the result field. You would also get a history field that contains the initial user question, including all the task updates.

Here’s a snippet of that result:

{
  "id": "...",
  "jsonrpc": "2.0",
  "result": {
    "artifacts": [
      {
        "artifactId": "...",
        "name": "calculator_result",
        "parts": [
          {
            "kind": "text",
            "text": "Today's date is July 13, 2025."
          }
        ]
      }
    ],
    "contextId": "...",
    "history": [
      {
        "role": "user",
        "parts": [
          {
            "kind": "text",
            "text": "What is today's date?"
          }
        ]
      },
      {
        "role": "agent",
        "parts": [
          {
            "kind": "text",
            "text": "Agent 'CalculatorAgent' is starting..."
          }
        ]
      }
    ],
    "id": "...",
    "kind": "task",
    "status": {
      "state": "completed"
    }
  }
}

Wrapping up

You have now seen how to run longer running tasks and provide updates along the way via streaming. As long as your agent code provides status updates, the AgentExecutor can create a task and provide task updates and the task result to the A2A Server which uses SSE to send them to the A2A Client.

In an upcoming post, we will take a look at running a multi-agent solution in the Azure cloud.

Create a Copilot declarative agent that calls an API with authentication

In a previous post, we looked at creating a Copilot declarative agent. The agent had one custom action that called the JSONPlaceholder API. Check that post for an introduction to what these agents can do. Using a dummy, unauthenticated API is not much fun so let’s take a look at doing the same for a custom API that requires authentication.

Python API with authentication

The API we will create has one endpoint: GET /sales. It’s implemented as follows:

@app.get("/sales/", dependencies=[Depends(verify_token)])
async def get_sales():
    """
    Retrieve sales data.
    Requires Bearer token authentication.
    """
    return {
        "status": "success",
        "data": generate_sample_sales_data()
    }

The data is generated by the generate_sample_sales_data function. It just generates random sales data. You can check the full code on GitHub. The important thing here is that we use bearer authentication with a key.

When I hit the /sales endpoint with a wrong key, a 401 Unauthorized is raised:

401 Unauthorized (via REST client VS Code plugin)

With the correct key, the /sales endpoint returns the random data:

GET /sales returns random data

Running the API

To make things easy, we will run the API on the local machine and expose it with ngrok. Install ngrok using the instructions on their website. If you cloned the repo, go to the api folder and run the commands below. Run the last command from a different terminal window.

pip install -r requirements.txt
python app.py
ngrok http 8000

Note: you can also use local port forwarding in VS Code. I prefer ngrok but if you do not want to install it, simply use the VS Code feature.

In the terminal where you ran ngrok, you should see something like below:

ngrok tunnel is active

Ngrok has a nice UI to inspect the calls via the web interface at http://localhost:4040:

ngrok web interface

Before continuing, ensure that the ngrok forwarding URL (https://xyz.ngrok-free.app) responds when you hit the /sales endpoint.

Getting the OpenAPI document

When you create a FastAPI API, it generates OpenAPI documentation that describes all the endpoints. The declarative agent needs that documentation to configure actions.

For the above API, that looks like below. Note that this is not the default document. It was changed in code.

{
  "openapi": "3.0.0",
  "info": {
    "title": "Sales API",
    "description": "API for retrieving sales data",
    "version": "1.0.0"
  },
  "paths": {
    "/sales/": {
      "get": {
        "summary": "Get Sales",
        "description": "Retrieve sales data.\nRequires Bearer token authentication.",
        "operationId": "get_sales_sales__get",
        "responses": {
          "200": {
            "description": "Successful Response",
            "content": {
              "application/json": {
                "schema": {

                }
              }
            }
          }
        }
      }
    },
    "/": {
      "get": {
        "summary": "Root",
        "description": "Root endpoint - provides API information",
        "operationId": "root__get",
        "responses": {
          "200": {
            "description": "Successful Response",
            "content": {
              "application/json": {
                "schema": {

                }
              }
            }
          }
        }
      }
    }
  },
  "components": {
    "securitySchemes": {
      "BearerAuth": {
        "type": "http",
        "scheme": "bearer"
      }
    }
  },
  "servers": [
    {
      "url": "https://627d-94-143-189-241.ngrok-free.app",
      "description": "Production server"
    }
  ]
}

The Teams Toolkit requires OpenAPI 3.0.x instead of 3.1.x. By default, recent versions of FastAPI generate 3.1.x docs. You can change that in the API’s code by adding the following:

def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema
    
    openapi_schema = get_openapi(
        title="Sales API",
        version="1.0.0",
        description="API for retrieving sales data",
        routes=app.routes,
    )
    
    # Set OpenAPI version
    openapi_schema["openapi"] = "3.0.0"
    
    # Add servers
    openapi_schema["servers"] = [
        {
            "url": "https://REPLACE_THIS.ngrok-free.app",  # Replace with your production URL
            "description": "Production server"
        }
    ]
    
    # Add security scheme
    openapi_schema["components"] = {
        "securitySchemes": {
            "BearerAuth": {
                "type": "http",
                "scheme": "bearer"
            }
        }
    }
    
    # Remove endpoint-specific security requirements
    for path in openapi_schema["paths"].values():
        for operation in path.values():
            if "security" in operation:
                del operation["security"]
    
    app.openapi_schema = openapi_schema
    return app.openapi_schema

app.openapi = custom_openapi

In the code, we switch to OpenAPI 3.0.0, add our server (the ngrok forwarding URL), add the security scheme and more. Now, when you go to https://your_ngrok_url/openapi.json, the JSON shown above should be returned.

Creating the Copilot Agent

Now we can create a new declarative agent like we did in the previous post. When you are asked for the OpenAPI document, you can retrieve it from the live server via the ngrok forwarding URL.

After creating the agent, declarativeAgent.json should contain the following action:

"actions": [
    {
        "id": "action_1",
        "file": "ai-plugin.json"
    }

In ai-plugin.json, in functions and runtimes, you should see the function description and a reference to the OpenAPI operation.

That’s all fine but of course, but the API will not work because a key needs to be provided. You create the key in the Teams developer portal at https://dev.teams.microsoft.com/tools:

Adding an API key for Bearer auth

You create the key by clicking New API key and filling in the form. Ensure you add a key that matches the key in the API. Also ensure that the URL to your API is correct (the ngrok forwarding URL). With an incorrect URL, the key will not be accepted.

Now we need to add a reference to the key. The agent can use that reference to retrieve the key and use it when it calls your API. Copy the key’s registration ID and then open ai-plugin.json. Add the following to the runtimes array:

"runtimes": [
    {
        "type": "OpenApi",
        "auth": {
            "type": "ApiKeyPluginVault",
            "reference_id": "KEY_REGISTRATION_ID"
        },
        "spec": {
            "url": "apiSpecificationFile/openapi.json"
        },
        "run_for_functions": [
            "get_sales_sales__get"
        ]
    }
]

The above code ensures that HTTP bearer authentication is used with the stored key when the agent calls the get_sales_sales__get endpoint.

Now you are ready to provision your agent. After provisioning, locate the agent in Teams:

Find the agent

Now either use a starter (if you added some; above that is (2)) or type the question in the chat box.

Getting laptop sales in 2024

Note that I did not do anything fancy with the adaptive card. It just says success.

If you turned on developer mode in Copilot, you can check the raw response:

Viewing the raw response, right from within Microsoft 365 Chat

Conclusion

In this post, we created a Copilot agent that calls a custom API secured with HTTP bearer authentication. The “trick” to get this to work is to add the key to the Teams dev portal and reference it in the json file that defines the API call.

HTTP bearer authentication is the easiest to implement. In another post, we will look at using OAuth to protect the API. There’s a bit more to that, as expected.

Creating a Copilot declarative agent with VS Code and the Teams Toolkit

If you are a Microsoft 365 Copilot user, you have probably seen that the words “agent” and “Copilot agent” are popping up here and there. For example, if you chat with Copilot there is an Agents section in the top right corner:

Copilot Chat with agents

Above, there is a Visual Creator agent that’s built-in. It’s an agent dedicated to generating images. Below Visual Creator, there are agents deployed to your organisation and ways to add and create agents.

A Copilot agent in this context, runs on top of Microsoft 365 Copilot and uses the Copilot orchestrator and underlying model. An agent is dedicated to a specific task and has the following properties. Some of these properties are optional:

  • Name: name of the agent
  • Description: you guessed it, the description of the agent
  • Instructions: instructions for the agent about how to do its work and respond to the user; you can compare this to a system prompt you give an LLM to guide its responses
  • Conversation starters: prompts to get started like the Learn More and Generate Ideas in the screenshot above
  • Documents: documents the agent can use to provide the user with answers; this will typically be a SharePoint site or a OneDrive location
  • Actions: actions the agents can take to provide the user with an answer; these actions will be API calls that can fetch information from databases, create tickets in a ticketing system and much more…

There are several ways to create these agents:

  • Start from SharePoint and create an agent based on the documents you select
  • Start from Microsoft 365 Copilot chat
  • Start from Copilot Studio
  • Start from Visual Studio Code

Whatever you choose, you are creating the agent declaratively. You do not have to write code to create the agent. Depending on the tool you use, not all capabilities are exposed. For example, if you want to add actions to your agent, you need Copilot Studio or Visual Studio Code. You could start creating the agent from SharePoint and then add actions with Copilot Studio.

In this post, we will focus on creating a declarative agent with Visual Studio Code.

Getting Started

You need Visual Studio Code or a compatible editor and add the Teams Toolkit extension. Check Microsoft Learn to learn about all requirements. After installing it in VS Code, click the extension. You will be presented with the options below:

Teams Toolkit extension in VS Code

To create a declarative agent, click Create a New App. Select Copilot Agent.

Copilot Agent in Teams Toolkit

Next, select Declarative Agent. You will be presented with the choices below:

Creating an agent with API plugin so we can call APIs

To make this post more useful, we will add actions to the agent. Although the word “action” is not mentioned above, selecting Add plugin will give us that functionality.

We will create our actions from an OpenAPI 3.0.x specification. Select Start with an OpenAPI Description Document as shown below.

When you select the above option, you can either:

  • Use a URL that returns the OpenAPI document
  • Browse for an OpenAPI file (json or yaml) on your file system

I downloaded the OpenAPI specification for JSON Placeholder from https://arnu515.github.io/jsonplaceholder-api-docs/. JSON Placeholder is an online dummy API that provides information about blog posts. After downloading the OpenAPI spec, browse for the swagger.json file via the Browse for an OpenAPI file option. In the next screen, you can select the API operations you want to expose:

Select the operations you want the agent to use

I only selected the GET /posts operation (getPosts). Next, you will be asked for a folder location and a name for your project. I called mine DemoAgent. After specifying the name, a new VS Code window will pop up:

Declarative Agent opens in a new Window

You might get questions about installing additional extensions and even to provision the app.

How does it work?

Before explaining some of the internals, let’s look at the end result in Copilot chat. Below is the provisioned app, provisioned only to my own account. This is the app as created by the extension, without modifications on my part.

Agent in Copilot Chat; sample API we use returns Latin 😉

Above, I have asked for three posts. Copilot matches my intent to the GET /posts API call and makes the call. The JSONPlaceholder API does not require authentication so that’s easy. Authentication is supported but that’s for another post. If it’s the first time the API is used, you will be asked for permission to use it.

In Copilot, I turned on developer mode by typing -developer on in the chat box. When you click Show plugin developer info, you will see something like the below screenshot:

Copilot developer mode

Above, the Copilot orchestrator has matched the function getPosts from the DemoAgent plugin. Plugin is just the general name for Copilot extensions that can perform actions (or functions). Yes, naming is hard. The Copilot orchestrator selected the getPosts function to execute. The result was a 200 OK from the underlying API. If you click the 200 OK message, you see the raw results returned from the API.

Now let’s look at some of the files that are used to create this agent. The main file, from the agent’s point of view, is declarativeAgent.json in the appPackage folder. It contains the name, description, instructions and actions of the agent:

{
    "$schema": "https://developer.microsoft.com/json-schemas/copilot/declarative-agent/v1.0/schema.json",
    "version": "v1.0",
    "name": "DemoAgent",
    "description": "Declarative agent created with Teams Toolkit",
    "instructions": "$[file('instruction.txt')]",
    "actions": [
        {
            "id": "action_1",
            "file": "ai-plugin.json"
        }
    ]
}

The instructions property references another file which contains the instructions for the agent. One of the instructions is: You should start every response and answer to the user with “Thanks for using Teams Toolkit to create your declarative agent!”. That’s the reason why my question had that in the response to start with.

Of course, the actions are where the magic is. You can provide your agent with multiple actions. Here, we only have one. These actions are defined in a file that references the OpenAPI spec. Above, that file is ai-plugin.json. This file tells the agent what API call to make. It contains a functions array with only one function in this case: getPosts. It’s important you provide a good description for the function because Copilot selects the function to call based on its description. See the Matched functions list in the plugin developer info section.

Below the functions array is a runtimes array. It specifies what operation to call from the referenced OpenAPI specification. In here, you also specify the authentication to the API. In this case, the auth type is None but agents support HTTP bearer authentication with a simple key or OAuth.

Here’s the entire file:

{
    "$schema": "https://developer.microsoft.com/json-schemas/copilot/plugin/v2.1/schema.json",
    "schema_version": "v2.1",
    "name_for_human": "DemoAgent",
    "description_for_human": "Free fake API for testing and prototyping.",
    "namespace": "demoagent",
    "functions": [
        {
            "name": "getPosts",
            "description": "Returns all posts",
            "capabilities": {
                "response_semantics": {
                    "data_path": "$",
                    "properties": {
                        "title": "$.title",
                        "subtitle": "$.id"
                    },
                    "static_template": {
                        "type": "AdaptiveCard",
                        "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                        "version": "1.5",
                        "body": [
                            {
                                "type": "TextBlock",
                                "text": "id: ${if(id, id, 'N/A')}",
                                "wrap": true
                            },
                            {
                                "type": "TextBlock",
                                "text": "title: ${if(title, title, 'N/A')}",
                                "wrap": true
                            },
                            {
                                "type": "TextBlock",
                                "text": "body: ${if(body, body, 'N/A')}",
                                "wrap": true
                            },
                            {
                                "type": "TextBlock",
                                "text": "userId: ${if(userId, userId, 'N/A')}",
                                "wrap": true
                            }
                        ]
                    }
                }
            }
        }
    ],
    "runtimes": [
        {
            "type": "OpenApi",
            "auth": {
                "type": "None"
            },
            "spec": {
                "url": "apiSpecificationFile/openapi.json"
            },
            "run_for_functions": [
                "getPosts"
            ]
        }
    ],
    "capabilities": {
        "localization": {},
        "conversation_starters": [
            {
                "text": "Returns all posts"
            }
        ]
    }
}

As you can see, you can also control how the agent responds by providing an adaptive card. Teams toolkit decided on the format above based on the API specification and the data returned by the getPosts operation. In this case, the card looks like this:

Addaptive card showing the response from the API: id, title, body and userId of the fake blog post

Adding extra capabilities

You can add conversation starters to the agent in declarativeAgent.json. They are shown in the opening screen of your agent:

Conversation Starters

These starters are added to declarativeAgent.json:

{
    "$schema": "https://developer.microsoft.com/json-schemas/copilot/declarative-agent/v1.0/schema.json",
    "version": "v1.0",
    "name": "DemoAgent",
    "description": "Declarative agent created with Teams Toolkit",
    "instructions": "$[file('instruction.txt')]",
    "actions": [
        ...
    ],
    "conversation_starters": [
    {
        "title": "Recent posts",
        "text": "Show me recent posts"
    },
    {
        "title": "Last post",
        "text": "Show me the last post"
    }
]
}

In addition to conversation starters, you can also enable web searches. Simply add the following to the file above,

"capabilities": [
    {
        "name": "WebSearch"
    }
]

With this feature enabled, the agent can search the web for answers via Bing. It will do so when it thinks it needs to or when you tell it to. For instance: “Search the web for recent news about AI” gets you something like this:

Agent with WebSearch turned on

In the plugin developer info, you will see that none of your functions were executed. Developer info does not provide additional information about the web search.

Next to starter prompts and WebSearch, here are some of the other things you can do:

  • Add OneDrive and SharePoint content: extra capability with name OneDriveAndSharePoint; the user using the agent needs access to these files or they cannot be used to generate an answer
  • Add Microsoft Graph Connectors content: extra capability with name GraphConnectors; Graph Connectors pull in data from other sources in Microsoft Graph; by specifying the connector Ids, that data can then be retrieved by the agent

More information about the above settings can be found here: https://learn.microsoft.com/en-us/microsoft-365-copilot/extensibility/declarative-agent-manifest.

Provisioning

To provision the agent just for you, open VS Code’s command palette and search for Teams: Provision. You will be asked to log on to Microsoft 365. When all goes well, you should see the messages below in the Output pane:

Output after provisioning an app

If you are familiar with app deployment to Teams in general, you will notice that this is the same.

When the app is provisioned, it should appear in the developer portal at https://dev.teams.microsoft.com/apps:

DemoAgent in the Teams dev portal

Note that the extension adds dev to the agent when you provision the app. When you publish the app, this is different. You can also see this in VS Code in the build folder:

App package for provisioning in VS Code

Note: we did not discuss the manifest.json file which is used to configure the Teams app as a whole. Use it to set developer info, icons, name, description and more.

There are more steps to take to publish the app and make it available to your organisation. See https://learn.microsoft.com/en-us/microsoftteams/platform/toolkit/publish for more information

Conclusion

The goal of this blogpost was to show how easy it is to create a declarative agent on top of Microsoft 365 Copilot in VS Code. Remember that these agents use the underlying Copilot orchestrator and model and that is something you cannot change. If you need more freedom (e.g., control over LLM, its parameters, advanced prompting techniques etc…) and you want to create such an app in Teams, there’s always the Custom Engine Agent.

Declarative agents don’t require you to code although you do need to edit multiple files to get it to work?

In a follow-up post, we will take a look at adding a custom API with authentication. I will also show you how to easily add additional actions to an agent without too much manual editing. Stay tuned!

Using your own message broker with Diagrid Catalyst

In a previous post, I wrote about Diagrid Catalyst. Catalyst provides services like pub/sub and state stores to support the developer in writing distributed applications. In the post, we discussed a sample application that processes documents and extracts fields with an LLM (gpt-4o structured extraction). Two services, upload and process, communicate via the pub/sub pattern.

In that post, we used a pub/sub broker built-in to Catalyst. Using the built-in broker makes it extremely easy to get started. You simply create the service and topic subscription and write code to wire it all up using the Dapr APIs.

Catalyst built-in pub/sub service

But what if you want to use your own broker? Read on to learn how that works.

Using Azure Service Bus as the broker

To use Azure Service Bus, simply deploy an instance in a region of your choice. Ensure you use the standard tier because you need topics, not queues:

Azure Service Bus Standard Tier deployed in Sweden; public endpoint

With Service Bus deployed, we can now tell Catalyst about it. You do so in Components in the Catalyst portal:

Creating an Azure Service Bus component

Simply click Create Component to start a wizard. After completion of the wizard, your component will appear in the list. Above, at the bottom, a component with Azure Service Bus as the target is in the list.

The wizard itself is fairly straightforward. The first screen is shown below:

Component wizard

Above, in the first step, I clicked Pub/Sub and selected Azure Service Bus Topics. As you can see, several other pub/sub brokers are supported. The above list is not complete.

In the next steps, the following is set:

  • Assign access: configure the services that can access this component; in my case, that is the upload and process service
  • Authentication profile: decide how to authenticate to Azure Service Bus; I used a connection string
  • Configure component: set the component name and properties such as timeouts. These properties are specific to Service Bus. I only set the name and left the properties at their default.

That’s it. You now have defined a component that can be used by your applications. When you click the component, you can also inspect its YAML definition:

YAML representation of the component

You can use these YAML files from the diagrid CLI to create components. In the CLI they are called connections but it’s essentially the same from what I can tell at this point:

Listing connections

Showing the call graph

With Catalyst, all activity is logged and can be used to visualize a call graph like the one below:

Call Graph

Above, I clicked on the subscription that delivers messages to the process service. The messages come from our Azure pub/sub broker.

Note: you can also see the older pub/sub Catalyst broker in the call graph. It will be removed from the call graph some time after it is not used anymore.

Creating a subscription

A subscription to an Azure Service Bus topic looks the same as a subscription to the built-in Pub/Sub broker:

Subscription to topic invoices

The only difference with the previous blog post is the component. It’s the one we just created. The /process handler in your code will stay the same.

Code changes

The code from the previous post does not have to change a lot. That code uses an environment variable, PUBSUB_NAME, that needs to be set to pubsub-azure now. That’s it. The Dapr SDK code is unchanged:

with DaprClient() as d:
    try:
        result = d.publish_event(
            pubsub_name=pubsub_name,
            topic_name=topic_name,
            data=invoice.model_dump_json(),
            data_content_type='application/json',
        )
        logging.info('Publish Successful. Invoice published: %s' %
                        invoice.path)
        logging.info(f"Invoice model: {invoice.model_dump()}")
        return True
    except grpc.RpcError as err:
        logging.error(f"Failed to publish invoice: {err}")
        return False

Conclusion

Instead of using the default Catalyst pub/sub broker, we switched the underlying broker to a broker of our choice. This is just configuration. You code, besides maybe an environment variable, does not need to change.

In this post, we only changed the pub/sub broker. You can also easily change the underlying state store to Azure Blob Storage or Azure Cosmos DB.

Writing an multi-service document extractor with the help of Diagrid’s Catalyst

Many enterprises have systems in place that take documents, possibly handwritten, that contain data that needs to be extracted. In this post, we will create an application that can extract data from documents that you upload. We will make use of an LLM, in this case gpt-4o. We will use model version 2024-08-06 and its new structured output capabilities. Other LLMs can be used as well.

The core of the application is illustrated in the diagram below. The application uses more services than in the diagram. We will get to them later in this post.

Application Diagram

Note: the LLM-based extraction logic in this project is pretty basic. In production, you need to do quite a bit more to get the extraction just right.

The flow of the application is as follows:

  • A user or process submits a document to the upload service. This can be a pdf but other formats are supported as well.
  • In addition to the document, a template is specified by name. A template contains the fields to extract, together with their type (str, bool, float). For example: customer_name (str), invoice_total (float).
  • The upload service uploads the document to an Azure Storage account using a unique filename and preserves the extension.
  • The upload service publishes a message to a topic on a pub/sub message broker. The message contains data such as the document url and the name of the template.
  • The process service subscribes to the topic on the message broker and retrieves the message.
  • It downloads the file from the storage account and sends it to Azure Document Intelligence to convert it to plain text.
  • Using a configurable extractor, an LLM is used to extract the fields in the template from the document text. The sample code contains an OpenAI and a Groq extractor.
  • The extracted fields are written to a configurable output handler. The sample code contains a CSV and JSONL handler.

In addition to a pub-sub broker, templates are stored in a state store. The upload service is the only service that interfaces with the state store. It provides an HTTP method that the process service can use to retrieve a template from the state store.

To implement pub-sub, the state store and method invocations, we will use Diagrid’s Catalyst instead of doing this all by ourselves.

What is Catalyst?

If you are familiar with Dapr, the distributed application runtime, Catalyst will be easy to understand. Catalyst provides you with a set of APIs, hosted in the cloud and compatible with Dapr to support you in building cloud-native, distributed applications. It provides several building blocks. The ones we use are below:

  • request/reply: to support synchronous communication between services in a secure fashion
  • publish/subscribe: to support asynchronous communication between services using either a broker provided by Catalyst or other supported brokers like Azure Service Bus
  • key/value: allows services to save state in a key/value store. You can use the state store provided by Catalyst or other supported state stores like Azure Cosmos DB or an Azure Storage Account

The key to these building blocks is that your code stays the same if you swap the underlying message broker or key/value store. For example, you can start with Catalyst’s key/value store and later switch to Cosmos DB very easily. There is no need to add Cosmos DB libraries to your code. Catalyst will handle the Cosmos DB connectivity for you.

Important: I am referring mainly to Azure services here but Catalyst (and Dapr) support many services in other clouds as well!

Note that you do not need to install Dapr on your local machine or on platforms like Kubernetes when you use Catalyst. You only use the Dapr SDKs in your code and, when configured to do so, the SDK will connect to the proper APIs hosted in the cloud by Catalyst. In fact, you do not even need an SDK because the APIs can be used with plain HTTP or GRPC. Of course, using an SDK makes things a lot easier.

If you want to learn more about Catalyst, take a look at the following playlist: https://www.youtube.com/watch?v=7D7rMwJEMsk&list=PLdl4NkEiMsJscq00RLRrN4ip_VpzvuwUC. Lots of good stuff in there!

By doing all of the above in Catalyst we have a standardised approach that remains the same no matter the service behind it. We also get implementation best practices, for example for pub/sub. In addition, we are also provided with golden metrics and a UI to see how the application performs. All API calls are logged to aid in troubleshooting.

Let’s now take a look at the inner loop development process!

Scaffolding a new project

You need to sign up for Catalyst first. At the time of writing, Catalyst was in preview and not supported for production workloads. When you have an account, you should install the Diagrid CLI. The CLI is not just for Catalyst. It’s also used with Diagrid’s other products, such as Conductor.

With the CLI, you can create a new project, create services and application identities. For this post, we will use the UI instead.

In the Catalyst dashboard, I created a project called idpdemo:

List of projects; use Create Project to create a new one

Next, for each of my services (upload and process), we create an App ID. Each App ID has its own token. Services use the token to authenticate to the Catalyst APIs and use the services they are allowed to use.

The process App ID has the following configuration (partial view):

process App ID API configuration

The process service interacts with both the Catalyst key/value store (kvstore) and the pub/sub broker (pubsub). These services need to be enabled as well. We will show that later. We can also see that the process service has a pub/sub subscription called process-consumer. Via that subscription, we have pub/sub messages delivered to the process service whenever the upload service sends a message to the pub/sub topic.

In Diagrid Services, you can click on the pub/sub and key/value store to see what is going on. For example, in the pub/sub service you can see the topics, the subscribers to these topics and the message count.

pub/sub topics

In Connections, you can see your services (represented by App ID upload and process) and their scope. In this case, all App IDs have access to all services. That can easily be changed:

changing the scope: access by App IDs to the pubsub service; default All

Now that we have some understanding of App IDs, Diagrid services and connections, we can take a look at how to connect to Catalyst from code.

Important: in this post we only look at using request/reply, Diagrid pub/sub and key/value. Catalyst also supports workflow and bindings but they are not used in this post.

Connecting your code

All code is available on GitHub: https://github.com/gbaeke/catalyst

The upload service needs to connect to both the pub/sub broker and key/value store:

  • Whenever a document is uploaded, it is uploaded to Azure Storage. When that succeeds, a message is put on the broker with the path of the file and a template name.
  • Templates are created and validated by the upload service so that you can only upload files with a template that exists. Templates are written and read in the key/value store.

Before we write code, we need to provide the Dapr SDK for Python (we’ll only use the Python SDK here) the necessary connection information. It needs to know it should not connect to a Dapr sidecar but to Catalyst. You set these via environment variables:

These environment variables are automatically picked up and used by SDK to interact with the Catalyst APIs. The following code can be used to put a message on the pub/sub broker:

with DaprClient() as d:
    try:
        result = d.publish_event(
            pubsub_name=pubsub_name,
            topic_name=topic_name,
            data=invoice.model_dump_json(),
            data_content_type='application/json',
        )
        logging.info('Publish Successful. Invoice published: %s' %
                        invoice.path)
        return True
    except grpc.RpcError as err:
        logging.error(f"Failed to publish invoice: {err}")
        return False

This is the same code that you would use with Dapr on your local machine or in Kubernetes or Azure Container Apps. Like with Dapr, you need to specify the pubsub name and topic. Here that is pubsub and invoices as previously shown in the Catalyst UI. The data in the message is an instance of a Pydantic class that holds the path and template but converted to JSON.

The code below shows how to write to the state store (key/value store):

with DaprClient() as d:
    try:
        d.save_state(store_name=kvstore_name,
                        key=template_name, value=str(invoice_data))
    except grpc.RpcError as err:
        logging.error(f"Dapr state store error: {err.details()}")
        raise HTTPException(status_code=500, detail="Failed to save template")

This is of course very similar. We use the save_state method here and provide the store name (kvstore), key (template name) and value.

Let’s now turn to the process service. It needs to:

  • be notified when there is a new message on the invoices topic
    • check and retrieve the template by calling a method on the upload service

We only use two building blocks here: pub/sub and request/reply. The process service does not interact directly with the state store.

To receive a message, Catalyst needs a handler to call. In the pub/sub subscription, the handler (default route to be correct) is configured to be /process:

Configuration of default route on subscription

Our code that implements the handler is as follows (FastAPI):

@app.post('/process')  # called by pub/sub when a new invoice is uploaded
async def consume_orders(event: CloudEvent):
    # your code here

As you can see, when Catalyst calls the handler, it passes in a CloudEvent. The event has a data field that holds the path to our document and the template name. The CloudEvent type is defined as follows:

# pub/sub uses CloudEvent; Invoice above is the data
class CloudEvent(BaseModel):
    datacontenttype: str
    source: str
    topic: str
    pubsubname: str
    data: dict
    id: str
    specversion: str
    tracestate: str
    type: str
    traceid: str

In the handler, you simply extract the expected data and use it to process the event. In our case:

  • extract path and template from the data field
  • download the file from blob storage
  • send the file to Azure Document Intelligence to convert to text
  • extract the details from the document based on the template; if the template contains fields like customer_name and invoice_total, the LLM will try to extract that and return that content in JSON.
  • write the extracted values to JSON or CSV or any other output handler

Of course, we do need to extract the full template because we only have the template name. Let’s use the request/reply APIs to do that and call the template GET endpoint of the upload service via Catalyst:

def retrieve_template_from_kvstore(template_name: str):

    headers = {'dapr-app-id': invoke_target_appid, 'dapr-api-token': dapr_api_token,
               'content-type': 'application/json'}  
    try:
        result = requests.get(
            url='%s/template/%s' % (base_url, template_name),
            headers=headers
        )

        if result.ok:
            logging.info('Invocation successful with status code: %s' %
                         result.status_code)
            logging.info(f"Template retrieved: {result.json()}")
            return result.json()

    except Exception as e:
        logging.error(f"An error occurred while retrieving template from Dapr KV store: {str(e)}")
        return None

As an example, we use the HTTP API here instead of the Dapr invoke API. It might not be immediately clear but Catalyst is involved in this process and will have information and metrics about these calls:

Call Graph

The full line represents request/reply (invoke) from process to upload as just explained. The dotted line represents pub/sub traffic where upload creates messages to be consumed by process.

Running the app

You can easily run your application locally using the Diagrid Dev CLI. Ensure you are logged in by running diagrid login. In the preview, with only one project, the default project should already be that one. Then simply run diagrid dev scaffold to generate a yaml file.

In my case, after some modification, my dev-{project-name}.yaml file looked like below:

project: idpdemo
apps:
- appId: process
  disabled: true
  appPort: 8001
  env:
    DAPR_API_TOKEN: ...
    DAPR_APP_ID: process
    DAPR_CLIENT_TIMEOUT_SECONDS: 10
    DAPR_GRPC_ENDPOINT: https://XYZ.api.cloud.diagrid.io:443
    DAPR_HTTP_ENDPOINT: https://XYZ.api.cloud.diagrid.io
    OTHER ENV VARS HERE

  workDir: process
  command: ["python", "app.py"]
- appId: upload
  appPort: 8000
  env:
    ... similar
  workDir: upload
  command: ["python", "app.py"]
appLogDestination: ""

Of course, the file was modified with environment variables required by the code. For example the storage account key, Azure Document Intelligence key, etc…

All you need to do now is to run diagrid dev start to start the apps. The result should be like below:

Local project startup

By default, your service logs are written to the console with a prefix for each service.

If you use the code in GitHub, check the README.md to configure the project and run the code properly. If you would rather run the code with Dapr on your local machine (e.g., if you do not have access to Catalyst) you can do that as well.

Conclusion

In this post, we have taken a look at Catalyst, a set of cloud APIs that help you to write distributed applications in a standard and secure fashion. These APIs are compatible with Dapr, a toolkit that has already gained quite some traction in the community. With Catalyst, we quickly built an application that can be used as a starter to implement an asynchronous LLM-based document extraction pipeline. I did not have to worry too much about pub/sub and key/value services because that’s all part of Catalyst.

What will you build with Catalyst?

Working with Recipes and Gateways in Microsoft’s Radius

In a previous post, we looked at the basics of deploying a multi-container app that uses Dapr with Radius. In this post, we will add two things:

  • a recipe that deploys Redis
  • a gateway: to make the app available to the outside world

Find the full code and app.bicep in the following branch: https://github.com/gbaeke/raddemo/tree/radius-step1.

Recipes

When a developer chooses a resource they would like to use in their app, like a database or queue, that type of resource needs to be deployed somehow. In my sample app, the api saves data to Redis.

From an operator point of view, and possibly depending on the environment, Redis needs to be deployed and configured properly. For instance in dev, you could opt for Redis in a container without a password. In production, you could go for Azure Redis Cache instead with TLS and authentication.

This is where recipes come in. They deploy the needed resources and provide the proper connections to allow applications to connect. Let’s look at a recipe that deploys Redis in Kubernetes:

resource redis 'Applications.Datastores/redisCaches@2023-10-01-preview' = {
  name: 'redis'
  properties: {
    application: app.id
    environment: environment
  }
}

Note: you can get a list of recipes with rad recipe list; next to Redis, there are recipes for sqlDatabases, rabbitMQQueues and more; depending on how you initialised Radius, the recipe list might be empty

The above recipe deploys Redis as a container to the underlying Kubernetes cluster. The deployment is linked to an environment. Just like in the previous blog post, we just use the default environment. Working with environments and workspaces will be for another post.

In fact, this recipe does not specify the recipe explicitly. This means that the default recipe is used, which in this case is a Redis container in Kubernetes.

Note that the above recipe actually deploys the resource. It is quite possible that your Redis Cache is already deployed without a recipe. In that case, you can set resourceProvisioning to manual and set hostname, port and other properties manually, via secret integration or with references to another Bicep resource. For example:

resource redis 'Applications.Datastores/redisCaches@2023-10-01-preview' = {
  name: 'redis'
  properties: {
    environment: environment
    application:app.id
    resourceProvisioning: 'manual'
    resources: [{
      id: azureRedis.id
    }]
    username: 'myusername'
    host: azureRedis.properties.hostName
    port: azureRedis.properties.port
    secrets: {
      password: azureRedis.listKeys().primaryKey
    }
  }
}

Note: above, references are made to azureRedis, a symbolic name for a Bicep resource, implying that Azure Redis Cache is deployed from the same Bicep file but without a recipe

In either case (deployment or reference), when a connection from a container is made to this Redis resource, a number of environment variables are set inside the container. For example:

  • CONNECTION_CONNECTIONNAME_HOSTNAME
  • CONNECTION_CONNECTIONNAME_PORT

Connecting the api to Redis

To connect the api container to Redis, we use the following app.bicep (please read the previous article for the full context):

import radius as radius

@description('Specifies the environment for resources.')
param environment string

resource app 'Applications.Core/applications@2023-10-01-preview' = {
  name: 'raddemo'
  properties: {
    environment: environment
  }
}

resource redis 'Applications.Datastores/redisCaches@2023-10-01-preview' = {
  name: 'redis'
  properties: {
    application: app.id
    environment: environment
  }
}

resource ui 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'ui'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-ui:latest'
      ports: {
        web: {
          containerPort: 8001
        }
      }
      env: {
        DAPR_APP: api.name  // api name is the same as the Dapr app id here
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'ui'
      }
    ]
  }
}

resource api 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'api'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-api:latest'
      ports: {
        web: {
          containerPort: 8000
        }
      }
      env: {
          REDIS_HOST: redis.properties.host
          REDIS_PORT: string(redis.properties.port)
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'api'
        appPort: 8000
      }
    ]
    connections: {
      redis: {
        source: redis.id  // this creates environment variables in the container
      }
    }
  }
}

Note the connections array in the api resource. In that array, we added redis and we reference the redis recipe’s id.

Because our api expects the Redis host and port in environment variables different from the ones provided by the connection, we set the variables the api expects ourselves and reference the Redis recipe’s properties.

The environment variables in the api container set by the connection will be CONNECTION_REDIS_HOSTNAME etc… but we do not use them here because that would require a code change.

When you run this app with rad run app.bicep, Redis will be deployed. When the user submits a question via the ui, the logs will show that the Redis call succeeded:

api-c8686c8ff-bwf7l api INFO:root:Stored result for question Hello in Redis

redis-hjo6ha3uqagio-64949758b7-td7c8 redis-monitor 1698071908.381740 [0 10.244.0.24:59750] "SET" "Hello" "This is a fake result for question Hello"

Because rad run streams all logs, the redis-monitor logs are also shown. They clearly state a Redis SET operation was performed.

There is much more to say about recipes. You can even create your own recipes. They are just bicep (or Terraform) modules you publish to a registry. See authoring recipes for more information.

Adding a gateway

So far, we have accessed the ui of our application via port forwarding. The ui listens on port 8001 which is mapped to http://localhost:8001 by rad run. What if we want to make the application available to the outside world?

To make the ui available to the outside world, we can add the following to app.bicep:

resource gateway 'Applications.Core/gateways@2023-10-01-preview' = {
  name: 'gateway'
  properties: {
    application: app.id
    routes: [
      {
        path: '/'
        destination: 'http://ui:8001'
      }
    ]
  }
}

The above adds a gateway to our app and adds one route: http://ui:8001.

During deployment of the Radius control plane, Radius deployed Contour. Contour uses Envoy as the data plane and a service of type LoadBalancer makes the data plane available to the outside world.

In k9s, you should see the following pods in the Radius control plane namespace (radius-system):

Contour

You will also find the service of type LoadBalancer:

LoadBalancer with public IP address

When you create a gateway with Radius, it creates a Kubernetes resource of kind HTTPProxy with apiVersion projectcontour.io/v1 in the same namespace as your app. The spec of the resource refers to another HTTPProxy (ui here) and sets the fqdn (fully qualified domain name) to gateway.raddemo.4.175.112.144.nip.io.

nip.io is a service that resolves a name to the IP address in that name, in this case 4.175.122.144. That IP address is the IP address used by the Azure Load Balancer.

The HTTPProxy ui defines the service and port it routes to. Here that is a service called ui and port 8001.

gateway and ui Contour HTTPProxy resources

You can set your own fully qualified domain name if you wish, in addition to specifying a certificate to enable TLS.

The HTTPProxy resources instruct Contour to configure itself to accept traffic on the configured FQDN and forward it to the ui service.

The full Bicep code to deploy the containers, Redis and the gateway is below:

import radius as radius

@description('Specifies the environment for resources.')
param environment string

resource app 'Applications.Core/applications@2023-10-01-preview' = {
  name: 'raddemo'
  properties: {
    environment: environment
  }
}

resource redis 'Applications.Datastores/redisCaches@2023-10-01-preview' = {
  name: 'redis'
  properties: {
    application: app.id
    environment: environment
  }
}

resource gateway 'Applications.Core/gateways@2023-10-01-preview' = {
  name: 'gateway'
  properties: {
    application: app.id
    routes: [
      {
        path: '/'
        destination: 'http://ui:8001'
      }
    ]
  }
}


resource ui 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'ui'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-ui:latest'
      ports: {
        web: {
          containerPort: 8001
        }
      }
      env: {
        DAPR_APP: api.name  // api name is the same as the Dapr app id here
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'ui'
      }
    ]
  }
}

resource api 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'api'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-api:latest'
      ports: {
        web: {
          containerPort: 8000
        }
      }
      env: {
          REDIS_HOST: redis.properties.host
          REDIS_PORT: string(redis.properties.port)
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'api'
        appPort: 8000
      }
    ]
    connections: {
      redis: {
        source: redis.id  // this creates environment variables in the container
      }
    }
  }
}

To see the app’s URL, use rad app status.

Note: there is a discussion ongoing to use recipes instead of a pre-installed ingress controller like Contour. With recipes, you could install the ingress solution you prefer such as nginx ingress or any other solution.

Conclusion

In this post we added a Redis database and connected the api to Redis via a connection. We did not use the environment variables that the connection creates. Instead, we provided values for the Redis host name and port to the environment variables the api expects.

To make the application available via the built-in Contour ingress, we created a gateway resource that routes to the ui service on port 8001. The gateway creates a nip.io hostname but you can set the hostname to something different as long as that name resolves to the IP address of the Contour LoadBalancer service.

Giving Microsoft’s Radius a spin

Microsoft recently announced Radius. As stated in their inaugural blog post, it is “a tool to describe, deploy, and manage your entire application”. With Radius, you describe your application in a bicep file. This can include containers, databases, the connections between those and much more. Radius is an open-source solution started from within Microsoft. The name is somewhat confusing because of RADIUS, a network authentication solution developed in 1991!

Starting point: app running locally

Instead of talking about it, let’s start with an application that runs locally on a development workstation and uses Dapr:

The ui is a Flask app that presents a text area and a button. When the user clicks the button, the code that handles the event calls the api using Dapr invoke. If you do not know what Dapr is, have a look at docs.dapr.io. The api saves the user’s question and a fake response to Redis. If Redis cannot be found, the api will simply log it could not save the data. The response is returned to the ui.

To run the application with Dapr on a development machine, I use a dapr.yaml file in combination with dapr run -f . See multi-app run for more details.

Here’s the yaml file:

version: 1
apps:
  - appID: ui
    appDirPath: ./ui
    appPort: 8001
    daprHTTPPort: 3510
    env:
      DAPR_APP: api
    command: ["python3","app.py"]
  - appID: api
    appDirPath: ./api
    appPort: 8000
    daprHTTPPort: 3511
    env:
      REDIS_HOST: localhost
      REDIS_PORT: 6379
      REDIS_DB: 0
    command: ["python3","app.py"]

Note that the api needs a couple of environment variables to find the Redis instance. The ui needs one environment variable DAPR_APP that holds the Dapr appId of the api. The Dapr invoke call needs this appId in order to find the api on the network.

In Python, the Dapr invoke call looks like this:

with DaprClient() as d:
        log.info(f"Making call to {dapr_app}")
        resp = d.invoke_method(dapr_app, 'generate', data=bytes_data,
                                 http_verb='POST', content_type='application/json')
        log.info(f"Response from API: {resp}")

The app runs fine locally if you have Python and the dependencies as listed in both the ui’s and api’s requirements.txt file. Let’s try to deploy the app with Radius.

Deploying the app with Radius

Before we can deploy the app with Radius, you need to install a couple of things:

  • rad CLI: I installed the CLI on MacOS; see the installation instructions for more details
  • VS Code extension: Radius uses a forked version of Bicep that is older than the current version of Bicep. The two will eventually converge but for now, you need to disable the official Bicep extension in VS Code and install the Radius Bicep extension. This is needed to support code like import radius as radius, which is not supported in the current version of Bicep.
  • Kubernetes cluster: Radius uses Kubernetes and requires the installation of the Radius control plane on that cluster. I deployed a test & dev AKS cluster in Azure and ensured it was set as my current context. Use kubectl config current-context to check that.
  • Install Dapr: our app uses Dapr and Radius supports it; however, Dapr needs to be manually installed on the cluster; if you have Dapr on your local machine, run dapr init -k to install it on Kubernetes

Now you can clone my raddemo repo. Use git clone https://github.com/gbaeke/raddemo.git. In the raddemo folder, you will see two folders: api and ui. In the root folder, run the following command:

rad init

Select Yes to use the current folder.

Running rad init does the following:

  • Installs Radius to the cluster in the radius-system namespace
  • Creates a new environment and workspace (called default)
  • Sets up a local-dev recipe pack: recipes allow you to install resources your app needs like Redis, MySQL, etc…

After installation, this is the view on the radius-system Kubernetes namespace with k9s:

Pods in the radius-system namespace

There should also be a .rad folder with a rad.yaml file:

workspace:
  application: "raddemo"

The file defines a workspace with our application name raddemo. raddemo is the name of the folder where I ran rad init. You can have multiple workspaces defined with one selected as the default. For instance, you could have a dev and prod workspace where each workspace uses a different Kubernetes cluster and environment. The default could be set to dev but you can easily switch to prod using the rad CLI. Check this overview of workspaces for more information. I am going to work with just one workspace called default, which uses an environment called default. When you just run rad init, those are the defaults.

You also get a default app.bicep file:

import radius as radius
param application string

resource demo 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'demo'
  properties: {
    application: application
    container: {
      image: 'radius.azurecr.io/samples/demo:latest'
      ports: {
        web: {
          containerPort: 3000
        }
      }
    }
  }
}

This is deployable code. If you run rad run app.bicep, a Kubernetes pod will be deployed to your cluster, using the image above. Radius would also setup port forwarding to access the app on it’s containerPort (3000).

We will change this file to deploy the ui. We will remove the application parameter and define our own application. That application needs an environment which we will pass in via a parameter:

import radius as radius

@description('Specifies the environment for resources.')
param environment string

resource app 'Applications.Core/applications@2023-10-01-preview' = {
  name: 'raddemo'
  properties: {
    environment: environment
  }
}

resource ui 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'ui'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-ui:latest'
      ports: {
        web: {
          containerPort: 8001
        }
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'ui'
      }
    ]
  }
}

Above, we define the following:

  • a resource of type Applications.Core/applications: because applications run on Kubernetes, you can use a different namespace than the default and also set labels and annotations. All labels and annotations would be set on all resources belonging to the app, such as containers
  • the app resource needs an environment: the environment parameter is defined in the Bicep file and is set automatically by the rad CLI; it will match the environment used by your current workspace; environments can also have cloud credentials attached to deploy resources in Azure or AWS; we are not using that here
  • a resource of type Applications.Core/containers: this will create a pod in a Kubernetes namespace; the container belongs to the app we defined (application property) and uses the image gbaeke/ui-radius:latest on Docker Hub. Radius supports Dapr via extensions. The Dapr sidecar is added via these extensions with the app Id of ui.

In Kubernetes, this results in a pod with two containers: the ui container and the Dapr sidecar.

ui and Dapr sidecar

When you run rad run app.bicep, you should see the resources in namespace default-raddemo. The logs of all containers should stream to your console and local port 8001 should be mapped to the pod’s port 8001. http://localhost:8001 should show:

ui accessed via http://localhost:8001

We will end this post by also deploying the api. It also needs Dapr and we need to update the definition of the ui container by adding an environment variable:

import radius as radius

@description('Specifies the environment for resources.')
param environment string

resource app 'Applications.Core/applications@2023-10-01-preview' = {
  name: 'raddemo'
  properties: {
    environment: environment
  }
}

resource ui 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'ui'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-ui:latest'
      ports: {
        web: {
          containerPort: 8001
        }
      }
      env: {
        DAPR_APP: api.name  // api name is the same as the Dapr app id here
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'ui'
      }
    ]
  }
}

resource api 'Applications.Core/containers@2023-10-01-preview' = {
  name: 'api'
  properties: {
    application: app.id
    container: {
      image: 'gbaeke/radius-api:latest'
      ports: {
        web: {
          containerPort: 8000
        }
      }
    }
    extensions: [
      {
        kind: 'daprSidecar'
        appId: 'api'
        appPort: 8000
      }
    ]
  }
}

Above, we added the api container, enabled Dapr, and set the Dapr appId to api. In the ui, we set environment variable DAPR_APP to api.name. We can do this because the name of the api resource is the same as the appId. This also makes Radius deploy the api before the ui. Note that the api does not have Redis environment variables. It will default to finding Redis at localhost, which will fail. But that’s ok.

You now have two pods in your namespace:

Yes, there are three here but Redis will be added in a later post.

Note that instead of running rad run app.bicep, you can also run rad deploy app.bicep. The latter simply deploys the application. It does not forward ports or stream logs.

Summary

In this post, we touched on the basics of using Radius to deploy an application that uses Dapr. Under the hood, Radius uses Kubernetes to deploy container resources specified in the Bicep file. To run the application, simply run rad run app.bicep to deploy the app, stream all logs and set up port forwarding.

We barely scratched the surface here so in a next post, we will add Redis via a recipe, and make the application available publicly via a gateway. Stay tuned!

Enhancing Semantic Search with a Streamlit UI

In a previous blog post, we discussed two Python programs, upload_vectors.py and search_vectors.py. These programs were used to create and search vectors, respectively. The upload_vectors.py script created vectors from chunks of a larger text and stored them in Pinecone, while the search_vectors.py script enabled semantic search on the text. In this blog post, we will discuss how to create a user interface (UI) for these two programs using Streamlit.

🚀 I kickstarted the Streamlit app by handing over the text-based version to ChatGPT and asking it to work its magic ✨💻. Yes, it was that easy! Afterwards, I made several manual changes to make it look the way I wanted.

Pinecone, Vectors, Embeddings, and Semantic Search: What’s all that about?

Pinecone is a vector database service that allows for easy storage and retrieval of high-dimensional vectors. It is optimized for similarity search, which makes it a perfect fit for tasks like semantic search. Our script stores vectors in Pinecone by parsing an RSS feed, chunking the blog posts, and creating the vectors with OpenAI’s embedding APIs.

Vectors are mathematical representations of data in the form of an array of numbers. In our case, we use vectors to represent chunks of text retrieved from blog posts. These vectors are generated using a process called embedding, which is a way of representing complex data, like text, in a lower-dimensional space while preserving the essential information.

Semantic search is a type of search that goes beyond keyword matching to understand the meaning and context of the query. By using vector embeddings, we can compare the similarity between queries and stored texts to find the most relevant results. Pinecone does that search for us and simply returns a number of matching chunks (pieces of text).

What is Streamlit?

Streamlit is a Python library that makes it easy to create custom web apps for machine learning and data science projects. You can build interactive UIs with minimal code, allowing you to focus on the core logic of your application.

Here’s an example of creating an extremely simple Streamlit app:

import streamlit as st

st.title('Hello, Streamlit!')
st.write('This is a simple Streamlit app.')

This code would generate a web app with a title and a text output. You can also create more complex UIs with user input, like sliders, text inputs, and buttons.

Creating a Streamlit UI for Semantic Search

Now let’s examine the provided code for creating a Streamlit UI for the search_vectors.py program. The code can be broken down into the following sections:

  1. Import necessary libraries and check environment variables.
  2. Set up the tokenizer and define the tiktoken_len function.
  3. Create the UI elements, including the title, text input, dropdown, sliders, and buttons.
  4. Define the main search functionality that is triggered when the user clicks the “Search” button.

Here is the full code:

import os
import pinecone
import openai
import tiktoken
import streamlit as st

# check environment variables
if os.getenv('PINECONE_API_KEY') is None:
    st.error("PINECONE_API_KEY not set. Please set this environment variable and restart the app.")
if os.getenv('PINECONE_ENVIRONMENT') is None:
    st.error("PINECONE_ENVIRONMENT not set. Please set this environment variable and restart the app.")
if os.getenv('OPENAI_API_KEY') is None:
    st.error("OPENAI_API_KEY not set. Please set this environment variable and restart the app.")

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# create a title for the app
st.title("Search blog feed 🔎")

# create a text input for the user query
your_query = st.text_input("What would you like to know?")
model = st.selectbox("Model", ["gpt-3.5-turbo", "gpt-4"])

with st.expander("Options"):

    max_chunks = 5
    if model == "gpt-4":
        max_chunks = 15

    max_reply_tokens = 1250
    if model == "gpt-4":
        max_reply_tokens = 2000

    col1, col2 = st.columns(2)

    # model dropdown
    with col1:
        chunks = st.slider("Number of chunks", 1, max_chunks, 5)
        temperature = st.slider("Temperature", 0.0, 1.0, 0.0)

    with col2:
        reply_tokens = st.slider("Reply tokens", 750, max_reply_tokens, 750)
    

# create a submit button
if st.button("Search"):
    # get the Pinecone API key and environment
    pinecone_api = os.getenv('PINECONE_API_KEY')
    pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

    pinecone.init(api_key=pinecone_api, environment=pinecone_env)

    # set index
    index = pinecone.Index('blog-index')


    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        st.error(f"Error calling OpenAI Embedding API: {e}")
        st.stop()

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=chunks,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunk_texts = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunk_texts)

    # show urls of the chunks
    with st.expander("URLs", expanded=True):
        for url in urls:
            st.markdown(f"* {url}")
    

    with st.expander("Chunks"):
        for i, t in enumerate(chunk_texts):
            # remove newlines from chunk
            tokens = tiktoken_len(t)
            t = t.replace("\n", " ")
            st.write("Chunk ", i, "(Tokens: ", tokens, ") - ", t[:50] + "...")
    with st.spinner("Summarizing..."):
        try:
            prompt = f"""Answer the following query based on the context below ---: {your_query}
                                                        Do not answer beyond this context!
                                                        ---
                                                        {all_chunks}"""


            # openai chatgpt with article as context
            # chat api is cheaper than gpt: 0.002 / 1000 tokens
            response = openai.ChatCompletion.create(
                model=model,
                messages=[
                    { "role": "system", "content":  "You are a truthful assistant!" },
                    { "role": "user", "content": prompt }
                ],
                temperature=temperature,
                max_tokens=max_reply_tokens
            )

            st.markdown("### Answer:")
            st.write(response.choices[0]['message']['content'])

            with st.expander("More information"):
                st.write("Query: ", your_query)
                st.write("Full Response: ", response)

            with st.expander("Full Prompt"):
                st.write(prompt)

            st.balloons()
        except Exception as e:
            st.error(f"Error with OpenAI Completion: {e}")

A closer look

The code first imports the necessary libraries and checks if the required environment variables are set, displaying an error message if they are not. The libraries you need are in requirements.txt on GitHub. You can install them with:

pip3 install -r requirements.txt

ℹ️ I recommend using a Python virtual environment when you install these dependencies; see poetry (just one example)

The tiktoken_len function calculates the token length of a given text using the tokenizer. This is used to display the tokens of each chunk of text we set to the ChatCompletion API. Depending on the model, 4096 or 8192 tokens are supported.

The UI is built using Streamlit functions, such as st.title, st.text_input, st.selectbox, and st.columns. These functions create various UI elements that the user can interact with to input their query and set search parameters. If you look at the code, you will see how easy it is to add those elements.

With the UI elements, you can set:

  • the number of text chunks to return from Pinecone and to forward to the ChatCompletion API (using st.slider)
  • the number of tokens to reply with (using st.slider)
  • the model: gpt-3.5-turbo or gpt-4 (ensure you have access to the gpt-4 API)
  • the temperature (using st-slider)

The options are shown in two columns with st.columns.

The main search functionality is triggered when the user clicks the “Search” button. The code then vectorizes the query, searches for the most similar vectors in Pinecone, and displays the URLs and chunks found. Finally, the selected model is used to generate an answer based on the chunks found and the user’s query. Often, gpt-4 will provide the best answer. It seems to be able to better understand all the chunks of text thrown at it.

Running the code

To run the code you need the following:

  • A Pinecode API key and environment
  • An OpenAI API key

It is easiest to run the code with Docker. If you have it installed, run the following command:

docker run -p 8501:8501 -e OPENAI_API_KEY="YOURKEY" \
    -e PINECONE_API_KEY="YOURKEY" \
    -e PINECONE_ENVIRONMENT="YOURENV" gbaeke/blogsearch

The gbaeke/blogsearch image is available on Docker Hub. You can also build your own with the Dockerfile provided on GitHub.

After running the image, go to http://localhost:8501 and first use the Upload page to create your Pinecode index and store vectors in it. You can use my blog’s feed or any other feed. You can experiment with the chunk size and chunk overlap.

Upload to Pinecone

You can add multiple RSS feeds one-by-one as long as you turn off Recreate index before each new upload. After you have populated the index, use the Search page to start searching:

Searching

Above, we ask what we can do with Pinecone and let gpt-4 do the answering. The similarity search will search for 5 similar items and return them. We show the original URLs these results come from. In the Chunks section, you can see the original chunks because they are also in Pinecone as metadata. After the answer, you can find the full JSON returned by the ChatCompletion API and the full prompt we sent to that API.

Conclusion

In this blog post, we showed you how to create a Streamlit UI for the search_vectors.py script we talked about in a previous post. Streamlit allows you to easily build interactive web applications for your machine learning and data science projects. We also created a UI to upload posts to Pinecone. The full program allows you to add as much data as you want and query that data with semantic search, summarized and synthesized by the GPT model of choice. Give it a try and let me know what you think.

Enhancing Blog Post Search with Chunk-based Embeddings and Pinecone

In this blog post, we’ll show you a different approach to searching through a large database of blog posts. The previous approach involved creating a single embedding for the entire article and storing it in a vector database. The new approach is much more effective, and in this post, we’ll explain why and how to implement it.

The new approach involves the following steps:

  1. Chunk the article into pieces of about 400 tokens using LangChain
  2. Create an embedding for each chunk
  3. Store each embedding, along with its metadata such as the URL and the original text, in Pinecone
  4. Store the original text in Pinecone, but not indexed
  5. To search the blog posts, find the 5 best matching chunks and add them to the ChatCompletion prompt

We’ll explain each step in more detail below, but first, let’s start with a brief overview of the previous approach.

The previous approach used OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. The article was vectorized as a whole, and the resulting vector was stored in Pinecone. To search the blog posts, cosine similarity was used to find the closest matching article, and the contents of the article were retrieved using the Python requests library and the BeautifulSoup library. Finally, a prompt was created for the ChatCompletion API, including the retrieved article.

The problem with this approach was that the entire article was vectorized as one piece. This meant that if the article was long, the vector might not represent the article accurately, as it would be too general. Moreover, if the article was too long, the ChatCompletion API call might fail because too many tokens were used.

The new approach solves these problems by chunking the article into smaller pieces, creating an embedding for each chunk, and storing each embedding in Pinecone. This way, we have a much more accurate representation of the article, as each chunk represents a smaller, more specific part of the article. And because each chunk is smaller, there is less risk of using too many tokens in the ChatCompletion API call.

To implement the new approach, we’ll use LangChain to chunk the article into pieces of about 400 tokens. LangChain is a library aimed at assisting in the development of applications that use LLMs, or large language models.

Next, we’ll create an embedding for each chunk using OpenAI’s embeddings API. As before, we will use the text-embedding-ada-002 model. And once we have the embeddings, we’ll store each one, along with its metadata, in Pinecone. The key for each embedding will be a hash of the URL, combined with the chunk number.

The original text will also be stored in Pinecone, but not indexed, so that it can be retrieved later. With this approach, we do not need to retrieve a blog article from the web. Instead, we just get the text from Pinecone directly.

To search the blog posts, we’ll use cosine similarity to find the 5 best-matching chunks. The 5 best matching chunks will be added to the ChatCompletion prompt, allowing us to ask questions based on the article’s contents.

Uploading the embeddings

The code to upload the embeddings is shown below. You will need to set the following environment variables:

export OPENAI_API_KEY=your_openai_api_key
export PINECONE_API_KEY=your_pinecone_api_key
export PINECONE_ENVIRONMENT=your_pinecone_environment
import feedparser
import os
import pinecone
import openai
import requests
from bs4 import BeautifulSoup
from retrying import retry
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tiktoken
import hashlib

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')

# create the length function used by the RecursiveCharacterTextSplitter
def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def create_embedding(article):
    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    return embedding["data"][0]["embedding"]

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

if "blog-index" not in pinecone.list_indexes():
    print("Index does not exist. Creating...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})
else:
    print("Index already exists. Deleting...")
    pinecone.delete_index("blog-index")
    print("Creating new index...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://atomic-temporary-16150886.wpcomstaging.com/feed/'

# Parse the RSS feed with feedparser
print("Parsing RSS feed: ", url)
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

# create recursive text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=400,
    chunk_overlap=20,  # number of tokens overlap between chunks
    length_function=tiktoken_len,
    separators=['\n\n', '\n', ' ', '']
)

pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Create embeddings for entry ", i, " of ", entries, " (", entry.link, ")")

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # create chunks
    chunks = text_splitter.split_text(article)

    # create md5 hash of entry.link
    url = entry.link
    url_hash = hashlib.md5(url.encode("utf-8"))
    url_hash = url_hash.hexdigest()
        
    # create embeddings for each chunk
    for j, chunk in enumerate(chunks):
        print("\tCreating embedding for chunk ", j, " of ", len(chunks))
        vector = create_embedding(chunk)

        # concatenate hash and j
        hash_j = url_hash + str(j)

        # add vector to pinecone_vectors list
        print("\tAdding vector to pinecone_vectors list for chunk ", j, " of ", len(chunks))
        pinecone_vectors.append((hash_j, vector, {"url": entry.link, "chunk-id": j, "text": chunk}))

        # upsert every 100 vectors
        if len(pinecone_vectors) % 100 == 0:
            print("Upserting batch of 100 vectors...")
            upsert_response = index.upsert(vectors=pinecone_vectors)
            pinecone_vectors = []

# if there are any vectors left, upsert them
if len(pinecone_vectors) > 0:
    print("Upserting remaining vectors...")
    upsert_response = index.upsert(vectors=pinecone_vectors)
    pinecone_vectors = []

print("Vector upload complete.")

Searching for blog posts

The code below is used to search blog posts:

import os
import pinecone
import openai
import tiktoken

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index
index = pinecone.Index('blog-index')

while True:
    # set query
    your_query = input("\nWhat would you like to know? ")
    
    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        print("Error calling OpenAI Embedding API: ", e)
        continue

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=5,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunks = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunks)

    # print urls of the chunks
    print("URLs:\n\n", urls)

    # print the text number and first 50 characters of each text
    print("\nChunks:\n")
    for i, t in enumerate(chunks):
        print(f"\nChunk {i}: {t[:50]}...")

    try:
        # openai chatgpt with article as context
        # chat api is cheaper than gpt: 0.002 / 1000 tokens
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                { "role": "system", "content":  "You are a thruthful assistant!" },
                { "role": "user", "content": f"""Answer the following query based on the context below ---: {your_query}
                                                    Do not answer beyond this context!
                                                    ---
                                                    {all_chunks}""" }
            ],
            temperature=0,
            max_tokens=750
        )

        print(f"\n{response.choices[0]['message']['content']}")
    except Exception as e:
        print(f"Error with OpenAI Completion: {e}")

In Action

Below, we ask if Redis supports storing vectors and what version of Redis we need in Azure. The Pinecone vector search found 5 chunks, all from the same blog post (there is only one URL). The five chunks are combined and sent to ChatGPT, together with the original question. The response from the ChatCompletion API is clear!

Example question and response

Conclusion

In conclusion, the “chunked” approach to searching through a database of blog posts is much more effective and solves many of the problems associated with the previous approach. We hope you found this post helpful, and we encourage you to try out the new approach in your own projects!

Pinecone and OpenAI magic: A guide to finding your long lost blog posts with vectorized search and ChatGPT

Searching through a large database of blog posts can be a daunting task, especially if there are thousands of articles. However, using vectorized search and cosine similarity, you can quickly query your blog posts and retrieve the most relevant content.

In this blog post, we’ll show you how to query a list of blog posts (from this blog) using a combination of vectorized search with cosine similarity and OpenAI ChatCompletions. We’ll be using OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. We’ll also show you how to retrieve the contents of the article, create a prompt using the ChatCompletion API, and return the result to a web page.

ℹ️ Sample code is on GitHub: https://github.com/gbaeke/gpt-vectors

ℹ️ If you want an introduction to embeddings and cosine similarity, watch the video on YouTube by Part Time Larry.

Setting Up Pinecone

Before we can start querying our blog posts, we need to set up Pinecone. Pinecone is a vector database that makes it easy to store and query high-dimensional data. It’s perfect for our use case since we’ll be working with high-dimensional vectors.

ℹ️ Using a vector database is not strictly required. The GitHub repo contains app.py, which uses scikit-learn to create the vectors and perform a cosine similarity search. Many other approaches are possible. Pinecone just makes storing and querying the vectors super easy.

ℹ️ If you want more information about Pinecone and the concept of a vector database, watch this introduction video.

First, we’ll need to create an account with Pinecone and get the API key and environment name. In the Pinecone UI, you will find these as shown below. There will be a Show Key and Copy Key button in the Actions section next to the key.

Key and environment for Pinecone

Once we have an API key and the environment, we can use the Pinecone Python library to create and use indexes. Install the Pinecone library with pip install pinecone-client.

Although you can create a Pinecone index from code, we will create the index in the Pinecone portal. Go to Indexes and select Create Index. Create the index using cosine as metric and 1536 dimensions:

blog-index in Pinecone

The embedding model we will use to create the vectors, text-embedding-ada-002, outputs vectors with 1536 dimensions. For more info see OpenAI’s blog post of December 15, 2022.

To use the Pinecode index from code, look at the snippet below:

import pinecone

pinecone_api = "<your_api_key>"
pinecone_env = "<your_environment>"

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

index = pinecone.Index('blog-index')

We create an instance of the Index class with the name “blog-index” and store this in index. This index will be used to store our blog post vectors or to perform searches on.

Vectorizing Blog Posts with OpenAI’s Embeddings API

Next, we’ll need to vectorize our blog post articles. We’ll be using OpenAI’s embeddings API to do this. The embeddings API takes a piece of text and returns a high-dimensional vector representation of that text. Here’s an example of how to do that for one article or string:

import openai

openai.api_key = "<your_api_key>"

article = "Some text from a blog post"

vector = openai.Embedding.create(
    input=article,
    model="text-embedding-ada-002"
)["data"][0]["embedding"]

We create a vector representation of our blog post article by calling the Embedding class’s create method. We pass in the article text as input and the text-embedding-ada-002 model, which is a pre-trained language model that can generate high-quality embeddings.

Storing Vectors in Pinecone

Once we have the vector representations of our blog post articles, we can store them in Pinecone. Instead of storing vector per vector, we can use upsert to store a list of vectors. The code below uses the feed of this blog to grab the URLs for 50 posts, every post is vectorized and the vector is added to a Python list of tuples, as expected by the upsert method. The list is then added to Pinecone at once. The tuple that Pinecone expects is:

(id, vector, metadata dictionary)

e.g. (0, vector for post 1, {"url": url to post 1}

Here is the code that uploads the first 50 posts of baeke.info to Pinecone. You need to set the Pinecone key and environment and the OpenAI key as environment variables. The code uses feedparser to grab the blog feed, and BeatifulSoup to parse the retrieved HTML. The code serves as an example only. It is not very robust when it comes to error checking etc…

import feedparser
import os
import pinecone
import numpy as np
import openai
import requests
from bs4 import BeautifulSoup

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://atomic-temporary-16150886.wpcomstaging.com/feed/'

# Parse the RSS feed with feedparser
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

post_texts = []
pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Processing entry ", i, " of ", entries)

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    # print the embedding (length = 1536)
    vector = embedding["data"][0]["embedding"]

    # append tuple to pinecone_vectors list
    pinecone_vectors.append((str(i), vector, {"url": entry.link}))

# all vectors can be upserted to pinecode in one go
upsert_response = index.upsert(vectors=pinecone_vectors)

print("Vector upload complete.")

Querying Vectors with Pinecone

Now that we have stored our blog post vectors in Pinecone, we can start querying them. We’ll use cosine similarity to find the closest matching blog post. Here is some code that does just that:

query_vector = <vector representation of query>  # vector created with OpenAI as well

search_response = index.query(
    top_k=5,
    vector=query_vector,
    include_metadata=True
)

url = get_highest_score_url(search_response['matches'])

def get_highest_score_url(items):
    highest_score_item = max(items, key=lambda item: item["score"])

    if highest_score_item["score"] > 0.8:
        return highest_score_item["metadata"]['url']
    else:
        return ""

We create a vector representation of our query (you don’t see that here but it’s the same code used to vectorize the blog posts) and pass it to the query method of the Pinecone Index class. We set top_k=5 to retrieve the top 5 matching blog posts. We also set include_metadata=True to include the metadata associated with each vector in our response. That way, we also have the URL of the top 5 matching posts.

The query method returns a dictionary that contains a matches key. The matches value is a list of dictionaries, with each dictionary representing a matching blog post. The score key in each dictionary represents the cosine similarity score between the query vector and the blog post vector. We use the get_highest_score_url function to find the blog post with the highest cosine similarity score.

The function contains some code to only return the highest scoring URL if the score is > 0.8. It’s of course up to you to accept lower matching results. There is a potential for the vector query to deliver an article that’s not highly relevant which results in an irrelevant context for the OpenAI ChatCompletion API call we will do later.

Retrieving the Contents of the Blog Post

Once we have the URL of the closest matching blog post, we can retrieve the contents of the article using the Python requests library and the BeautifulSoup library.

import requests
from bs4 import BeautifulSoup

r = requests.get(url)
soup = BeautifulSoup(r.text, 'html.parser')

article = soup.find('div', {'class': 'entry-content'}).text

We send a GET request to the URL of the closest matching blog post and retrieve the HTML content. We use the BeautifulSoup library to parse the HTML and extract the contents of the <div> element with the class “entry-content”.

Creating a Prompt for the ChatCompletion API

Now that we have the contents of the blog post, we can create a prompt for the ChatCompletion API. The crucial part here is that our OpenAI query should include the blog post we just retrieved!

response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
        { "role": "system", "content": "You are a polite assistant" },
        { "role": "user", "content": "Based on the article below, answer the following question: " + your_query +
            "\nAnswer as follows:" +
            "\nHere is the answer directly from the article:" +
            "\nHere is the answer from other sources:" +
             "\n---\n" + article }
           
    ],
    temperature=0,
    max_tokens=200
)

response_text=f"\n{response.choices[0]['message']['content']}"

We use the ChatCompletion API with the gpt-3.5-turbo model to ask our question. This is the same as using ChatGPT on the web with that model. At this point in time, the GPT-4 model was not available yet.

Instead of one prompt, we send a number of dictionaries in a messages list. The first item in the list sets the system message. The second item is the actual user question. We ask to answer the question based on the blog post we stored in the article variable and we provide some instructions on how to answer. We add the contents of the article to our query.

If the article is long, you run the risk of using too many tokens. If that happens, the ChatCompletion call will fail. You can use the tiktoken library to count the tokens and prevent the call to happen in the first place. Or you can catch the exception and tell the user. In the above code, there is no error handling. We only include the core code that’s required.

Returning the Result to a Web Page

If you are running the search code in an HTTP handler as the result of the user typing a query in a web page, you can return the result to the caller:

return jsonify({
    'url': url,
    'response': response_text
})

The full example, including an HTML page and Flask code can be found on GitHub.

The result could look like this:

Query results in the closest URL using vectorized search and ChatGPT answering the question based on the contents the URL points at

Conclusion

Using vectorized search and cosine similarity, we can quickly query a database of blog posts and retrieve the most relevant post. By combining OpenAI’s embeddings API, Pinecone, and the ChatCompletion API, we can create a powerful tool for searching and retrieving blog post content using natural language.

Note that there are some potential issues as well. The code we show is merely a starting point:

  • Limitations of cosine similarity: it does not take into account all properties of the vectors, which can lead to misleading results
  • Prompt engineering: the prompt we use works but there might be prompts that just work better. Experimentation with different prompts is crucial!
  • Embeddings: OpenAI embeddings are trained on a large corpus of text, which may not be representative of the domain-specific language in the posts
  • Performance might not be sufficient if the size of the database grows large. For my blog, that’s not really an issue. 😀