订阅模式
consumer 的两种订阅模式, subscribe()和assign() 模式,一种是 topic 粒度(使用 group 管理),一种是 topic-partition 粒度(用户自己去管理)
订阅模式subscribe
consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。
按照 topic 级别去订阅,可以动态地获取其分配的 topic-partition,这是使用 Group 动态管理,它不能与手动 partition 管理一起使用。当监控到发生下面的事件时,Group 将会触发 rebalance 操作:
订阅的 topic 列表变化;
topic 被创建或删除;
consumer group 的某个 consumer 实例挂掉;
一个新的 consumer 实例通过 join 方法加入到一个 group 中。
在这种模式下,当 KafkaConsumer 调用 pollOnce 方法时,第一步会首先加入到一个 group 中,并获取其分配的 topic-partition 列表。
subscribe的两种方式:
topic列表订阅
通过集合的方式订阅一到多个topic。
SubscriptionType 类型设置为 AUTO_TOPICS;
更新 metadata 中的 topic 列表(topics 变量),并请求更新 metadata;
pattern模式订阅
以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配新主题,那么这个消费组会立即对其进行消费。
SubscriptionType 类型设置为 AUTO_PATTERN;
设置 Metadata 的 needMetadataForAllTopics 为 true,即在请求 metadata 时,需要更新所有 topic 的 metadata 信息,设置后再请求更新 metadata;
调用 coordinator.updatePatternSubscription() 方法,遍历所有 topic 的 metadata,找到所有满足 pattern 的 topic 列表,更新到 SubscriptionState 的 subscriptions 和 Metadata 的 topics 中;
通过在 ConsumerCoordinator 中调用 addMetadataListener() 方法在 Metadata 中添加 listener 当每次 metadata update 时就调用第三步的方法更新,但是只有当本地缓存的 topic 列表与现在要订阅的 topic 列表不同时,才会触发 rebalance 操作。
其他部分,两者基本一样,只是 pattern 模型在每次更新 topic-metadata 时,获取全局的 topic 列表,如果发现有新加入的符合条件的 topic,就立马去订阅,其他的地方,包括 Group 管理、topic-partition 的分配都是一样的。
分配模式assign
为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效。
当调用 assign() 方法手动分配 topic-partition 列表时,是不会使用 consumer 的 Group 管理机制,也即是当 consumer group member 变化或 topic 的 metadata 信息变化时是不会触发 rebalance 操作的。比如:当 topic 的 partition 增加时,这里是无法感知,需要用户进行相应的处理,Apache Flink 就是使用的这种方式。
1 | // ConsumerCoordinator.poll() // Coordinator.poll() |
如果使用的是 assign 模式,也即是非 AUTO_TOPICS 或 AUTO_PATTERN 模式时,Consumer 实例在调用 poll 方法时,是不会向 GroupCoordinator 发送 join-group/sync-group/heartbeat 请求的,也就是说 GroupCoordinator 是拿不到这个 Consumer 实例的相关信息,也不会去维护这个 member 是否存活,这种情况下就需要用户自己管理自己的处理程序。但是在这种模式是可以进行 offset commit的。