0%

Controller

Controller 作为 Kafka Server 端一个重要的组件,它的角色类似于其他分布式系统 Master 的角色,跟其他系统不一样的是,Kafka 集群的任何一台 Broker 都可以作为 Controller,但是在一个集群中同时只会有一个 Controller 是 alive 状态。Controller 在集群中负责的事务很多,比如:集群 meta 信息的一致性保证、Partition leader 的选举、broker 上下线等都是由 Controller 来具体负责。

Controller 简介

在于分布式系统中,总会有一个地方需要对全局 meta 做一个统一的维护,Kafka 的 Controller 就是充当这个角色的。

Controller 是运行在 Broker 上的,任何一台 Broker 都可以作为 Controller,但是一个集群同时只能存在一个 Controller,也就意味着 Controller 与数据节点是在一起的,Controller 做的主要事情如下:

  1. Broker 的上线、下线处理;
  2. 新创建的 topic 或已有 topic 的分区扩容,处理分区副本的分配、leader 选举;
  3. 管理所有副本的状态机和分区的状态机,处理状态机的变化事件;
  4. topic 删除、副本迁移、leader 切换等处理。

Controller 选举过程

Controller 启动

Kafka Server 在启动的过程中,都会去启动 Controller 服务。

Controller 在 startup() 方法中主要实现以下两部分功能:

  1. registerSessionExpirationListener() 方法注册连接 zk 的超时监听器;
  2. controllerElector.startup() 方法,监听 zk 上 controller 节点的变化,并触发 controller 选举方法。

Controller 选举

Controller 在启动时,会初始化 ZookeeperLeaderElector 对象,并调用其 startup() 启动相应的流程。

在 startup() 方法中,主要做了下面两件事情:

  1. 监听 zk 的 /controller 节点的数据变化,一旦节点有变化,立刻通过 LeaderChangeListener 的方法进行相应的处理;
  2. elect 在 controller 不存在的情况下选举 controller,存在的话,就是从 zk 获取当前的 controller 节点信息。
Controller 选举方法 elect

其实现逻辑如下:

  1. 先获取 zk 的 /cotroller 节点的信息,获取 controller 的 broker id,如果该节点不存在(比如集群刚创建时),那么获取的 controller id 为-1;
  2. 如果 controller id 不为-1,即 controller 已经存在,直接结束流程;
  3. 如果 controller id 为-1,证明 controller 还不存在,这时候当前 broker 开始在 zk 注册 controller;
  4. 如果注册成功,那么当前 broker 就成为了 controller,这时候开始调用 onBecomingLeader() 方法,正式初始化 controller(注意:controller 节点是临时节点,如果当前 controller 与 zk 的 session 断开,那么 controller 的临时节点会消失,会触发 controller 的重新选举);
  5. 如果注册失败(刚好 controller 被其他 broker 创建了、抛出异常等),那么直接返回。

在这里 controller 算是成功被选举出来了,controller 选举过程实际上就是各个 Broker 抢占式注册该节点,注册成功的便为 Controller。

controller 节点监听 LeaderChangeListener

LeaderChangeListener 主要是监听 zk 上的 Controller 节点变化,如果该节点内容变化或者节点被删除,那么会触发 handleDataChange() 和 handleDataDeleted() 方法。

处理过程如下:

  1. 如果 /controller 节点内容变化,那么更新一下 controller 最新的节点信息,如果该节点刚好之前是 controller,现在不是了,那么需要执行 controller 关闭操作,即 onResigningAsLeader() 方法;
  2. 如果 /controller 节点被删除,如果该节点刚好之前是 controller,那么需要执行 controller 关闭操作,即 onResigningAsLeader() 方法,然后再执行 elect 方法重新去选举 controller;

Controller 服务启动流程

Kafka的副本机制

Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset, 确定它在分区日志中唯一的位置。

20220409143015

Kafka每个topic的partition有N个副本,其中N是topic的复制因子。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。

20220409143223

Kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写,表示副本同步队列,具体可参考下节)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower“落后”太多或者失效,leader将会把它从ISR中删除。

leader和follower的角色区分,也主要是ReplicaManager来实现。具体地讲:

leader

  1. leader会接受client的读取请求和写入请求。
  2. leader需要接受follwer抓取message的请求,返回message给follower
  3. leader需要维护ISR(in-sync replicas)列表。“保持同步”的含义有些复杂,0.9之前版本对这个概念的定义与0.9不同,详情参见KIP-16 - Automated Replica Lag Tuning。0.9版本,broker的参数replica.lag.time.max.ms用来指定ISR的定义,如果leader在这么长时间没收到follower的拉取请求,或者在这么长时间内,follower没有fetch到leader的log end offset,就会被leader从ISR中移除。ISR是个很重要的指标,controller选取partition的leader replica时会使用它,因此leader选取ISR后会把结果记到Zookeeper上。
  4. leader需要维护high watermark。high watermark以下的消息就是所有ISR列表里的replica都已经读取的消息(注意,并不是所有replica都一定有这些消息,而只是ISR里的那些才肯定会有)。因此leader会根据follower拉取数据时提供的offset和ISR列表,决定HW,并且在返回给follower的请求中附带最新的HW。

follower

  1. follower需要不停地去leader处拉取最新的log
  2. follower需要根据leader在fetch reponse中提供的HW,更新自己本地保存的leader的HW信息。在它过行leader或follower转变时,会用到这个HW。

副本同步队列ISR

所谓同步,必须满足如下两个条件:

  1. 副本节点必须能与zookeeper保持会话(心跳机制)
  2. 副本能复制leader上的所有写操作,并且不能落后太多。(卡住或滞后的副本控制是由 replica.lag.time.max.ms 配置)

所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

上一节中的HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。

下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:

20220409143654

由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。

Fetch 请求处理的整体流程

20220408162810

Fetch 请求的来源

  1. Consumer Fetch 请求
  2. Replica 同步 Fetch 请求

Server 端的处理

KafkaApis 如何处理 Fetch 请求

Fetch 请求处理的真正实现是在 replicaManager 的 fetchMessages() 方法中,在这里,可以看出,无论是 Fetch 请求还是 Produce 请求,都是通过副本管理器来实现的,副本管理器(ReplicaManager)管理的对象是分区实例(Partition),而每个分区都会与相应的副本实例对应(Replica),在这个节点上的副本又会与唯一的 Log 实例对应,正如流程图的上半部分一样,Server 就是通过这几部分抽象概念来管理真正存储层的内容。

ReplicaManager 如何处理 Fetch 请求

ReplicaManger 处理 Fetch 请求的入口在 fetchMessages() 方法。

fetchMessages()

整体来说,分为以下几步:

  1. readFromLocalLog():调用该方法,从本地日志拉取相应的数据;
  2. 判断 Fetch 请求来源,如果来自副本同步,那么更新该副本的 the end offset 记录,如果该副本不在 isr 中,并判断是否需要更新 isr;
  3. 返回结果,满足条件的话立马返回,否则的话,通过延迟操作,延迟返回结果。

readFromLocalLog()

处理过程:

  1. 先根据要拉取的 topic-partition 获取对应的 Partition 对象,根据 Partition 对象获取对应的 Replica 对象;
  2. 根据 Replica 对象找到对应的 Log 对象,然后调用其 read() 方法从指定的位置读取数据。

存储层对 Fetch 请求的处理

Log 对象

每个 Replica 会对应一个 log 对象,而每个 log 对象会管理相应的 LogSegment 实例。

read()

该方法会先查找对应的 Segment 对象(日志分段),然后循环直到读取到数据结束,如果当前的日志分段没有读取到相应的数据,那么会更新日志分段及对应的最大位置。

日志分段实际上是逻辑概念,它管理了物理概念的一个数据文件、一个时间索引文件和一个 offset 索引文件,读取日志分段时,会先读取 offset 索引文件再读取数据文件,具体步骤如下:

  1. 根据要读取的起始偏移量(startOffset)读取 offset 索引文件中对应的物理位置;
  2. 查找 offset 索引文件最后返回:起始偏移量对应的最近物理位置(startPosition);
  3. 根据 startPosition 直接定位到数据文件,然后读取数据文件内容;
  4. 最多能读到数据文件的结束位置(maxPosition)。

LogSegment

关乎 数据文件、offset 索引文件和时间索引文件真正的操作都是在 LogSegment 对象中的,日志读取也与这个方法息息相关。

read() in LogSegment

  1. 根据 startOffset 得到实际的物理位置(translateOffset());
  2. 计算要读取的实际物理长度;
  3. 根据实际起始物理位置和要读取实际物理长度读取数据文件。

translateOffset()

  1. 查找 offset 索引文件:调用 offset 索引文件的 lookup() 查找方法,获取离 startOffset 最接近的物理位置;
  2. 调用数据文件的 searchFor() 方法,从指定的物理位置开始读取每条数据,知道找到对应 offset 的物理位置。

offset索引文件

索引文件存储的是简单地索引数据,其格式为:「N,Position」。其中 N 表示索引文件里的第几条消息,而 Position 则表示该条消息在数据文件(Log File)中的物理偏移地址。

数据文件

数据文件就是所有消息的一个列表,而每条消息都有一个固定的格式。

如何读取消息

先在index文件中找到最接近offset的物理地址,这个过程可以用二分的方法查找,然后再根据物理地址从数据文件中读取消息。

produce 请求处理整体流程

在 Producer Client 端,Producer 会维护一个 ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 的变量,然后会根据 topic-partition 的 leader 信息,将 leader 在同一台机器上的 batch 放在一个 request 中,发送到 server,这样可以节省很多网络开销,提高发送效率。

在发送 Produce 的请求里,Client 是把一个 Map<TopicPartition, MemoryRecords> 类型的 produceRecordsByPartition 作为内容发送给了 Server 端,那么 Server 端是如何处理这个请求的呢?

20220407200553

Broker 在收到 Produce 请求后,会有一个 KafkaApis 进行处理,KafkaApis 是 Server 端处理所有请求的入口,它会负责将请求的具体处理交给相应的组件进行处理,从上图可以看到 Produce 请求是交给了 ReplicaManager 对象进行处理了。

Server 端处理

KafkaApis 处理 Produce 请求

KafkaApis 处理 produce 请求是在 handleProducerRequest() 方法中完成,总体来说,处理过程是(在权限系统的情况下):

1. 查看 topic 是否存在,以及 client 是否有相应的 Describe 权限;(Describe 权限:如果用户在指定主题上具有 Describe 权限,则会列出该主题。)

2. 对于已经有 Describe 权限的 topic 查看是否有 Write 权限;

3. 调用 replicaManager.appendRecords() 方法向有 Write 权限的 topic-partition 追加相应的 record。

ReplicaManager

ReplicaManager 顾名思义,它就是副本管理器,副本管理器的作用是管理这台 broker 上的所有副本(replica)。在 Kafka 中,每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。

Kafka Server 在启动的时候,会创建 ReplicaManager 对象,如下所示。在 ReplicaManager 的构造方法中,它需要 LogManager 作为成员变量。

ReplicaManager 的并不负责具体的日志创建,它只是管理 Broker 上的所有分区(也就是图中下一步的那个 Partition 对象)。在创建 Partition 对象时,它需要 ReplicaManager 的 logManager 对象,Partition 会通过这个 logManager 对象为每个 replica 创建对应的日志。

ReplicaManager 与 LogManger 对比如下:

20220407201545

appendRecords() 实现

appendRecords() 的实现主要分为以下几步:

1. 首先判断 acks 设置是否有效(-1,0,1三个值有效),无效的话直接返回异常,不再处理;
2. acks 设置有效的话,调用 appendToLocalLog() 方法将 records 追加到本地对应的 log 对象中;
3. appendToLocalLog() 处理完后,如果发现 clients 设置的 acks=-1,即需要 isr 的其他的副本同步完成才能返回 response,那么就会创建一个 DelayedProduce 对象,等待 isr 的其他副本进行同步,否则的话直接返回追加的结果。

appendToLocalLog() 实现

appendToLocalLog() 的实现如下:

1. 首先判断要写的 topic 是不是 Kafka 内置的 topic,内置的 topic 是不允许 Producer 写入的;
2. 先查找 topic-partition 对应的 Partition 对象,如果在 allPartitions 中查找到了对应的 partition,那么直接调用 partition.appendRecordsToLeader() 方法追加相应的 records,否则会向 client 抛出异常。

Partition.appendRecordsToLeader() 方法

ReplicaManager 在追加 records 时,调用的是 Partition 的 appendRecordsToLeader() 方法。

在这个方法里,会根据 topic 的 min.isrs 配置以及当前这个 partition 的 isr 情况判断是否可以写入,如果不满足条件,就会抛出 NotEnoughReplicasException 的异常,如果满足条件,就会调用 log.append() 向 replica 追加日志。

存储层

log对象

在上面有过一些介绍,每个 replica 会对应一个 log 对象,log 对象是管理当前分区的一个单位,它会包含这个分区的所有 segment 文件(包括对应的 offset 索引和时间戳索引文件),它会提供一些增删查的方法。

在 Log 对象的初始化时,有三个变量是比较重要的:

  1. nextOffsetMetadata:可以叫做下一个偏移量元数据,它包括 activeSegment 的下一条消息的偏移量,该 activeSegment 的基准偏移量及日志分段的大小;
  2. activeSegment:指的是该 Log 管理的 segments 中那个最新的 segment(这里叫做活跃的 segment),一个 Log 中只会有一个活跃的 segment,其他的 segment 都已经被持久化到磁盘了;
  3. logEndOffset:表示下一条消息的 offset,它取自 nextOffsetMetadata 的 offset,实际上就是活动日志分段的下一个偏移量。

日志写入

Server 将每个分区的消息追加到日志中时,是以 segment 为单位的,当 segment 的大小到达阈值大小之后,会滚动新建一个日志分段(segment)保存新的消息,而分区的消息总是追加到最新的日志分段(也就是 activeSegment)中。每个日志分段都会有一个基准偏移量(segmentBaseOffset,或者叫做 baseOffset),这个基准偏移量就是分区级别的绝对偏移量,而且这个值在日志分段是固定的。有了这个基准偏移量,就可以计算出来每条消息在分区中的绝对偏移量,最后把数据以及对应的绝对偏移量写到日志文件中。

日志分段

在 Log 的 append() 方法中,会调用 maybeRoll() 方法来判断是否需要进行相应日志分段操作

是否需要创建新的日志分段,有下面几种情况:

当前日志分段的大小加上消息的大小超过了日志分段的阈值(log.segment.bytes);
距离上次创建日志分段的时间达到了一定的阈值(log.roll.hours),并且数据文件有数据;
索引文件满了;
时间索引文件满了;
最大的 offset,其相对偏移量超过了正整数的阈值。

创建一个 segment 对象,真正的实现是在 Log 的 roll() 方法中,也就是上面的方法中,创建 segment 对象,主要包括三部分:数据文件、offset 索引文件和 time 索引文件。

offset索引文件

这里顺便讲述一下 offset 索引文件,Kafka 的索引文件有下面一个特点:

采用 绝对偏移量+相对偏移量 的方式进行存储的,每个 segment 最开始绝对偏移量也是其基准偏移量;
数据文件每隔一定的大小创建一个索引条目,而不是每条消息会创建索引条目,通过 index.interval.bytes 来配置,默认是 4096,也就是4KB;

是稀疏索引,可以放到内存中加快查找,是有序的,可以使用二分法进行查找。

LogSegment 写入

真正的日志写入,还是在 LogSegment 的 append() 方法中完成的,LogSegment 会跟 Kafka 最底层的文件通道、mmap 打交道。

20220408155359

LogManager的结构

logDir:表示用户配置的日志存放路径,通过log.dir配置,可以配置多个。LogManager会维护一个LogDir的列表。

Log: 每个partition的日志目录,代表topic的一个分区副本。LogManager会维护本broker上所有的Log对象。

LogSegment:partition中的日志段对象,每个Log都会有N个日志段。这个日志段包括了日志文件和对应的索引文件。

LogManager的创建

LogManager,即日志管理组件,在kafka启动时会创建并启动。

LogManager创建后,会先后做两件事

1. 检查日志目录
2. 加载日志目录的文件

检查日志目录

1. 配置的日志目录是否有重复的
2. 日志目录不存在的话就新建一个日志目录
3. 检查日志目录是否可读

加载日志目录的文件

遍历每个日志目录时,会先读取日志检查点文件,然后读取日志目录下的所有文件,然后创建相关的Log对象。需要注意的是,由于加载过程比较慢,对于每个日志目录都会创建一个线程来加载,最后等所有线程都加载完毕后才会退出loadLogs()方法。

此,创建LogManager的过程是阻塞的,当LogManager创建完成后,说明所有的分区目录都加载进来了。

启动LogManager

创建LogManager后,就会立马调用startup()方法启动。

LogManager的启动其实就是提交了4个定时任务:

1. 旧的日志段删除任务
2. 刷盘任务
3. 检查点任务
4. 分区目录删除任务

旧的日志段删除任务

在LogManager启动后,会提交一个周期性的日志段删除任务,用来处理一些超过一定时间以及大小的日志段。

Kafka对于旧日志段的处理方式有两种

删除:超过时间或大小阈值的旧 segment,直接进行删除;
压缩:不是直接删除日志分段,而是采用合并压缩的方式进行。

Kafka删除的检查策略有两种。一种根据时间过期的策略删除过期的日志,一种是根据日志大小来删除太大的日志。

刷盘任务

kafka在处理Producer请求时,只是将日志写到缓存,并没有执行flush()方法刷到磁盘。因此,logManager中开启了一个刷盘任务,定期检查各个目录,根据刷盘策略执行flush操作。这个任务保证了每隔多久kafka会执行一次刷盘操作。

当距离上次刷盘的时间超过了log.config.flushMs时间就会执行一次刷盘,将缓存中的内容持久化到磁盘。但是kafka官方给刷盘频率设置的默认值是Long的最大值,也就是说,kafka官方的建议是把刷盘操作交给操作系统来控制。

另外,这个刷盘任务这是控制指定时间刷盘一次。kafka还有一个关于刷盘的策略是根据日志的条数来控制刷盘频率的,也就是配置flush.messages。这个配置是在每次写日志完检查的,当kafka处理Producer请求写日志到缓存后,会检查当前的offset和之前记录的offset直接的差值,如果超过配置的值,就执行一次刷盘。不过flush.messages的默认值也是Long的最大值。

日志恢复检查点任务

kafka的recovery-checkpoint(检查点)记录了最后一次刷新的offset,表示多少日志已经落盘到磁盘上,然后在异常关闭后恢复日志。

recoveryPoint表示还未刷到磁盘的第一条offset,比如offset=100之前的消息都刷到磁盘中了,那么recoveryPoint就是101。

这个任务做的事情很简单,就是遍历所有的LogDir,然后将内存中维护的recovery-checkpoint写到文件上。

offset-checkpoint的存储

每个LogDir日志目录下,都会有一个文件recovery-point-offset-checkpoint,存放了各个Log(Partiton)当前的checkpoint是多少:

1
2
3
4
5
6
7
0
54
__consumer_offsets 22 0
__consumer_offsets 30 0
__consumer_offsets 8 0
__consumer_offsets 21 0
...

第一行的数字表示当前版本,第二行的数字表示该LogDir目录下有多少个partition目录。接着就是topic partition编号 recovery-checkpoint。

何时刷新recovery-checkpoint

kafka会在每次flush的时候更新对应Log的recovery-checkpoint。但是由于kafka的定时flush默认是交给操作系统来执行的。所以只有在新建一个新的segment时,以及对partition进行truncat时(如果replica的offset比leader还大,replica就要执行一次truncate,把超出的那些offset砍掉),才会更新recovery-checkpoint。

这种情况就会造成日志落盘了很多,但是recovery-checkpoint一直没更新的情况,不过由于recovery-checkpoint只是用来在broker启动时恢复日志用的,这一点倒无关紧要。另外,在正常关闭broker,kafka会保证将最新的offset写入recovery-checkpoint文件中。

如何利用recovery-checkpoint恢复日志

首先,恢复点是异常关闭时用来恢复数据的。如果数据目录下有.kafka_cleanshutdown文件就表示不是异常关闭,就用不上恢复点了。如果上一次关闭时异常关闭的,kafka就会利用checkpoint来修复日志了。

1. 通过检查是否有.kafka_cleanshutdown文件来判断上一次是否是正常关闭,如果是的话,就不用恢复什么了,直接更新recovery-checkpoint。

2. 如果上次是非正常关闭,通过当前的recovery-checkpoint找出这个recovery-checkpoint之后的所有segment(包括recovery-checkpoint所在的segment)。然后遍历这些segment,一条一条消息检查过去,并重建索引,之后如果有segment的消息格式不正确,就执行异步删除操作,将后面的segment全部删除掉。

分区目录删除任务

该任务执行的任务主要是删除分区目录,同时删除底下的segment数据文件。

做的事情主要就是遍历logsToBeDeleted列表,然后遍历删除元素。

那么什么时候分区会被加到logsToBeDeleted中待删除呢?

1. LogManager启动时会扫描所有分区目录名结尾是’-delete’的分区,加入到logsToBeDeleted中

2. 分区被删除的时候走的都是异步删除策略,会先被加入到logsToBeDeleted中等待删除。

在kafka中,要删除分区需要往broker发送StopReplica请求。broker收到StopReplica请求后,判断是否需要删除分区,如果要删除就执行异步删除步骤。