Kafka 副本
replica.lag.time.max.ms = 设置 follower 与 leader 副本同步,用于查看是否延迟过大
follower 会维护一个 lastCaughtTimeMS , 代表最后一次抓住的时间,我们会用 now() 减去这个值,如果大于 replica.lag.time.max.ms 则说明延迟过大
ISR = 与 leader 副本保持较近的副本的集合,包括 leader 副本
也就是说,当leader副本挂掉之后,会从follower 副本中选举出来一个座位leader副本,但是如果follower副本与leader副本数据差距过大,就会导致数据丢失,所以kafka采取的是,从ISR集合中选举leader,从而降低数据丢失的条数
OSR = 与 leader 副本差距较大的副本的集合,当此节点与leader相近时,会加入到ISR集合中来,当然被踢出ISR集合中并不意味着就不被关了,而是不会去在意你的票数等。。还是会和leader保持通信
ISR中只有 leader副本,然后leader副本挂了:
- 等待ISR中的任意一个 follower 副本加回到ISR集合中
- 选择第一个活过来的副本(不一定是ISR集合中的),可能会出现数据丢失的场景
副本同步
follower full leader – > 长轮询
ack
初始情况下,leader follower, hw = 0,leo = 0,remoteLeo = 0(follower 的 leo,若有多个 follower,则存在多个值)
长轮询(阻塞式),会进行阻塞,等待时间取决于replica.fetch.wait.max.ms
- 写入 leader 后,正好 follower发送 fetch
- follower 堵塞在 leader 副本指定的时间内,leader副本收到了数据请求
1、
收到fetch 请求: leader 读取log,更新 leo,把消息内容和当前分区的HW发送给followe副本
followe: 写入日志,更新 leo , 更新hw=min(hw,leo),然后告诉 leader 自己的信息,
leader 更新 remoteLeo,更新HW,然后告诉 follower 更新 hw
1、 唤醒阻塞的 fetch
有数据丢失的风险 min.insync.replicas=1 ,设置 ISR 最小集合数 acks=-1, -1:需要 ISR 中所有的副本返回,安全性最高,性能越低 0:不需要等到 broker 的消息确认(数据丢失风险很大) 1:只要 leader 副本返回就可以
follower 挂掉并且恢复的时候,会调整LEO的值,变成HW的值,但followe 副本是挂在更新hw之前,恢复之后fetch 请求 leader,但是 leader挂了,followe成为 leader,
解决 leader_epoch (epoch,offset) 是一对值,如果发生 leader 重新选举,那么 offset 就是对应新的 leader 写入消息的offset
当 followe 恢复的时候会发起 offsetForLeaderEpochRequest请求,leader 返回对应信息(leader没挂)
挂了,选举 leader 取最大leo
leader挂掉,epoch 变为1,follower 变为 leader ,但是之前已经更新了数据
follower 和 leader epoch 不一致,新的会拿到 小的epoch+1 的值,得到 startOffset,发给 follower
名词简介
leader 选举
从 ISR 获得优先副本
消费者分配分区
partition.assignment.strategy 设置消费者与主题分区的分配策略
- RangeAssignor
- RoundRobinAssignor
- StickyAssignor
副本的分配
均衡的分配在各个 broker 中,同一个broker 中不会存在相同的分区副本(当前目录不允许有重名文件)
kafka log
kafka 的 log 存放在内部主题 __consumer_offset中,初始化的时候这个主题是不存在的,当第一次有消费者消费数据的时候才会进行创建,该 topic 默认有50个分区
在 server.properties 中配置的 log.dir 参数值中,会首先在文件夹下创建 cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint.lock
消息压缩
kafka 实现的压缩方式,是将多条消息进行压缩,生产者发送的压缩数据在 Broker 中也是压缩存储的,消费者消费的到数据也是压缩数据,只有在消费数据之前才会将数据解压,这样就实现了端到端的压缩
通过compression.type来配置,默认值为‘producer’,表示保留生产者使用的压缩方式,‘gzip’,’snappy’,’lz4’。
如果参数配置为 uncompressed, 则表示不进行压缩
日志索引
日志文件分段 大于 log.segment.bytes 默认为 1073741824 即1GB
当前日志分段的时间戳减去当前的系统时间,大于 log.roll.ms或log.roll.hour,前者优先级较高,默认是7天
偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes配置的值。log.index.size.max.bytes 的默认值为10485760,即10MB。
对于非活跃的索引文件,已经不需要在往里面写入,所以设置为只读,而活跃的索引文件还需要进行索引数据的写入,所以设置为读写
bin/kafka-dump-log.sh –files /tmp/kafka-logs/ topic-log-0/00000000000000000000.index
解析文件
为什么快
1、kafka 使用了大量的也缓存,通过操作系统刷新磁盘数据,kafka 也提供参数进行强制刷盘 通过 log.flush.interval.messages、log.flush.interval.ms 等参数来控制。
2、消息顺序追加
3、采用零拷贝:指直接将数据从磁盘文件中复制到网卡设备中,不经过程序处理,减少了从用户态到内核态的切换
数据存储
log.segment.byte 一个日志文件存多少
简单来说 index 文件有两个字段 indexOffset , logset ,表示的是一个范围区间 1476 995 994 499 下一个的 logset 是 上一个的 indexOffset + 1
log 文件有 msgOffset , partition 字段,msg 和 index的offset 对应,通过offset 找到对应的 position , position 代表文件所在位置,这样可以通过二分查找等方法,快速定位到数据
日志清理
- 压缩 同一个 key 消息内容不一样,只会保留最后一次的数据
- 删除
- 大小 : log.retention.bytes
- 时间 : log.retention.hours
默认保留七天