Kafka核心API——Stream API

网友投稿 340 2022-11-26

Kafka核心API——Stream API

Kafka Stream概念及初识高层架构图

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。

Kafka Stream的基本概念:

Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib) 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境 Kafka Stream通过state store可以实现高效的状态操作 支持原语Processor和高层抽象DSL

Partition的数据会分发到不同的Task上,Task主要是用来做流式的并行处理 每个Task都会有自己的state store去记录状态 每个Thread里会有多个Task

Kafka Stream 核心概念

Kafka Stream关键词:

流和流处理器:流指的是数据流,流处理器指的是数据流到某个节点时对其进行处理的单元 流处理拓扑:一个拓扑图,该拓扑图展示了数据流的走向,以及流处理器的节点位置 源处理器及Sink处理器:源处理器指的是数据的源头,即第一个处理器,Sink处理器则反之,是最终产出结果的一个处理器

Kafka Stream使用演示

从上图中可以看到,Consumer对一组Partition进行消费,这组Partition可以在一个Topic中或多个Topic中。然后形成数据流,经过各个流处理器后最终通过Producer输出到一组Partition中,同样这组Partition也可以在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。

因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。到服务器上使用命令行创建两个Topic:

[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic [root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic

由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入Stream的依赖包。在项目中添加如下依赖:

org.apache.kafka kafka-streams 2.5.0

接下来以一个经典的词频统计为例,演示一下Stream API的使用。代码示例:

package com.zj.study.kafka.stream; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.List; import java.util.Properties; public class StreamSample { private static final String INPUT_TOPIC = "input-topic"; private static final String OUTPUT_TOPIC = "output-topic"; /** * 构建配置属性 */ public static Properties getProperties() { Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "49.232.153.84:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return properties; } public static KafkaStreams createKafkaStreams() { Properties properties = getProperties(); // 构建流结构拓扑 StreamsBuilder builder = new StreamsBuilder(); // 构建wordCount这个Processor wordCountStream(builder); Topology topology = builder.build(); // 构建KafkaStreams return new KafkaStreams(topology, properties); } /** * 定义流计算过程 * 例子为词频统计 */ public static void wordCountStream(StreamsBuilder builder) { // 不断的从INPUT_TOPIC上获取新的数据,并追加到流上的一个抽象对象 KStream source = builder.stream(INPUT_TOPIC); // KTable是数据集的抽象对象 KTable count = source.flatMapValues( // 以空格为分隔符将字符串进行拆分 v -> List.of(v.toLowerCase().split(" ")) // 按value进行分组统计 ).groupBy((k, v) -> v).count(); KStream sink = count.toStream(); // 将统计结果输出到OUTPUT_TOPIC sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); } public static void main(String[] args) { KafkaStreams streams = createKafkaStreams(); // 启动该Stream streams.start(); } }

KTable类似于一个时间片段,在一个时间片段内输入的数据就会update进去,以这样的形式来维护这张表 KStream则没有update这个概念,而是不断的追加

运行以上代码,然后到服务器中使用kafka-console-producer.sh脚本命令向input-topic生产一些数据,如下:

[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic input-topic >Hello World Java >Hello World Kafka >Hello Java Kafka >Hello Java

然后再运行kafka-console-consumer.sh脚本命令从output-topic中消费数据,并进行打印。具体如下:

[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning

控制台输出的结果:

world 2 hello 3 java 2 kafka 2 hello 4 java 3

从输出结果中可以看到,Kafka Stream首先是对前三行语句进行了一次词频统计,所以前半段是:

world 2 hello 3 java 2 kafka 2

当最后一行输入之后,又再做了一次词频统计,并针对新的统计结果进行输出,其他没有变化的则不作输出,所以最后打印了:

hello 4 java 3

这也是KTable和KStream的一个体现,从测试的结果可以看出Kafka Stream是实时进行流计算的,并且每次只会针对有变化的内容进行输出。

foreach方法

在之前的例子中,我们是从某个Topic读取数据进行流处理后再输出到另一个Topic里。但在一些场景下,我们可能不希望将结果数据输出到Topic,而是写入到一些存储服务中,例如ElasticSearch、MongoDB、MySQL等。

在这种场景下,就可以利用到foreach方法,该方法用于迭代流中的元素。我们可以在foreach中将数据存入例如Map、List等容器,然后再批量写入到数据库或其他存储中间件即可。

foreach方法使用示例:

public static void foreachStream(StreamsBuilder builder) { KStream source = builder.stream(INPUT_TOPIC); source.flatMapValues( v -> List.of(v.toLowerCase().split(" ")) ).foreach((k, v) -> System.out.println(k + " : " + v)); }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java虚拟机调用Java主类的main()方法
下一篇:a朱金桥0-10V/电位器LED色温调节方案GML003
相关文章

 发表评论

暂时没有评论,来抢沙发吧~