3 Ways to Stream Data from Postgres to Elasticsearch

Jeff Richman
3 Ways to Stream Data from Postgres to Elasticsearch
A Postgres to ElasticSearch pipeline is a great way to store and analyze tons of relational data- as long as you keep the two systems in sync.

If you have a high volume of relational data and are thinking through how you’ll both store and analyze it, the Postgres to ElasticSearch stack could be a great fit — as long as you keep the two systems in sync.

In this tutorial, we’ll cover:
 

  • Overview of ElasticSearch
  • Overview of Postgres
  • When & Why to use ETL from Postgres to ElasticSearch in real time
  • 3 Methods to stream data from Postgres to ElasticSearch

Introduction to ElasticSearch

ElasticSearch, often called just “Elastic” or the “ELK stack,” is a popular open-source distributed search and analytics engine built on the Apache Lucene library. 

You’ll find it particularly useful for applications such as search and discovery, real-time log analysis, and geospatial analysis. 

This is because Elastic:

  • Supports full-text search
  • Handles both relational and non-relational data. 
  • Scales horizontally to support large loads across multiple nodes.

If you’re comparing Elastic against popular SQL databases like MySQL or Postgres, it is particularly favorable to use when:

  • Building search-based applications (e.g. eCommerce search) or analyzing textual data, due to its ability to rank results based on how closely results match a text-based query. 
  • Analyzing or working with high-volume data such as log data because it can scale across multiple nodes.
  • Geospatial or other applications that work with time series data.
  • You’re finding yourself creating many indices and would rather quickly apply filters instead. 

Introduction to Postgres

You’ve probably heard of PostgreSQL, or Postgres. At least hopefully. It is the fourth most popular relational database after all. 

Postgres is a popular open-source RDBMS that extends the functionality of SQL. With the rise of the cloud, its open-source nature, and a strong feature set, it has been on the rise over the past decade. 

 

Blog Post Image

 

image source

You’ll find that Postgres is an extensible enterprise-class database capable of:

  • Storing large data volumes.
  • Handling complex data structures.
  • Working seamlessly with many other applications. 

Replicating data from Postgres to ElasticSearch in Real Time

Suppose you have relational data in Postgres representing a set of emails. You want to enable someone to search the emails as a data source, for, say, remembering their partner’s birthday. You may want to build a search application to help the user query the data (and quickly so they don’t miss it!).

By replicating the data over to ElasticSearch, you may be in a better position to build a scalable application for text-based querying. 

You may want to consider whether it would make sense to continuously load the data into ElasticSearch using real-time streaming, perform a one-time load, or batch the data on some frequency. Streaming the data in real time can drive better outcomes for use cases like building recommendation systems or trying to identify fraud. 

As with most data pipelines, there are a number of ways and tools to perform ETL from Postgres to ElasticSearch in real time. 

In this guide we’ll cover three popular ways to stream your data.

Method 1: PGSync – Open Source Project for Postgres to Elastic

PGSync is an open-source project for the continuous capture of data from Postgres to ElasticSearch. 

It is managed by Tolu Aina, and having just finished emailing him, he’s a really nice helpful guy. 

Tolu built PGsync to use the logical decoding feature of Postgres 9.4 to capture a stream of change events in the Postgres database. 

If you modify the configuration file in Postgres to enable logical decoding, PGSync can consume the change events from the Postgres write-ahead log. 

After you define a schema file for the resulting document, your captured change events will be transformed by PGSync’s query builder from relational data into the structured document format that ElasticSearch requires.
 

Steps:

Prerequisities: Python 3.7+, Postgres 9.6+, Redis 3.1.0, ElasticSearch 6.3.1+, SQLAlchemy 1.3.4+, Superuser privileges 

  1. Open the postgresql.conf file in the data directory, usually located in the directory etc/postgresql/[version]/main/.
  2. Set wal_level=logical
  3. Create a replication slot by running 

    SELECT * FROM pg_create_logical_replication_slot('slot_name', 'plugin');
  4. Install PGSync pip install pgsync.
  5. Create a schema.json that will match the expected document representation in ElasticSearch.
  6. Run as a daemon pgsync --config schema.json -d.

Advantages of using PGSync

  • Streams data in real time.
  • Open-source project, so it’s free to use.
  • Uses logical decoding, so the impact on your production database is minimized.
  • If all prerequisites are met, it’s a good fit for simple low-latency replication needs

Disadvantages of using PGSync

  • Does not support in-flight transforms
    .
  • Unable to support zero-downtime migrations as of today.
  • No formal support… A relatively new open-source project with infrequent commits by mostly one contributor.
  • Unclear if it can handle complex enterprise loads.

Method 2: Fully Managed Postgres to Elastic via Estuary Flow

If you don’t want to install a set of libraries and learn a bunch of new tooling, you can explore a fully managed service with a UI for building a data pipeline without coding. 

Estuary Flow is one such free no-code platform for building streaming data pipelines. 

Estuary provides a UI on top of the open-source Gazette streaming framework to replicate change data and history from Postgres to ElasticSearch in milliseconds. Created pipelines are continuous, fully managed, and also stream change data from the Postgres write-ahead log. 

Steps:

  1. Create a free account in the Estuary Flow web app here.
  2. Set up change data capture from Postgres.
    1. Configure your Postgres instance to meet the requirements.
    2. Navigate to Captures in the web app and select Postgres.

      Blog Post Image

    3. Add a unique name for your capture.
    4. Fill out the capture details with the server address, database username, and password.

      Blog Post Image

    5. Click Next. Flow will find and list all the tables in your Postgres database. You can choose which tables you’d like to capture.
    6. Click Save & Publish to begin the capture process.
  3. The data from each table is now stored in a Flow Collection. A collection is both your real-time and historical data stored as JSON documents in cloud storage. As a collection, the data can now be materialized in real time, transformed, and joined with other collections.
  4. Materialize Postgres data into Elastic.
    1. Select Materialize Collections from the dialog box of your successful capture.
    2. Select the ElasticSearch connector.

      Blog Post Image

    3. Add a unique name for the materialization.
    4. Input the Elastic cluster endpoint in the format https://CLUSTER_ID.REGION.CLOUD_PLATFORM.DOMAIN:PORT.
    5. Input the username and password

      Blog Post Image

    6. Scroll to the Collection Selector. The tables ingested from Postgres will each be mapped to a separate index in ElasticSearch. Provide a name for each. 
    7. Click Next.
    8. Click Save & Publish. 

All historical data from Postgres is now backfilled into ElasticSearch documents, and any new change data into the source Postgres database will materialize to Elastic in less than 100ms.

You can optionally exert more control over the field mappings to Elastic with field overrides

Visit our documentation for more details on building pipelines in Estuary Flow.

Advantages of using Estuary Flow for Postgres to ElasticSearch:

  • No-code UI-based setup.
  • Real-time data pipeline with materializations in under 100ms.
  • Fully managed enterprise-grade system supporting flows of 7GB/s+.
  • Ability to replicate Postgres data to destinations beyond Elastic without repeating ingestion.
  • Support for SSH tunneling.
  • Free tier up to 25GB/mo.
  • Able to perform in-flight transformations and joins with other data assets before syncing.

Disadvantages:

  • Pay $0.75GB of data transferred after 25GB.

Method 3: Logstash JDBC plugin for Postgres to ElasticSearch

Pre-requisites: Java 8+, Logstash, JDBC

Elastic provides a documented process for using Logstash to sync from a relational database to ElasticSearch. Logstash is an open-source server-side data processing platform. 

Note that this process is tested for MySQL, though it should work for other relational databases like Postgres. This integration is event-based, however, it should be noted that it does not capture change data from the write-ahead log via the enablement of logical decoding. 

This means your production database could be heavily taxed by this implementation. You can read more about Postgres CDC types here. 

Steps:

  1. Gather credentials 
    1. Navigate to the Kibana menu and then Management->Integrations->View Deployment Details.
    2. To authenticate you will use the Elastic API key.
  2. Get Logstash the Postgres JDBC Driver.
    1. Install Logstash.
    2. Download and unpack the JDBC driver and take note of the driver’s location.
  3. Add timestamps to Postgres (if you do not already have them).
    1. For each table you plan to replicate to Elastic, you will need a column to reflect the time it was last modified.
  4. Create a Logstash pipeline with the JDBC input plugin.
    1. Create a file called jdbc.conf in <localpath>/logstash-7.12.

      Paste the code below into the file to generate the Logstash pipeline, substituting your driver location, credentials, and timestamp column name.
plaintext
input { jdbc { jdbc_driver_library => "<driverpath>/mysql-connector-java-<versionNumber>.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db" jdbc_user => "<myusername>" jdbc_password => "<mypassword>" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" } } filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["id", "@version", "unix_ts_in_secs"] } } output { elasticsearch { index => "rdbms_idx" ilm_enabled => false cloud_id => "<DeploymentName>:<ID>" cloud_auth => "elastic:<Password>" ssl => true # api_key => "<myAPIid:myAPIkey>" } }

With the configuration above saved, launch Logstash:
bin/logstash -f jcbc.conf

Advantages of using the Logstash JDBC plugin:

  • Open-source.
  • Can transform data in Logstash before populating to ElasticSearch.

Disadvantages:

  • Will negatively impact your production database as it uses timestamp rather than log-based CDC.
  • There are implementation nuances to avoid creating duplicates.
  • Users report that Logstash documentation can be outdated and incomplete at times.

Postres to Elastic Beyond This Guide… 

As with any data pipeline, you can choose to build a custom solution using a streaming framework, such as Kafka and Debezium, or Amazon Kinesis.

If, for example, your use case is a text search of logs, you can consider using a tool like ZomboDB for Postgres. However, this may still not be enough power and functionality for a multi-node filtering system like Elastic. 

But note that to replicate a real-time replication in custom code would require extensive work. For simple batch use cases or one-offs, the codebase will be more straightforward but still require operational overhead and management. 

Create a free Estuary account and finish building your real-time Postgres to ElasticSearch pipeline within a half hour. Questions? Hit us up on Slack!

Related Articles From Estuary

Further Reading and Discussion:

https://discuss.elastic.co/t/jdbc-to-elasticsearch-duplicate-records/150153

https://discuss.elastic.co/t/logstash-adding-duplicate-rows-for-every-run/44775/7

https://www.elastic.co/guide/en/cloud/current/ec-getting-started-search-use-cases-db-logstash.html#ec-db-logstash-prerequisites

https://news.ycombinator.com/item?id=11123479

https://medium.com/@emreceylan/how-to-sync-postgresql-data-to-elasticsearch-572af15845ad

https://www.reddit.com/r/aws/comments/f00t52/eli5_when_should_one_use_elasticsearch_as_opposed/?sort=top

https://www.elastic.co/guide/en/cloud/current/ec-getting-started-search-use-cases-db-logstash.html