0%

kafka之consumer的commit机制

commit机制

在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。

kafka对于offset的处理有两种提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)

偏移量offset

如果消费者一直处于运行状态,那么偏移量就没有什么用处.

如果有消费者退出或者新 partition 加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的 partition,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个 partition 最后一次提交的offset,然后从偏移量指定的地方继续处理.

自动提交偏移量

Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。

Kafka消费者客户端编程逻辑中位移提交是一个大难点,自动位移提交免去了复杂的位移提交逻辑,让编码更简洁,但同时也带来了重复消费和消息丢失的问题。

  1. 重复消费

    假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次进行自动位移提交之前,消费者崩溃了或者发生再均衡,那么又得从上一次的位移处重新开始消费。

    我们可以通过减少自动位移提交的时间间隔来减少重复消息的窗口大小,但这样不能从根本上解决重复消费的问题,而且会使位移提交更加频繁。

  2. 消息丢失

    比如在如下图场景:拉取线程不断拉取消息并存入本地缓存,比如存入到BlockingQueue中,另外一个线程负责从缓存中读取消息并进行相应的逻辑处理。假设目前已经进行第y+1次拉取和第Z次位移提交,也就是第X+9以前的位移一经提交,但是处理消息的线程还在处理第X+4条消息,此时如果处理消息的线程发生异常,然后恢复正常后,则再次拉取消息会从第Z次提交的位置X+9处开始拉取消息然后处理,此时从X+4到X+9处的消息就被丢失了。

    20220406165937

手动提交偏移量

鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。

对于手动提交offset主要有3种方式:1.同步提交 2.异步提交 3.异步+同步 组合的方式提交。

  1. 同步手动提交偏移量

    同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。

  2. 异步手动提交偏移量+回调函数

    异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。

    对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。

  3. 异步+同步 组合的方式提交偏移量

    针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。

Welcome to my other publishing channels