0%

partition分配机制

consumer 提供的两种不同 partition 分配策略,可以通过 partition.assignment.strategy 参数进行配置,默认情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一种 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor

RangeAssignor 分配模式

假设 topic 的 partition 数为 numPartitionsForTopic,group 中订阅这个 topic 的 member 数为 consumersForTopic.size(),首先需要算出两个值:

numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size():表示平均每个 consumer 会分配到几个 partition;

consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size():表示平均分配后还剩下多少个 partition 未分配。

分配的规则是:对于剩下的那些 partition 分配到前 consumersWithExtraPartition 个 consumer 上,也就是前 consumersWithExtraPartition 个 consumer 获得 topic-partition 列表会比后面多一个。

在上述的程序中,举了一个例子,假设有一个 topic 有 7 个 partition,group 有5个 consumer,这个5个 consumer 都订阅这个 topic,那么 range 的分配方式如下:

consumer 0:start: 0, length: 2, topic-partition: p0,p1;
consumer 1:start: 2, length: 2, topic-partition: p2,p3;
consumer 2:start: 4, length: 1, topic-partition: p4;
consumer 3:start: 5, length: 1, topic-partition: p5;
consumer 4:start: 6, length: 1, topic-partition: p6

RoundRobinAssignor分配模式

roundrobin 的实现原则,简单来说就是:列出所有 topic-partition 和列出所有的 consumer member,然后开始分配,一轮之后继续下一轮,假设有有一个 topic,它有7个 partition,group 有3个 consumer 都订阅了这个 topic,那么其分配方式为:

20220406200003

对于多个 topic 的订阅,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

20220406200004

commit机制

在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。

kafka对于offset的处理有两种提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)

偏移量offset

如果消费者一直处于运行状态,那么偏移量就没有什么用处.

如果有消费者退出或者新 partition 加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的 partition,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个 partition 最后一次提交的offset,然后从偏移量指定的地方继续处理.

自动提交偏移量

Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。

Kafka消费者客户端编程逻辑中位移提交是一个大难点,自动位移提交免去了复杂的位移提交逻辑,让编码更简洁,但同时也带来了重复消费和消息丢失的问题。

  1. 重复消费

    假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次进行自动位移提交之前,消费者崩溃了或者发生再均衡,那么又得从上一次的位移处重新开始消费。

    我们可以通过减少自动位移提交的时间间隔来减少重复消息的窗口大小,但这样不能从根本上解决重复消费的问题,而且会使位移提交更加频繁。

  2. 消息丢失

    比如在如下图场景:拉取线程不断拉取消息并存入本地缓存,比如存入到BlockingQueue中,另外一个线程负责从缓存中读取消息并进行相应的逻辑处理。假设目前已经进行第y+1次拉取和第Z次位移提交,也就是第X+9以前的位移一经提交,但是处理消息的线程还在处理第X+4条消息,此时如果处理消息的线程发生异常,然后恢复正常后,则再次拉取消息会从第Z次提交的位置X+9处开始拉取消息然后处理,此时从X+4到X+9处的消息就被丢失了。

    20220406165937

手动提交偏移量

鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。

对于手动提交offset主要有3种方式:1.同步提交 2.异步提交 3.异步+同步 组合的方式提交。

  1. 同步手动提交偏移量

    同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。

  2. 异步手动提交偏移量+回调函数

    异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。

    对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。

  3. 异步+同步 组合的方式提交偏移量

    针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。

订阅模式

consumer 的两种订阅模式, subscribe()和assign() 模式,一种是 topic 粒度(使用 group 管理),一种是 topic-partition 粒度(用户自己去管理)

订阅模式subscribe

consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。

按照 topic 级别去订阅,可以动态地获取其分配的 topic-partition,这是使用 Group 动态管理,它不能与手动 partition 管理一起使用。当监控到发生下面的事件时,Group 将会触发 rebalance 操作:

订阅的 topic 列表变化;
topic 被创建或删除;
consumer group 的某个 consumer 实例挂掉;
一个新的 consumer 实例通过 join 方法加入到一个 group 中。

在这种模式下,当 KafkaConsumer 调用 pollOnce 方法时,第一步会首先加入到一个 group 中,并获取其分配的 topic-partition 列表。

subscribe的两种方式:

  1. topic列表订阅

    通过集合的方式订阅一到多个topic。

    SubscriptionType 类型设置为 AUTO_TOPICS;

    更新 metadata 中的 topic 列表(topics 变量),并请求更新 metadata;

  2. pattern模式订阅

    以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配新主题,那么这个消费组会立即对其进行消费。

    SubscriptionType 类型设置为 AUTO_PATTERN;

    设置 Metadata 的 needMetadataForAllTopics 为 true,即在请求 metadata 时,需要更新所有 topic 的 metadata 信息,设置后再请求更新 metadata;

    调用 coordinator.updatePatternSubscription() 方法,遍历所有 topic 的 metadata,找到所有满足 pattern 的 topic 列表,更新到 SubscriptionState 的 subscriptions 和 Metadata 的 topics 中;

    通过在 ConsumerCoordinator 中调用 addMetadataListener() 方法在 Metadata 中添加 listener 当每次 metadata update 时就调用第三步的方法更新,但是只有当本地缓存的 topic 列表与现在要订阅的 topic 列表不同时,才会触发 rebalance 操作。

其他部分,两者基本一样,只是 pattern 模型在每次更新 topic-metadata 时,获取全局的 topic 列表,如果发现有新加入的符合条件的 topic,就立马去订阅,其他的地方,包括 Group 管理、topic-partition 的分配都是一样的。

分配模式assign

为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效。

当调用 assign() 方法手动分配 topic-partition 列表时,是不会使用 consumer 的 Group 管理机制,也即是当 consumer group member 变化或 topic 的 metadata 信息变化时是不会触发 rebalance 操作的。比如:当 topic 的 partition 增加时,这里是无法感知,需要用户进行相应的处理,Apache Flink 就是使用的这种方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ConsumerCoordinator.poll() // Coordinator.poll()
// 用处:
// 1. 同步更新 coordinator:确保我们的 consumer group 的 coordinator 是最新的。
// 2. 更新拉取的位移:确保当前 consumer 分配的分区更新其相应的拉取位移,如果没有更新到的话,consumer 就会使用 auto.offset.reset 来更新分区的拉取位移(设置为最早位移、最近位移或者抛错)。

// note: 它确保了这个 group 的 coordinator 是已知的,并且这个 consumer 是已经加入到了 group 中,也用于 offset 周期性的 commit
public void poll(long now) {
invokeCompletedOffsetCommitCallbacks();// note: 用于测试

// note: Step1 通过 subscribe() 方法订阅 topic,并且 coordinator 未知,初始化 Consumer Coordinator
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
// note: 获取 GroupCoordinator 地址,并且建立连接
ensureCoordinatorReady();
now = time.milliseconds();
}

// note: Step2 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin
// note: 如果订阅模式不是 AUTO_TOPICS 或 AUTO_PATTERN,直接跳过
if (needRejoin()) {
// note: rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言)
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();

// note: 确保 group 是 active; 加入 group; 分配订阅的 partition
ensureActiveGroup();
now = time.milliseconds();
}

// note: Step3 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间
// note: 发送心跳请求是在 ensureCoordinatorReady() 中调用的
pollHeartbeat(now);
// note: Step4 自动 commit 时,当定时达到时,进行自动 commit
maybeAutoCommitOffsetsAsync(now);
}

如果使用的是 assign 模式,也即是非 AUTO_TOPICS 或 AUTO_PATTERN 模式时,Consumer 实例在调用 poll 方法时,是不会向 GroupCoordinator 发送 join-group/sync-group/heartbeat 请求的,也就是说 GroupCoordinator 是拿不到这个 Consumer 实例的相关信息,也不会去维护这个 member 是否存活,这种情况下就需要用户自己管理自己的处理程序。但是在这种模式是可以进行 offset commit的。

Consumer poll 模型

Consumer poll 方法的真正实现是在 pollOnce() 方法中,一次 poll 过程,包括检查新的数据、做一些必要的 commit 以及 offset 重置操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Do one round of polling. In addition to checking for new data, this does any needed offset commits
* (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
* @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
* @return The fetched records (may be empty)
*/
// note: 一次 poll 过程,包括检查新的数据、做一些必要的 commit 以及 offset 重置操作
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// note: 1. 获取 GroupCoordinator 并连接、加入 Group、sync Group, 期间 group 会进行 rebalance 并获取
coordinator.poll(time.milliseconds());
// assignment

// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
// note: 2. 更新要拉取 partition 的 offset(如果需要更新的话)
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());

// if data is available already, return it immediately
// note: 3. 获取 fetcher 已经拉取到的数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
// 如果不是空,说明 completedFetches 队列中有数据,就直接拿数据
return records;
// 如果是空
// note: 说明上次 fetch 到是的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据

// send any new fetches (won't resend pending fetches)
// note: 4. 向订阅的所有 partition 发送 fetch 请求,会从多个 partition 拉取数据,结果存放在completedFetches中
// 但是这里的发送并不是真正的发送,而是将 FetchRequest 请求对象存放在 unsend 缓存当中,然后会在 ConsumerNetworkClient#poll 方法调用时才会被真正地执行发送。
fetcher.sendFetches();

long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

//note: 5. 调用 poll 方法发送数据
client.poll(pollTimeout, now, new PollCondition() {
// 发送请求到服务端,但是如果之前发送的fectch请求还在路上的话,就block等待。
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
//note: 6. 如果 group 需要 rebalance, 直接返回空数据,这样更快地让 group 进行稳定状态
if (coordinator.needRejoin())
return Collections.emptyMap();

// 再次返回数据
return fetcher.fetchedRecords();
}

在这里,我们把一个 pollOnce 模型分为6个部分,这里简单介绍一下:

  1. coordinator.poll():连接 GroupCoordinator,并发送 join-group、sync-group 请求,加入 group 成功,并获取其分配的 tp 列表;,如果设置了自动 commit,也会在这一步进行 commit,总之,对于一个新建的 group,group 状态将会从 Empty –> PreparingRebalance –> AwaiSync –> Stable;
  2. updateFetchPositions(): 在上一步中已经获取到了这个 consumer 实例要订阅的 topic-partition list,这一步更新其 fetch-position offset,以便进行拉取;
  3. fetcher.fetchedRecords():返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(自动 commit 时,是在第一步实现的),才会更新其 committed offset;
  4. fetcher.sendFetches():只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同一个 node 的 topic-partition 会合成一个请求去发送;
  5. client.poll():调用底层 NetworkClient 提供的接口去发送相应的请求;
  6. oordinator.needRejoin():如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进行 rebalance。

注意:

其实 Kafka 消费者在拉取消息过程中,有两条线程在工作,其中用户主线程调用 pollForFetches 方法从缓存中获取消息消费,在获取消息后,会再调用 ConsumerNetworkClient#poll 方法从 Broker 发送拉取请求,然后将拉取到的消息缓存到本地,这里为什么在拉取完消息后,会主动调用 ConsumerNetworkClient#poll 方法呢?我想这里的目的是为了下次 poll 的时候可以立即从缓存中拉取消息。

当缓存中还存在中还存在某个分区的消息数据时,消费者不会继续对该分区进行拉取请求,直到该分区的本地缓存被消费完,才会继续发送拉取请求。假设某消费者监听三个分区,每个分区每次从 Broker 中拉取 4 条消息,用户每次从本地缓存中获取 2 条消息:

20220405155653

GroupCoordinator

Kafka 的 Server 端主要有三块内容:GroupCoordinator、Controller 和 ReplicaManager

GroupCoordinator 是负责进行 consumer 的 group 成员的rebalance 与 offset 管理(但每个 GroupCoordinator 只是管理一部分的 consumer group member 和 offset 信息)

rebalance机制

kafka保证同一消费组中的每个consumer能够消费一个或者多个特定的partition数据,一个partition的数据只能被一个consumer消费;因为每个partition里的消息是有序的,这样可以保证partition中的数据被同一个消费者有序消费;同时consumer只需要和自己消费的partition的broker通信就可以,减少开销。

在如下条件下,partition要在consumer中重新分配:

条件1:有新的consumer加入
条件2:旧的consumer挂了
条件3:coordinator挂了,集群选举出新的coordinator
条件4:topic的partition新加
条件5:consumer调用unsubscrible(),取消topic的订阅

在kafka中消费者的分区分配策略默认有两种:range和RoundRobin。

基于zk的rebalance

在kafka0.9版本之前,consumer的rebalance是通过在zookeeper上注册watch完成的。

这种做法很容易带来zk的羊群效应,任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance,造成集群内大量的调整;同时由于每个consumer单独通过zookeeper判断Broker和consumer宕机,由于zk的脑裂特性,同一时刻不同consumer通过zk看到的表现可能是不一样,这就可能会造成很多不正确的rebalance尝试;除此之外,由于consumer彼此独立,每个consumer都不知道其他consumer是否rebalance成功,可能会导致consumer group消费不正确。

基于Coordinator的rebalance

基于zk的rebalance存在不可避免的羊群效应和脑裂问题,如何不用zk来协调,而是将失败探测和Rebalance的逻辑放到一个高可用的中心,那么上述问题就能得以解决;因此kafka0.9.*的版本重新设计了consumer端,诞生了这样一个高可用中心Coordinator,大大减少了zookeeper负载。

对于每一个Consumer Group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。coordinator主要做两件事:

维持group的成员组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。
协调group成员的行为。

Coordinator有如下几种类型:

GroupCoordinator:broker端的,每个kafka server都有一个实例,管理部分的consumer group和它们的offset
WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。
ConsumerCoordinator:consumer端的,和GroupCoordinator通信的媒介。

ConsumerCoordinator是KafkaConsumer的一个成员,只负责与GroupCoordinator通信,所以真正的协调者还是GroupCoordinator。

group 如何选择相应的 GroupCoordinator

  • 步骤1:对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。

    __consumer_offsets topic 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认有三个副本。

    而具体的一个 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions来计算的(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)。

    对于 consumer group 而言,是根据其 group.id 进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。

  • 步骤2:找到coordinator之后,发送JoinGroup请求

    20220404162248

  • 步骤3:JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition

    20220404162401

partition的分配策略和分配结果其实是由client决定的,而不是由coordinator决定的。在第2步,所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,其他consumer作为follower。

然后由这个leader进行partition分配。然后在第3步,leader通过SyncGroup消息,把分配结果发给coordinator,其他consumer也发送SyncGroup消息,获得这个分配结果。

为什么要在consumer中选一个leader出来,进行分配,而不是由coordinator直接分配呢?关于这个, Kafka的官方文档有详细的分析。其中一个重要原因是为了灵活性:如果让server分配,一旦需要新的分配策略,server集群要重新部署,这对于已经在线上运行的集群来说,代价是很大的;而让client分配,server集群就不需要重新部署了。

Rebalance Generation

它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本。

heartbeat的实现原理

前面介绍了rebalance的条件,这些条件主要是通过heartbeat感知,每一个consumer都会定期的往coordinator发送heartbeat消息,一旦coordinator返回了某个特定的error code:ILLEGAL_GENERATION, 就说明之前的group无效了(解散了),要重新进行JoinGroup + SyncGroup操作。

那这个定期发送如何实现呢?一个直观的想法就是开一个后台线程,定时发送heartbeat消息,但维护一个后台线程,很显然会增大实现的复杂性。上面也说了,consumer是单线程程序。在这里是通过DelayedQueue来实现的。

DelayedQueue与HeartBeatTask

其基本思路是把HeartBeatRequest放入一个DelayedQueue中,然后在while循环的poll中,每次从DelayedQueue中把请求拿出来发送出去(只有时间到了,Task才能从Queue中拿出来)。

offset管理

老版本的位移是提交到zookeeper中的,目录结构是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。

因此kafka提供了另一种解决方案:增加__consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。

20220404164238

20220404164330

offset的管理过程

offset提交消息会根据消费组的key(消费组名称)进行分区. 对于一个给定的消费组,它的所有消息都会发送到唯一的broker(即Coordinator)

Coordinator上负责管理offset的组件是Offset manager。负责存储,抓取,和维护消费者的offsets. 每个broker都有一个offset manager实例。

Offset Commit提交过程

消费端

一条offset提交消息会作为生产请求.当消费者启动时,会为”offsets topic”创建一个消费者。

broker端

broker把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的.一旦将数据追加到leader的本地日志中,并且所有的replicas都赶上leader.leader检查生产请求是”offsets topic”,(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支)它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候).
因为设置了acks=-1,只有当这些offsets成功地复制到ISR中的所有brokers,才会被提交给offset manager.

Offset Fetch获取过程

消费端

消费者启动时,会首先创建到任意一个存活的brokers的通道.因此消费者会发送它所有”OffsetFetchRequest”到这个随机选中的broker。如果出现错误,这个通道就会被关闭,并重新创建一个随机的通道。

broker端

一个Offset抓取请求包含了多个topic-partitions. 接收请求的broker可能有也可能没有请求的partitions的offset信息。因此接收请求的brokers也会和其他broker通信. 一个通道连接池会用来转发请求给partition的leader broker。