RedisTimeSeries是一个 Redis 模块,它为 Redis 带来了原生的时间序列数据结构。时间序列解决方案,早期建立在排序集(或 Redis 流)之上,可以从 RedisTimeSeries 功能中受益,例如大容量插入、低延迟读取、灵活的查询语言、下采样等等!
一般来说,时间序列数据是(相对)简单的。话虽如此,我们还需要考虑其他特征:
- 数据速度:例如考虑每秒来自数千台设备的数百个指标
- 容量(大数据):考虑数月(甚至数年)的数据积累
因此,RedisTimeSeries 等数据库只是整体解决方案的一部分。您还需要考虑如何收集(摄取)、处理和发送所有数据到 RedisTimeSeries。你真正需要的是一个可扩展的数据管道,它可以作为一个缓冲区来解耦生产者和消费者。
这就是Apache Kafka的用武之地!除了核心代理之外,它还拥有丰富的组件生态系统,包括Kafka Connect(这是本文介绍的解决方案架构的一部分)、多语言客户端库、Kafka Streams、Mirror Maker 等。
这篇博文提供了一个实际示例,说明如何将 RedisTimeSeries 与Apache Kafka结合使用来分析时间序列数据。
该代码可在此 GitHub 存储库中找到__https://github.com/abhirockzz/redis-timeseries-kafka
让我们首先探索用例。请注意,出于博客文章的目的,它一直保持简单,然后在后续部分中进一步解释。
场景:设备监控
想象有很多地点,每个地点都有多个设备,而您的任务是监控设备指标——现在我们将考虑温度和压力。这些指标将存储在 RedisTimeSeries 中(当然!)并使用以下键命名约定 -<metric name>:<location>:<device>
。例如,位置 5 中设备 1 的温度将表示为 temp:5:1。每个时间序列数据点还将具有以下标签(键值对)—— metric、location、device 。这是为了实现灵活的查询,您将在接下来的部分中看到。
下面是几个示例,让您了解如何使用TS.ADD命令添加数据点:
- 位置 3 中设备 2 的温度以及标签:
TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2
- 位置 3 中设备 2 的压力:
TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2
解决方案架构
这是解决方案在高层次上的样子:
让我们分解一下:
源(本地)组件
- MQTT 代理 (mosquitto): MQTT 是物联网用例的事实上的协议。我们将使用的场景是物联网和时间序列的组合——稍后会详细介绍。
- Kafka Connect: MQTT 源连接器 用于将数据从 MQTT 代理传输到 Kafka 集群。
Azure 服务
- Azure Cache for Redis Enterprise 层:企业层基于 Redis Enterprise,Redis 的商业变体 Redis。除了 RedisTimeSeries 之外,Enterprise tier 还支持 RediSearch 和 RedisBloom。客户无需担心企业层的许可证获取。Azure Cache for Redis 将促进这一过程,客户可以通过 Azure Marketplace 产品获得并支付该软件的许可证。
- Azure 上的 Confluent Cloud:一个完全托管的产品,提供 Apache Kafka 作为服务,这要归功于从 Azure 到 Confluent Cloud 的集成供应层。它减轻了跨平台管理的负担,并为在 Azure 基础架构上使用 Confluent Cloud 提供了统一的体验,从而使您可以轻松地将 Confluent Cloud 与您的 Azure 应用程序集成。
- Azure Spring Cloud:借助 Azure Spring Cloud,将 Spring Boot 微服务部署到 Azure 变得更加容易。Azure Spring Cloud 缓解了基础架构问题,提供配置管理、服务发现、CI/CD 集成、蓝绿部署等。该服务完成所有繁重的工作,因此开发人员可以专注于他们的代码。
请注意,为了简单起见,一些服务是在本地托管的。在生产级部署中,您也希望在 Azure 中运行它们。例如,您可以在 Azure Kubernetes 服务中操作 Kafka Connect 集群以及 MQTT 连接器。
总而言之,这是端到端流程:
- 脚本生成发送到本地 MQTT 代理的模拟设备数据。
- 此数据由 MQTT Kafka Connect 源连接器获取,并发送到在 Azure 中运行的 Confluent Cloud Kafka 集群中的主题。
- 它由托管在 Azure Spring Cloud 中的 Spring Boot 应用程序进一步处理,然后将其保存到 Azure Cache for Redis 实例。
是时候从实用的东西开始了!在此之前,请确保您具备以下条件。
先决条件:
- 一个 Azure 帐户——你可以在这里免费获得一个
- 安装 Azure CLI
- JDK 11,例如 OpenJDK
- 最新版本的 Maven 和 Git
设置基础架构组件
按照文档 预配 RedisTimeSeries 模块随附的Azure Cache for Redis(企业层) 。
在 Azure Marketplace 上预配 Confluent Cloud集群 。同时 创建一个 Kafka 主题 (使用名称 mqtt.device-stats) and [create credentials](https://docs.confluent.io/cloud/current/client-apps/api-keys.html#create-resource-specific-api-keys-in-the-ui) (API key and secret) that you will use later on to connect to your cluster securely.
您可以使用 Azure 门户 或 使用 Azure CLI预配 Azure Spring Cloud 的实例 :
az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>
复制
在继续之前,请确保克隆 GitHub 存储库:
git clone https://github.com/abhirockzz/redis-timeseries-kafka cd redis-timeseries-kafka
复制
设置本地服务
组件包括:
MQTT 代理
我在 Mac 上本地安装并启动了 mosquitto 代理。
brew install mosquitto brew services start mosquitto
复制
您可以按照与您的操作系统相对应的步骤进行操作,也可以随意使用此Docker 映像。
Grafana
我在 Mac 上本地安装并启动了 Grafana。
brew install grafana brew services start grafana
复制
你可以为你的操作系统做同样的事情,或者随意使用这个Docker 镜像。
docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana
复制
Kafka 连接
您应该能够在刚刚克隆的 repo 中找到 connect-distributed.properties 文件。替换诸如 bootstrap.servers、sasl.jaas.config 等属性的值。
首先,在本地下载并解压 Apache Kafka 。
启动本地 Kafka Connect 集群:
export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0> $KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties
复制
- 从此链接下载连接器/插件 ZIP 文件,并且,
- 将其解压缩到 Connect worker 的 plugin.path 配置属性中列出的目录之一
如果您在本地使用 Confluent 平台,只需使用 Confluent Hub CLI:_confluent-hub install confluentinc/kafka-connect-mqtt:latest_
创建 MQTT 源连接器实例
确保检查mqtt-source-config.json文件。确保为kafka.topic输入正确的主题名称,并保持mqtt.topics不变。
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @mqtt-source-config.json # wait for a minute before checking the connector status curl http://localhost:8083/connectors/mqtt-source/status
复制
部署设备数据处理器应用程序
在您刚刚克隆的 GitHub存储库中,在consumer/src/resources folder and replace the values for:
- Azure Redis 缓存主机、端口和主访问密钥
- Azure API 密钥和机密上的 Confluent Cloud
构建应用程序 JAR 文件:
cd consumer export JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home> mvn clean package
复制
创建一个 Azure Spring Cloud 应用程序并将 JAR 文件部署到它:
az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11 az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar
复制
启动模拟设备数据生成器
您可以使用刚刚克隆的 GitHub 存储库中的脚本:
./gen-timeseries-data.sh
复制
注意——它所做的只是使用mosquitto_pub CLI 命令发送数据。
数据被发送到 device-stats MQTT 主题(这_不是_Kafka 主题)。您可以使用 CLI 订阅者仔细检查:
mosquitto_sub -h localhost -t device-stats
复制
检查 Confluent Cloud 门户中的 Kafka 主题。您还应该检查Azure Spring Cloud 中设备数据处理器应用程序的日志:
az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>
复制
享受 Grafana 仪表板!
在localhost:3000浏览到 Grafana UI 。
Grafana 的 Redis 数据源插件适用于任何 Redis 数据库,包括 Azure Cache for Redis。按照此博客文章中的说明配置数据源。
在您克隆的 GitHub 存储库中的 grafana_dashboards 文件夹中导入仪表板(如果您需要有关如何导入仪表板的帮助,请参阅Grafana 文档)。
例如,这是一个仪表板,显示位置 1中设备 5的平均压力(超过 30 秒) (使用TS.MRANGE)。
这是另一个仪表板,显示位置 3中多个设备的最高温度(超过 15 秒) (再次感谢 TS.MRANGE)。
那么,您想运行一些 RedisTimeSeries 命令吗?
启动redis-cli并连接到 Azure Cache for Redis 实例:
redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls
复制
从简单的查询开始:
# pressure in device 5 for location 1 TS.GET pressure:1:5 # temperature in device 5 for location 4 TS.GET temp:4:5
复制
按位置过滤并获取所有设备的温度和压力:
TS.MGET WITHLABELS FILTER location=3
复制
提取特定时间范围内一个或多个位置的所有设备的温度和压力:
TS.MRANGE - + WITHLABELS FILTER location=3 TS.MRANGE - + WITHLABELS FILTER location=(3,5)
复制
– + 指的是从开始到最新时间戳的所有内容,但您可以更具体。
MRANGE is what we needed! We can also filter by a specific device in a location and further drill down by either temperature or pressure:
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp
复制
所有这些都可以与聚合相结合。
# all the temp data points are not useful. how about an average (or max) instead of every temp data points? TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp
复制
也可以创建一个规则来执行此聚合并将其存储在不同的时间序列中。
完成后,不要忘记删除资源以避免不必要的成本。
删除资源
- 按照文档中的步骤删除 Confluent Cloud 集群 - 您只需删除 Confluent 组织即可。
- 同样,您也应该删除 Azure Cache for Redis 实例。
在您的本地机器上:
- 停止 Kafka Connect 集群
- 停止蚊子经纪人(例如 brew 服务停止蚊子)
- 停止 Grafana 服务(例如 brew services stop grafana)
我们探索了一个使用 Redis 和 Kafka 摄取、处理和查询时间序列数据的数据管道。当您考虑下一步并转向生产级解决方案时,您应该考虑更多的事情。
其他注意事项
优化 RedisTimeSeries
- 保留策略:考虑一下这一点,因为默认情况下您的时间序列数据点不会被修剪或删除。
- 下采样和聚合规则:您不想永远存储数据,对吧?确保配置适当的规则来处理这个问题(例如 TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000 )。
- 重复数据策略:您希望如何处理重复样本?确保默认策略 (BLOCK) 确实是您需要的。如果没有,请考虑其他选项。
这不是一个详尽的清单。其他配置选项请参考RedisTimeSeries 文档
长期数据保留呢?
数据很宝贵,包括时间序列!您可能希望进一步处理它(例如运行机器学习以提取洞察、预测性维护等)。为此,您需要将这些数据保留更长的时间,并且为了经济高效,您需要使用可扩展的对象存储服务,例如Azure Data Lake Storage Gen2 (ADLS Gen2) .
有一个连接器!您可以通过使用完全托管的Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud在 ADLS 中处理和存储数据,然后使用Azure Synapse Analytics或Azure Databricks运行机器学习来增强现有数据管道。
可扩展性
您的时间序列数据量只能以一种方式移动——向上!您的解决方案具有可扩展性至关重要:
- 核心基础设施:托管服务允许团队专注于解决方案,而不是设置和维护基础设施,尤其是在涉及复杂的分布式系统(如数据库和 Redis 和 Kafka 等流平台)时。
- Kafka Connect:就数据管道而言,由于 Kafka Connect 平台本质上是无状态且水平可扩展的,因此您处于良好状态。在如何构建和调整 Kafka Connect 工作集群的大小方面,您有很多选择。
- 自定义应用程序:与此解决方案中的情况一样,我们构建了一个自定义应用程序来处理 Kafka 主题中的数据。幸运的是,同样的可扩展性特征也适用于它们。在横向扩展方面,它仅受您拥有的 Kafka 主题分区数量的限制。
集成:不仅仅是 Grafana!RedisTimeSeries 还与Prometheus和Telegraf集成。然而,在写这篇博文的时候还没有 Kafka 连接器——这将是一个很棒的插件!
结论
当然,您可以将 Redis 用于(几乎)所有事情,包括时间序列工作负载!请务必考虑从时间序列数据源一直到 Redis 及更远的数据管道和集成的端到端架构。
原文标题:Processing Time-Series Data with Redis and Apache Kafka
原文作者:Abhishek Gupta
原文地址:https://redis.com/blog/processing-time-series-data-with-redis-and-apache-kafka/