GroupCoordinator
Kafka 的 Server 端主要有三块内容:GroupCoordinator、Controller 和 ReplicaManager
GroupCoordinator 是负责进行 consumer 的 group 成员的rebalance 与 offset 管理(但每个 GroupCoordinator 只是管理一部分的 consumer group member 和 offset 信息)
rebalance机制
kafka保证同一消费组中的每个consumer能够消费一个或者多个特定的partition数据,一个partition的数据只能被一个consumer消费;因为每个partition里的消息是有序的,这样可以保证partition中的数据被同一个消费者有序消费;同时consumer只需要和自己消费的partition的broker通信就可以,减少开销。
在如下条件下,partition要在consumer中重新分配:
条件1:有新的consumer加入
条件2:旧的consumer挂了
条件3:coordinator挂了,集群选举出新的coordinator
条件4:topic的partition新加
条件5:consumer调用unsubscrible(),取消topic的订阅
在kafka中消费者的分区分配策略默认有两种:range和RoundRobin。
基于zk的rebalance
在kafka0.9版本之前,consumer的rebalance是通过在zookeeper上注册watch完成的。
这种做法很容易带来zk的羊群效应,任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance,造成集群内大量的调整;同时由于每个consumer单独通过zookeeper判断Broker和consumer宕机,由于zk的脑裂特性,同一时刻不同consumer通过zk看到的表现可能是不一样,这就可能会造成很多不正确的rebalance尝试;除此之外,由于consumer彼此独立,每个consumer都不知道其他consumer是否rebalance成功,可能会导致consumer group消费不正确。
基于Coordinator的rebalance
基于zk的rebalance存在不可避免的羊群效应和脑裂问题,如何不用zk来协调,而是将失败探测和Rebalance的逻辑放到一个高可用的中心,那么上述问题就能得以解决;因此kafka0.9.*的版本重新设计了consumer端,诞生了这样一个高可用中心Coordinator,大大减少了zookeeper负载。
对于每一个Consumer Group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。coordinator主要做两件事:
维持group的成员组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。
协调group成员的行为。
Coordinator有如下几种类型:
GroupCoordinator:broker端的,每个kafka server都有一个实例,管理部分的consumer group和它们的offset
WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。
ConsumerCoordinator:consumer端的,和GroupCoordinator通信的媒介。
ConsumerCoordinator是KafkaConsumer的一个成员,只负责与GroupCoordinator通信,所以真正的协调者还是GroupCoordinator。
group 如何选择相应的 GroupCoordinator
步骤1:对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。
__consumer_offsets topic 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认有三个副本。
而具体的一个 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions来计算的(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)。
对于 consumer group 而言,是根据其 group.id 进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。
步骤2:找到coordinator之后,发送JoinGroup请求

步骤3:JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition

partition的分配策略和分配结果其实是由client决定的,而不是由coordinator决定的。在第2步,所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,其他consumer作为follower。
然后由这个leader进行partition分配。然后在第3步,leader通过SyncGroup消息,把分配结果发给coordinator,其他consumer也发送SyncGroup消息,获得这个分配结果。
为什么要在consumer中选一个leader出来,进行分配,而不是由coordinator直接分配呢?关于这个, Kafka的官方文档有详细的分析。其中一个重要原因是为了灵活性:如果让server分配,一旦需要新的分配策略,server集群要重新部署,这对于已经在线上运行的集群来说,代价是很大的;而让client分配,server集群就不需要重新部署了。
Rebalance Generation
它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本。
heartbeat的实现原理
前面介绍了rebalance的条件,这些条件主要是通过heartbeat感知,每一个consumer都会定期的往coordinator发送heartbeat消息,一旦coordinator返回了某个特定的error code:ILLEGAL_GENERATION, 就说明之前的group无效了(解散了),要重新进行JoinGroup + SyncGroup操作。
那这个定期发送如何实现呢?一个直观的想法就是开一个后台线程,定时发送heartbeat消息,但维护一个后台线程,很显然会增大实现的复杂性。上面也说了,consumer是单线程程序。在这里是通过DelayedQueue来实现的。
DelayedQueue与HeartBeatTask
其基本思路是把HeartBeatRequest放入一个DelayedQueue中,然后在while循环的poll中,每次从DelayedQueue中把请求拿出来发送出去(只有时间到了,Task才能从Queue中拿出来)。
offset管理
老版本的位移是提交到zookeeper中的,目录结构是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。
因此kafka提供了另一种解决方案:增加__consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。


offset的管理过程
offset提交消息会根据消费组的key(消费组名称)进行分区. 对于一个给定的消费组,它的所有消息都会发送到唯一的broker(即Coordinator)
Coordinator上负责管理offset的组件是Offset manager。负责存储,抓取,和维护消费者的offsets. 每个broker都有一个offset manager实例。
Offset Commit提交过程
消费端
一条offset提交消息会作为生产请求.当消费者启动时,会为”offsets topic”创建一个消费者。
broker端
broker把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的.一旦将数据追加到leader的本地日志中,并且所有的replicas都赶上leader.leader检查生产请求是”offsets topic”,(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支)它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候).
因为设置了acks=-1,只有当这些offsets成功地复制到ISR中的所有brokers,才会被提交给offset manager.
Offset Fetch获取过程
消费端
消费者启动时,会首先创建到任意一个存活的brokers的通道.因此消费者会发送它所有”OffsetFetchRequest”到这个随机选中的broker。如果出现错误,这个通道就会被关闭,并重新创建一个随机的通道。
broker端
一个Offset抓取请求包含了多个topic-partitions. 接收请求的broker可能有也可能没有请求的partitions的offset信息。因此接收请求的brokers也会和其他broker通信. 一个通道连接池会用来转发请求给partition的leader broker。