我们专注攀枝花网站设计 攀枝花网站制作 攀枝花网站建设
成都网站建设公司服务热线:400-028-6601

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

Kafka多线程Consumer的实例代码

这篇文章主要介绍“Kafka多线程Consumer的实例代码”,在日常操作中,相信很多人在Kafka多线程Consumer的实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Kafka多线程Consumer的实例代码”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

公司主营业务:成都网站设计、网站制作、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联推出鹿寨免费做网站回馈大家。

 
多线程示例代码:
这里要根据自身需求开发,我这里只举一个简单的例子,就是几个分区就启动几个consumer,一一对应。
三个类:
Main:
public static void main(String[] args) {

String bootstrapServers = "kafka01:9092,kafka02:9092";
String groupId = "test";
String topic = "testtopic";
int consumerNum = 3;
ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
cg.execute();
}



import java.util.ArrayList;
import java.util.List;


public class ConsumerGroup {

private List consumers;

public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){

consumers = new ArrayList<>(consumerNum);

for(int i=0;i < consumerNum;i++){
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
consumers.add(ConsumerRunnable);
}
}

public void execute(){

for(ConsumerRunnable consumerRunnable:consumers){
new Thread(consumerRunnable).start();
}
}
}



import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerRunnable implements Runnable{

private final KafkaConsumer consumer;

public ConsumerRunnable(String bootstrapServers,String groupId,String topic){

Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
while (true) {
ConsumerRecords records = consumer.poll(10);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
poll方法详解:

(旧版本:多分区多线程     新版本:一个线程管理多个socket连接)

但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,

另一个是后台心跳线程。

根据上边的各种配置,poll方法会找到offset,当获取了足够多的可用数据,或者等待时间超过了指定的超时时间,就会返回。

java consumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。

poll的超时参数,已经说过1000的话是超时设定,如果没有很多数据,也就等一秒,就返回了,比如定时5秒的将消息写入,就可以将超时参数设置为5000,达到效率最大化。

如果没有定时任务呢,那就设置为  Long.MAX_VALUE   未获取足够多的数据就无限等待。这里要捕获一下WakeupException。

到此,关于“Kafka多线程Consumer的实例代码”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


当前题目:Kafka多线程Consumer的实例代码
标题路径:http://mswzjz.cn/article/gdipjs.html

其他资讯