我们专注攀枝花网站设计 攀枝花网站制作 攀枝花网站建设
成都网站建设公司服务热线:400-028-6601

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

Kafka中怎么通过整合SpringBoot实现消息发送与消费

Kafka中怎么通过整合SpringBoot实现消息发送与消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

我们提供的服务有:成都网站建设、成都做网站、微信公众号开发、网站优化、网站认证、太和ssl等。为1000多家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的太和网站制作公司

kafka和zookeeper集群前边写过了。如果遇到kakfa说没有连接记得把kafka下logs日志都删除了,重新启动kafka集群再启动springboot服务

zookeeper https://my.oschina.net/u/3730149/blog/3071737

kafka https://my.oschina.net/u/3730149/blog/3071754
  • 生产者

Kafka中怎么通过整合SpringBoot实现消息发送与消费

maven依赖



	4.0.0

	com.gzh.kafka.producer
	producer
	0.0.1-SNAPSHOT
	jar

	kafka-producer-master
	demo project for kafka producer

	
		org.springframework.boot
		spring-boot-starter-parent
		1.5.9.RELEASE
		 
	

	
		UTF-8
		UTF-8
		2.1.5.RELEASE
		1.8
	

	
		
			org.springframework.boot
			spring-boot-starter
		

		
		
			org.springframework.kafka
			spring-kafka
			${spring-kafka.version}
		
		
			org.springframework.boot
			spring-boot-starter-web
		

		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
		
			org.springframework.kafka
			spring-kafka-test
			${spring-kafka.version}
			test
		

		
		
			io.springfox
			springfox-swagger2
			2.8.0
		
		
		
			io.springfox
			springfox-swagger-ui
			2.8.0
		

	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	

application.properties

server.port=8000
spring.application.name=kafka-producer
#kafka configuration
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#topic
kafka.app.topic.foo=test20180430

使用Spring Boot发送Spring Kafka消息

SpringKafka提供了使用Producer的KafkaTemplate类发送消息,并提供将数据发送到Kafka主题的高级操作。 提供异步和同步方法,异步方法返回Future。Spring Boot根据application.properties属性文件中配置的属性自动配置并初始化KafkaTemplate。

为了方便测试发送消息,使用了Spring的定时任务,在类上使用@EnableScheduling 注解开启定时任务,通过@Scheduled注解指定发送消息规则。

	package com.gzh.kafka.producer.component;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.beans.factory.annotation.Value;
	import org.springframework.kafka.core.KafkaTemplate;
	import org.springframework.kafka.support.SendResult;
	import org.springframework.scheduling.annotation.EnableScheduling;
	import org.springframework.scheduling.annotation.Scheduled;
	import org.springframework.stereotype.Component;
	import org.springframework.util.concurrent.ListenableFuture;

	@Component
	@EnableScheduling
	public class KafkaMessageProducer {

		private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageProducer.class);

		@Autowired
		private KafkaTemplate kafkaTemplate;

		@Value("${kafka.app.topic.foo}")
		private String topic;

		@Scheduled(cron = "00/5 * * * * ?")
		public void send() {
			String message = "Hello World---" + System.currentTimeMillis();
			LOG.info("topic="+topic+",message="+message);
			ListenableFuture> future = kafkaTemplate.send(topic, message);
			future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),
					fail -> LOG.error("KafkaMessageProducer 发送消息失败!"));
		}
	}

创建消息生产者启动类

	package com.gzh.kafka.producer;

	import org.springframework.boot.SpringApplication;
	import org.springframework.boot.autoconfigure.SpringBootApplication;
	import org.springframework.boot.context.properties.EnableConfigurationProperties;

	@SpringBootApplication
	@EnableConfigurationProperties
	public class KafkaProducerApplication{

		public static void main(String[] args) {
			SpringApplication.run(KafkaProducerApplication.class, args);
		}
	}

至此,Spring Boot整合Spring Kafka消息生产者应用已经整合完毕。启动zookeeper、kafka各个服务器。启动生产者应用,查看消息生产者应用控制台日志,显示发送消息成功!说明整合OK。

也可以用前段web页面请求的方式

	package com.gzh.kafka.producer.service;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.beans.factory.annotation.Value;
	import org.springframework.kafka.core.KafkaTemplate;
	import org.springframework.kafka.support.SendResult;
	import org.springframework.stereotype.Service;
	import org.springframework.util.concurrent.ListenableFuture;

	@Service
	public class KafkaMessageSendService {

		private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSendService.class);

		@Autowired
		private KafkaTemplate kafkaTemplate;

		@Value("${kafka.app.topic.foo}")
		private String topic;

		public void send(String message){
			LOG.info("topic="+topic+",message="+message);
			ListenableFuture> future = kafkaTemplate.send(topic, message);
			future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),
					fail -> LOG.error("KafkaMessageProducer 发送消息失败!"));
		}
	}

界面请求处理controller类

	package com.gzh.kafka.producer.controller;

	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.http.MediaType;
	import org.springframework.web.bind.annotation.RequestMapping;
	import org.springframework.web.bind.annotation.RequestMethod;
	import org.springframework.web.bind.annotation.RequestParam;
	import org.springframework.web.bind.annotation.RestController;

	import com.gzh.kafka.producer.service.KafkaMessageSendService;

	@RestController
	@RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE)
	public class KafkaMessageSendController {

		@Autowired
		private KafkaMessageSendService kafkaMessageSendService;

		@RequestMapping(value="/sendMessage",method=RequestMethod.POST)
		public String send(@RequestParam(required=true) String message){
			try {
				kafkaMessageSendService.send(message);
			} catch (Exception e) {
				return "send failed.";
			}
			return message;
		}
	}

通过Swagger访问测试Controller服务请求

Kafka中怎么通过整合SpringBoot实现消息发送与消费

  • 消费者

Kafka中怎么通过整合SpringBoot实现消息发送与消费

maven依赖



	4.0.0

	com.gzh.kafka.consumer
	consumer
	0.0.1-SNAPSHOT
	jar

	kafka-consumer-master
	demo project for kafka consumer

	
		org.springframework.boot
		spring-boot-starter-parent
		1.5.9.RELEASE
		 
	

	
		UTF-8
		UTF-8
		1.3.4.RELEASE
		1.8
	

	
		
			org.springframework.boot
			spring-boot-starter
		
		
		
			org.springframework.kafka
			spring-kafka
			${spring-kafka.version}
		
		
			org.springframework.boot
			spring-boot-starter-web
		

		
			org.springframework.boot
			spring-boot-starter-test
			test
		
		
		
			org.springframework.kafka
			spring-kafka-test
			${spring-kafka.version}
			test
		
	

	
		
			
				org.springframework.boot
				spring-boot-maven-plugin
			
		
	


注意,这是使用Spring-Kafka时一定要注意版本问题,否则会报各种奇葩错误。Spring官方网站上给出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系: Kafka中怎么通过整合SpringBoot实现消息发送与消费

application.properties配置

server.port=8001
spring.application.name=kafka-consumer

#kafka configuration
#指定消息被消费之后自动提交偏移量,以便下次继续消费
spring.kafka.consumer.enable-auto-commit=true
#指定消息组
spring.kafka.consumer.group-id=guan
#指定kafka服务器地址
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
#指定从最近地方开始消费(earliest)
spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#topic
kafka.app.topic.foo=test20180430

通过使用@KafkaListener来注解一个方法Spring Kafka会自动创建一个消息监听器容器。使用该注解,并指定要消费的topic(也可以指定消费组以及分区号,支持正则表达式匹配),这样,消费者一旦启动,就会监听kafka服务器上的topic,实时进行消费消息。

	package com.gzh.kafka.consumer.service;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.springframework.kafka.annotation.KafkaListener;
	import org.springframework.messaging.MessageHeaders;
	import org.springframework.messaging.handler.annotation.Headers;
	import org.springframework.messaging.handler.annotation.Payload;
	import org.springframework.stereotype.Component;


	@Component
	public class KafkaMessageConsumer {

		private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);

		@KafkaListener(topics={"${kafka.app.topic.foo}"})
		public void receive(@Payload String message, @Headers MessageHeaders headers){
			LOG.info("KafkaMessageConsumer 接收到消息:"+message);
			headers.keySet().forEach(key->LOG.info("{}: {}",key,headers.get(key)));
		}
	}

创建消息消费者启动类

	package com.gzh.kafka.consumer;

	import org.springframework.boot.SpringApplication;
	import org.springframework.boot.autoconfigure.SpringBootApplication;
	import org.springframework.boot.context.properties.EnableConfigurationProperties;

	@SpringBootApplication
	@EnableConfigurationProperties
	public class KafkaConsumerApplication {

		public static void main(String[] args) {
			SpringApplication.run(KafkaConsumerApplication.class, args);
		}
	}

消费者应用已经完成,接下来让我们验证Spring Kafka消息发送和接收效果。先依次启动zookeeper、kafka服务器,然后在启动生产者(kafka-producer-master)应用,再启动消费者(kafka-consumer-master)应用,然后观察生产者和消费者启动类日志: 显示接受消息成功!

看完上述内容,你们掌握Kafka中怎么通过整合SpringBoot实现消息发送与消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!


文章题目:Kafka中怎么通过整合SpringBoot实现消息发送与消费
链接地址:http://mswzjz.cn/article/pjchgd.html

其他资讯