0%

kafka之两种订阅模式

订阅模式

consumer 的两种订阅模式, subscribe()和assign() 模式,一种是 topic 粒度(使用 group 管理),一种是 topic-partition 粒度(用户自己去管理)

订阅模式subscribe

consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。

按照 topic 级别去订阅,可以动态地获取其分配的 topic-partition,这是使用 Group 动态管理,它不能与手动 partition 管理一起使用。当监控到发生下面的事件时,Group 将会触发 rebalance 操作:

订阅的 topic 列表变化;
topic 被创建或删除;
consumer group 的某个 consumer 实例挂掉;
一个新的 consumer 实例通过 join 方法加入到一个 group 中。

在这种模式下,当 KafkaConsumer 调用 pollOnce 方法时,第一步会首先加入到一个 group 中,并获取其分配的 topic-partition 列表。

subscribe的两种方式:

  1. topic列表订阅

    通过集合的方式订阅一到多个topic。

    SubscriptionType 类型设置为 AUTO_TOPICS;

    更新 metadata 中的 topic 列表(topics 变量),并请求更新 metadata;

  2. pattern模式订阅

    以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配新主题,那么这个消费组会立即对其进行消费。

    SubscriptionType 类型设置为 AUTO_PATTERN;

    设置 Metadata 的 needMetadataForAllTopics 为 true,即在请求 metadata 时,需要更新所有 topic 的 metadata 信息,设置后再请求更新 metadata;

    调用 coordinator.updatePatternSubscription() 方法,遍历所有 topic 的 metadata,找到所有满足 pattern 的 topic 列表,更新到 SubscriptionState 的 subscriptions 和 Metadata 的 topics 中;

    通过在 ConsumerCoordinator 中调用 addMetadataListener() 方法在 Metadata 中添加 listener 当每次 metadata update 时就调用第三步的方法更新,但是只有当本地缓存的 topic 列表与现在要订阅的 topic 列表不同时,才会触发 rebalance 操作。

其他部分,两者基本一样,只是 pattern 模型在每次更新 topic-metadata 时,获取全局的 topic 列表,如果发现有新加入的符合条件的 topic,就立马去订阅,其他的地方,包括 Group 管理、topic-partition 的分配都是一样的。

分配模式assign

为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效。

当调用 assign() 方法手动分配 topic-partition 列表时,是不会使用 consumer 的 Group 管理机制,也即是当 consumer group member 变化或 topic 的 metadata 信息变化时是不会触发 rebalance 操作的。比如:当 topic 的 partition 增加时,这里是无法感知,需要用户进行相应的处理,Apache Flink 就是使用的这种方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ConsumerCoordinator.poll() // Coordinator.poll()
// 用处:
// 1. 同步更新 coordinator:确保我们的 consumer group 的 coordinator 是最新的。
// 2. 更新拉取的位移:确保当前 consumer 分配的分区更新其相应的拉取位移,如果没有更新到的话,consumer 就会使用 auto.offset.reset 来更新分区的拉取位移(设置为最早位移、最近位移或者抛错)。

// note: 它确保了这个 group 的 coordinator 是已知的,并且这个 consumer 是已经加入到了 group 中,也用于 offset 周期性的 commit
public void poll(long now) {
invokeCompletedOffsetCommitCallbacks();// note: 用于测试

// note: Step1 通过 subscribe() 方法订阅 topic,并且 coordinator 未知,初始化 Consumer Coordinator
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
// note: 获取 GroupCoordinator 地址,并且建立连接
ensureCoordinatorReady();
now = time.milliseconds();
}

// note: Step2 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin
// note: 如果订阅模式不是 AUTO_TOPICS 或 AUTO_PATTERN,直接跳过
if (needRejoin()) {
// note: rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言)
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();

// note: 确保 group 是 active; 加入 group; 分配订阅的 partition
ensureActiveGroup();
now = time.milliseconds();
}

// note: Step3 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间
// note: 发送心跳请求是在 ensureCoordinatorReady() 中调用的
pollHeartbeat(now);
// note: Step4 自动 commit 时,当定时达到时,进行自动 commit
maybeAutoCommitOffsetsAsync(now);
}

如果使用的是 assign 模式,也即是非 AUTO_TOPICS 或 AUTO_PATTERN 模式时,Consumer 实例在调用 poll 方法时,是不会向 GroupCoordinator 发送 join-group/sync-group/heartbeat 请求的,也就是说 GroupCoordinator 是拿不到这个 Consumer 实例的相关信息,也不会去维护这个 member 是否存活,这种情况下就需要用户自己管理自己的处理程序。但是在这种模式是可以进行 offset commit的。

Welcome to my other publishing channels