LogManager的结构
logDir:表示用户配置的日志存放路径,通过log.dir配置,可以配置多个。LogManager会维护一个LogDir的列表。
Log: 每个partition的日志目录,代表topic的一个分区副本。LogManager会维护本broker上所有的Log对象。
LogSegment:partition中的日志段对象,每个Log都会有N个日志段。这个日志段包括了日志文件和对应的索引文件。
LogManager的创建
LogManager,即日志管理组件,在kafka启动时会创建并启动。
LogManager创建后,会先后做两件事
1. 检查日志目录
2. 加载日志目录的文件
检查日志目录
1. 配置的日志目录是否有重复的
2. 日志目录不存在的话就新建一个日志目录
3. 检查日志目录是否可读
加载日志目录的文件
遍历每个日志目录时,会先读取日志检查点文件,然后读取日志目录下的所有文件,然后创建相关的Log对象。需要注意的是,由于加载过程比较慢,对于每个日志目录都会创建一个线程来加载,最后等所有线程都加载完毕后才会退出loadLogs()方法。
此,创建LogManager的过程是阻塞的,当LogManager创建完成后,说明所有的分区目录都加载进来了。
启动LogManager
创建LogManager后,就会立马调用startup()方法启动。
LogManager的启动其实就是提交了4个定时任务:
1. 旧的日志段删除任务
2. 刷盘任务
3. 检查点任务
4. 分区目录删除任务
旧的日志段删除任务
在LogManager启动后,会提交一个周期性的日志段删除任务,用来处理一些超过一定时间以及大小的日志段。
Kafka对于旧日志段的处理方式有两种
删除:超过时间或大小阈值的旧 segment,直接进行删除;
压缩:不是直接删除日志分段,而是采用合并压缩的方式进行。
Kafka删除的检查策略有两种。一种根据时间过期的策略删除过期的日志,一种是根据日志大小来删除太大的日志。
刷盘任务
kafka在处理Producer请求时,只是将日志写到缓存,并没有执行flush()方法刷到磁盘。因此,logManager中开启了一个刷盘任务,定期检查各个目录,根据刷盘策略执行flush操作。这个任务保证了每隔多久kafka会执行一次刷盘操作。
当距离上次刷盘的时间超过了log.config.flushMs时间就会执行一次刷盘,将缓存中的内容持久化到磁盘。但是kafka官方给刷盘频率设置的默认值是Long的最大值,也就是说,kafka官方的建议是把刷盘操作交给操作系统来控制。
另外,这个刷盘任务这是控制指定时间刷盘一次。kafka还有一个关于刷盘的策略是根据日志的条数来控制刷盘频率的,也就是配置flush.messages。这个配置是在每次写日志完检查的,当kafka处理Producer请求写日志到缓存后,会检查当前的offset和之前记录的offset直接的差值,如果超过配置的值,就执行一次刷盘。不过flush.messages的默认值也是Long的最大值。
日志恢复检查点任务
kafka的recovery-checkpoint(检查点)记录了最后一次刷新的offset,表示多少日志已经落盘到磁盘上,然后在异常关闭后恢复日志。
recoveryPoint表示还未刷到磁盘的第一条offset,比如offset=100之前的消息都刷到磁盘中了,那么recoveryPoint就是101。
这个任务做的事情很简单,就是遍历所有的LogDir,然后将内存中维护的recovery-checkpoint写到文件上。
offset-checkpoint的存储
每个LogDir日志目录下,都会有一个文件recovery-point-offset-checkpoint,存放了各个Log(Partiton)当前的checkpoint是多少:
1 | 0 |
第一行的数字表示当前版本,第二行的数字表示该LogDir目录下有多少个partition目录。接着就是topic partition编号 recovery-checkpoint。
何时刷新recovery-checkpoint
kafka会在每次flush的时候更新对应Log的recovery-checkpoint。但是由于kafka的定时flush默认是交给操作系统来执行的。所以只有在新建一个新的segment时,以及对partition进行truncat时(如果replica的offset比leader还大,replica就要执行一次truncate,把超出的那些offset砍掉),才会更新recovery-checkpoint。
这种情况就会造成日志落盘了很多,但是recovery-checkpoint一直没更新的情况,不过由于recovery-checkpoint只是用来在broker启动时恢复日志用的,这一点倒无关紧要。另外,在正常关闭broker,kafka会保证将最新的offset写入recovery-checkpoint文件中。
如何利用recovery-checkpoint恢复日志
首先,恢复点是异常关闭时用来恢复数据的。如果数据目录下有.kafka_cleanshutdown文件就表示不是异常关闭,就用不上恢复点了。如果上一次关闭时异常关闭的,kafka就会利用checkpoint来修复日志了。
1. 通过检查是否有.kafka_cleanshutdown文件来判断上一次是否是正常关闭,如果是的话,就不用恢复什么了,直接更新recovery-checkpoint。
2. 如果上次是非正常关闭,通过当前的recovery-checkpoint找出这个recovery-checkpoint之后的所有segment(包括recovery-checkpoint所在的segment)。然后遍历这些segment,一条一条消息检查过去,并重建索引,之后如果有segment的消息格式不正确,就执行异步删除操作,将后面的segment全部删除掉。
分区目录删除任务
该任务执行的任务主要是删除分区目录,同时删除底下的segment数据文件。
做的事情主要就是遍历logsToBeDeleted列表,然后遍历删除元素。
那么什么时候分区会被加到logsToBeDeleted中待删除呢?
1. LogManager启动时会扫描所有分区目录名结尾是’-delete’的分区,加入到logsToBeDeleted中
2. 分区被删除的时候走的都是异步删除策略,会先被加入到logsToBeDeleted中等待删除。
在kafka中,要删除分区需要往broker发送StopReplica请求。broker收到StopReplica请求后,判断是否需要删除分区,如果要删除就执行异步删除步骤。