Streaming SQL Full Guide: How to Transform Real-Time Data
Transforming real-time data has always been hard, but streaming SQL tools make it easy. Get up and running with 3 practical tutorials.

Processing real-time data in motion has historically been challenging and expensive. The perceived difficulty put it out of the question for many, despite numerous use cases and benefits. 

In this article, we will dive deep into a new form of SQL that makes processing data in motion much easier, as well as a SaaS solution that makes streaming data processing much cheaper.

We will provide 3 tutorials that show you how to process data in motion with SQL and Estuary Flow, which is a low-code ETL platform that streamlines the creation and maintenance of data pipelines, ingesting and transforming data in real time from sources to destinations. 

But first, let's talk about why the current status quo of SQL is limited. 

A Brief History of SQL

Blog Post Image

SQL was developed over half a century ago by IBM. It has since then become the industry-standard language for database creation and manipulation. 

What has changed since then?

SQL was developed at a time when data was scarce, and expensive to store, analyze, and use. Real-time data was barely available if at all. Historically, SQL has been primarily used to query static data - data at rest

Although fast access to insights has always been a north star for analytics and SQL, in the past this was almost impossible.

But, this has changed.

Over the years, the Big Data era has changed how businesses interact with data. In 2013, the global market for Big Data reached $10 billion. In 2016, the world has experienced unprecedented data growth. 90% of the world’s data at that time was created in the previous 2 years alone. With this amount of data becoming available to the world, it’s no longer sufficient to query static data only. 

The need is clearly there. Now, we just need a solution.

In 2009, LinkedIn invented Apache Kafka, which made stream processing possible. It was a big step towards bringing stream processing to the mainstream. However, to manage the infrastructure and tooling, significant development resources were needed. 

In the following years, several SaaS platforms such as Estuary began to offer managed solutions, dramatically simplifying implementation complexity and ease of use, making real-time streaming data more accessible, and making the handling of it more approachable.1

In the last few years, some of these SaaS platforms have added the capability to apply plain SQL to data in motion, not just data at restThis is what we call Streaming SQL

Streaming SQL has upped the game of what data-driven organizations are able to do with the vast amount of data available in order to gain a competitive edge. 

For example, instead of analyzing month-to-date app usage as of yesterday, the rise of Streaming SQL means analyzing month-to-date usage as of seconds ago.

Allied Market Research estimated that the data analytics market will grow to $420.98 billion by 2027.2 Those who will make the best out of this big opportunity will be the ones leveraging powerful tools such as Streaming SQL. 

What is Streaming SQL?

As already mentioned, in recent years the enormous growth in data volumes and the increasing need for real-time data analysis have made SQL a crucial component of data management and business analytics. 

However, traditional SQL solutions that operate on stored data in databases cannot effectively support real-time stream processing requirements. 

As a result, there is a need for a new type of SQL that can process continuous data streams. This is where Streaming SQL comes in.

Before we dive deeper, let us examine what exactly we mean by “streaming” in this context.

"Streaming" refers to the handling of data as a continuous flow of events or messages through message brokers such as KafkaGazetteAmazon Kinesis, or Pulsar

Blog Post Image

These event streams can include various types of data, from user actions on websites or mobile apps, IoT sensor data, and server metrics to traditional database activities captured using change data capture (CDC).

Traditional SQL runs on databases while Streaming SQL runs on streams of live data. Running SQL on databases returns a static set of results from a single point in time

On the other hand, with Streaming SQL you could run the exact same SQL query on a stream of real-time data and get a point-in-time answer. 

In short, Streaming SQL is designed to process subsets of data quickly and deliver results in real time. 

Streaming SQL can transform, filter, aggregate, and enrich data in flight, making it a powerful tool for organizations to extract maximum value from constantly streaming data. 

Also, Streaming SQL can work with a wide range of data sources and environments, from public cloud platforms to Kafka, Hadoop, NoSQL, and relational databases.

Streaming SQL has become an essential part of effective real-time stream processing solutions. It enables data-driven organizations to analyze and act on streaming data and to quickly identify the value in new data. With Streaming SQL, organizations can quickly adopt a modern data architecture and create streaming data pipelines to support their business needs.

Streaming SQL vs. Traditional SQL

Streaming SQL can be understood as a variant of traditional SQL where Streaming SQL is specifically designed to process data streams in real time. 

There are two major differences between traditional SQL and Streaming SQL. 

  1. Static vs Continuous Stream: As mentioned, the primary difference is that traditional SQL operates on static data stored in databases, whereas Streaming SQL works with continuously flowing data streams as they are generated, potentially by multiple sources. This continuous nature is what makes them valuable compared to traditional SQL solutions. 
Blog Post Image

Also, Streaming SQL solutions use windows and event tables to trigger actions when data changes. You can write SQL queries for streaming data without writing code.

In a nutshell, Streaming SQL is best for processing data streams that are constantly changing, while traditional SQL is best for querying and analyzing data that is already stored in a database.

  1. Sliding Window Approach: Streaming SQL uses a sliding window approach, which involves breaking the data stream into small, discrete segments or windows and processing each window separately. This approach enables Streaming SQL to handle data streams that are too large to be processed as a single entity.

The Need for Streaming SQL

While several CDC tools allow you to ingest data from a source to a target, few offer SQL transformation capabilities. Often, replicating data as-is from one place to another may not be sufficient for your needs. 

For example, you may want to do some filtering, apply certain calculations to your source data, or aggregate data from multiple documents before the data arrive at the destination. 

Other common use cases include merging across several collections using a common key and applying business logic to the source data. 

Using derivations in Flow, you can perform a variety of transformations, from a simple remapping use case to a highly complex stateful transaction processing. 

A derivation is a data collection that is derived from applying transformations to one or more source collections. Derivations work continuously, ensuring they stay in sync with any updates made to the source collections in real time. 

Flow enables you to write derivations using either SQLite or TypeScript in three simple steps.

The Flow Derivations doc here walks through a tutorial that illustrates a complex stateful transaction processing use case. However, often you may just need to apply a simple transformation to your data, which is what this tutorial aims to show.

Four Common Use Cases for Streaming SQL

SQL transformation can be extremely useful during data replication as it allows data to be transformed and modified as it is being replicated from one source to another. 

The following are a few ways in which SQL transformation can be used during data replication.

Blog Post Image
  1. Data cleansing: Data replication often involves moving data from one system to another, which can result in data quality issues. You can use SQL transformation to clean and standardize data during replication, ensuring that the data is consistent across systems.
  2. Data mapping: When replicating data between two systems, it is often necessary to map fields from one system to another. You can use SQL transformation to map fields and transform data types during replication, ensuring that the data is properly mapped and formatted.
  3. Data filtering: During replication, it may be necessary to filter out certain records or data elements based on specific criteria. You can use SQL transformation to filter out data during replication, ensuring that only the relevant data is replicated to the target system.
  4. Data aggregation: During replication, it may be necessary to aggregate data from multiple sources or to create summary data for reporting purposes. You can use SQL transformation to aggregate and summarize data during replication, making it easier to analyze and report on.

Overall, SQL transformation is useful during data replication because it allows you to transform, clean, and format data as it is being moved from one system to another. This helps ensure that the data is consistent and accurate across systems, and that it is properly mapped and formatted for its intended use.

Tutorial Example 1: Stateless Transform

In this tutorial, we will walk through a few SQL transformation use cases, showing you how and where to put your SQL queries to transform your data collection using Estuary Flow.

Note that Flow integrates with GitPod to let you leverage the full capabilities of SQLite. GitPod is free to use. It is an online Integrated Development Environment (IDE) that provides a complete development environment that can be accessed through a web browser, with all the necessary tools and dependencies pre-installed.

Scenario

Suppose you have a table of employees, with their names, address, and region they are in:

Blog Post Image

And you want to filter by region, and derive a target collection where only employees in the US are included

Blog Post Image

Suppose this is the end result you want in your destination:

Blog Post Image

Prerequisites

Before following the steps below, you should already have a collection you want to apply the transformation to. 

  1. Create an employees table in any database of your choice.
  2. Create a Capture in Estuary Flow. If you’re new to Flow, you can get started with a free account.
  3. Name the Capture: employees

Tutorial Steps

Log in to Estuary Flow.

Navigate to the Collections page in Flow and click “+ NEW TRANSFORMATION”.

Blog Post Image

This brings up a pop-up window to derive a new collection. 

Blog Post Image

Step 1: Select source collections

In the “Available Collections” dropdown, find the source collection you want to apply streaming SQL to. In our case: JennyPoc/sqlserversource/employees. Select it.

Step 2: Transformation Language

For Language, there are two options: SQL or Typescript. For this tutorial, we will select: SQL

Step 3: Write transformations

Finally, give your derived collection a name. We will name ours sqldemo.

Click the blue button “PROCEED TO GITPOD”.

Blog Post Image

You will then receive this message in Flow, informing you that a GitPod window has opened in another tab.

Blog Post Image

Proceed to GitPod and sign in using your GitLab, GitHub, or BitBucket account.

After logging into GitPod, an environment like the following is already set up for you:

Blog Post Image

In your flow.yaml file in your workspace folder, make the following updates:

  1. Update the schema specs to include the fields you want to see in your derived collection.

    Note: All Flow collections have a JSON schema, which defines their structure, representation, and constraints. You can learn more about collection schemas here.

You can either manually update the JSON schema or use --infer-schema to get your schema specs automatically populated. See below for further details on how to run the command.

  1. Designate a key (For our example, we will key our collection on EmployeeID.)
  2. Remove migrations .sql filename and replace with {} (Note that migration is not needed for a stateless transformation. Review this article to understand more about stateful vs stateless processing.)
  3. Replace the lambda .sql filename with SQL select statement that filters on Region:
plaintext
Select $EmployeeID, $LastName, $FirstName, $Address, $Region where $Region = ‘US’;
Blog Post Image

To see a preview, run the following flowctl command on the Terminal tab in GitPod:

plaintext
$ flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
Blog Post Image

To use schema inference mentioned above, run the following flowctl command with the --infer-schema flag:

plaintext
$ flowctl preview --source flow.yaml --infer-schema --interval 200ms | jq -c 'del(._meta)'

To stop the preview, type Ctrl Z or Ctrl C:

Blog Post Image

If we publish a materialization on this data, the resulting table looks like this:

Blog Post Image

This tutorial video walks through the above steps live.

In our next example, we will show how to use migrations and lambda, as well as how to publish the materialization.

Tutorial Example 2: Stateful Transform

Scenario: 

For our next two examples, we will create a transformation off of our Wikipedia live demo, which captures change events on Wikipedia edits from the Wikipedia API and then ingests the collection to a Google Sheet in real time.

The original materialized collection includes the following fields:

  • The date of the Wikipedia edit
  • Whether the edit was made by a bot or a human
  • Total edits
  • Total new lines

For Example 2, we will apply a simple SQL transformation to retain the first and last time we saw a user, and their lifetime edits.

For Example 3, we will apply another SQL transformation on the same dataset to show aggregations.

Prerequisites: 

Again, as a prerequisite before you add a new transformation, you should already have a collection you want to apply the transformation to. 

For the next two examples in this tutorial, we will apply the transformation to this collection in the demo prefix: demo/wikipedia/recentchange-sampled

Blog Post Image

This collection is a 3% sample of the enormous demo/wikipedia/recentchange collection which contains millions of documents. Since the purpose of this tutorial is to demonstrate a proof of concept, we avoid publishing a derivation that processes hundreds of gigabytes of data. 

Tutorial Steps

To begin, navigate to the Collections page in Flow and click “+ NEW TRANSFORMATION”.

Blog Post Image

The “Derive A New Collection” screen opens. 

Step 1: Select source collections

In the “Available Collections” dropdown, find the collection demo/wikipedia/recentchange-sampled and select it.

Step 2: Transformation Language

For Language, there are two options: SQL or Typescript. For this tutorial, we will select: SQL

Step 3: Write transformations

Finally, give your derived collection a name. We will name ours sqltransformdemo.

Click the blue button “PROCEED TO GITPOD”.

After logging into GitPod, an environment like the following is already set up for you:

Blog Post Image

In the flow.yaml file, a specification is already created to get you started in implementing your derivation. 

There are a few components in the flow.yaml. Since a derivation is a collection, it has a schema. The top part of flow.yaml is a JSON schema of the collection, including its properties and key.

You need to update your schema specs and key accordingly. This user guide is also a good reference.

Because this is a SQLite derivation, it can have one or more migrations that it can use, as well as one or more transforms. The migrations and transforms are listed under the derive section of the flow.yaml file.

What a transform does is: Every time a document is available from the source collection, it reads from the collection of data, and evaluates the lambda function, which is a block of SQL statements in the case of a SQLite transform.

Depending on the complexity of your SQL statements, you can either put them directly in flow.yaml under the derive section, or use the migrations and lambda .sql files to hold your SQL, in which case the flow.yaml holds the filenames of the migrations and lambda .sql files.

For this tutorial, we will fill out the flow.yaml as follows:

Blog Post Image

This time, we are keying our collection on user

Next, the migrations .sql file contains the new table that your derivation will use. 

You can use migrations to create or alter tables. Each migration is run only once, and new migrations will be applied as needed. 

Note that not all derivations require creating a new table, such as Example 1 and 3. 

By default, the <derivation name>.migration.0.sql is populated with the following example:

Blog Post Image

For our tutorial, we will create a new table users as follows:

plaintext
create table users (   user text primary key not null,   first_seen text not null,   last_seen text not null,   total_edits integer not null,   total_lines integer not null );

Next, the lambda file is where you put your SQL transformation statements. By default, it is populated with the following example, showing you examples of fields you can leverage:

Blog Post Image

For this tutorial, we will use the following SQL statements to track usernames, first seen and last seen timestamps, total edits, and total new lines

Put the following in your SQL lambda file:

plaintext
insert into users (user, first_seen, last_seen, total_edits, total_lines) select $user, $meta$dt, $meta$dt, 1, coalesce($length$new - $length$old, 0) where $type = 'edit' and $meta$dt on conflict do update set    last_seen = $meta$dt,    total_edits = total_edits + 1,    total_lines = total_lines + coalesce($length$new - $length$old, 0); select    user,    first_seen,    last_seen,    total_edits,    total_lines    from users    where $type = 'edit' and user = $user;

Let’s dissect these SQL statements a bit. 

You may be wondering why we are setting both first_seen and last_seen to $meta$dt - where are we minimizing or maximizing those values?

We are maximizing the last_seen value through this upsert statement in the lambda:

plaintext
on conflict do update set    last_seen = $meta$dt,    total_edits = total_edits + 1,    total_lines = total_lines + coalesce($length$new - $length$old, 0)

The upsert makes last_seen to be the latest $meta$dt when the user makes an edit.

The upsert is also aggregating total_edits and total_lines. It doesn’t require a schema annotation as it is accumulated in the database.

Lastly, the reason why we are inserting data into tables and then selecting them out is because we want to track states for this use case. This is only necessary for a stateful transform. In order to keep track of when we first and last saw a user across events, we need to store this info somewhere.

To run a preview, run the following flowctl command on the Terminal tab in Gitpod:

plaintext
flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'

You should already be authenticated, but if you leave the window open for too long, you may be asked to authenticate again. 

To do that in the Terminal, run flowctl auth login

Blog Post Image

You can get your access token for flowctl from the UI. Navigate to the Admin page, CLI-API tab:

Blog Post Image

When the flowctl preview command runs, you should see the live updates from the derivation as follows:

Blog Post Image

To stop the preview, type Ctrl Z or Ctrl C:

Blog Post Image

To publish the derived collection, run the flowctl catalog publish command as follows. See this doc for a complete flowctl usage guide. 

Blog Post Image

You should now see your published collection in the Flow UI:

Blog Post Image

Note that SQL statements are applied on a go-forward basis only, so you will see updates reflected in your destination whenever new source documents arrive at your source.

Tutorial Example 3: Aggregations

In this example, we will use the same dataset from our Wikipedia demo collection to show aggregations. 

We will look at the total number of lines per edit and include the date of the edit and whether the edit was made by a bot or a human. 

To do that, first update the flow.yaml file with the following schema specs and remove <derivation name>.migrations.0.sql from flow.yaml, as we will not be creating a new table for this derivation. 

The properties in our schema include:

  1. date
  2. bot (boolean field indicating whether the edit was made by a bot or a human)
  3. total_lines
  4. total_edits

We are keying our collection on the date field this time.

Blog Post Image

Notice we are adding a merge annotation to our schema specs:

Blog Post Image

When multiple documents get added to collections that share a common key, Flow opportunistically merges all such documents into a single representative document for that key through a strategy called reduction

The above reduce annotation provides Flow with the specific reduction strategy to use at your document locations. You can learn more about reductions here.

Next, populate the lambda file with the following SQL statement:

plaintext
select    date($meta$dt) as date,    $bot,    coalesce($length$new - $length$old, 0) as total_lines,    1 as total_edits    where $type = 'edit';

The coalesce extracts the delta change of the particular source document.

Now run the preview to take a look at the output:

plaintext
flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
Blog Post Image

As before, to publish the derived collection, run the flowctl catalog publish command. 

See this doc for a complete flowctl usage guide. 

See this tutorial video for a deeper dive and another example on how to apply SQL transformations to Wikipedia live data.

Conclusion

In this tutorial, we have walked through how to use simple SQL statements to create derivations in Flow. 

Using Streaming SQL, you can easily apply a wide range of transformations to your data before they arrive at the destination in your data pipeline.

We have walked through three examples. In the first example, we apply a simple SQL query to filter streaming data on a specific column. In the second example, using our Wikipedia live demo, we applied a simple SQL transformation to retain the first and last time we saw a user and their lifetime edits. In the third example, we applied another SQL transformation to show aggregations of the raw data.

SQL transformation is a powerful capability during data replication because it allows data to be transformed, cleaned, and formatted as it is being moved from one system to another. This can help ensure the data is consistent and accurate across systems, and that it is properly mapped and formatted for its intended use.

Try the SQL transformation capability in Flow today! If you’re new to Flow, you can register here for free.

References

  1. https://www.scalevp.com/blog/stream-processing-becomes-mainstream
  2. https://www.techtarget.com/whatis/feature/A-history-and-timeline-of-big-data