消息的消费一般有两种模式,推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。kakfa采用的是拉模式,这样可以很好的控制消费速率。那么kafka消费的具体工作流程是什么样的呢?kafka的位移管理又是怎么样的呢?
kafka是以消费者组进行消费,一个消费者组,由多个consumer组成,他们和topic的消费规则如下:
通过这种分组、分区的消费方式,可以提高消费者的吞吐量,同时也能够实现消息的发布/订阅模式和点对点两种模式。
消费者消费总体分为两个步骤,第一步是制定消费的方案,就是这个组下哪个消费者消费哪个分区,第二个是建立网络连接,获取消息数据。
现在已经初始化消费者组信息,知道哪个消费者消费哪个分区,接着我们来看看消费者细节。
前面简单提到了消费者组初始化的时候会对分区进行分配,那么具体的分配策略是什么呢,也就是哪个消费者消费哪个分区数据?
kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
如上图所示:有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。
这种方式容易造成数据倾斜!如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。
RoundRobin 针对集群中所有topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
Sticky是粘性的意思,它是从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,在rebalance会尽量保持原有分配的分区不变化,这样可以节省开销。
Cooperative Sticky和Sticky类似,但是它会将原来的一次大规模rebalance操作,拆分成了多次小规模的rebalance,直至最终平衡完成,所以体验上会更好。
关于什么是rebalance继续往下看你就知道了。
上面也提到了rebalance,也就是再均衡。当kafka发生下面的情况会进行在均衡,也就是重新给消费者分配分区:
消费者需要保存当前消费到分区的什么位置了,这样哪怕消费者故障,重启后也能继续消费,这就是消费者的维护offset管理。
消费者位移offset存储在哪呢?
如何查看__consumer_offsets主题内容?
bin/kafka-console-consumer.sh --topic
__consumer_offsets --bootstrap-server hadoop102:9092 --
consumer.config config/consumer.properties --formatter
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageForm
atter" --from-beginning
## topic1 1号分区
[offset,topic1,1]::OffsetAndMetadata(offset=7,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
## topic1 0号分区
[offset,topic1,0]::OffsetAndMetadata(offset=8,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
消费者是如何提交保存位移offset呢?
为了使我们能够专注于自己的业务逻辑,kafka默认提供了自动提交offset的功能。这个由消费者客户端参数 enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒。
自动提交会带来什么问题?
自动提交消费位移的方式非常简便,但会带来是重复消费的问题。
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象。
我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样 并不能避免重复消费的发送,而且也会使位移提交更加频繁。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更 加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费。手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和 commitAsync()两种类型的方法。
同步提交会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),它必须等待offset提交完毕,再去消费下一批数据。
// 同步提交 offset
consumer.commitSync();
异步提交则没有失败重试机制,故有可能提交失败。它发送完提交offset请求后,就开始消费下一批数据了。
// 异步提交 offset
consumer.commitAsync();
那么手动提交会带来什么问题呢?可能会出现"漏消息"的情况。
设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
我们可以通过消费者事物来解决这样的问题。
其实无论是手动提交还是自动提交,都有可能出现消息重复和是漏消息,与我们的编程模型有关,需要我们开发的时候根据消息的重要程度来选择合适的消费方案。
一个正常的消费逻辑需要具备以下几个步骤:
(1)配置消费者客户端参数及创建相应的消费者实例;
(2)订阅主题;
(3)拉取消息并消费;
(4)提交消费位移 offset;
(5)关闭消费者实例。
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "doitedu01:9092");
// 制定 consumer group
props.put("group.id", "g1");
// 是否自动提交 offset
props.put("enable.auto.commit", "true");
// 自动提交 offset 的时间间隔
props.put("auto.commit.interval.ms", "1000");
// key 的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value 的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
props.put("auto.offset.reset","earliest");
// 定义 consumer
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 消费者订阅的 topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("first", "test","test1"));
while (true) {
// 读取数据,读取超时时间为 100ms
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
consumer.subscribe(Arrays.asList(topicl ));
consumer subscribe(Arrays.asList(topic2))
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果 有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到 新添加的主题中的消息。
consumer.subscribe(Pattern.compile ("topic.*" ));
消费者不仅可以通过 KafkaConsumer.subscribe()方法订阅主题,还可直接订阅某些主题的指定分区。
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;
通过unsubscribe()方法采取消主题的订阅。
consumer.unsubscribe();
kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法, poll()方法返回的是所订阅的主题(分区)上的一组消息。
对于 poll () 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空。
public ConsumerRecords poll(final Duration timeout)
超时时间参数 timeout ,用来控制 poll() 方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。
有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek( 方法正好提供了这个功能,让我们可以追前消费或回溯消费。
public void seek(TopicPartiton partition,long offset)
最后我们总结一下消费者中重要的参数配置。
参数名称 |
描述 |
bootstrap.servers |
向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer |
指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
group.id |
标记消费者所属的消费者组。 |
enable.auto.commit |
默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms |
如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
auto.offset.reset |
当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions |
__consumer_offsets 的分区数,默认是 50 个分区。 |
heartbeat.interval.ms |
Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms |
Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms |
消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes |
默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms |
默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes |
默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records |
一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
kafka消费是很重要的一个环节,本文总结kafka消费者的一些重要机制,包括消费者的整个流程,消费的分区策略,消费的再平衡以及消费的位移管理。在明白这些机制以后,简单讲解了如何使用消费者consumer的API以及消费者中重要的参数。
网页标题:Kafka消费者那些事儿
本文链接:http://www.mswzjz.cn/qtweb/news10/377410.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能