Streaming Postgres Changes
By Mark Fletcher
- 5 minutes read - 854 wordsGroups.io is based on a series of Postgres databases, which has worked very well for us. There are some scenarios, however, where it would be advantageous to have our data in a streaming log-based system, like Apache Kafka. One specific scenario involves ElasticSearch, which we use to provide full text search over group archives. Right now, when a message is added or updated, we send the update to Postgres, then we send the update to the ElasticSearch cluster. If we want to re-index our archives, we have to prevent new messages from coming in as well as changes to existing messages, while we do a table scan of the archives into a new ES cluster. This is non-optimal. A better pattern would be as follows:
- Additions/Changes/Deletions from Postgres get streamed into a log-based system.
- A reader is constantly consuming those changes and updating the ES cluster.
- When we want to re-index the site, we start a new reader which consumes the log from the beginning, creating a new ES index.
- When the new ES index is up to date, simply switch the site over to it, stop the old reader and delete the old ES index.
There are other scenarios where having a log-based representation of the data would be useful as well. With this in mind, I’ve been researching ways to stream Postgres changes. These are my notes about what I’ve learned so far. They may be incomplete and contain errors. Corrections are appreciated. Postgres introduced logical replication in version 9.4. With the addition of a plugin, it is now possible to stream changes from a Postgres database, in whatever format you prefer. There are a couple of projects that use this to stream Postgres into Kafka, like Bottled Water (no longer maintained) and Debezium. I could not get the Debezium Postgres plugin compiled on Centos 7. In addition, there’s a competitor to Kafka, called NATS, which, while not as mature as Kafka, has the advantage (to me) of being written in Go. There appears to be a connector between Postgres and NATS, but I haven’t explored it. Another related project is pg_warp, which allows you to stream Postgres changes to another Postgres database.
I wanted to explore exactly how a Postgres streaming system would work. While everything is documented, it was not clear to me how the process worked, should one want to implement their own logical replication system. The Postgres docs helped my understanding, along with this presentation, and this blog post from Simple. Also, playing with the wal2json plugin and following their README helped, and Debezium’s docs go into some detail as well. But there is more to it. This is what I’ve found out.
There are a couple of parts to a Postgres streaming system. You want to first get a complete snapshot of the existing database, and then you want to get all changes to the database going forward, without missing any changes should there be a hiccup (ie crash).
Here are the steps required (note: this may be updated as I gain more experience with this):
- Set up Postgres for logical replication, and decide which plugin to use. All the plugin does is determine the format of the data you will receive.
- Connect to the database using the streaming protocol. This means appending “replication=database” to the URL. The streaming protocol is not the same as the normal Postgres protocol, although you can use psql to send some commands. If you are programming in Go, the only Postgres driver that I found that supports the replication protocol is pgx. Unfortunately, one of the commands needed from it, CreateReplicationSlot(), does not return the name of the Snapshot created, which you need. I’ve submitted a pull-request with the change to return this information.
- At the same time, connect to the database the normal way.
- On the streaming connection, issue this command (using wal2json as the plugin for this example): CREATE_REPLICATION_SLOT test_slot LOGICAL wal2json;
- This creates the replication slot, and it also generates a snapshot. The name of the snapshot is returned. Also, the consistent_point is returned.
- On the normal connection, you can now take a snapshot of the existing database using the snapshot name returned above. Use these commands to initiate the transaction: BEGIN; SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; SET TRANSACTION SNAPSHOT snapshot_name; SELECT * from ….; COMMIT;
- Once the snapshot has been completed, on the streaming connection, issue: START_REPLICATION SLOT test_slot LOGICAL consistent_point;
- That starts the streaming. You will receive data in the format the plugin outputs. One piece of data also returned is the WAL position. You can use this to resume streaming, should you have to restart your streaming system.
- When you are done streaming, you must issue a DROP_REPLICATION_SLOT command on the streaming connection, otherwise Postgres will not be able remove old WAL files and you will eventually run out of disk space.
I am not completely sure that I’ve outlined the process correctly for when you have to restart streaming. I am also unclear about replication origins, but I think that’s only applicable if you’re replicating into another Postgres database. But I’m not sure.