Introduction

In this tutorial, you’ll learn how to integrate Apache Kafka with a real-time GraphQL API using Hasura and Estuary Flow. 

Note: The source project we’re adapting for this exercise was created by Will Baker, engineer at Estuary. If you’d rather skip the detailed walkthrough or approach things in a different way, head over to GitHub and have at! 

Kafka is a popular open-source event streaming platform. It’s versatile, but also unopinionated and challenging to use. Connecting Kafka to the front end of your application or project in a way that’s not only functional, but also efficient, is a major challenge.

This sounds like a job for GraphQL. Graphql is server-side query language for APIs. It’s an alternative to REST that is quickly growing in popularity and can provide a robust, efficient API layer for your application or project. 

The obvious question, though, is how to connect the two. Is it even possible to do this in a reasonable, approachable manner? And without losing the near-real-time nature of your Kafka cluster along the way?

It is. Let’s take a look at what we’ll build.

The Kafka to GraphQL pipeline

When we’re done here, we’ll have a pipeline that goes like this:

Diagram of kafka to graphql pipeline. Steps are: kafka cluster, estuary flow, postgres, and hasura graphql

Each component is there for a reason. We’ve already covered Kafka and GraphQL. 

Estuary Flow is a DataOps platform designed for real-time data flows. It combines the familiarity of an ETL vendor with an event-driven runtime. It’s advantages include:

  • Out-of-the-box integration with Kafka as a data source and Postgres as a destination.
  • Intuitive CLI and UI support for the SaaS offering, as well as an open-source option. 
  • Transmission of documents in real-time, with validation and exactly-once semantics.

Hasura GraphQL Engine is a service you can use to generate real-time GraphQL APIs from just about any flavor of Postgres. Of the many services that facilitate GraphQL, Hasura’s advantages include:

  • Ease of use.
  • Option to self-host the open-source version, and a cloud platform with a generous free tier.
  • Hasura is designed for Postgres, one of the world’s most popular open-source SQL databases. For our purposes here, Postgres also connects with Estuary Flow. 

Today, we’ll use the cloud or SaaS versions of each pipeline component except Kafka. We’ll do this using only free trials or free tiers. 

You don’t have to be a developer to run this workflow. I’ll break things down in detail so as long as you’re a proficient user of technology, you’ll be able follow along.

Finally, keep in mind that this is far from the only way to build this pipeline. Use it as a launch point for your own, customized workflow. 

Prerequisites and Setup

We’ll need all of these things:

    • A local development environment. I recommend working in Visual Studio Code, but you can use whatever you want.
    • A Kafka cluster with data. We’ll use the one that comes with the project repo, hosted locally.
    • Docker. The Kafka cluster and data generator come in a containerized environment. If you haven’t used Docker before, you’ll need to download some things.
    • A proxy to allow Flow to access the Kafka cluster on your local machine. We’ll use ngrok.
    • An Estuary Flow free trial account.
    • A Postgres database. We’ll use a hosted version through Google Cloud SQL. New accounts come with $300 of free credits, which is more than enough for what we need.
    • A Hasura Cloud account. The free tier is for personal use, which, again, meets our needs today.

Here’s how to get all the things:

  1. If necessary, download and install the following:
    1. VS Code
    2. Docker Desktop
    3. VS Code Remote Containers Extension
    4. GitHub Desktop (if you’re not a developer and never use Git or Github, this is the quickest way to get started).
  2. Clone the project locally. 
    1. Go to the GitHub repo.
    2. Click Code and select Open with GitHub Desktop. The GitHub Desktop Clone a Repository dialog opens.
    3. Set the Local Path to wherever you want to source files to be saved locally and click Clone. This creates a new folder called kafka-graphql.
  3. Download and install ngrok
  4. Sign up for an Estuary free trial by filling in your credentials in the Flow web app.
    Note: As of the time of publication, Flow is in private beta. You’ll be provisioned a trial account within 24 hours. 
  5. Create your Postgres database in Google Cloud SQL. 
    1. Launch your trial.
    2. Create a new Postgres instance. Make note of the username and password. 
    3. Enable public IP for the instance, adding Estuary’s IP, 34.121.207.128, as an authorized IP address.
  6. Sign up for Hasura Cloud and create a new project on the free tier.

Now, on to the fun part. 

Start the Kafka Cluster and Data Generator

The project that we cloned from GitHub includes a Docker network that can use to spin up different pieces of the data pipeline:

  • A Kafka cluster. 
  • A data generator that sends made-up stock trade data to Kafka. This stream of messages will be sent in a single Kafka topic called trades. 

To get started, let’s enable these two services and make sure they’re visible to Flow via your ngrok proxy. 

  1. Start Docker desktop.
  2. In VS Code, open the kafka-graphql folder.
    It contains several files, including a folder called data_gen (the source code for the data generator) and a file called docker-compose.yaml, which includes details of the Docker network. 
  3. Click docker-compose.yaml to open the file.
    The file contains specifications for several docker containers. (For this tutorial, since we’re using cloud-hosted Postgres and Hasura, we can disregard the containers that are commented out.)
  4. Note that Kafka is configured to run on port 9092.
    We’ll set up a proxy for this port.
  5. In the terminal window, run: ngrok tcp 9092
    The window shows the details of your proxy. You’ll leave this running for the duration of the exercise, but be sure to remove it when you’re done.
    terminal window with an active ngrok proxy
  6. Copy the forwarding address, omitting the protocol. It’ll be formatted like 0.tcp.ngrok.io:00000.
    You’ll set it as the environment variable for the Kafka host.
  7. Locate the file called .env-template. Make a copy, and change its name to .env.
  8. Open .env and set KAFKA_HOST to your ngrok address, (e.g. KAFKA_HOST=0.tcp.ngrok.io:00000). Save the file.
  9. Open a new terminal window and build the Docker network, the data generator, and the Kafka cluster:
    docker compose -f "docker-compose.yaml" up -d --build --remove-orphans
    The process exits, indicating the network was successfully built and initiated, but it will likely take a few moments for the data generator and Kafka cluster to start talking to each other. Let’s monitor the data generator logs.
  10. Run docker logs --follow data_gen
    New log entries appear continuously, once every second or two. When they stop showing errors and start to look like:
    data_gen | 2022/10/28 16:00:15 Producing record of trade: {"ID":616,"Symbol":"GOOG","Price":44.35,"Size":25,"Timestamp":"2022-10-28T16:00:15.241172378Z"}
    … you can move on to the next section.
    Hint: if the logs seem to stop after a few minutes, you may have set up your proxy incorrectly. Check that ngrok is still running, and that you put the correct address in the Docker compose file. If necessary, run docker compose down and start this section again.

Capture the Kafka Topic with Estuary Flow

We now have a functional Kafka cluster with a stream of data, but it’s not connected to anything. The next step is to capture the data using Estuary Flow.

    1. Log into the Flow web application.
    2. Click the Captures tab, and then the New Capture button.
    3. Locate the Apache Kafka card and click Capture.
      A form appears with the properties required to set up a capture from Kafka. We’ll start by giving the task a name.
    4. Click inside the Name box. You’ll see one or more prefixes representing namespaces to which you have access. Most likely, yours will say trial/
    5. Click a prefix and append a unique name to it. For example, trial/yourname/kafka-graphql.
    6. Under Endpoint Config, find the Bootstrap Servers section. Click the plus button to add a server. In the field that appears, paste your ngrok proxy address, such as 0.tcp.ngrok.io:00000.
    7. Expand the Authentication section.
      Since this is just a demo, our Kafka cluster doesn’t have authentication. (For your production use cases, be sure to set up authentication on your cluster).
    8. Click Disabled.
    9. Click the Discover Endpoint button. Flow initiates a connection with your Kafka cluster and generates JSON specifications for the data capture task, and for the data collection it will write to.
    10. In the specification editor, click the tab with the collection name (something like trial/yourname/kafka-graphql/trades).
      The JSON schema is essentially empty. Because Kafka doesn’t enforce a schema on its stored messages, Flow isn’t able to infer a schema. You’ll provide it one.
    11. Paste the following into the editor, deleting what was there previously:
      {
        "schema": {
          "properties": {
            "Price": {
              "reduce": {
                "strategy": "lastWriteWins"
              },
              "type": "number"
            },
            "Size": {
              "reduce": {
                "strategy": "sum"
              },
              "type": "integer"
            },
            "Symbol": {
              "type": "string"
            }
          },
          "reduce": {
            "strategy": "merge"
          },
          "required": [
            "ID",
            "Symbol",
            "Size"
          ],
          "type": "object"
        },
        "key": [
          "/Symbol"
        ]
      }
      

      Note: A YAML version of this schema is included in the project repo. The web app only accepts JSON currently, however, so I’ve reformatted it for you to make things quicker.

  1. Click the Save and Publish button.
    Flow deploys the data capture. Although we haven’t yet materialized it anywhere, Flow keeps a copy of each document in a cloud-backed collection. We can preview that data in the web app.
  2. When the capture has been successfully published, click Close on the pop-up window displaying logs.
  3. Click the Collections tab and click Details next to the name of your collection.data preview for trades data in estuary flowData is displayed as it is stored: by key. The schema we supplied uses the stock symbol as the key.
    For each symbol, Flow overwrites the price each time it receives a new message from Kafka. In Postgres, this will translate to a compact table of current stock prices.
    We’ll load, or materialize, the data into Postgres now.

Materialize the Data to Your Postgres Database

Let’s continue and connect the data in Flow to the Google Cloud SQL Postgres instance you set up earlier.

  1. In the Flow web app, click the Materializations tab, and then click the New Materialization button.
  2. Locate the PostgreSQL card and click Materialize.
    A form appears with the properties required to set up a materialization to Postgres. Again, we’ll first give it a name.
  3. Click inside the Name box, choose your prefix, and append a unique name, like trial/yourname/postgres-graphql.
  4. To fill in the Address field, you’ll need your instance’s Public IP.
    1. Go to the Google Cloud Console and choose your instance.
    2. From the instance’s Overview page, under Connect to this instance, copy the public IP address.
  5. Return to Flow. In the Address field, paste the public IP address and append the port, :5432, giving the address the full format of XX.XX.XXX.XXX:5432
  6. Fill in the User and Password fields. Unless you changed it during setup, the username is postgres. If you didn’t set a password during setup, leave that field blank. You can check your user info on the User tab of the instance’s page in Google Cloud Console.
  7. Leave Database blank. Flow will materialize to the default database, which is also called postgres.
    postgres materialization configuration in estuary flowNext, we’ll specify the trades collection as the data to materialize and name the corresponding table that will be created in the database.
  8. Scroll down to the Collection Selector. Use the Available Collections box to search for your collection (e.g., trial/yourname/kafka-graphql/trades).
  9. In the Table field, type trades.
  10. Click Discover Endpoint to test the connection, and then click Save and Publish.

When the action completes successfully, this means your data flow from Kafka to Postgres is completed. The trade events streaming on your local Kafka cluster are reflected in your Postgres table in milliseconds.

All that’s left to do is connect the database to Hasura and start working with the GraphQL API.

Connect Your Postgres Database to Hasura GraphQL

  1. Open your project in the Hasura Console.
  2. Connect your Hasura project to your Postgres. Follow the steps in Hasura’s docs to do this (start at Step 4: Allow connections to your DB from Hasura Cloud).
    Once connected, the database name will display on the sidebar of the Data tab. It contains the default database schema, Public.
  3. Click Public in the sidebar to reveal a list of database tables. Locate the trades table and click Track to expose the table to the GraphQL API.
  4. Click the API tab.
    We’ll set up a subscription — a real-time query — and visually verify that our pipeline is working as expected.
  5. In the query box, paste the following and press the play button:
    subscription {
      trades {
        price
        symbol
        Size
      }
    }

In the viewer to the right, you’ll see the trades updates from Postgres streamed in real time.
gif showing updating trades data in hasura console

What’s next?

From here, the path opens up, but this is as far as I go. You have a highly performant API ready to be leveraged wherever you’d like.

With the sample dataset we used here, you could create an application that provides custom alerting or automatically trades on your behalf based on current stock prices (you’d have to hook your cluster up to real stock data first, of course).

The real power is if you have data in your own Kafka cluster, and you’re having trouble integrating it with the front end of your application. Especially as your data grows in volume and complexity, it’s important to have a pipeline that will stay fast at scale while still validating and re-shaping data.

 

Leave a Comment

Your email address will not be published. Required fields are marked *