暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Kafka Stream

原创 简单 2022-09-16
794

什么是Stream

流是无限制的和连续的实时数据包。数据包通常以键值对的形式产生。生产者自动传输这些数据包,意味着不需要发出请求。

什么是Kafka Stream

Kafka Streams是Apache Kafka社区的项目之一。它是一个用于构建数据管道和微服务的客户端库。输入和输出的数据存储在Kafka集群中。它还在Kafka消费者客户端的基础上提供实时的流处理。流使得从Kafka主题转换或过滤数据并将其发布到另一个Kafka主题变得容易。

Kafka Stream API

Kafka Stream APIs提供了数据并行性、容错性和可扩展性。它将消息作为无界的、连续的和实时的记录流来处理,具有以下特点:

  • 单个流可以消费和生产。
  • 不支持批处理。
  • 支持无状态和有状态操作。
  • 除了Kafka自身没有额外依赖。
  • 采用一次处理一条记录的方式,以实现毫秒级的处理延迟。

依赖包

为了实现例子,我们在build.sbt中加入kafka stream api:

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.8.0"
复制

代码

要编写Kafka stream应用,先导入下面包:

import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig._ import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
复制

配置一些必要的Stream属性:

val streamsConfiguration = new Properties() streamsConfiguration.put(APPLICATION_ID_CONFIG, "Kafka-Streams-Example") streamsConfiguration.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") streamsConfiguration.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) streamsConfiguration.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
复制

首先创建KStreamBuilder实例,builder对象拥有stream方法接收输入主题名字并返回一个kstream对象实例,然后在kstream对象上可以应用各种方法,如map、join,从而返回另一个kstream对象,可以写到输出kafka的主题。

val builder = new KStreamBuilder
val kStream = builder.stream("Input_Topic")
val upperCaseKStream = kStream.mapValues(_.toLowerCase)
upperCaseKStream.to("Output_Topic")
val stream = new KafkaStreams(builder, streamsConfiguration)
stream.start()
复制

原文标题:Kafka Streams
原文作者:Amarjeet Singh
原文地址:https://blog.knoldus.com/kafka-streams/

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论