PostgreSQL as message broker
How to implement a basic FIFO queue using the most popular SQL data store.
If you require a message broker to facilitate message exchange within your project, there are several options available to choose from in the present day. PostgreSQL is primarily known as a powerful open-source relational database management system (RDBMS) that excels in handling structured data. However, its versatility extends beyond traditional database functions. Surprisingly, PostgreSQL can also serve as a capable message broker, enabling efficient communication between various components of an application or multiple applications.
Leveraging PostgreSQL as a message broker provides an alternative to dedicated message broker software like RabbitMQ, Redis, or Kafka, offering certain advantages and convenience in specific use cases:
Autarky: You are not convinced that its time to broaden your stack. PostgreSQL is already in it.
Familiarity: Your team is already using SQL data stores for years.
Transactional Support: PostgreSQL ensures that message delivery and processing can be included within the same transaction as other database operations, maintaining atomicity.
Integration: You can easily integrate PostgreSQL with existing components in your ecosystem. It may not be so easy with other tools.
Understanding Message Brokers
Message brokers facilitate communication between software systems. Using them, you can decouple the "sender" and "receiver" applications allowing them to communicate asynchronously via messages. Typical capabilities of a message broker is message queuing and pub/sub (publish/subscribe). You can also provide message persistence on the broker if you need the messages to be available until the "receivers" are available to "consume" them.
Using an SQL data store we can develop a "sender" application with no trouble at all - we can simply insert a "message" as a temporary row in a table. It is the "receiver" application that may concern us because there is no mechanism to block the receiver until the arrival of a new message.
Following the article we will present how you can develop a simple FIFO (First In First Out) queue in python based on PostgreSQL.
The database schema
Comments:
This is an example with only 2 tables.
The tables contain an auto generated value for every message and 2 extra Ids.
The message's payload is stored in a json column.
The consumer will simply get the message from '
JsonMessageQueue
' and move it to 'JsonMessageArchive
' table.Because multiple consumers will run in parallel, we are saving the consumer's name in '
ConsumedBy
' column of 'JsonMessageArchive
' table.
The producer
Comments:
This is a quick way to produce messages and monitor the system's operation.
During the message insert transaction we are also 'NOTIFY' a specific channel (line 20). We will need this action later on the consumer.
All the data (ids, payload) are sample constant values.
Any error handling is omitted for the sake of simplicity.
The sql 'INSERT' statement is parameterized.
The consumer
We present 2 versions that differ only in the way they check if we have a new message waiting in the "queue":
The "waiting consumer" example
Comments:
We are using python '
multiprocessing
' to scale out consumer. If you need more speed you can simply add a new name on consumer list (line 46).The consumer checks for messages in the 'Queue' table by running a 'SELECT' statement with '
FOR UPDATE SKIP LOCKED
' parameters. With 'FOR UPDATE
' it locks the selected row so it cannot be selected by other consumers, running in parallel. With 'SKIP LOCKED
' it can skip any locked row and continue to the next available row. This is crucial in order to real 'in parallel' execution.If there is a message, it is 'consumed'. If not, the consumer waits for a time period (1 second in our example) before check again.
Any error handling is omitted for the sake of simplicity.
The sql 'DELETE', 'INSERT' statements are parameterized.
The "listening consumer" example
Comments:
The logic is the same. It only differs in the waiting phase.
Instead of waiting, this version facilitates the "NOTIFY/LISTEN" capability of PostgreSQL.
The consumer is waiting for a notification on a specific channel before run the 'SELECT' statement, avoiding needless select executions.
Check
psycopg2
library's documentation here on how you can asynchronously interact with PostgreSQL.We could use the facilities offered by PostgreSQL commands
LISTEN
andNOTIFY
to implement the whole queue logic. However preserving the transactional support(atomicity), even while generating or consuming messages, is one of the implementation's purposes.
Conclusion
PostgreSQL's capability as a message broker offers a compelling solution for specific use cases, especially when simplicity, integration, and moderate messaging loads are crucial factors. While it may not match the performance and specialized features of dedicated message brokers in certain scenarios, leveraging PostgreSQL for messaging can be a pragmatic choice within the right context, particularly for applications that heavily rely on PostgreSQL's ecosystem and strengths. Understanding the trade-offs and considering specific use case requirements are key in determining whether PostgreSQL as a message broker aligns with the overall architecture and goals of an application or system.
- Posted by Kostas Koutsogiannopoulos · Dec. 24, 2023