创建并连接MSK服务

一、背景

Amazon Managed Streaming for Apache Kafka 是AWS推出的托管Kafka服务。主要优势体现在:

  • Amazon MSK 使您可以专注于创建流应用程序,而无需担心管理 Apache Kafka 环境的运营开销。Amazon MSK 为您管理 Apache Kafka 集群和 Apache ZooKeeper 节点的预置、配置及维护。Amazon MSK 还在 AWS 控制台中显示关键的 Apache Kafka 性能指标。
  • Amazon MSK 创建 Apache Kafka 集群,并在一个 AWS 区域内提供多可用区复制。Amazon MSK 持续监控集群的运行状况,并自动替换发生故障的组件。
  • Amazon MSK 可为您的 Apache Kafka 集群提供多级安全性,包括 VPC 网络隔离、使用 AWS IAM 进行控制层面 API 授权、静态加密、传输中 TLS 加密。

二、创建MSK集群

1、关于在中国北京区域创建MSK的说明

通常的Kafka是三节点状态,在AWS中国区北京Region提供2个可用区。当在北京区域创建Kafka的时候,背后的ZooKeeper将以多节点方式创建并维持高可用。相应的,BrokerNode将是最小为2个节点,每个可用区一个节点,数据副本也是最小2个节点。一般在生产环境中,可使用4个Broker节点的方式,4个Broker将分别位于2个AZ内,每个AZ各2个。

本文的实验将按照以上方式创建4个Broker。

2、创建安全规则组

首先为Kafka创建一个安全规则组,位于所需要的VPC内。安全规则组的策略如下:

  • 允许TCP 2181端口入栈,来源是本VPC的内网IP 172.31.0.0/16
  • 允许TCP 9092端口入栈,来源是本VPC的内网IP 172.31.0.0/16
  • 允许任意出栈,去往0.0.0.0/0

在以上规则组中,如果是生产环境,建议把安全规则组的入栈的来源地址,从172.31.0.0/16这种网段授权方式,修改为对EC2所在的安全规则组授权的方式。

创建完毕后保存规则组为某特定名称,稍后的创建MSK集群中将会使用。

3、创建集群

进入MSK界面,点击创建按钮。如下截图。

image-20210315211016754

点击定制方式创建。注意因为要修改安全组,所以必须选择右侧的定制按钮。在集群名称位置输入 Cluster01 作为集群名称。在版本位置选择默认推荐的版本。如下截图。

image-20210315211051500

讲页面向下滚动后,选择MSK配置参数组是默认值。在下方VPC位置,选择,可用的AZ。首先选择第一个AZ,选择对应的子网。如下截图。

image-20210315212241389

讲页面向下滚动。选择第二个可用区,选择对应的子网。然后在Broker类型位置选择要使用的规格,例如默认是m5.xlarge讲提供2vCPU/8GB内存。在每个可用区的节点数位置输入2,即可获得总共2个AZ 4个节点。如下截图。

image-20210315213705808

在Tag位置跳过,可不输入。在存储位置使用默认的1000GB。如下截图。

image-20210315213847451

将页面向下滚动。在传输加密位置,选择第二项,同时允许加密的和不加密的流量。这是因为某些历史程序可能直接调用的是Kafka端口,并未使用TLS对流量加密。因此选择同时允许的选项将保持较好兼容性。如下截图。

image-20210315213926800

在身份认证选项位置,选择None。这里的使用用户名和密码验证的方法需要启用TLS传输,也就是上一步必须选择“只接受TLS”流量才可以。如果上一步选择了同时允许加密和不加密的流量,那么这里也就不能设置身份验证了。不能设置身份验证的情况下,建议用安全规则组进行白名单方式的访问授权。再来页面下方监控位置选择基本监控。如下截图。

image-20210315214152597

更多监控选项位置可以保持为默认不选中状态。如下截图。

image-20210315214422044

在高级设置部分,点击展开集群设置,选择定制设置,然后在安全规则组位置,通过下拉框选择前文创建的Kafka专用的安全组。此外,还需要删除掉默认的default安全组的选中。最后一步点击创建集群按钮。如下截图。

image-20210315214451874

创建集群过程将耗时15分钟。在如下位置可以看到集群的状态是创建中,需要等待状态变成可用之后才可以后续操作。如下截图。

image-20210315214914486

当状态显示Active时候,集群创建完成。如下截图。

image-20210315221757359

三、发送消息测试

发送消息测试有两个操作位置:

  • 第一部分的操作需要在一个具有AWS CLI且配置了Access Key的环境上操作;
  • 第二部分需要在能够访问到Kafka集群的EC2上进行测试。

1、查看终端节点

进入MSK后,点击查看客户端信息。如下截图。

image-20210316152115259

在如下界面可以看到看到TLS和文本连接两个字符串。TLS是用于SSL加密的,Plaintext是不加密的文本链接。请将一下内容复制下来。如下截图。

image-20210316152346885

加下来继续查询Kafka集群的Zookeeper节点。

打开集群界面,点击集群名称,进入集群详情界面,找到资源名称ARN。将ARN复制下来。如下截图。

image-20210315221824049

在配置好AWS CLI和Access Key的机器上,执行如下命令获取集群的ZookeeperConnectString端点。请替换其中的ARN为上一个步骤界面上显示的ARN。

aws kafka describe-cluster --cluster-arn \
arn:aws-cn:kafka:cn-north-1:420029960748:cluster/cluster01/1792eb5f-69d8-442a-8953-b28e02834a66-2

执行后返回信息中可以找到如下一段。

"ZookeeperConnectString": "z-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181,z-2.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181,z-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181"

这一段就是Zookeeper的Endpoint地址。将其复制下来。

2、在Kafka上创建Topic

创建一个EC2,EC2所在的网络需要与Kafka在同一个VPC内,且子网之间可能正常通信。登陆到这个EC2上,下载Kafka安装包。可以从Apache Kafka官网下载,本实验也可以从这里下载。下载后解压缩,在bin目录下可以看到有关sh脚本。

在EC2上执行如下命令创建Topic。

bin/kafka-topics.sh --create --zookeeper \
z-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181,\
z-2.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181,\
z-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181 \
--replication-factor 4 --partitions 1 --topic AWSKafkaTutorialTopic

执行后提示 Created topic AWSKafkaTutorialTopic 则表示创建Topic成功。

在EC2上执行如下命令查看Topic。

bin/kafka-topics.sh --list --zookeeper \
z-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181,\
z-2.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181,\
z-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:2181

执行后返回信息如下即是正常的列出了Topic,其中AWSKafkaTutorialTopic是刚创建的Topic。

AWSKafkaTutorialTopic
__amazon_msk_canary
__consumer_offsets

创建Topic完成。

3、发送消息

在配置好AWS CLI和Access Key的环境上,运行如下命令查看Broker终端节点。请替换ARN为前文复制下来的ARN。

aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:420029960748:cluster/cluster01/1792eb5f-69d8-442a-8953-b28e02834a66-2

执行后,将返回如下信息。

{
    "BootstrapBrokerString": "b-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092,b-4.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092",
    "BootstrapBrokerStringTls": "b-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9094,b-4.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9094"
}
(END)

在以上返回信息里边,包括 BootstrapBrokerString 和 BootstrapBrokerStringTls 两个信息。第一个是用于不带SSL的普通连接,第二个是用于支持SSL连接的TLS安全连接。本文在创建集群时候就选择了同时允许不加密的连接和TLS连接,因此不加密也是可以访问MSK的。下面将使用不加密的Broker节点作为入口访问。

在EC2上执行如下命令发送消息。

bin/kafka-console-producer.sh --broker-list \
b-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092,\
b-4.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092,\
b-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092 \
--topic AWSKafkaTutorialTopic

建立连接成功的话,将会打开一个持续连接窗口,输入要发送的消息并按回车即可发送。如果要停止输入消息,按Ctrl+C键可以终止。接下来不要关闭这个窗口,再打开一个新的SSH控制台去接收消息。

4、接收消息

接上一步,不要关闭发送消息的SSH窗口,再打开一个新的SSH控制台登陆到EC2。

执行如下命令接收消息。

bin/kafka-console-consumer.sh --bootstrap-server \
b-3.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092,\
b-4.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092,\
b-1.cluster01.vlxlsh.c2.kafka.cn-north-1.amazonaws.com.cn:9092 \
--topic AWSKafkaTutorialTopic --from-beginning

运行程序后,首先可以看到之前发送的消息。然后切换到发送窗口,继续输入新的消息,并按回车,也可以看到控制台上有新的消息发送并接收成功。如下截图。

image-20210317184605608

至此,实验完成。