作者:FastCoder 2021-08-17 06:48:43
开发
前端
Kafka Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
目前成都创新互联公司已为千余家的企业提供了网站建设、域名、雅安服务器托管、绵阳服务器托管、企业网站设计、茂名网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
[[417927]]
环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2
Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。
流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是“无界”的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。
Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特点:
有两种特殊的processor:
Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。
Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。
相关的核心概念查看如下链接
下面演示Kafka Stream 在Springboot中的应用
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.apache.kafka kafka-streams
- server:
- port: 9090
- spring:
- application:
- name: kafka-demo
- kafka:
- streams:
- application-id: ${spring.application.name}
- properties:
- spring.json.trusted.packages: '*'
- bootstrap-servers:
- - localhost:9092
- - localhost:9093
- - localhost:9094
- producer:
- acks: 1
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
- properties:
- spring.json.trusted.packages: '*'
- consumer:
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
- enable-auto-commit: false
- group-id: ConsumerTest
- auto-offset-reset: latest
- properties:
- session.timeout.ms: 12000
- heartbeat.interval.ms: 3000
- max.poll.records: 100
- spring.json.trusted.packages: '*'
- listener:
- ack-mode: manual-immediate
- type: batch
- concurrency: 8
- properties:
- max.poll.interval.ms: 300000
- @Service
- public class MessageSend {
- @Resource
- private KafkaTemplate
kafkaTemplate ; - public void sendMessage2(Message message) {
- kafkaTemplate.send(new ProducerRecord
("test", message)).addCallback(result -> { - System.out.println("执行成功..." + Thread.currentThread().getName()) ;
- }, ex -> {
- System.out.println("执行失败") ;
- ex.printStackTrace() ;
- }) ;
- }
- }
- @KafkaListener(topics = {"test"})
- public void listener2(List
> records, Acknowledgment ack) { - for (ConsumerRecord
record : records) { - System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;
- }
- try {
- TimeUnit.SECONDS.sleep(0) ;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- ack.acknowledge() ;
- }
- @KafkaListener(topics = {"demo"})
- public void listenerDemo(List
> records, Acknowledgment ack) { - for (ConsumerRecord
record : records) { - System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;
- }
- ack.acknowledge() ;
- }
消息转换并转发其它Topic
- @Bean
- public KStream
- KStream
- stream.map((key, value) -> {
- System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ;
- return new KeyValue<>(key, "{\"title\": \"123123\", \"message\": \"重新定义内容\"}".getBytes(Charset.forName("UTF-8"))) ;
- }).to("demo") ;
- return stream;
- }
执行结果:
Stream对象处理
- @Bean
- public KStream
kStream4(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.map((key, value) -> {
- value.setTitle("XXXXXXX") ;
- return new KeyValue<>(key, value) ;
- }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ;
- return stream;
- }
执行结果:
分组处理
- @Bean
- public KStream
kStream5(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- return value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .count()
- .toStream().print(Printed.toSysOut());
- return stream;
- }
执行结果:
聚合
- @Bean
- public KStream
kStream6(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- return value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .aggregate(() -> 0L, (key, value ,aggValue) -> {
- System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;
- return aggValue + 1 ;
- }, Materialized.
>as("kvs").withValueSerde(Serdes.Long())) - .toStream().print(Printed.toSysOut());
- return stream;
- }
执行结果:
Filter过滤数据
- @Bean
- public KStream
kStream7(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- return value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .aggregate(() -> 0L, (key, value ,aggValue) -> {
- System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;
- return aggValue + 1 ;
- }, Materialized.
>as("kvs").withValueSerde(Serdes.Long())) - .toStream()
- .filter((key, value) -> !"2".equals(key))
- .print(Printed.toSysOut());
- return stream;
- }
执行结果:
过滤Key不等于"2"
分支多流处理
- @Bean
- public KStream
kStream8(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - // 分支,多流处理
- KStream
[] arrStream = stream.branch( - (key, value) -> "男".equals(value.getSex()),
- (key, value) -> "女".equals(value.getSex()));
- Stream.of(arrStream).forEach(as -> {
- as.foreach((key, message) -> {
- System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ;
- });
- });
- return stream;
- }
执行结果:
多字段分组
不能使用多个selectKey,后面的会覆盖前面的
- @Bean
- public KStream
kStreamM2(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream
- .selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- System.out.println(Thread.currentThread().getName()) ;
- return value.getTime() + " | " + value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .count()
- .toStream().print(Printed.toSysOut());
- return stream;
- }
执行结果:
文章标题:Springboot整合KafkaStream实时统计数据
网页路径:http://www.mswzjz.cn/qtweb/news19/26219.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能