脚本宝典收集整理的这篇文章主要介绍了Kafka与RocketMq文件存储机制对比,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。
kafka文件结构和rocketMQ文件结构是什么样子?特点是什么?
Kafka以partITion为单元分片存储消息
Kafka部分名词解释如下:
为方便理解以单broker为例,假设建立一个broker建立的topic是kafka-topic-01,partition数量是3, 会形成以下目录
#1、分区目录文件
drwxr-x--- 2 root root 4096 Jul 26 19:35 kafka-topic-01-0
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-1
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-2
分为三个文件
#2、分区目录中的日志数据文件和日志索引文件
-rw-r----- 1 root root 512K Jul 24 19:51 00000000000000000000.index
-rw-r----- 1 root root 1.0G Jul 24 19:51 00000000000000000000.log
-rw-r----- 1 root root 768K Jul 24 19:51 00000000000000000000.timeindex
-rw-r----- 1 root root 512K Jul 24 20:03 00000000000022372103.index
-rw-r----- 1 root root 1.0G Jul 24 20:03 00000000000022372103.log
-rw-r----- 1 root root 768K Jul 24 20:03 00000000000022372103.timeindex
-rw-r----- 1 root root 512K Jul 24 20:15 00000000000044744987.index
-rw-r----- 1 root root 1.0G Jul 24 20:15 00000000000044744987.log
-rw-r----- 1 root root 767K Jul 24 20:15 00000000000044744987.timeindex
-rw-r----- 1 root root 10M Jul 24 20:21 00000000000067117761.index
-rw-r----- 1 root root 511M Jul 24 20:21 00000000000067117761.log
-rw-r----- 1 root root 10M Jul 24 20:21 00000000000067117761.timeindex
@H_653_126@
字段名 |
说明 |
relativeOffset(4) |
相对偏移量,相对baseOffset来说 |
position(4) |
物理地址,日志文件中的物理地址 |
如offset的值是368772
1.根据offset找到所在的segment,根据二分查找,找到消息所在的log文件0000000000000368769.log和索引文件0000000000000368769.index
2.计算下差368772-368769=3,在索引文件中也是二分查找,定位到是<3,497>记录,即对应的物理位置是497,从而找到消息
3.根据物理位置497在0000000000000368769.log文件找到消息。
根据指定的时间戳查找偏移量信息
字段名 |
说明 |
timestamp(8) |
当前日志分段最大时间戳 |
relativeOffset(4) |
时间戳对应的相对偏移量 |
rocketMQ把所有topic中的消息都commitLog中
存储的文件主要分为:
文件地址:${user.home} Store${commitlog}${fileName}
文件地址:${storeRoot}consumequeue${topicName}${queueId}${fileName}
字段名 |
说明 |
offset(8) |
commitlog的偏移量 |
size(4) |
commitlog消息大小 |
tagHashCode |
tag的哈希值 |
文件地址:${user.home}storeindex${fileName}
索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
0.9之前老版本
消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector】,通过配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:
[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed APR 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
保存方式:
consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset,该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同
broker 存放 offset 是 kafka 从 0.9 版本开始
存储位置:
consumer 默认将 offset 持久化保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
提交offset分为:自动提交和手动提交
保存方式:
消费者正常运行,除了持久化一份消费offset到broker中,还会在内存中保存一份消费进度offset,所以当消费者都正常运行时__consumer_offsets使用的比较少。当消费者崩溃或者balance时,会从broker中拉取最后一次消费offset。
集群模式:topic中的一条消息只会同一个消费者组中的一个消费者消费,不会被多个消费者消费
对offset的管理分为本地模式和远程模式。本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。
集群模式使用的是远程模式。
存储位置:
ocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json
{
"offsetTable": {
"topic-name@consumer-group": {
"0": 88526,
"1": 88528
}
}
}
保存方式:
定时持久化到broker磁盘ConsumerOffset.json
consumer从broker拉取消息后,Broker更新消费进度,仅仅是更新了内存中的offsetTable表,并没有涉及到ConsumerOffset.json这个文件。broker启动时会启动一个定时任务(默认5秒),来定时把消费offset持久化到磁盘consumerOffset.json,保存的过程是先将原来的文件存到ConsumerOffset.json.bak文件中,然后将新的内容存入ConsumerOffset.json文件
广播模式:一条消息会被每个消费者消费
当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。
日志删除任务会定时(默认5分钟执行一次)检查是否有保留时间超过设定阈值(默认保存7天)可删除的segment文件。
日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments,参考下图所示
基于日志大小的保留策略与基于时间的保留策略类似,其首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。查找出deletableSegments之后就执行删除操作
该删除策略具体是删除某日志分段的下一个日志分段的baseOffset小于等于logStartOffset的部分。
Producer 端压缩、Broker 端保持、Consumer 端解压缩
在Kafka中,压缩可能发生在两个地方:生产者端和Broker端。broker端保存的也是压缩的消息,传输到consumer端再进行解压缩
在吞吐量方面:LZ4 > Snappy > zstd / GZIP
即所有的Topic下的消息队列共用同一个CommitLog的日志数据文件。感觉这样会增加随机读的概率,可以学着kakfa按topic隔离。
消息写入时,每次都回去去mappedFileQueue中去拿mappedfile。而这个mappedfile是由后台运行的AllocateMappedFileService服务线程去创建和预分配的。这样下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟
我们拿到mmapedfile文件,可能pagecache中还是出现页数据不存在的情况,所以rocketmq增加了预热
有一个warmMappedFile方法,它会把当前映射的文件,每一页遍历多去,写入一个 0 字节,然后再调用MLock 和 madvise(MADV_WILLNEED)。
mlock:可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到 swap 空间。
madvise:给操作系统建议,说这文件在不久的将来要访问的,因此,提前读几页可能是个好主意
通过哪些I/O机制来访问index和segment文件呢?可以分为写和读两块:
写(生产)消息:
读(消费)消息:
写(生产)消息:
读(消费)消息:
普通读文件过程
大体流程如下:
这里简单说一下为啥要拷贝到进程中:进程之间是相互隔离的,而且在常规操作下进程无法访问内核数据,所以得将 cache 拷贝到进程当中,给进程使用。
Mmap映射
没有数据拷贝,映射的是数据地址
mmap 把文件映射到用户空间里的虚拟内存,省去了从内核缓冲区复制到用户空间的过程,文件中的位置在虚拟内存中有了对应的地址,可以像操作内存一样操作这个文件,相当于已经把整个文件放入内存。mmap 在完成了 read、write 相同效果的同时不仅省去了内核到进程的内存拷贝过程,而且还可以实现数据的共享操作:一个文件可以同时被多个进程、内核映射,如果映射的文件被内核或其他进程修改,那么最终的结果也会反映到映射当中。
PageCache是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache
如果一次读取文件时出现未命中(cache miss)PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取(ps:顺序读入紧随其后的少数几个页面)。这样,只要下次访问的文件已经被加载至PageCache时,读取操作的速度基本等于访问内存
二、文件写入
OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于文件的顺序读写操作来说,读和写的区域都在OS的PageCache内,此时读写性能接近于内存。不是顺序写,当pageCache中发现漏页,还是会去吧磁盘中数据拉到pageCache再写
kafka消费的时候使用了零拷贝的sendfile。pagecache数据不经过内核切换直接拷贝到socket buffer。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少 为2次。根据测试结果,可以提高60%的数据发送性能。
https://t1mek1ller.github.io/2019/11/13/kafka-rocketmq-storage/
https://tech.meituan.COM/2015/01/13/kafka-fs-design-theory.html
swap 空间: swap space是磁盘上的一块区域,可以是一个分区,也可以是一个文件,或者是他们的组合。当RAM满了后,并且需要更多内存空间时,使用磁盘空间代替RAM空间
以上是脚本宝典为你收集整理的Kafka与RocketMq文件存储机制对比全部内容,希望文章能够帮你解决Kafka与RocketMq文件存储机制对比所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。