暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MySQL到DynamoDB:使用Kafka在AWS上构建流数据管道

原创 eternity 2022-07-25
862

这是博客系列的第二部分,通过Kafka和Kafka Connect逐步介绍数据管道。我将使用AWS进行演示,但这些概念适用于任何等效选项(例如,在Docker中本地运行这些选项)。

本部分将展示实际的更改数据捕获,它允许您跟踪数据库表中的行级更改,以响应创建、更新和删除操作。例如,在MySQL中,这些更改数据事件通过MySQL二进制日志(binlog)公开。

在第1部分中,我们在数据管道的源部分使用了Datagen连接器,它帮助我们为MSK主题生成模拟数据,并使事情变得简单。我们将使用Aurora MySQL作为数据源,并利用其更改数据捕获功能和Debezium connector for MySQL从Aurora MySQL中的表中实时提取数据,并将其推送到MSK主题。然后,我们将继续像以前一样使用DynamoDB接收器连接器。

如果你是Debezium的新手…

它是一个分布式平台,构建在不同数据库中可用的更改数据捕获功能之上。它提供了一组Kafka Connect连接器,用于接入数据库表中的行级更改(使用CDC),并将其转换为事件流。这些文件被发送到卡夫卡,并可供所有下游应用程序使用。

这是本博客文章中提出的解决方案的高级图表。
微信图片_20220721172227.png

我假设您遵循第1部分的内容,其中已经介绍了本教程所需的基本基础设施和服务的创建过程。如果您还没有,请参阅第1部分中的“准备基础设施组件和服务”一节

数据管道第1部分:Aurora MySQL到MSK

我们先创建管道的前半部分,将Aurora MySQL表中的数据同步到MSK中的主题。

在本节中,您将:

  • 下载Debezium连接器工件

  • 在MSK中创建自定义插件

  • 将Debezium源连接器部署到MSK Connect

最后,您将准备好数据管道的前一半!

创建自定义插件和连接器

将Debezium连接器上载到Amazon S3

登录Kafka客户端EC2实例并运行以下命令:

sudo -u ec2-user -i
mkdir debezium && cd debezium

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.0.Final/debezium-connector-mysql-1.9.0.Final-plugin.tar.gz
tar xzf debezium-connector-mysql-1.9.0.Final-plugin.tar.gz

cd debezium-connector-mysql
zip -9 ../debezium-connector-mysql-1.9.0.Final-plugin.zip *

cd ..
aws s3 cp ./debezium-connector-mysql-1.9.0.Final-plugin.zip s3://msk-lab-<ENTER_YOUR_AWS_ACCOUNT_ID>-plugins-bucket/
复制

创建自定义插件

有关如何创建MSK Connect插件的分步说明,请参阅官方文档中的使用AWS管理控制台创建自定义插件。

在创建自定义插件时,请确保选择在上一步中上载到S3的Debezium连接器zip文件。
微信图片_20220721172515.png

创建Debezium源连接器

有关如何创建MSK Connect连接器的分步说明,请参阅官方文档中的创建连接器。

要创建连接器,请执行以下操作:

1.选择您刚刚创建的插件。

2.输入连接器名称并选择MSK群集和IAM身份验证

3.您可以在连接器配置部分输入以下提供的内容。确保根据设置更换以下配置:

4.database.history.kafka.bootstrap.servers—独自创立服务器-输入MSK群集端点

5.数据库主机名-输入Aurora RDS MySQL端点

保持其余配置不变

connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=master
database.server.id=123456
tasks.max=1
database.history.kafka.topic=dbhistory.salesdb
database.history.kafka.bootstrap.servers=<ENTER MSK CLUSTER ENDPOINT>
database.server.name=salesdb
database.port=3306
include.schema.changes=true
database.hostname=<ENTER RDS MySQL ENDPOINT>
database.password=S3cretPwd99
database.include.list=salesdb
value.converter.schemas.enable=false
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
复制

1.在“访问权限”下,为连接器选择正确的IAM角色(名称中包含AuroraConnectorIAMRole的角色)

2.单击“下一步”移动到安全选项-保持不变

3.单击“下一步”。对于日志交付,请选择交付到Amazon CloudWatch日志。定位并选择/msk连接演示cwlog组

4.单击下一步-在最后一页上,向下滚动并单击创建连接器以启动该过程,并等待连接器启动。

完成后,连接器转换到运行状态,继续执行以下步骤。

测试管道

我们想确认salesdb数据库中SALES_ORDER表中的记录是否已推送到MSK主题。为此,从EC2主机上运行Kafka CLI consumer。

注意主题名salesdb。salesdb。SALES_订单-这符合Debezium约定。

sudo -u ec2-user -i
export MSK_BOOTSTRAP_ADDRESS=<ENTER MSK CLUSTER ENDPOINT>
/home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server $MSK_BOOTSTRAP_ADDRESS --consumer.config /home/ec2-user/kafka/config/client-config.properties --from-beginning --topic salesdb.salesdb.SALES_ORDER | jq --color-output .
复制

在另一个终端中,使用MySQL客户端并连接到Aurora数据库并插入一些记录:

sudo -u ec2-user -i

export RDS_AURORA_ENDPOINT=<ENTER RDS MySQL ENDPOINT>

mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password=S3cretPwd99

USE salesdb;

select * from SALES_ORDER limit 5;

INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29001, 2568, now(), 'STANDARD');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29002, 1649, now(), 'ONE-DAY');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29003, 3861, now(), 'TWO-DAY');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29004, 2568, now(), 'STANDARD');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29005, 1649, now(), 'ONE-DAY');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29006, 3861, now(), 'TWO-DAY');
复制

如果一切设置正确,您应该可以在消费终端中看到记录。

{
  "ORDER_ID": 29001,
  "SITE_ID": 2568,
  "ORDER_DATE": 1655279536000,
  "SHIP_MODE": "STANDARD"
}
{
  "ORDER_ID": 29002,
  "SITE_ID": 1649,
  "ORDER_DATE": 1655279536000,
  "SHIP_MODE": "ONE-DAY"
}
{
  "ORDER_ID": 29003,
  "SITE_ID": 3861,
  "ORDER_DATE": 1655279563000,
  "SHIP_MODE": "TWO-DAY"
}
...
复制

压缩更改事件有效负载的秘密

请注意,更改数据捕获事件负载是多么紧凑。这是因为我们将连接器配置为使用io。德贝齐姆。变换。ExtractNewRecordState是一种Kafka单消息转换(SMT)。默认情况下,Debezium更改事件结构相当复杂-与更改事件一起,它还包括元数据,如模式、源数据库信息等。它看起来像这样:

{
  "before": null,
  "after": {
    "ORDER_ID": 29003,
    "SITE_ID": 3861,
    "ORDER_DATE": 1655279563000,
    "SHIP_MODE": "TWO-DAY"
  },
  "source": {
    "version": "1.9.0.Final",
    "connector": "mysql",
    "name": "salesdb",
    "ts_ms": 1634569283000,
    "snapshot": "false",
    "db": "salesdb",
    "sequence": null,
    "table": "SALES_ORDER",
    "server_id": 1733046080,
    "gtid": null,
    "file": "mysql-bin-changelog.000003",
    "pos": 43275145,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",
  "ts_ms": 1655279563000,
  "transaction": null
...
复制

多亏了Kafka SMT(使用transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState指定),我们可以有效地展平事件负载,并根据需要进行定制。

有关详细信息,请参阅Debezium文档中的新记录状态提取。

数据管道第2部分:MSK到DynamoDB

现在,我们可以将重点转移到管道的另一半,该管道负责在DynamoDB接收器连接器的帮助下将数据从MSK主题提取到DynamoDB表。

如果DynamoDB表不存在,连接器会自动为您创建一个,但它使用默认设置,即在配置模式下创建一个表,其中包含10个读取容量单元(RCU)和10个写入容量单元(WCU)。
微信图片_20220722101248.png
但是您的用例可能需要配置。例如,为了处理大量数据,您可能需要配置自动缩放,或者更好地为您的表激活按需模式。

这正是我们要做的。

在继续之前,创建DynamoDB表

使用以下设置:

  • 表名-kafka_salesdb。salesdb。SALES_ORDER(不更改表名)

  • 分区键-ORDER_ID(Number)

  • 范围键-SITE_ID(数字)

  • 容量模式-按需

微信图片_20220722101355.png

创建自定义插件和连接器

有关如何创建MSK Connect插件的分步说明,请参阅官方文档中的使用AWS管理控制台创建自定义插件。

在创建自定义插件时,请确保选择在上一步中上载到S3的DynamoDB连接器zip文件。

有关如何创建MSK Connect连接器的分步说明,请参阅官方文档中的创建连接器。

要创建连接器,请执行以下操作:

1.选择您刚刚创建的插件。

2.输入连接器名称并选择MSK群集和IAM身份验证

3.您可以在连接器配置部分输入以下提供的内容。确保根据设置更换以下配置:

  • 为topics属性使用正确的主题名(在本例中,我们使用的是salesdb.salesdb.SALES_ORDER,因为这是Debezium源连接器采用的主题名格式)

  • 用于汇流。话题独自创立服务器,输入MSK群集端点

  • 适用于aws。发电机B。端点和aws。发电机B。区域,输入创建DynamoDB表的区域,例如us-east-1

保持其余配置不变。

connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
tasks.max=2
aws.dynamodb.region=<ENTER AWS REGION e.g. us-east-1>
aws.dynamodb.endpoint=https://dynamodb.<ENTER AWS REGION>.amazonaws.com
topics=salesdb.salesdb.SALES_ORDER
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
table.name.format=kafka_${topic}
confluent.topic.bootstrap.servers=<ENTER MSK CLUSTER ENDPOINT>
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.mechanism=AWS_MSK_IAM
confluent.topic.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
aws.dynamodb.pk.hash=value.ORDER_ID
aws.dynamodb.pk.sort=value.SITE_ID
复制

1.在“访问权限”下,为连接器选择正确的IAM角色(名称中包含DynamoDBConnectorIAMRole的角色)

2.单击“下一步”移动到安全选项-保持不变

3.单击“下一步”。对于日志交付,请选择交付到Amazon CloudWatch日志。定位并选择/msk连接演示cwlog组

4.单击下一步-在最后一页上,向下滚动并单击创建连接器以启动该过程,并等待连接器启动。

完成后,连接器转换到运行状态,继续执行以下步骤。

选择DynamoDB的主键

在上述配置中,我们设置了aws。发电机B。pk。哈希和aws。发电机B。pk。按值排序。订单ID和值。站点ID。这意味着Kafka主题事件负载中的ORDER\u ID字段将用作分区键,SITE\u ID的值将被指定为范围键(根据您的要求,您也可以将aws.dynamodb.pk.sort留空)。

测试端到端管道

作为初始加载过程的一部分,连接器确保Kafka主题中的所有现有记录都保存在连接器配置中指定的DynamoDB表中。在这种情况下,您应该在DynamoDB中看到29000多条记录(根据SALES_ORDER表),您可以运行查询来探索数据。
微信图片_20220722101654.png

要继续测试端到端管道,您可以在SALES_ORDER表中插入更多数据,并确认它们已通过Debezium源连接器同步到Kafka,并一直同步到DynamoDB,这要归功于接收器连接器。

删除资源

完成后,删除您创建的资源。

  • 删除S3 bucket的内容(msk lab-<YOUR ACCOUNT\u ID>-plugins bucket)

  • 删除CloudFormation堆栈

  • 删除DynamoDB表

  • 删除MSK Connect连接器、插件和自定义配置

更改数据捕获是一个强大的工具,但我们需要一种方法来利用这些事件日志,并使其可用于依赖该数据的其他服务。在这一部分中,您了解了如何利用此功能,使用Kafka Connect在MySQL和DynamoDB之间建立流数据管道。

本系列到此结束。

原文标题:MySQL to DynamoDB: Build a Streaming Data Pipeline on AWS Using Kafka
原文作者:Abhishek Gupta
原文链接:https://dzone.com/articles/mysql-to-dynamodb-build-a-streaming-data-pipeline

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论