New for Amazon Redshift – General Availability of Streaming Ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka

Voiced by Polly

Ten years ago, just a few months after I joined AWS, Amazon Redshift was launched. Over the years, many features have been added to improve performance and make it easier to use. Amazon Redshift now allows you to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes. More recently, Amazon Redshift Serverless became generally available to make it easier to run and scale analytics without having to manage your data warehouse infrastructure.

To process data as quickly as possible from real-time applications, customers are adopting streaming engines like Amazon Kinesis and Amazon Managed Streaming for Apache Kafka. Previously, to load streaming data into your Amazon Redshift database, you’d have to configure a process to stage data in Amazon Simple Storage Service (Amazon S3) before loading. Doing so would introduce a latency of one minute or more, depending on the volume of data.

Today, I am happy to share the general availability of Amazon Redshift Streaming Ingestion. With this new capability, Amazon Redshift can natively ingest hundreds of megabytes of data per second from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view and query it in seconds.

Architecture diagram.

Streaming ingestion benefits from the ability to optimize query performance with materialized views and allows the use of Amazon Redshift more efficiently for operational analytics and as the data source for real-time dashboards. Another interesting use case for streaming ingestion is analyzing real-time data from gamers to optimize their gaming experience. This new integration also makes it easier to implement analytics for IoT devices, clickstream analysis, application monitoring, fraud detection, and live leaderboards.

Let’s see how this works in practice.

Configuring Amazon Redshift Streaming Ingestion
Apart from managing permissions, Amazon Redshift streaming ingestion can be configured entirely with SQL within Amazon Redshift. This is especially useful for business users who lack access to the AWS Management Console or the expertise to configure integrations between AWS services.

You can set up streaming ingestion in three steps:

  1. Create or update an AWS Identity and Access Management (IAM) role to allow access to the streaming platform you use (Kinesis Data Streams or Amazon MSK). Note that the IAM role should have a trust policy that allows Amazon Redshift to assume the role.
  2. Create an external schema to connect to the streaming service.
  3. Create a materialized view that references the streaming object (Kinesis data stream or Kafka topic) in the external schemas.

After that, you can query the materialized view to use the data from the stream in your analytics workloads. Streaming ingestion works with Amazon Redshift provisioned clusters and with the new serverless option. To maximize simplicity, I am going to use Amazon Redshift Serverless in this walkthrough.

To prepare my environment, I need a Kinesis data stream. In the Kinesis console, I choose Data streams in the navigation pane and then Create data stream. For the Data stream name, I use my-input-stream and then leave all other options set to their default value. After a few seconds, the Kinesis data stream is ready. Note that by default I am using on-demand capacity mode. In a development or test environment, you can choose provisioned capacity mode with one shard to optimize costs.

Now, I create an IAM role to give Amazon Redshift access to the my-input-stream Kinesis data streams. In the IAM console, I create a role with this policy:

{ “Version”: “2012-10-17”, “Statement”: [ { “Effect”: “Allow”, “Action”: [ “kinesis:DescribeStreamSummary”, “kinesis:GetShardIterator”, “kinesis:GetRecords”, “kinesis:DescribeStream” ], “Resource”: “arn:aws:kinesis:*:123412341234:stream/my-input-stream” }, { “Effect”: “Allow”, “Action”: [ “kinesis:ListStreams”, “kinesis:ListShards” ], “Resource”: “*” } ] }

To allow Amazon Redshift to assume the role, I use the following trust policy:

{ “Version”: “2012-10-17”, “Statement”: [ { “Effect”: “Allow”, “Principal”: { “Service”: “redshift.amazonaws.com” }, “Action”: “sts:AssumeRole” } ] }

In the Amazon Redshift console, I choose Redshift serverless from the navigation pane and create a new workgroup and namespace, similar to what I did in this blog post. When I create the namespace, in the Permissions section, I choose Associate IAM roles from the dropdown menu. Then, I select the role I just created. Note that the role is visible in this selection only if the trust policy allows Amazon Redshift to assume it. After that, I complete the creation of the namespace using the default options. After a few minutes, the serverless database is ready for use.

In the Amazon Redshift console, I choose Query editor v2 in the navigation pane. I connect to the new serverless database by choosing it from the list of resources. Now, I can use SQL to configure streaming ingestion. First, I create an external schema that maps to the streaming service. Because I am going to use simulated IoT data as an example, I call the external schema sensors.

CREATE EXTERNAL SCHEMA sensors FROM KINESIS IAM_ROLE ‘arn:aws:iam::123412341234:role/redshift-streaming-ingestion’;

To access the data in the stream, I create a materialized view that selects data from the stream. In general, materialized views contain a precomputed result set based on the result of a query. In this case, the query is reading from the stream, and Amazon Redshift is the consumer of the stream.

Because streaming data is going to be ingested as JSON data, I have two options:

  1. Leave all the JSON data in a single column and use Amazon Redshift capabilities to query semi-structured data.
  2. Extract JSON properties into their own separate columns.

Let’s see the pros and cons of both options.

The approximate_arrival_timestamp, partition_key, shard_id, and sequence_number columns in the SELECT statement are provided by Kinesis Data Streams. The record from the stream is in the kinesis_data column. The refresh_time column is provided by Amazon Redshift.

To leave the JSON data in a single column of the sensor_data materialized view, I use the JSON_PARSE function:

CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, refresh_time, JSON_PARSE(kinesis_data, ‘utf-8’) as payload FROM sensors.”my-input-stream”; CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, refresh_time, JSON_PARSE(kinesis_data) as payload FROM sensors.”my-input-stream”;

Because I used the AUTO REFRESH YES parameter, the content of the materialized view is automatically refreshed when there is new data in the stream.

To extract the JSON properties into separate columns of the sensor_data_extract materialized view, I use the JSON_EXTRACT_PATH_TEXT function:

CREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, refresh_time, JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, ‘utf-8′),’sensor_id’)::VARCHAR(8) as sensor_id, JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, ‘utf-8′),’current_temperature’)::DECIMAL(10,2) as current_temperature, JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, ‘utf-8′),’status’)::VARCHAR(8) as status, JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, ‘utf-8′),’event_time’)::CHARACTER(26) as event_time FROM sensors.”my-input-stream”;

Loading Data into the Kinesis Data Stream
To put data in the my-input-stream Kinesis Data Stream, I use the following random_data_generator.py Python script simulating data from IoT sensors:

import datetime import json import random import boto3 STREAM_NAME = “my-input-stream” def get_random_data(): current_temperature = round(10 + random.random() * 170, 2) if current_temperature > 160: status = “ERROR” elif current_temperature > 140 or random.randrange(1, 100) > 80: status = random.choice([“WARNING”,”ERROR”]) else: status = “OK” return { ‘sensor_id’: random.randrange(1, 100), ‘current_temperature’: current_temperature, ‘status’: status, ‘event_time’: datetime.datetime.now().isoformat() } def send_data(stream_name, kinesis_client): while True: data = get_random_data() partition_key = str(data[“sensor_id”]) print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey=partition_key) if __name__ == ‘__main__’: kinesis_client = boto3.client(‘kinesis’) send_data(STREAM_NAME, kinesis_client)

I start the script and see the records that are being put in the stream. They use a JSON syntax and contain random data.

$ python3 random_data_generator.py {‘sensor_id’: 66, ‘current_temperature’: 69.67, ‘status’: ‘OK’, ‘event_time’: ‘2022-11-20T18:31:30.693395’} {‘sensor_id’: 45, ‘current_temperature’: 122.57, ‘status’: ‘OK’, ‘event_time’: ‘2022-11-20T18:31:31.486649’} {‘sensor_id’: 15, ‘current_temperature’: 101.64, ‘status’: ‘OK’, ‘event_time’: ‘2022-11-20T18:31:31.671593’} …

Querying Streaming Data from Amazon Redshift
To compare the two materialized views, I select the first ten rows from each of them:

  • In the sensor_data materialized view, the JSON data in the stream is in the payload column. I can use Amazon Redshift JSON functions to access data stored in JSON format.Console screenshot.
  • In the sensor_data_extract materialized view, the JSON data in the stream has been extracted into different columns: sensor_id, current_temperature, status, and event_time.Console screenshot.

Now I can use the data in these views in my analytics workloads together with the data in my data warehouse, my operational databases, and my data lake. I can use the data in these views together with Redshift ML to train a machine learning model or use predictive analytics. Because materialized views support incremental updates, the data in these views can be efficiently used as a data source for dashboards, for example, using Amazon Redshift as a data source for Amazon Managed Grafana.

Availability and Pricing
Amazon Redshift streaming ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka is generally available today in all commercial AWS Regions.

There are no additional costs for using Amazon Redshift streaming ingestion. For more information, see Amazon Redshift pricing.

It’s never been easier to use low-latency streaming data in your data warehouse and in your data lake. Let us know what you build with this new capability!

— Danilo



Source