0%

kafka之partition的顺序性

  • RecordAccumulator

    • 每个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操作(但并不是只有达到这两个条件之一才会被发送,这点要理解清楚)。
      20220404111740
  • mutePartition() 与 unmutePartition()

    • 这两个方法是保证有序性关键之一,其主要做用就是将指定的 topic-partition 从 muted 集合中加入或删除。
    • mutePartition():如果要求保证顺序性,那么这个 tp 对应的 RecordBatch 如果要开始发送,就将这个 tp 加入到 muted 集合中;
    • unmutePartition():如果 tp 对应的 RecordBatch 发送完成,tp 将会从 muted 集合中移除。
  • ready()

    • ready() 是在 Sender 线程中调用的,其作用选择那些可以发送的 node,也就是说,如果这个 tp 对应的 batch 可以发送(达到时间或大小要求),就把 tp 对应的 leader 选出来。
  • drain()

    • 是用来遍历可发送请求的 node,然后再遍历在这个 node 上所有 tp,如果 tp 对应的 deque 有数据,将会被选择出来直到超过一个请求的最大长度(max.request.size)为止,也就说说即使 RecordBatch 没有达到条件,但为了保证每个 request 尽快多地发送数据提高发送效率,这个 RecordBatch 依然会被提前选出来并进行发送。
  • 顺序性如何保证?

    • 如果 KafkaProducer 的 max.in.flight.requests.per.connection 设置为1,那么就可以保证其顺序性,否则的话,就不保证顺序性
    • 而其实现机制则是在 RecordAccumulator 中的 mutePartition() 与 unmutePartition()

Welcome to my other publishing channels