In this tutorial we'll go through the steps of first replicating one table from a self managed Postgres cluster to an Aiven cluster, then we'll show how to migrate them all.

Note: These instructions also work with AWS RDS PostgreSQL 10+. Google Cloud Platform's PostgreSQL for CloudSQL does not support logical replication, however. 

The aiven_extras  extension allows Aiven users to perform publish/subscribe-style logical replication without a superuser account, it is preinstalled on Aiven PostgreSQL servers, which will be the destination of the replication.

This will allow you to perform a number of tasks, such as replicating a subset of tables or migrating your whole PostgreSQL database into Aiven.

Some considerations before we begin:

  • The PostgreSQL clusters need to be version 10+
  • The Aiven PostgreSQL cluster needs to be able to connect to the source cluster's PostgreSQL port
  • You need access to an administrator role on the source cluster for some of these operations
  • You do not need to install any extensions on the source cluster, and also the aiven_extras extension is available by default on Aiven PostgreSQL clusters

For this tutorial, we'll use a source database called test_database on a self managed PostgreSQL cluster and we’ll replicate a single table called test_table that has a single column called id in it onto an Aiven managed PostgreSQL cluster, to the defaultdb database. So to recap, the two servers will be:

  • A self hosted PG DB running PG 10+ with a public facing port and an admin role (hereafter called source)
  • An Aiven PG DB running PG 10+ (hereafter called destination)

The source database needs to have wal_level set to logical.

source $ show wal_level;
(1 row)

If not, you can enable it from psql and then reload the PostgreSQL configuration:

source $ ALTER SYSTEM SET wal_level = logical;

If you are replicating from an AWS RDS PostgreSQL cluster you will need to set the rds.logical_replication parameter to 1 (true) in the parameter group for this to be enabled.

Now, on the source cluster create a database called test_database and connect to it with psql. Then we'll create the test table and values:

source $ CREATE TABLE test_table (id INT);
source $ INSERT INTO test_table VALUES (1), (2);

In PostgreSQL 10 and above, PUBLICATIONs are entries that define the tables to be replicated which are in turn SUBSCRIBED to by the receiving database.

When creating a publication you can also define which operations should be transferred. For example, you can define that only INSERTs should be replicated but not UPDATEs and DELETEs. 

In this example we’ll replicate all three types of operations:

source $ CREATE PUBLICATION pub1 FOR TABLE test_table WITH (publish='insert,update,delete');

Next, we’ll log into the destination database that will receive the changes from the source database and begin by creating the new aiven_extras extension:

destination $ CREATE EXTENSION aiven_extras CASCADE;

PostgreSQL’s logical replication doesn’t copy table definitions, so we need to copy them with something like pg_dump.

First dump the schema - then editing the file, change the ALTER TABLE invocations' owners from postgres to avnadmin.

# Self managed PostgreSQL source - dump the schema to a file:

$ pg_dump --schema-only --no-publications test_database > test_database-schema.sql

# Change 'postgres' ownerships in the file to 'avnadmin'
# For this example we can use sed, be careful with your real data

$ sed -i 's/OWNER TO postgres/OWNER TO avnadmin/g' test_database-schema.sql

# Create the table definitions to the destination DB

$ psql "postgres://" < test_database-schema.sql

OR, like in this case, if there are only a few tables to be defined, simply define them with the appropriate CREATE TABLE statements:

destination $ CREATE TABLE test_table (id INT);

Once the table definitions are present in the destination, create a SUBSCRIPTION to start replicating changes from the source:

destination $ SELECT * FROM aiven_extras.pg_create_subscription('subscription', 'host= password=Password123 port=5432 dbname=test_database user=postgres', 'pub1', 'slot', TRUE, TRUE);

As the pg_subscription catalog is superuser-only, you need to use the aiven_extras.pg_list_all_subscriptions()function from aiven-extras  to see that the subscription was created successfully.

destination=> SELECT * FROM aiven_extras.pg_list_all_subscriptions();
 subdbid |   subname    | subowner | subenabled |                                     subconninfo                                     | subslotname | subsynccommit | subpublications
   16394 | subscription |       10 | t          | host= password=Password123 port=5432 dbname=test_database user=postgres | slot        | off           | {pub1}
(1 row)

Once the subscription has been created you can execute:

destination $ SELECT * FROM test_table;
(2 rows)

And we can see that the data was copied successfully from the source.

It is also important not to leave unused replication setups, because the underlying replication slots in PostgreSQL make sure that the server keeps all the data needed to replicate from that time forward; and, if no-one is reading the data stream it will keep ever growing amounts of data on disk until it eventually gets full.

Once we know we want to get rid of a subscription, we can run:

destination $ SELECT * FROM aiven_extras.pg_drop_subscription('subscription');

To see that everything was cleaned up correctly, you can list all the subscriptions again to see that the one you deleted is really gone:

destination $ SELECT * FROM aiven_extras.pg_list_all_subscriptions();
 subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications
(0 rows)

And finally here’s an example for publishing all the tables in the source side database.

While CREATE PUBLICATION doesn’t require superuser privileges if you publish just an enumerated list of tables, it does require it if you want all the tables to be published.

Let's create two more test tables on both the destination and source, then a few test rows on the source.

destination $ CREATE TABLE test_table_2 (id INT);
destination $ CREATE TABLE test_table_3 (id INT);

source $ CREATE TABLE test_table_2 (id INT);
source $ CREATE TABLE test_table_3 (id INT);
source $ INSERT INTO test_table_2 VALUES (3), (4);
source $ INSERT INTO test_table_3 VALUES (5), (6);

Now on the source we create a publication with all the tables:

source $ CREATE PUBLICATION pub_all_tables FOR ALL TABLES WITH (publish='insert,update,delete');

Then we create a subscription called subscription to the pub_all_tables publication on the destination:

SELECT * FROM aiven_extras.pg_create_subscription('subscription', 'host= password=Password123 port=5432 dbname=test_database user=postgres', 'pub_all_tables', 'slot', TRUE, TRUE);

If you check the new tables, the new rows from the source will have been populated to the destination.

You can check the status of the replication on the source and destination as follows:

source $ SELECT * FROM pg_stat_replication;
  pid  | usesysid | usename  | application_name | client_addr | client_hostname | client_port |         backend_start         | backend_xmin |   state   | sent_lsn  | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | sync_priority | sync_state
 11896 |       10 | postgres | subscription     | ::1         |                 |       54538 | 2020-04-12 21:38:44.837484+02 |              | streaming | 0/16A62F8 | 0/16A62F8 | 0/16A62F8 | 0/16A62F8  |           |           |            |             0 | async
(1 row)

destination $ SELECT * FROM pg_stat_subscription;
 subid |   subname    |  pid  | relid | received_lsn |      last_msg_send_time       |     last_msg_receipt_time     | latest_end_lsn |        latest_end_time        
 16486 | subscription | 10738 |       | 0/16A62F8    | 2020-04-12 19:39:46.341078+00 | 2020-04-12 19:39:46.373729+00 | 0/16A62F8      | 2020-04-12 19:39:46.341078+00
(1 row)

Once replication can be ended, remove the subscription on the destination:

defaultdb=> SELECT * FROM aiven_extras.pg_drop_subscription('subscription');
(1 row)
Did this answer your question?