RecordAccumulator
- 每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送,这点要理解清楚)。

- 每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送,这点要理解清楚)。
mutePartition() 与 unmutePartition()
- 这两个方法是保证有序性关键之一,其主要做用就是将指定的 topic-partition 从 muted 集合中加入或删除。
- mutePartition():如果要求保证顺序性,那么这个 tp 对应的 RecordBatch 如果要开始发送,就将这个 tp 加入到 muted 集合中;
- unmutePartition():如果 tp 对应的 RecordBatch 发送完成,tp 将会从 muted 集合中移除。
ready()
- ready() 是在 Sender 线程中调用的,其作用选择那些可以发送的 node,也就是说,如果这个 tp 对应的 batch 可以发送(达到时间或大小要求),就把 tp 对应的 leader 选出来。
drain()
- 是用来遍历可发送请求的 node,然后再遍历在这个 node 上所有 tp,如果 tp 对应的 deque 有数据,将会被选择出来直到超过一个请求的最大长度(max.request.size)为止,也就说说即使 RecordBatch 没有达到条件,但为了保证每个 request 尽快多地发送数据提高发送效率,这个 RecordBatch 依然会被提前选出来并进行发送。
顺序性如何保证?
- 如果 KafkaProducer 的 max.in.flight.requests.per.connection 设置为1,那么就可以保证其顺序性,否则的话,就不保证顺序性
- 而其实现机制则是在 RecordAccumulator 中的 mutePartition() 与 unmutePartition()
kafka之partition的顺序性
Welcome to my other publishing channels