Springboot整合KafkaStream实时统计数据

SpringBoot整合Kafka stream实时统计数据

作者:FastCoder 2021-08-17 06:48:43

开发

前端

Kafka Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。

目前成都创新互联公司已为千余家的企业提供了网站建设、域名、雅安服务器托管绵阳服务器托管、企业网站设计、茂名网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

[[417927]]

环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介绍

Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。

流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是“无界”的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。

Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。

Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特点:

  • 被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中
  • 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
  • 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations)
  • 支持exactly-once语义
  • 支持纪录级的处理,实现毫秒级的延迟
  • 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流处理拓扑

  • 流是Kafka Streams提供的最重要的抽象:它表示一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录定义为键值对。
  • Stream Processing Application是使用了Kafka Streams库的应用程序。它通过processor topologies定义计算逻辑,其中每个processor topology都是多个stream processor(节点)通过stream组成的图。
  • A stream processor 是处理器拓扑中的节点;它表示一个处理步骤,通过每次从拓扑中的上游处理器接收一个输入记录,将其操作应用于该记录,来转换流中的数据,并且随后可以向其下游处理器生成一个或多个输出记录。

有两种特殊的processor:

Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。

Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。

相关的核心概念查看如下链接

下面演示Kafka Stream 在Springboot中的应用

依赖

  
 
 
 
  1.  
  2.   org.springframework.boot 
  3.   spring-boot-starter-web 
  4.    
  5.  
  6.   org.springframework.kafka 
  7.   spring-kafka 
  8.  
  9.  
  10.   org.apache.kafka 
  11.   kafka-streams 
  12.  

配置

  
 
 
 
  1. server: 
  2.   port: 9090 
  3. spring: 
  4.   application: 
  5.     name: kafka-demo 
  6.   kafka: 
  7.     streams: 
  8.       application-id: ${spring.application.name} 
  9.       properties: 
  10.         spring.json.trusted.packages: '*' 
  11.     bootstrap-servers: 
  12.     - localhost:9092 
  13.     - localhost:9093 
  14.     - localhost:9094 
  15.     producer: 
  16.       acks: 1 
  17.       retries: 10 
  18.       key-serializer: org.apache.kafka.common.serialization.StringSerializer 
  19.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer 
  20.       properties: 
  21.         spring.json.trusted.packages: '*' 
  22.     consumer: 
  23.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
  24.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer 
  25.       enable-auto-commit: false 
  26.       group-id: ConsumerTest 
  27.       auto-offset-reset: latest 
  28.       properties: 
  29.         session.timeout.ms: 12000 
  30.         heartbeat.interval.ms: 3000 
  31.         max.poll.records: 100 
  32.         spring.json.trusted.packages: '*' 
  33.     listener: 
  34.       ack-mode: manual-immediate 
  35.       type: batch 
  36.       concurrency: 8 
  37.     properties: 
  38.       max.poll.interval.ms: 300000 

消息发送

  
 
 
 
  1. @Service 
  2. public class MessageSend { 
  3.   @Resource 
  4.   private KafkaTemplate kafkaTemplate ; 
  5.   public void sendMessage2(Message message) { 
  6.     kafkaTemplate.send(new ProducerRecord("test", message)).addCallback(result -> { 
  7.       System.out.println("执行成功..." + Thread.currentThread().getName()) ; 
  8.     }, ex -> { 
  9.       System.out.println("执行失败") ; 
  10.       ex.printStackTrace() ; 
  11.     }) ; 
  12.   } 

消息监听

  
 
 
 
  1. @KafkaListener(topics = {"test"}) 
  2. public void listener2(List> records, Acknowledgment ack) { 
  3.   for (ConsumerRecord record : records) { 
  4.     System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  5.   } 
  6.   try { 
  7.     TimeUnit.SECONDS.sleep(0) ; 
  8.   } catch (InterruptedException e) { 
  9.     e.printStackTrace(); 
  10.   } 
  11.   ack.acknowledge() ; 
  12.      
  13. @KafkaListener(topics = {"demo"}) 
  14. public void listenerDemo(List> records, Acknowledgment ack) { 
  15.   for (ConsumerRecord record : records) { 
  16.     System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  17.   } 
  18.   ack.acknowledge() ; 

Kafka Stream处理

消息转换并转发其它Topic

  
 
 
 
  1. @Bean 
  2. public KStream kStream(StreamsBuilder streamsBuilder) { 
  3.   KStream stream = streamsBuilder.stream("test"); 
  4.   stream.map((key, value) -> { 
  5.     System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ; 
  6.     return new KeyValue<>(key, "{\"title\": \"123123\", \"message\": \"重新定义内容\"}".getBytes(Charset.forName("UTF-8"))) ; 
  7.   }).to("demo") ; 
  8.   return stream; 

执行结果:

Stream对象处理

  
 
 
 
  1. @Bean 
  2. public KStream kStream4(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer descri = (JsonDeserializer) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.map((key, value) -> { 
  8.     value.setTitle("XXXXXXX") ; 
  9.     return new KeyValue<>(key, value) ; 
  10.   }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ; 
  11.   return stream; 

执行结果:

分组处理

  
 
 
 
  1. @Bean 
  2. public KStream kStream5(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer descri = (JsonDeserializer) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .count() 
  15.   .toStream().print(Printed.toSysOut()); 
  16.   return stream; 

执行结果:

聚合

  
 
 
 
  1. @Bean 
  2. public KStream kStream6(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer descri = (JsonDeserializer) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream().print(Printed.toSysOut()); 
  19.   return stream; 

执行结果:

Filter过滤数据

  
 
 
 
  1. @Bean 
  2. public KStream kStream7(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer descri = (JsonDeserializer) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream.selectKey(new KeyValueMapper() { 
  8.     @Override 
  9.     public String apply(String key, Message value) { 
  10.       return value.getOrgCode() ; 
  11.     } 
  12.   }) 
  13.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  14.   .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  15.     System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  16.     return aggValue + 1 ; 
  17.   }, Materialized.>as("kvs").withValueSerde(Serdes.Long())) 
  18.   .toStream() 
  19.   .filter((key, value) -> !"2".equals(key)) 
  20.   .print(Printed.toSysOut()); 
  21.   return stream; 

执行结果:

过滤Key不等于"2"

分支多流处理

  
 
 
 
  1. @Bean 
  2. public KStream kStream8(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer descri = (JsonDeserializer) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   // 分支,多流处理 
  8.   KStream[] arrStream = stream.branch( 
  9.     (key, value) -> "男".equals(value.getSex()),  
  10.     (key, value) -> "女".equals(value.getSex())); 
  11.   Stream.of(arrStream).forEach(as -> { 
  12.     as.foreach((key, message) -> { 
  13.       System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ; 
  14.     }); 
  15.   }); 
  16.   return stream; 

执行结果:

多字段分组

不能使用多个selectKey,后面的会覆盖前面的

  
 
 
 
  1. @Bean 
  2. public KStream kStreamM2(StreamsBuilder streamsBuilder) { 
  3.   JsonSerde jsonSerde = new JsonSerde<>() ; 
  4.   JsonDeserializer descri = (JsonDeserializer) jsonSerde.deserializer() ; 
  5.   descri.addTrustedPackages("*") ; 
  6.   KStream stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  7.   stream 
  8.   .selectKey(new KeyValueMapper() { 
  9.     @Override 
  10.     public String apply(String key, Message value) { 
  11.       System.out.println(Thread.currentThread().getName()) ; 
  12.       return value.getTime() + " | " + value.getOrgCode() ; 
  13.     } 
  14.   }) 
  15.   .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  16.   .count() 
  17.   .toStream().print(Printed.toSysOut()); 
  18.   return stream; 

执行结果:

 

文章标题:Springboot整合KafkaStream实时统计数据
网页路径:http://www.mswzjz.cn/qtweb/news19/26219.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能