注:2024年2月起,Kinesis Data Firehose也成为了独立产品Data Firehose。再加上之前成为独立产品的Managed Flink,Kinesis的三套件目前都成为了独立产品。
一、背景
Kinesis作为AWS流式数据服务的核心产品,支持多种数据服务作为投递对象。通过Kinesis Data Firehose将数据持久化落盘到S3并自动加载到Redshift数据仓库,可实现最低一分钟的分析间隔,且无需额外配置脚本或计划任务用于加载和数据转换。
本文通过自定义脚本生成测试数据,并加载到Redshift。
二、创建Redshift集群
由于Kinesis是在互联网上运行的服务,默认是通过互联网写入Redshift,因此需要为Redshift配置一个EIP用于接受流量,并为Redshift配置安全规则组允许来自Kinesis地址范围的写入。本文按照最简单的配置通过互联网写入,并通过安全规则组和IP地址限制确保Redshift的安全。
1、创建包含Kinesis入栈许可的安全规则组
进入EC2界面,点击左侧的Security Groups
,创建一个新的安全规则组。
在新的安全规则组入栈(Inbound)方向内,至少包含两条入栈方向的。第一条是本允许连接5439端口,来源地址是从本地互联网公司出口的IP,或者是做实验的VPC的NAT Gateway出口的EIP,用于VPC内的EC2上的应用软件访问Redshift。第二条是Kinesis服务写入Redshift时候需要声明的Kinesis地址段,这个地址段访问可以从本文末尾的文档行中查询到各Region公开的Kinesis服务的IP地址范围。如下截图。
配置完毕后保存安全规则组。
2、申请EIP
进入EC2界面,点击左侧的Elastic IPs
按钮,点击右侧上的申请新的EIP按钮,获得一个新的EIP。如下截图。
配置完成获得EIP一个。
3、创建集群
进入Redshift控制台,点击左侧的Cluster
集群菜单,点击右下方的Create Cluster。如下截图。
输入集群名称,针对实验(测试)场景,机型选择最小的ra3.xplus
,数量选择1
台,支持最高4TB存储。如果创建2台,则支持32TB存储。如下截图。
接下来向下滚动页面,在加载测试数据Load sample data
位置,不选中。默认用户名保持的awsuser
,在密码位置设置好要使用的密码。注意密码需要同时包含英文大写、小写和数字。如果复杂度不够,创建集群时候会有提示密码复杂度不满足要求。如下截图。
在设置Redshift使用的IAM角色的位置,如果之前创建过Redshift,这里会显示出来以前用过的IAM角色。如果之前没有为Redshift创建过IAM角色,那么这里要从菜单中选择新创建一个。点击Manage IAM roles
,点击Create IAM role
即可创建新的。如下截图。
在弹出的创建向导对话框内,选择Redshift访问Any S3 Bucket
给所有S3桶授权。此处是动手实验,所以可以简单粗暴的设置授权Redshift可访问所有S3 Bucket存储桶。如果是生产环境,请针对单个存储桶赋予特定的读写权限。如下截图。
创建IAM角色完毕后,回到刚才的IAM角色清单。这里要确认刚才新创建的IAM Role是默认的角色,如果不是,从下拉框中选择Make default
,确认生效。
配置完毕角色后,要配置网络和VPC设置。点击Addtional configuraitons
后边的Use ddefaults
按钮,展开详细设置。如下截图。
在安全规则组位置,取消默认的default
按钮规则组,选中前文创建的Redshift安全规则组。如下截图。
接下来配置网络访问。在Publicly accessible
位置设哦只为Turn on
为Redshift打开权限。在EIP位置从下拉框中选择上一步创建的EIP。如下截图。
跳过其余选项,都是用向导默认值,然后点击创建按钮。如下截图。
大概等待10~15分钟,集群创建完毕。
4、在Redshift上生成目标表结构
回到刚才的Redshift集群界面,当看到集群的状态是绿色的Available
时候,表示集群可用。点击页面上方的Query data
按钮,即可打开Web界面的Query Editor V2工作台(注意选版本V2)。如下截图。
点击左侧的数据库名称,在登陆选项中,选择用户名密码登陆,然后输入默认的数据库dev
,输入用户名awsuser
,输入前一步创建数据库过程中设置的密码,完成登陆。如下截图。
登陆成功后,在右侧的SQL对话框内输入如下SQL语句,完成建表。如下截图。
Create table public.kdfdemo1
(
msgtime TIMESTAMP,
ticker_symbol varchar(4),
sector varchar(16),
change float,
price float
)
至此Redshift准备完成。
三、使用Kinesis流式传输到Redshift
1、创建S3存储桶
为Kinesis数据落盘在S3创建对应的存储桶一个,本文是用kdf-rs-01
的存储桶名。创建存储桶时候所有参数使用默认值。
2、创建KDF流目标到Redshift
进入Kinesis向导,选择创建Data Firehose流。如下截图。
选择数据来源是Direct PUT
,选择目标是Amazon Redshift
,设置Delivery stream name
。记住这个名称,后文代码将会使用。如下截图。
在数据转换部分,留空不选中。如下截图。
在集群配置部分,选择正确的集群、用户名、密码和数据库。在上一步创建Redshift时候,数据库默认是dev,用户名默认是awsuser。如果创建过程有修改,请自行输入确保匹配正确。如下截图。
在表的名称位置,输入kdfdemo01
,注意与上一步创建表的步骤保持一致。在Columns
位置留空不用填写。在S3存储桶Intermediate S3 Bucket
位置输入上一步创建的S3存储桶kdf-rs-01
。在S3存储目录前缀Intermediate S3 prefix
设置上,保持默认值留空。如下截图。
在Redshift COPY command
位置上,设置参数为GZIP JSON 'auto ignorecase'
。注意这个参数区分大小写。这表示S3存储桶输入的是GZIP压缩文件,数据格式是JSON,且忽略JSON文件中的数据字段名称与Redshift字段大小写不匹配的问题。在重试时间位置设置为3秒。如下截图。
点击Buffer hints, compression and encryption
,展开详细设置。在缓冲文件大小文之设置为默认的5MB,在时间间隔的位置设置为最小60秒写入Redshift。如下截图。
最后在S3存储压缩数据位置,选择GZIP
格式压缩,这样可降低S3存储桶使用的容量。注意这里选择是否压缩需要与前边的步骤设置Redshift加载数据Load命令保持一致。如果这里开启了压缩,前边Load命令也需要加入GZIP参数,反之则都不压缩。最后点击创建。如下截图。
至此Kinesis创建完成。
3、发起测试数据流并检查S3存储桶
等待几分钟,确保Kinesis显示Status
位置为绿色的Active
即可开始测试。
点击进入刚才配置的Kinesis流到Redshift,在页面上有一个Test with demo data
这里可选使用,不过其缺少时间戳字段,这里我们构造一段脚本生成数据测试。
运行生成数据脚本前,需要在运行代码的环境上提前配置好Python、AWSCLI的密钥,设置好AWS区域。执行如下命令安装Boto3库。
pip3 install boto3
将如下代码保存为kdf-redshift.py。然后运行python3 kdf-redshift.py
执行之。
import boto3
import time
import json
import random
import botocore.exceptions
# Kinesis Data Firehose stream name
output_stream = "PUT-S3-Redshift"
firehose = boto3.client('firehose')
# Submit to Kinesis
try:
for i in range(1000):
# Source data before Lambda Transformation
msgtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
sector = random.sample(['ENERGY', 'RETAIL', 'TECHNOLOGY', 'HEALTHCARE'], 1)[0]
ticker_symbol_list = random.sample(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'], 3)
ticker_symbol = ticker_symbol_list[0] + ticker_symbol_list[1] + ticker_symbol_list[2]
# Build json
data1 = {
"msgtime" : msgtime ,
"ticker_symbol" : ticker_symbol ,
"sector" : sector ,
"change" : round(random.uniform(-10, 10),4) ,
"price" : round(random.uniform(-10, 10),4)
}
msg = json.dumps(data1)
firehose.put_record(
DeliveryStreamName=output_stream,
Record={
'Data': msg
}
)
print(msg)
time.sleep(5)
except botocore.exceptions.ClientError as error:
print(error)
运行本段代码。可看到数据正常的发送给了Kinesis服务。如下截图。
如果提示找不到Kinesis资源的错误,有可能是AWSCLI和Python配置的region不对,或者Kinesis流的名称不对。修正后重新运行脚本即可。
4、读取Redshift数据
在等待大约1~2分钟后运行SQL语句查询:
SELECT
*
FROM
"dev"."public"."kdfdemo1";
可看到Redshift上的数据被正确的检索出。通过查看时间戳,可确定是约1分钟前写入的。
四、优化Redshift表结构的测试
本实验主要过程与上一个步骤相同,本文只描述配置有差别的项目。
1、建表语句
新建一张表,名为kdfdemo2
,里边包含了字段压缩方法、排序键和分区键的定义。
在Redshift上执行如下命令:
Create table public.kdfdemo2
(
uuid varchar(255) encode TEXT255,
msgtime TIMESTAMP encode AZ64 sortkey,
ticker_symbol varchar(4) encode TEXT255,
sector varchar(16) encode text255 distkey,
change float,
price float
)
注:TIMESTAMP类型的压缩类型设置为az64或者RAW更好。
创建完成。效果如下:
2、创建新的S3存储桶
为了和上边的实验区分开,这里创建一个新的存储桶,名为kdf-rs-02
。
3、创建新的Kinesis Data Firehose流
为了和前文区别开,这里取名PUT-S3-Redshift-2
。
4、生成数据代码
执行如下代码生成数据。与前文的代码相比,是用的Kinesis的流名称不同,
import boto3
import time
import json
import random
import botocore.exceptions
import hashlib
# Kinesis Data Firehose stream name
output_stream = "PUT-S3-Redshift-2"
firehose = boto3.client('firehose')
# Submit to Kinesis
try:
for i in range(1000):
# Source data before Lambda Transformation
msgtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
sector = random.sample(['ENERGY', 'RETAIL', 'TECHNOLOGY', 'HEALTHCARE'], 1)[0]
ticker_symbol_list = random.sample(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'], 3)
ticker_symbol = ticker_symbol_list[0] + ticker_symbol_list[1] + ticker_symbol_list[2]
uuid = str(msgtime) + str(ticker_symbol)
msgtimeMD5 = hashlib.md5(uuid.encode())
# Build json
data1 = {
"uuid" : msgtimeMD5.hexdigest() ,
"msgtime" : msgtime ,
"ticker_symbol" : ticker_symbol ,
"sector" : sector ,
"change" : round(random.uniform(-10, 10),4) ,
"price" : round(random.uniform(-10, 10),4)
}
msg = json.dumps(data1)
firehose.put_record(
DeliveryStreamName=output_stream,
Record={
'Data': msg
}
)
print(msg)
time.sleep(5)
except botocore.exceptions.ClientError as error:
print(error)
5、查询数据测试结果
执行如下SQL查询:
SELECT
*
FROM
"dev"."public"."kdfdemo2";
可看到返回结果如下:
至此实验完成。
五、参考文档
Kinesis Data Firehose在各Region的IP地址范围
https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html#using-iam-rs-vpc
Amazon Kinesis Data Firehose Immersion Day – Amazon Redshift as destination