亚马逊云科技Amazon MSK是Amazon云平台提供的托管Kafka服务。在系统升级或迁移时,用户常常需要将一个Amazon MSK集群中的数据导出(备份),然后在新集群或另一个集群中再将数据导入(还原)。通常,Kafka集群间的数据复制和同步多采用Kafka MirrorMaker,但是,在某些场景中,受环境限制,两个于Kafka集群之间的网络可能无法连通,或者两个亚马逊云科技账号相互隔离,亦或是需要将Kafka的数据沉淀为文件存储以备他用。此时,基于Kafka Connect S3 Source/Sink Connector的方案会是一种较为合适的选择,本文就将介绍一下这一方案的具体实现。
数据的导出、导入、备份、还原通常都是一次性操作,为此搭建完备持久的基础设施并无太大必要,省时省力,简单便捷才是优先的考量因素。为此,本文将提供一套开箱即用的解决方案,方案使用Docker搭建Kafka Connect,所有操作均配备自动化Shell脚本,用户只需设置一些环境变量并执行相应脚本即可完成全部工作。这种基于Docker的单体模式可以应对中小型规模的数据同步和迁移,如果要寻求稳定、健壮的解决方案,可以考虑将Docker版本的Kafka Connect迁移到Kubernetes或Amazon MSK Connect,实现集群化部署。
整体架构
首先介绍一下方案的整体架构。导出/导入和备份/还原其实是两种高度类似的场景,但为了描述清晰,我们还是分开讨论。先看一下导出/导入的架构示意图:
在这个架构中,Source端的MSK是数据流的起点,安装了S3 Sink Connector的Kafka Connect会从Source端的MSK中提取指定Topic的数据,然后以Json或Avro文件的形式存储到S3上;同时,另一个安装了S3 Source Connector的Kafka Connect会从S3上读取这些Json或Avro文件,然后写入到Sink端MSK的对应Topic中。如果Source端和Sink端的MSK集群不在同一个Region,可以在各自的Region分别完成导入和导出,然后在两个Region之间使用S3的Cross-Rejion Replication进行数据同步。
该架构只需进行简单的调整,即可用于MSK集群的备份/还原,如下图所示:先将MSK集群的数据备份到S3上,待完成集群的升级、迁移或重建工作后,再从S3上将数据恢复到新建集群即可。
预设条件
本文聚焦于Kafka Connect的数据导出/导入和备份/还原操作,需要提前准备:
一台基于Amazon Linux2的EC2实例(建议新建纯净实例),本文所有的实操脚本都将在该实例上执行,该实例也是运行Kafka Connect Docker Container的宿主机。
两个MSK集群,一个作为Source,一个作为Sink;如果只有一个MSK集群也可完成验证,该集群将既作Source又作Sink。
为聚焦Kafka Connect S3 Source/Sink Connector的核心配置,预设MSK集群没有开启身份认证(即认证类型为Unauthenticated),数据传输方式为PLAINTEXT,以便简化Kafka Connect的连接配置。
网络连通性上要求EC2实例能访问S3、Source端MSK集群、Sink端MSK集群。如果在实际环境中无法同时连通Source端和Sink端,则可以在两台分属于不同网络的EC2上进行操作,但它们必须都能访问S3。如果是跨Region或账号隔离,则另需配置S3 Cross-Region Replication或手动拷贝数据文件。
全局配置
由于实际操作将不可避免地依赖到具体的亚马逊云科技账号以及本地环境里的各项信息(如AKSK,服务地址,各类路径,Topic名称等),为了保证本文给出的操作脚本具有良好的可移植性,将所有与环境相关的信息抽离出来,以全局变量的形式在实操前集中配置。以下就是全局变量的配置脚本,读者需要根据个人环境设定这些变量的取值:
为了便于演示和解读,本文将使用下面的全局配置,其中前6项配置与账号和环境强相关,仍需用户自行修改,脚本中给出的仅为示意值,而后5项配置与MSK数据的导入导出息息相关,不建议修改,因为后续的解读将基于这里设定的值展开,待完成验证后,您可再根据需要灵活修改后5项配置以完成实际的导入导出工作。
回到操作流程,登录准备好的EC2实例,修改下面脚本中与账号和环境相关的前6项配置,然后执行修改后的脚本。此外,需要提醒注意的是:在后续操作中,部分脚本执行后将不再返回,而是持续占用当前窗口输出日志或Kafka消息,因此需要新开命令行窗口,每次新开窗口都需要执行一次这里的全局配置脚本。
关于上述脚本中的后5项配置,有如下详细说明:
我们就以脚本中设定的值为例,解读一下这5项配置联合起来将要实现的功能,同时也是本文将演示的主要内容:
在Source端的MSK集群上存在两个名为source-topic-1和source-topic-2的Topic,通过安装有S3 Sink Connector的Kafka Connect(Docker容器)将两个Topic的数据导出到S3的指定存储桶中,然后再通过安装有S3 Source Connector的Kafka Connect(Docker容器,可以和S3 Source Connector共存为一个Docker容器)将S3存储桶中的数据写入到Sink端的MSK集群上,其中原source-topic-1的数据将被写入sink-topic-1,原source-topic-2的数据将被写入sink-topic-2。
特别地,如果是备份/还原场景,需要保持导出/导入的Topic名称一致,此时,可直接删除S3 Source Connector中以transforms开头的4项配置(将在下文中出现),或者将下面两项改为:
如果只有一个MSK集群,同样可以完成本文的验证工作,只需将SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同时设置为该集群即可,这样,该集群既是Source端又是Sink端,由于配置中的Source Topics和Sink Topics并不同名,所以不会产生冲突。
环境准备
安装工具包
在EC2上执行以下脚本,安装并配置jq,yq,docker,jdk,kafka-console-client五个必须的软件包,可以根据自身EC2的情况酌情选择安装全部或部分软件。建议使用纯净的EC2实例,完成全部的软件安装:
创建S3存储桶
整个方案以S3作为数据转储媒介,为此需要在S3上创建一个存储桶。Source端MSK集群的数据将会导出到该桶中并以Json文件形式保存,向Sink端MSK集群导入数据时,读取的也是存储在该桶中的Json文件。
在源MSK上创建Source Topics
为了确保Topics数据能完整备份和还原,S3 Source Connector建议Sink Topics的分区数最好与Source Topics保持一致,如果让MSK自动创建Topic,则很有可能会导致Source Topics和Sink Topics的分区数不对等,所以,选择手动创建Source Topics和Sink Topics,并确保它们的分区数一致。以下脚本将创建source-topic-1和source-topic-2两个Topic,各含9个分区:
在目标MSK上创建Sink Topics
原因同上,以下脚本将创建:sink-topic-1和sink-topic-2两个Topic,各含9个分区:
制作Kafka Connect镜像
接下来是制作带S3 Sink Connector和S3 Source Connector的Kafka Connect镜像,镜像和容器均以kafka-s3-syncer命名,以下是具体操作:
配置并启动Kafka Connect
镜像制作完成后,就可以启动了Kafka Connect了。Kafka Connect有很多配置项,需要提醒注意的是:在下面的配置中,使用的是Kafka Connect内置的消息转换器:JsonConverter,如果你的输入/输出格式是Avro或Parquet,则需要另行安装对应插件并设置正确的Converter Class。
上述脚本执行后,命令窗口将不再返回,而是会持续输出容器日志,因此下一步操作需要新开一个命令行窗口。
配置并启动S3 Sink Connector
在第5节的操作中,已经将S3 Sink Connector安装到了Kafka Connect的Docker镜像中,但是还需要显式地配置并启动它。新开一个命令行窗口,先执行一遍《实操步骤(1):全局配置》,声明全局变量,然后执行以下脚本:
配置并启动S3 Source Connector
同上,在第5节的操作中,已经将S3 Source Connector安装到了Kafka Connect的Docker镜像中,同样需要显式地配置并启动它:
至此,整个环境搭建完毕,一个以S3作为中转媒介的MSK数据导出、导入、备份、还原链路已经处于运行状态。
测试
现在,来验证一下整个链路是否能正常工作。首先,使用kafka-console-consumer.sh监控source-topic-1和sink-topic-1两个Topic,然后使用脚本向source-topic-1持续写入数据,如果在sink-topic-1看到了相同的数据输出,就说明数据成功地从source-topic-1导出然后又导入到了sink-topic-1中,相应的,在S3存储桶中也能看到“沉淀”的数据文件。
打开Source Topic
新开一个命令行窗口,先执行一遍《实操步骤(1):全局配置》,声明全局变量,然后使用如下命令持续监控source-topic-1中的数据:
打开Sink Topic
新开一个命令行窗口,先执行一遍《实操步骤(1):全局配置》,声明全局变量,然后使用如下命令持续监控sink-topic-1中的数据:
向Source Topic写入数据
新开一个命令行窗口,先执行一遍《实操步骤(1:全局配置》,声明全局变量,然后使用如下命令向source-topic-1中写入数据:
现象与结论
执行上述写入操作后,从监控source-topic-1的命令行窗口中可以很快看到写入的数据,这说明Source端MSK已经开始持续产生数据了,随后(约1分钟),即可在监控sink-topic-1的命令行窗口中看到相同的输出数据,这说明目标端的数据同步也已开始正常工作。此时,打开S3的存储桶会发现大量Json文件,这些Json是由S3 Sink Connector从source-topic-1导出并存放到S3上的,然后S3 Source Connector又读取了这些Json并写入到了sink-topic-1中,至此,整个方案的演示与验证工作全部结束。
清理
在验证过程中,可能需要多次调整并重试,每次重试最好恢复到初始状态,以下脚本会帮助清理所有已创建的资源:
小结
本方案主要定位于轻便易用,在S3 Sink Connector和S3 Source Connector中还有很多与性能、吞吐量相关的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等,可以在实际需要自行调整,此外,Kafka Connect也可以方便地迁移到Kubernetes或Amazon MSK Connect中以实现集群化部署。