0%

kafka之LogManager

LogManager的结构

logDir:表示用户配置的日志存放路径,通过log.dir配置,可以配置多个。LogManager会维护一个LogDir的列表。

Log: 每个partition的日志目录,代表topic的一个分区副本。LogManager会维护本broker上所有的Log对象。

LogSegment:partition中的日志段对象,每个Log都会有N个日志段。这个日志段包括了日志文件和对应的索引文件。

LogManager的创建

LogManager,即日志管理组件,在kafka启动时会创建并启动。

LogManager创建后,会先后做两件事

1. 检查日志目录
2. 加载日志目录的文件

检查日志目录

1. 配置的日志目录是否有重复的
2. 日志目录不存在的话就新建一个日志目录
3. 检查日志目录是否可读

加载日志目录的文件

遍历每个日志目录时,会先读取日志检查点文件,然后读取日志目录下的所有文件,然后创建相关的Log对象。需要注意的是,由于加载过程比较慢,对于每个日志目录都会创建一个线程来加载,最后等所有线程都加载完毕后才会退出loadLogs()方法。

此,创建LogManager的过程是阻塞的,当LogManager创建完成后,说明所有的分区目录都加载进来了。

启动LogManager

创建LogManager后,就会立马调用startup()方法启动。

LogManager的启动其实就是提交了4个定时任务:

1. 旧的日志段删除任务
2. 刷盘任务
3. 检查点任务
4. 分区目录删除任务

旧的日志段删除任务

在LogManager启动后,会提交一个周期性的日志段删除任务,用来处理一些超过一定时间以及大小的日志段。

Kafka对于旧日志段的处理方式有两种

删除:超过时间或大小阈值的旧 segment,直接进行删除;
压缩:不是直接删除日志分段,而是采用合并压缩的方式进行。

Kafka删除的检查策略有两种。一种根据时间过期的策略删除过期的日志,一种是根据日志大小来删除太大的日志。

刷盘任务

kafka在处理Producer请求时,只是将日志写到缓存,并没有执行flush()方法刷到磁盘。因此,logManager中开启了一个刷盘任务,定期检查各个目录,根据刷盘策略执行flush操作。这个任务保证了每隔多久kafka会执行一次刷盘操作。

当距离上次刷盘的时间超过了log.config.flushMs时间就会执行一次刷盘,将缓存中的内容持久化到磁盘。但是kafka官方给刷盘频率设置的默认值是Long的最大值,也就是说,kafka官方的建议是把刷盘操作交给操作系统来控制。

另外,这个刷盘任务这是控制指定时间刷盘一次。kafka还有一个关于刷盘的策略是根据日志的条数来控制刷盘频率的,也就是配置flush.messages。这个配置是在每次写日志完检查的,当kafka处理Producer请求写日志到缓存后,会检查当前的offset和之前记录的offset直接的差值,如果超过配置的值,就执行一次刷盘。不过flush.messages的默认值也是Long的最大值。

日志恢复检查点任务

kafka的recovery-checkpoint(检查点)记录了最后一次刷新的offset,表示多少日志已经落盘到磁盘上,然后在异常关闭后恢复日志。

recoveryPoint表示还未刷到磁盘的第一条offset,比如offset=100之前的消息都刷到磁盘中了,那么recoveryPoint就是101。

这个任务做的事情很简单,就是遍历所有的LogDir,然后将内存中维护的recovery-checkpoint写到文件上。

offset-checkpoint的存储

每个LogDir日志目录下,都会有一个文件recovery-point-offset-checkpoint,存放了各个Log(Partiton)当前的checkpoint是多少:

1
2
3
4
5
6
7
0
54
__consumer_offsets 22 0
__consumer_offsets 30 0
__consumer_offsets 8 0
__consumer_offsets 21 0
...

第一行的数字表示当前版本,第二行的数字表示该LogDir目录下有多少个partition目录。接着就是topic partition编号 recovery-checkpoint。

何时刷新recovery-checkpoint

kafka会在每次flush的时候更新对应Log的recovery-checkpoint。但是由于kafka的定时flush默认是交给操作系统来执行的。所以只有在新建一个新的segment时,以及对partition进行truncat时(如果replica的offset比leader还大,replica就要执行一次truncate,把超出的那些offset砍掉),才会更新recovery-checkpoint。

这种情况就会造成日志落盘了很多,但是recovery-checkpoint一直没更新的情况,不过由于recovery-checkpoint只是用来在broker启动时恢复日志用的,这一点倒无关紧要。另外,在正常关闭broker,kafka会保证将最新的offset写入recovery-checkpoint文件中。

如何利用recovery-checkpoint恢复日志

首先,恢复点是异常关闭时用来恢复数据的。如果数据目录下有.kafka_cleanshutdown文件就表示不是异常关闭,就用不上恢复点了。如果上一次关闭时异常关闭的,kafka就会利用checkpoint来修复日志了。

1. 通过检查是否有.kafka_cleanshutdown文件来判断上一次是否是正常关闭,如果是的话,就不用恢复什么了,直接更新recovery-checkpoint。

2. 如果上次是非正常关闭,通过当前的recovery-checkpoint找出这个recovery-checkpoint之后的所有segment(包括recovery-checkpoint所在的segment)。然后遍历这些segment,一条一条消息检查过去,并重建索引,之后如果有segment的消息格式不正确,就执行异步删除操作,将后面的segment全部删除掉。

分区目录删除任务

该任务执行的任务主要是删除分区目录,同时删除底下的segment数据文件。

做的事情主要就是遍历logsToBeDeleted列表,然后遍历删除元素。

那么什么时候分区会被加到logsToBeDeleted中待删除呢?

1. LogManager启动时会扫描所有分区目录名结尾是’-delete’的分区,加入到logsToBeDeleted中

2. 分区被删除的时候走的都是异步删除策略,会先被加入到logsToBeDeleted中等待删除。

在kafka中,要删除分区需要往broker发送StopReplica请求。broker收到StopReplica请求后,判断是否需要删除分区,如果要删除就执行异步删除步骤。

Welcome to my other publishing channels