Kinesis Data Firehose 准实时写入数据到Redshift方案

注:2024年2月起,Kinesis Data Firehose也成为了独立产品Data Firehose。再加上之前成为独立产品的Managed Flink,Kinesis的三套件目前都成为了独立产品。

一、背景

Kinesis作为AWS流式数据服务的核心产品,支持多种数据服务作为投递对象。通过Kinesis Data Firehose将数据持久化落盘到S3并自动加载到Redshift数据仓库,可实现最低一分钟的分析间隔,且无需额外配置脚本或计划任务用于加载和数据转换。

本文通过自定义脚本生成测试数据,并加载到Redshift。

二、创建Redshift集群

由于Kinesis是在互联网上运行的服务,默认是通过互联网写入Redshift,因此需要为Redshift配置一个EIP用于接受流量,并为Redshift配置安全规则组允许来自Kinesis地址范围的写入。本文按照最简单的配置通过互联网写入,并通过安全规则组和IP地址限制确保Redshift的安全。

1、创建包含Kinesis入栈许可的安全规则组

进入EC2界面,点击左侧的Security Groups,创建一个新的安全规则组。

在新的安全规则组入栈(Inbound)方向内,至少包含两条入栈方向的。第一条是本允许连接5439端口,来源地址是从本地互联网公司出口的IP,或者是做实验的VPC的NAT Gateway出口的EIP,用于VPC内的EC2上的应用软件访问Redshift。第二条是Kinesis服务写入Redshift时候需要声明的Kinesis地址段,这个地址段访问可以从本文末尾的文档行中查询到各Region公开的Kinesis服务的IP地址范围。如下截图。

配置完毕后保存安全规则组。

2、申请EIP

进入EC2界面,点击左侧的Elastic IPs按钮,点击右侧上的申请新的EIP按钮,获得一个新的EIP。如下截图。

配置完成获得EIP一个。

3、创建集群

进入Redshift控制台,点击左侧的Cluster集群菜单,点击右下方的Create Cluster。如下截图。

输入集群名称,针对实验(测试)场景,机型选择最小的ra3.xplus,数量选择1台,支持最高4TB存储。如果创建2台,则支持32TB存储。如下截图。

接下来向下滚动页面,在加载测试数据Load sample data位置,不选中。默认用户名保持的awsuser,在密码位置设置好要使用的密码。注意密码需要同时包含英文大写、小写和数字。如果复杂度不够,创建集群时候会有提示密码复杂度不满足要求。如下截图。

在设置Redshift使用的IAM角色的位置,如果之前创建过Redshift,这里会显示出来以前用过的IAM角色。如果之前没有为Redshift创建过IAM角色,那么这里要从菜单中选择新创建一个。点击Manage IAM roles,点击Create IAM role即可创建新的。如下截图。

在弹出的创建向导对话框内,选择Redshift访问Any S3 Bucket给所有S3桶授权。此处是动手实验,所以可以简单粗暴的设置授权Redshift可访问所有S3 Bucket存储桶。如果是生产环境,请针对单个存储桶赋予特定的读写权限。如下截图。

创建IAM角色完毕后,回到刚才的IAM角色清单。这里要确认刚才新创建的IAM Role是默认的角色,如果不是,从下拉框中选择Make default,确认生效。

配置完毕角色后,要配置网络和VPC设置。点击Addtional configuraitons后边的Use ddefaults按钮,展开详细设置。如下截图。

在安全规则组位置,取消默认的default按钮规则组,选中前文创建的Redshift安全规则组。如下截图。

接下来配置网络访问。在Publicly accessible位置设哦只为Turn on为Redshift打开权限。在EIP位置从下拉框中选择上一步创建的EIP。如下截图。

跳过其余选项,都是用向导默认值,然后点击创建按钮。如下截图。

大概等待10~15分钟,集群创建完毕。

4、在Redshift上生成目标表结构

回到刚才的Redshift集群界面,当看到集群的状态是绿色的Available时候,表示集群可用。点击页面上方的Query data按钮,即可打开Web界面的Query Editor V2工作台(注意选版本V2)。如下截图。

点击左侧的数据库名称,在登陆选项中,选择用户名密码登陆,然后输入默认的数据库dev,输入用户名awsuser,输入前一步创建数据库过程中设置的密码,完成登陆。如下截图。

登陆成功后,在右侧的SQL对话框内输入如下SQL语句,完成建表。如下截图。

Create table public.kdfdemo1
(
    msgtime TIMESTAMP,
    ticker_symbol varchar(4),
    sector varchar(16),
    change float,
    price float
)

至此Redshift准备完成。

三、使用Kinesis流式传输到Redshift

1、创建S3存储桶

为Kinesis数据落盘在S3创建对应的存储桶一个,本文是用kdf-rs-01的存储桶名。创建存储桶时候所有参数使用默认值。

2、创建KDF流目标到Redshift

进入Kinesis向导,选择创建Data Firehose流。如下截图。

选择数据来源是Direct PUT,选择目标是Amazon Redshift,设置Delivery stream name。记住这个名称,后文代码将会使用。如下截图。

在数据转换部分,留空不选中。如下截图。

在集群配置部分,选择正确的集群、用户名、密码和数据库。在上一步创建Redshift时候,数据库默认是dev,用户名默认是awsuser。如果创建过程有修改,请自行输入确保匹配正确。如下截图。

在表的名称位置,输入kdfdemo01,注意与上一步创建表的步骤保持一致。在Columns位置留空不用填写。在S3存储桶Intermediate S3 Bucket位置输入上一步创建的S3存储桶kdf-rs-01。在S3存储目录前缀Intermediate S3 prefix设置上,保持默认值留空。如下截图。

Redshift COPY command位置上,设置参数为GZIP JSON 'auto ignorecase'。注意这个参数区分大小写。这表示S3存储桶输入的是GZIP压缩文件,数据格式是JSON,且忽略JSON文件中的数据字段名称与Redshift字段大小写不匹配的问题。在重试时间位置设置为3秒。如下截图。

点击Buffer hints, compression and encryption,展开详细设置。在缓冲文件大小文之设置为默认的5MB,在时间间隔的位置设置为最小60秒写入Redshift。如下截图。

最后在S3存储压缩数据位置,选择GZIP格式压缩,这样可降低S3存储桶使用的容量。注意这里选择是否压缩需要与前边的步骤设置Redshift加载数据Load命令保持一致。如果这里开启了压缩,前边Load命令也需要加入GZIP参数,反之则都不压缩。最后点击创建。如下截图。

至此Kinesis创建完成。

3、发起测试数据流并检查S3存储桶

等待几分钟,确保Kinesis显示Status位置为绿色的Active即可开始测试。

点击进入刚才配置的Kinesis流到Redshift,在页面上有一个Test with demo data这里可选使用,不过其缺少时间戳字段,这里我们构造一段脚本生成数据测试。

运行生成数据脚本前,需要在运行代码的环境上提前配置好Python、AWSCLI的密钥,设置好AWS区域。执行如下命令安装Boto3库。

pip3 install boto3

将如下代码保存为kdf-redshift.py。然后运行python3 kdf-redshift.py执行之。

import boto3
import time
import json
import random
import botocore.exceptions

# Kinesis Data Firehose stream name
output_stream = "PUT-S3-Redshift"

firehose = boto3.client('firehose')

# Submit to Kinesis

try:
    for i in range(1000):
        # Source data before Lambda Transformation
        msgtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        sector = random.sample(['ENERGY', 'RETAIL', 'TECHNOLOGY', 'HEALTHCARE'], 1)[0]
        ticker_symbol_list = random.sample(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'], 3)
        ticker_symbol = ticker_symbol_list[0] + ticker_symbol_list[1] + ticker_symbol_list[2]

        # Build json
        data1 = {
            "msgtime" : msgtime ,
            "ticker_symbol" : ticker_symbol ,
            "sector" : sector ,
            "change" : round(random.uniform(-10, 10),4) ,
            "price" : round(random.uniform(-10, 10),4)
        }   
        msg = json.dumps(data1)

        firehose.put_record(
            DeliveryStreamName=output_stream,
                Record={
                    'Data': msg
                    }
            )
        print(msg)
        time.sleep(5)
        
except botocore.exceptions.ClientError as error:
    print(error)

运行本段代码。可看到数据正常的发送给了Kinesis服务。如下截图。

如果提示找不到Kinesis资源的错误,有可能是AWSCLI和Python配置的region不对,或者Kinesis流的名称不对。修正后重新运行脚本即可。

4、读取Redshift数据

在等待大约1~2分钟后运行SQL语句查询:

SELECT
    *
FROM
    "dev"."public"."kdfdemo1";

可看到Redshift上的数据被正确的检索出。通过查看时间戳,可确定是约1分钟前写入的。

四、优化Redshift表结构的测试

本实验主要过程与上一个步骤相同,本文只描述配置有差别的项目。

1、建表语句

新建一张表,名为kdfdemo2,里边包含了字段压缩方法、排序键和分区键的定义。

在Redshift上执行如下命令:

Create table public.kdfdemo2
(
    uuid varchar(255) encode TEXT255,
    msgtime TIMESTAMP encode AZ64 sortkey,
    ticker_symbol varchar(4) encode TEXT255,
    sector varchar(16) encode text255 distkey,
    change float,
    price float
)

注:TIMESTAMP类型的压缩类型设置为az64或者RAW更好。

创建完成。效果如下:

2、创建新的S3存储桶

为了和上边的实验区分开,这里创建一个新的存储桶,名为kdf-rs-02

3、创建新的Kinesis Data Firehose流

为了和前文区别开,这里取名PUT-S3-Redshift-2

4、生成数据代码

执行如下代码生成数据。与前文的代码相比,是用的Kinesis的流名称不同,

import boto3
import time
import json
import random
import botocore.exceptions
import hashlib

# Kinesis Data Firehose stream name
output_stream = "PUT-S3-Redshift-2"

firehose = boto3.client('firehose')

# Submit to Kinesis

try:
    for i in range(1000):
        # Source data before Lambda Transformation
        msgtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        sector = random.sample(['ENERGY', 'RETAIL', 'TECHNOLOGY', 'HEALTHCARE'], 1)[0]
        ticker_symbol_list = random.sample(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'], 3)
        ticker_symbol = ticker_symbol_list[0] + ticker_symbol_list[1] + ticker_symbol_list[2]
        uuid = str(msgtime) + str(ticker_symbol)
        msgtimeMD5 = hashlib.md5(uuid.encode())

        # Build json
        data1 = {
            "uuid" : msgtimeMD5.hexdigest() ,
            "msgtime" : msgtime ,
            "ticker_symbol" : ticker_symbol ,
            "sector" : sector ,
            "change" : round(random.uniform(-10, 10),4) ,
            "price" : round(random.uniform(-10, 10),4)
        }   
        msg = json.dumps(data1)

        firehose.put_record(
            DeliveryStreamName=output_stream,
                Record={
                    'Data': msg
                    }
            )
        print(msg)
        time.sleep(5)
        
except botocore.exceptions.ClientError as error:
    print(error)

5、查询数据测试结果

执行如下SQL查询:

SELECT
    *
FROM
    "dev"."public"."kdfdemo2";

可看到返回结果如下:

至此实验完成。

五、参考文档

Kinesis Data Firehose在各Region的IP地址范围

https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html#using-iam-rs-vpc

Amazon Kinesis Data Firehose Immersion Day – Amazon Redshift as destination

https://catalog.us-east-1.prod.workshops.aws/workshops/32e6bc9a-5c03-416d-be7c-4d29f40e55c4/en-US/lab-4/lab4-2-redshift