In a previous post, I wrote about Diagrid Catalyst. Catalyst provides services like pub/sub and state stores to support the developer in writing distributed applications. In the post, we discussed a sample application that processes documents and extracts fields with an LLM (gpt-4o structured extraction). Two services, upload and process, communicate via the pub/sub pattern.
In that post, we used a pub/sub broker built-in to Catalyst. Using the built-in broker makes it extremely easy to get started. You simply create the service and topic subscription and write code to wire it all up using the Dapr APIs.
Catalyst built-in pub/sub service
But what if you want to use your own broker? Read on to learn how that works.
Using Azure Service Bus as the broker
To use Azure Service Bus, simply deploy an instance in a region of your choice. Ensure you use the standard tier because you need topics, not queues:
Azure Service Bus Standard Tier deployed in Sweden; public endpoint
With Service Bus deployed, we can now tell Catalyst about it. You do so in Components in the Catalyst portal:
Creating an Azure Service Bus component
Simply click Create Component to start a wizard. After completion of the wizard, your component will appear in the list. Above, at the bottom, a component with Azure Service Bus as the target is in the list.
The wizard itself is fairly straightforward. The first screen is shown below:
Component wizard
Above, in the first step, I clicked Pub/Sub and selected Azure Service Bus Topics. As you can see, several other pub/sub brokers are supported. The above list is not complete.
In the next steps, the following is set:
Assign access: configure the services that can access this component; in my case, that is the upload and process service
Authentication profile: decide how to authenticate to Azure Service Bus; I used a connection string
Configure component: set the component name and properties such as timeouts. These properties are specific to Service Bus. I only set the name and left the properties at their default.
That’s it. You now have defined a component that can be used by your applications. When you click the component, you can also inspect its YAML definition:
YAML representation of the component
You can use these YAML files from the diagrid CLI to create components. In the CLI they are called connections but it’s essentially the same from what I can tell at this point:
Listing connections
Showing the call graph
With Catalyst, all activity is logged and can be used to visualize a call graph like the one below:
Call Graph
Above, I clicked on the subscription that delivers messages to the process service. The messages come from our Azure pub/sub broker.
Note: you can also see the older pub/sub Catalyst broker in the call graph. It will be removed from the call graph some time after it is not used anymore.
Creating a subscription
A subscription to an Azure Service Bus topic looks the same as a subscription to the built-in Pub/Sub broker:
Subscription to topic invoices
The only difference with the previous blog post is the component. It’s the one we just created. The /process handler in your code will stay the same.
Code changes
The code from the previous post does not have to change a lot. That code uses an environment variable, PUBSUB_NAME, that needs to be set to pubsub-azure now. That’s it. The Dapr SDK code is unchanged:
with DaprClient() as d:
try:
result = d.publish_event(
pubsub_name=pubsub_name,
topic_name=topic_name,
data=invoice.model_dump_json(),
data_content_type='application/json',
)
logging.info('Publish Successful. Invoice published: %s' %
invoice.path)
logging.info(f"Invoice model: {invoice.model_dump()}")
return True
except grpc.RpcError as err:
logging.error(f"Failed to publish invoice: {err}")
return False
Conclusion
Instead of using the default Catalyst pub/sub broker, we switched the underlying broker to a broker of our choice. This is just configuration. You code, besides maybe an environment variable, does not need to change.
In this post, we only changed the pub/sub broker. You can also easily change the underlying state store to Azure Blob Storage or Azure Cosmos DB.
Many enterprises have systems in place that take documents, possibly handwritten, that contain data that needs to be extracted. In this post, we will create an application that can extract data from documents that you upload. We will make use of an LLM, in this case gpt-4o. We will use model version 2024-08-06 and its new structured output capabilities. Other LLMs can be used as well.
The core of the application is illustrated in the diagram below. The application uses more services than in the diagram. We will get to them later in this post.
Application Diagram
Note: the LLM-based extraction logic in this project is pretty basic. In production, you need to do quite a bit more to get the extraction just right.
The flow of the application is as follows:
A user or process submits a document to the upload service. This can be a pdf but other formats are supported as well.
In addition to the document, a template is specified by name. A template contains the fields to extract, together with their type (str, bool, float). For example: customer_name (str), invoice_total (float).
The upload service uploads the document to an Azure Storage account using a unique filename and preserves the extension.
The upload service publishes a message to a topic on a pub/sub message broker. The message contains data such as the document url and the name of the template.
The process service subscribes to the topic on the message broker and retrieves the message.
It downloads the file from the storage account and sends it to Azure Document Intelligence to convert it to plain text.
Using a configurable extractor, an LLM is used to extract the fields in the template from the document text. The sample code contains an OpenAI and a Groq extractor.
The extracted fields are written to a configurable output handler. The sample code contains a CSV and JSONL handler.
In addition to a pub-sub broker, templates are stored in a state store. The upload service is the only service that interfaces with the state store. It provides an HTTP method that the process service can use to retrieve a template from the state store.
To implement pub-sub, the state store and method invocations, we will use Diagrid’s Catalyst instead of doing this all by ourselves.
What is Catalyst?
If you are familiar with Dapr, the distributed application runtime, Catalyst will be easy to understand. Catalyst provides you with a set of APIs, hosted in the cloud and compatible with Dapr to support you in building cloud-native, distributed applications. It provides several building blocks. The ones we use are below:
request/reply: to support synchronous communication between services in a secure fashion
publish/subscribe: to support asynchronous communication between services using either a broker provided by Catalyst or other supported brokers like Azure Service Bus
key/value: allows services to save state in a key/value store. You can use the state store provided by Catalyst or other supported state stores like Azure Cosmos DB or an Azure Storage Account
The key to these building blocks is that your code stays the same if you swap the underlying message broker or key/value store. For example, you can start with Catalyst’s key/value store and later switch to Cosmos DB very easily. There is no need to add Cosmos DB libraries to your code. Catalyst will handle the Cosmos DB connectivity for you.
Important: I am referring mainly to Azure services here but Catalyst (and Dapr) support many services in other clouds as well!
Note that you do not need to install Dapr on your local machine or on platforms like Kubernetes when you use Catalyst. You only use the Dapr SDKs in your code and, when configured to do so, the SDK will connect to the proper APIs hosted in the cloud by Catalyst. In fact, you do not even need an SDK because the APIs can be used with plain HTTP or GRPC. Of course, using an SDK makes things a lot easier.
By doing all of the above in Catalyst we have a standardised approach that remains the same no matter the service behind it. We also get implementation best practices, for example for pub/sub. In addition, we are also provided with golden metrics and a UI to see how the application performs. All API calls are logged to aid in troubleshooting.
Let’s now take a look at the inner loop development process!
Scaffolding a new project
You need to sign up for Catalyst first. At the time of writing, Catalyst was in preview and not supported for production workloads. When you have an account, you should install the Diagrid CLI. The CLI is not just for Catalyst. It’s also used with Diagrid’s other products, such as Conductor.
With the CLI, you can create a new project, create services and application identities. For this post, we will use the UI instead.
In the Catalyst dashboard, I created a project called idpdemo:
List of projects; use Create Project to create a new one
Next, for each of my services (upload and process), we create an App ID. Each App ID has its own token. Services use the token to authenticate to the Catalyst APIs and use the services they are allowed to use.
The process App ID has the following configuration (partial view):
process App ID API configuration
The process service interacts with both the Catalyst key/value store (kvstore) and the pub/sub broker (pubsub). These services need to be enabled as well. We will show that later. We can also see that the process service has a pub/sub subscription called process-consumer. Via that subscription, we have pub/sub messages delivered to the process service whenever the upload service sends a message to the pub/sub topic.
In Diagrid Services, you can click on the pub/sub and key/value store to see what is going on. For example, in the pub/sub service you can see the topics, the subscribers to these topics and the message count.
pub/sub topics
In Connections, you can see your services (represented by App ID upload and process) and their scope. In this case, all App IDs have access to all services. That can easily be changed:
changing the scope: access by App IDs to the pubsub service; default All
Now that we have some understanding of App IDs, Diagrid services and connections, we can take a look at how to connect to Catalyst from code.
Important: in this post we only look at using request/reply, Diagrid pub/sub and key/value. Catalyst also supports workflow and bindings but they are not used in this post.
The upload service needs to connect to both the pub/sub broker and key/value store:
Whenever a document is uploaded, it is uploaded to Azure Storage. When that succeeds, a message is put on the broker with the path of the file and a template name.
Templates are created and validated by the upload service so that you can only upload files with a template that exists. Templates are written and read in the key/value store.
Before we write code, we need to provide the Dapr SDK for Python (we’ll only use the Python SDK here) the necessary connection information. It needs to know it should not connect to a Dapr sidecar but to Catalyst. You set these via environment variables:
These environment variables are automatically picked up and used by SDK to interact with the Catalyst APIs. The following code can be used to put a message on the pub/sub broker:
with DaprClient() as d:
try:
result = d.publish_event(
pubsub_name=pubsub_name,
topic_name=topic_name,
data=invoice.model_dump_json(),
data_content_type='application/json',
)
logging.info('Publish Successful. Invoice published: %s' %
invoice.path)
return True
except grpc.RpcError as err:
logging.error(f"Failed to publish invoice: {err}")
return False
This is the same code that you would use with Dapr on your local machine or in Kubernetes or Azure Container Apps. Like with Dapr, you need to specify the pubsub name and topic. Here that is pubsub and invoices as previously shown in the Catalyst UI. The data in the message is an instance of a Pydantic class that holds the path and template but converted to JSON.
The code below shows how to write to the state store (key/value store):
with DaprClient() as d:
try:
d.save_state(store_name=kvstore_name,
key=template_name, value=str(invoice_data))
except grpc.RpcError as err:
logging.error(f"Dapr state store error: {err.details()}")
raise HTTPException(status_code=500, detail="Failed to save template")
This is of course very similar. We use the save_state method here and provide the store name (kvstore), key (template name) and value.
Let’s now turn to the process service. It needs to:
be notified when there is a new message on the invoices topic
check and retrieve the template by calling a method on the upload service
We only use two building blocks here: pub/sub and request/reply. The process service does not interact directly with the state store.
To receive a message, Catalyst needs a handler to call. In the pub/sub subscription, the handler (default route to be correct) is configured to be /process:
Configuration of default route on subscription
Our code that implements the handler is as follows (FastAPI):
@app.post('/process') # called by pub/sub when a new invoice is uploaded
async def consume_orders(event: CloudEvent):
# your code here
As you can see, when Catalyst calls the handler, it passes in a CloudEvent. The event has a data field that holds the path to our document and the template name. The CloudEvent type is defined as follows:
# pub/sub uses CloudEvent; Invoice above is the data
class CloudEvent(BaseModel):
datacontenttype: str
source: str
topic: str
pubsubname: str
data: dict
id: str
specversion: str
tracestate: str
type: str
traceid: str
In the handler, you simply extract the expected data and use it to process the event. In our case:
extract path and template from the data field
download the file from blob storage
send the file to Azure Document Intelligence to convert to text
extract the details from the document based on the template; if the template contains fields like customer_name and invoice_total, the LLM will try to extract that and return that content in JSON.
write the extracted values to JSON or CSV or any other output handler
Of course, we do need to extract the full template because we only have the template name. Let’s use the request/reply APIs to do that and call the template GET endpoint of the upload service via Catalyst:
def retrieve_template_from_kvstore(template_name: str):
headers = {'dapr-app-id': invoke_target_appid, 'dapr-api-token': dapr_api_token,
'content-type': 'application/json'}
try:
result = requests.get(
url='%s/template/%s' % (base_url, template_name),
headers=headers
)
if result.ok:
logging.info('Invocation successful with status code: %s' %
result.status_code)
logging.info(f"Template retrieved: {result.json()}")
return result.json()
except Exception as e:
logging.error(f"An error occurred while retrieving template from Dapr KV store: {str(e)}")
return None
As an example, we use the HTTP API here instead of the Dapr invoke API. It might not be immediately clear but Catalyst is involved in this process and will have information and metrics about these calls:
Call Graph
The full line represents request/reply (invoke) from process to upload as just explained. The dotted line represents pub/sub traffic where upload creates messages to be consumed by process.
Running the app
You can easily run your application locally using the Diagrid Dev CLI. Ensure you are logged in by running diagrid login. In the preview, with only one project, the default project should already be that one. Then simply run diagrid dev scaffold to generate a yaml file.
In my case, after some modification, my dev-{project-name}.yaml file looked like below:
project: idpdemo
apps:
- appId: process
disabled: true
appPort: 8001
env:
DAPR_API_TOKEN: ...
DAPR_APP_ID: process
DAPR_CLIENT_TIMEOUT_SECONDS: 10
DAPR_GRPC_ENDPOINT: https://XYZ.api.cloud.diagrid.io:443
DAPR_HTTP_ENDPOINT: https://XYZ.api.cloud.diagrid.io
OTHER ENV VARS HERE
workDir: process
command: ["python", "app.py"]
- appId: upload
appPort: 8000
env:
... similar
workDir: upload
command: ["python", "app.py"]
appLogDestination: ""
Of course, the file was modified with environment variables required by the code. For example the storage account key, Azure Document Intelligence key, etc…
All you need to do now is to run diagrid dev start to start the apps. The result should be like below:
Local project startup
By default, your service logs are written to the console with a prefix for each service.
If you use the code in GitHub, check the README.md to configure the project and run the code properly. If you would rather run the code with Dapr on your local machine (e.g., if you do not have access to Catalyst) you can do that as well.
Conclusion
In this post, we have taken a look at Catalyst, a set of cloud APIs that help you to write distributed applications in a standard and secure fashion. These APIs are compatible with Dapr, a toolkit that has already gained quite some traction in the community. With Catalyst, we quickly built an application that can be used as a starter to implement an asynchronous LLM-based document extraction pipeline. I did not have to worry too much about pub/sub and key/value services because that’s all part of Catalyst.