0%

kafka之Controller

Controller

Controller 作为 Kafka Server 端一个重要的组件,它的角色类似于其他分布式系统 Master 的角色,跟其他系统不一样的是,Kafka 集群的任何一台 Broker 都可以作为 Controller,但是在一个集群中同时只会有一个 Controller 是 alive 状态。Controller 在集群中负责的事务很多,比如:集群 meta 信息的一致性保证、Partition leader 的选举、broker 上下线等都是由 Controller 来具体负责。

Controller 简介

在于分布式系统中,总会有一个地方需要对全局 meta 做一个统一的维护,Kafka 的 Controller 就是充当这个角色的。

Controller 是运行在 Broker 上的,任何一台 Broker 都可以作为 Controller,但是一个集群同时只能存在一个 Controller,也就意味着 Controller 与数据节点是在一起的,Controller 做的主要事情如下:

  1. Broker 的上线、下线处理;
  2. 新创建的 topic 或已有 topic 的分区扩容,处理分区副本的分配、leader 选举;
  3. 管理所有副本的状态机和分区的状态机,处理状态机的变化事件;
  4. topic 删除、副本迁移、leader 切换等处理。

Controller 选举过程

Controller 启动

Kafka Server 在启动的过程中,都会去启动 Controller 服务。

Controller 在 startup() 方法中主要实现以下两部分功能:

  1. registerSessionExpirationListener() 方法注册连接 zk 的超时监听器;
  2. controllerElector.startup() 方法,监听 zk 上 controller 节点的变化,并触发 controller 选举方法。

Controller 选举

Controller 在启动时,会初始化 ZookeeperLeaderElector 对象,并调用其 startup() 启动相应的流程。

在 startup() 方法中,主要做了下面两件事情:

  1. 监听 zk 的 /controller 节点的数据变化,一旦节点有变化,立刻通过 LeaderChangeListener 的方法进行相应的处理;
  2. elect 在 controller 不存在的情况下选举 controller,存在的话,就是从 zk 获取当前的 controller 节点信息。
Controller 选举方法 elect

其实现逻辑如下:

  1. 先获取 zk 的 /cotroller 节点的信息,获取 controller 的 broker id,如果该节点不存在(比如集群刚创建时),那么获取的 controller id 为-1;
  2. 如果 controller id 不为-1,即 controller 已经存在,直接结束流程;
  3. 如果 controller id 为-1,证明 controller 还不存在,这时候当前 broker 开始在 zk 注册 controller;
  4. 如果注册成功,那么当前 broker 就成为了 controller,这时候开始调用 onBecomingLeader() 方法,正式初始化 controller(注意:controller 节点是临时节点,如果当前 controller 与 zk 的 session 断开,那么 controller 的临时节点会消失,会触发 controller 的重新选举);
  5. 如果注册失败(刚好 controller 被其他 broker 创建了、抛出异常等),那么直接返回。

在这里 controller 算是成功被选举出来了,controller 选举过程实际上就是各个 Broker 抢占式注册该节点,注册成功的便为 Controller。

controller 节点监听 LeaderChangeListener

LeaderChangeListener 主要是监听 zk 上的 Controller 节点变化,如果该节点内容变化或者节点被删除,那么会触发 handleDataChange() 和 handleDataDeleted() 方法。

处理过程如下:

  1. 如果 /controller 节点内容变化,那么更新一下 controller 最新的节点信息,如果该节点刚好之前是 controller,现在不是了,那么需要执行 controller 关闭操作,即 onResigningAsLeader() 方法;
  2. 如果 /controller 节点被删除,如果该节点刚好之前是 controller,那么需要执行 controller 关闭操作,即 onResigningAsLeader() 方法,然后再执行 elect 方法重新去选举 controller;

Controller 服务启动流程

Welcome to my other publishing channels