0%

kafka中的Metadata

  • Metadata的内容

    • Metadata元数据主要包含了broker、topic和partition的信息,比如broker的id、topic的partition数、partition的leader和副本等
    • 集群中有哪些节点;
    • 集群中有哪些 topic,这些 topic 有哪些 partition;
    • 每个 partition 的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上;
    • 每个 partition 的 AR 有哪些副本,ISR 有哪些副本
  • Metadata的应用场景

    • KafkaProducer 发送一条消息到指定的 topic 中,需要知道分区的数量,要发送的目标分区,目标分区的 leader,leader 所在的节点地址等,这些信息都要从 Metadata 中获取。
    • 当 Kafka 集群中发生了 leader 选举,节点中 partition 或副本发生了变化等,这些场景都需要更新Metadata 中的数据
  • Metadata的更新

    • Producer 在调用 dosend() 方法时,第一步就是通过 waitOnMetadata 方法获取该 topic 的 metadata 信息。如果metadata读不到,会一直阻塞在那,直到超时,抛出TimeoutException。
    • Sender poll()更新Metadata
      • 周期性的更新: 每隔一段时间更新一次,这个通过 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 这2个字段来实现
      • 失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新。 requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了false
  • Metadata失效检测

    • initConnect的时候
    • poll里面IO的时候,连接断掉了
    • 有请求超时
    • 发消息的时候,有partition的leader没找到
    • 返回的response和请求对不上的时候
    • 总之一句话:发生各式各样的异常,数据不同步,都认为metadata可能出问题了,要求更新。

Welcome to my other publishing channels