叠个甲:下面的内容来自于小白Debug的三期视频。从到尾看了一遍而整理成的文字稿,并加入了mermaid绘图以及补充了一些内容。下方都已经标明了出处
Kafka架构
什么是消息队列
假设你维护了两个服务A和B。B服务每秒只能处理100个消息,但A服务却每秒发出200个消息,有没有办法让B在不被压垮的同时,还能处理掉A的消息?
为了保护B服务,很容易想到在B服务的内存中加入一个队列(说白了其实是个链表)。链表的每个节点就是一个消息,每个节点有一个序号,我们叫它Offset,记录消息的位置。B服务依据自己的处理能力,消费列表里的消息,能处理多少是多少,不断更新已处理消息值。
来不及处理的消息会堆积在内存里,如果B服务更新重启,这些消息就都丢了。这个好解决,将队列挪出来,变成一个单独的进程,就算B服务重启,也不会影响到了队列里的消息。这样一个简陋的队列进程,其实就是所谓的消息队列。
- A服务这样负责发数据到消息队列的角色,就是生产者Producer
- 像B服务这样处理消息的角色就是消费者Consumer。
但这个消息队列属实过于简陋,要对其优化,使其:高性能、高扩展性、高可用
高性能
消息队列里会不断堆积数据,为了提升性能,可以扩展更多的消费者提升消费速度。生产者也可以相对得增加更多,提升了消息队列的吞吐量。
随着生产者和消费者都变多,他们会同时争抢同一个消息队列,抢不到的一方就得等待,这个如何解决?
- 消息分类(垂直扩展):首先是对消息进行分类,一类是一个Topic。然后根据Topic新增队列的数量,生产者将数据按投递到不同的队列中,消费者则根据需要订阅不同的Topic。这样就大大降低了Topic队列的压力。
- 分区拆分(水平扩展):但单个Topic的消息可能还是过多,我们可以将单个队列拆成好几段,每段就是一个分区(Partition),每个消费者负责一个Partition,这样就大大降低了争抢,提升了消息队列的性能。
graph LR
P[Producer]
P -->|Write| T1[TopicUser]
P -->|Write| T2[TopicAudit]
P -->|Write| T3[...]
subgraph Topic
Part1[Partition 1]
Part2[Partition 2]
Part3[Partition 3]
Part4[Partition 4]
end
subgraph ConsumerGroup
C1[Consumer 1]
C2[Consumer 2]
end
T1-.-Part1
T1-.-Part2
T1-.-Part3
T1-.-Part4
Part1 -->|Read| C1
Part2 -->|Read| C1
Part3 -->|Read| C2
Part4 -->|Read| C2
高扩展性
随着Partition变多,若Partition都在同一台机器上的话,就会导致单机负载过高,影响整体系统性能。
于是可以申请更多的机器,将Partition分散部署在多台机器上,这每一台机器就代表一个Broker。通过增加Broker,缓解机器负载过高带来的性能问题。
graph TD
subgraph Cluster
subgraph Broker3 [Broker 3]
P1_3["TopicUser-Part3"]
P2_1["TopicAudit-Part1"]
end
subgraph Broker2 [Broker 2]
P1_2["TopicUser-Part2"]
P2_3["TopicAudit-Part3"]
end
subgraph Broker1 [Broker 1]
P1_1["TopicUser-Part1"]
P2_2["TopicAudit-Part2"]
end
end
T1[TopicUser] -.-> P1_1
T1 -.-> P1_2
T1 -.-> P1_3
T2[TopicAudit] -.-> P2_1
T2 -.-> P2_2
T2 -.-> P2_3
classDef userTopic fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
classDef userPart fill:#e1f5fe,stroke-width:0px;
classDef auditTopic fill:#fff3e0,stroke:#e65100,stroke-width:2px;
classDef auditPart fill:#fff3e0,stroke-width:0px;
class T1 userTopic;
class P1_1,P1_2,P1_3 userPart;
class T2 auditTopic;
class P2_1,P2_2,P2_3 auditPart;
高可用
如果其中一个Partition所在的Broker挂了,那Broker所有Partition的消息都没了,如何优化?
可以给多加几个副本,他们统称Replica:
- Leader和Follower:将它们分为Leader和Follower。
- Leader负责应付生产者和消费者的读写请求,
- Follower只管同步Leader的消息。
- 分散部署:将Leader和Follower分散到不同的Broker上,这样Leader所在的Broker挂了,也不会影响Follower所在的Broker。(鸡蛋没有放在一个篮子内)
- 故障转移:还能从Follower中选举出一个新的Leader,Partition顶上,这样就保证了消息队列高可用。
graph TD
subgraph Broker3
L3[Partition 3 Leader]
F1[Partition 1 Follower]
end
subgraph Broker2
L2[Partition 2 Leader]
F3[Partition 3 Follower]
end
subgraph Broker1
L1[Partition 1 Leader]
F2[Partition 2 Follower]
end
P[Producer] --> L1
P --> L2
P --> L3
L1 -.->|Sync| F1
L2 -.->|Sync| F2
L3 -.->|Sync| F3
classDef p1Leader fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
classDef p1Follower fill:#e1f5fe,stroke-width:0px;
classDef p2Leader fill:#fff3e0,stroke:#e65100,stroke-width:2px;
classDef p2Follower fill:#fff3e0,stroke-width:0px;
classDef p3Leader fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px;
classDef p3Follower fill:#e8f5e9,stroke-width:0px;
class L1 p1Leader;
class F1 p1Follower;
class L2 p2Leader;
class F2 p2Follower;
class L3 p3Leader;
class F3 p3Follower;
ISR与ACKS:平衡性能和可用性
当Leader收到写入的消息时,面临一个两难的选择:
- 追求极致数据安全:Leader等待所有Follower都同步成功,再返回成功。
- 缺点:只要有一个Follower网络慢,整个写入就卡住了,性能极差。
- 追求极致性能:Leader只要自己收到消息,就立马返回成功。
- 缺点:如果Leader刚存完还没来得及同步给Follower就挂了,数据就丢了。
为了平衡这两者,又引入了ISR (In-Sync Replicas) 机制。
ISR是一个集合,只包含那些跟得上Leader同步进度的Follower。如果某个Follower同步太慢,就会被踢出ISR。
配合ISR,Producer可以使用ACKS参数来控制可靠性级别:
- acks=0(极致性能):Producer发完就走,不管Leader有没有收到。丢数据风险最大。
- acks=1(折中方案):只要Leader收到就认为成功。如果Leader挂了且Follower没同步,会丢数据。
- acks=all / -1(极致安全):必须等ISR里所有副本都同步成功,才算成功。
- 这里巧妙的地方在于:它不等待所有副本,只等待ISR里的副本。这样既保证了可靠性(ISR里至少有活着的备份),又不会被那些慢得要死的节点(已被踢出ISR)拖累性能。
graph TD
subgraph ISR ["ISR Set (In-Sync Replicas)"]
L1["Leader (Broker 1)"]
F1["Follower (Broker 2)"]
end
subgraph OSR [Out of Sync]
F2["Follower (Broker 3)"]
end
P[Producer] -->|Write| L1
L1 -->|Fast Sync| F1
L1 -.->|Slow/Stalled Sync| F2
style ISR fill:#e1f5fe,stroke:#01579b,stroke-width:2px
style OSR fill:#ffebee,stroke:#c62828,stroke-width:2px,stroke-dasharray: 5 5
classDef nodeStyle fill:#ffffff,stroke:#333,stroke-width:1px;
class L1,F1,F2 nodeStyle;
持久化和过期策略
假设所有Broker都挂了,那岂不是数据全丢了?
因此,不能光把数据放内存里,还要持久化到磁盘中,若全部Broker都挂了,重启服务后也能从磁盘里读出数据继续工作。
磁盘总是有限的,不可能一直往里写,还要指定保留和丢弃策略。比如磁盘数据超过一定大小,或消息放置超过一定时间就会被清理掉,这就是所谓的Retention Policy。
Consumer Group
到这里还有个问题,按现在的消费方式,每次新增的消费者只能跟着最新的Offset接着消费。如果想让新增的消费者,从某个Offset开始消费呢?
于是引入消费者组的概念,也就是Consumer Group。不同消费者组维护自己的消费进度,互不打搅。
另外消费组内的多个消费节点可以并行消费每个Partition,极大提升了消费的速率。
Rebalance
Partition的数量和Consumer的数量不一定总是相等的。这就涉及到一个核心机制:Rebalance(重平衡)。
- 1对1或1对多:一个Partition只能被同一个Consumer Group内的一个Consumer消费(避免并发顺序问题)。
- 动态调整:当消费者加入或退出时,Kafka会重新分配Partition的所有权,这个过程就是Rebalance。
graph LR
subgraph Before ["场景一:2个消费者,4个Partition"]
C1["Consumer A"]
C2["Consumer B"]
P1[Part 1] -.-> C1
P2[Part 2] -.-> C1
P3[Part 3] -.-> C2
P4[Part 4] -.-> C2
end
subgraph Action ["🚀 Consumer C 加入"]
direction TB
end
subgraph After ["场景二:触发Rebalance,3个消费者"]
C1_new["Consumer A"]
C2_new["Consumer B"]
C3_new["Consumer C"]
P1_new[Part 1] ==> C1_new
P2_new[Part 2] ==> C2_new
P3_new[Part 3] ==> C3_new
P4_new[Part 4] -.-> C3_new
end
Before --> Action
Action --> After
style Action fill:#fff3e0,stroke:#e65100,stroke-dasharray: 5 5
classDef part fill:#e1f5fe,stroke:#01579b;
class P1,P2,P3,P4,P1_new,P2_new,P3_new,P4_new part;
Rebalance 是如何触发和执行的?这里涉及到 Group Coordinator(运行在Broker端)和 Consumer Leader。
sequenceDiagram
box "Group Leader" #e1f5fe
participant C1 as Consumer A (Leader)
end
participant C2 as Consumer B (Member)
participant C3 as Consumer C (New Member)
participant GC as Group Coordinator (Broker)
Note over C1, GC: 阶段1: 触发 Rebalance (JoinGroup)
C3->>GC: 发送 JoinGroup 请求 (我想加入!)
GC->>C1: 心跳响应: RebalanceInProgress
GC->>C2: 心跳响应: RebalanceInProgress
C1->>GC: 发送 JoinGroup 请求
C2->>GC: 发送 JoinGroup 请求
Note over GC: 等待所有成员加入...
GC->>C1: JoinGroup 响应 (你是Leader, 这是当前成员列表)
GC->>C2: JoinGroup 响应 (你是Follower, 等着吧)
GC->>C3: JoinGroup 响应 (你是Follower, 等着吧)
Note over C1, GC: 阶段2: 制定方案 & 同步 (SyncGroup)
Note right of C1: 根据成员列表<br/>制定分区分配方案
C1->>GC: 发送 SyncGroup 请求 (这是分配方案!)
C2->>GC: 发送 SyncGroup 请求 (我的方案呢?)
C3->>GC: 发送 SyncGroup 请求 (我的方案呢?)
GC->>C1: SyncGroup 响应 (Part 1 -> A)
GC->>C2: SyncGroup 响应 (Part 2 -> B)
GC->>C3: SyncGroup 响应 (Part 3,4 -> C)
Note over C1, C3: Rebalance 完成,开始消费
Zookeeper
上述以及引入了很多不同功能的组件,每个组件都有自己的数据和状态,所以还需要有个中心组件去统一维护这些组件的状态信息。
于是引入了Zookeeper,它作为Kafka集群的“大管家”,负责管理集群元数据、选举Controller、管理Topic配置等。
graph TD
ZK[("Zookeeper Cluster")]
subgraph Brokers
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
end
subgraph Controller
C[Active Controller]
end
subgraph Clients
P[Producers]
Cons[Consumers]
end
%% Broker Registration & Health Check
B1 --"1. 注册 & 心跳"--> ZK
B2 --"1. 注册 & 心跳"--> ZK
B3 --"1. 注册 & 心跳"--> ZK
%% Controller Election
B1 -.->|"2. 竞选Controller"| ZK
C --"我是Controller"--> B1
C --"管理元数据"--> ZK
%% Topic & Partition State
ZK --"3. 存储Topic/Partition状态"--> ZK
%% ISR Management
C --"4. 更新ISR信息"--> ZK
%% Consumer Group (Old Kafka versions, now mostly in Broker)
Cons -.->|"5. 记录Offset (旧版)"| ZK
%% Notification
ZK --"6. 节点变动通知"--> C
ZK --"Topic变动通知"--> P
style ZK fill:#fff9c4,stroke:#fbc02d,stroke-width:2px
style C fill:#e1f5fe,stroke:#01579b,stroke-width:2px
Consumer Rebalance 归 Zookeeper 管吗?
- 早期版本 (0.9之前):是的。消费者直接依赖 Zookeeper 监听节点变化来触发 Rebalance。但这导致了严重的“羊群效应”(Herd Effect),Zookeeper 压力山大。
- 现有版本 (0.9之后):不是。Rebalance 逻辑被移交给了 Broker (Coordinator)。消费者通过和 Broker 通信来完成组内分配,Zookeeper 只负责最基础的元数据存储。这大大减轻了 Zookeeper 的负担,也提升了 Rebalance 的性能。
RocketMQ架构
RocketMQ是国产自研的消息队列,目前已经是Apache的顶级项目。
和其他消息队列一样,它接受来自生产者的消息,将消息分类,每一类是一个Topic,消费者根据需要订阅Topic,获取里面的消息。既然都是消息队列,那它们之间有什么区别呢?
RocketMQ的架构,其实参考了Kafka的设计思想,同时又在Kafka的基础上做了一些调整。用一句话总结就是:RocketMQ和Kafka相比,在架构上做了减法,在功能上做了加法。
在架构上做减法
简单回顾下消息队列Kafka的架构:
- Kafka通过多个Topic对消息进行分类
- 为了提升单个Topic的并发性能,将单个Topic拆分为多个Partition
- 为了提升系统扩展性,将多个Partition分别部署在不同Broker上
- 为了提升系统的可用性,为Partition加了多个副本
- 为了协调和管理Kafka集群的数据消息,引入Zookeeper作为协调节点
1. 简化协调节点
Zookeeper在Kafka架构中会和Broker通信,维护Kafka集群信息,一个新的Broker连上Zookeeper后,其他Broker就能立马感知到他的加入。
像这种能在分布式环境下,让多个实例同时获取到同一份信息的服务,就是所谓的分布式协调服务。
但Zookeeper作为一个通用的分布式协调服务,它不仅可以用于服务注册和发现,还可以用于分布式锁、配置管理等场景,但Kafka其实只用到了它的部分功能。
所以RocketMQ直接将Zookeeper去掉,换成了NameServer,用一种更轻量的方式管理消息队列的集群信息。
当然Kafka的开发者后来也意识到了Zookeeper过重的问题,从2.8.0版本就支持移除Zookeeper,通过Broker之间加入一致性算法Raft实现同样的效果,这就是所谓的KRaft或Quorum模式。
2. 简化分区
Kafka会将Topic拆分为多个Partition,用来提升并发性能。在RocketMQ里也一样,将Topic拆分成了多个分区,但换了个名字叫Queue,也就是队列。
- Kafka:Partition会存储完整的消息体。
- RocketMQ:Queue却只存一些简要信息,比如消息偏移Offset。而消息的完整数据,则放到一个叫CommitLog的文件上。通过Offset可以定位到CommitLog上的某条消息。
graph TD
subgraph Kafka ["Kafka Storage Model"]
subgraph K_Topic ["Topic: UserEvents"]
P1["Partition 1 (File)"]
P2["Partition 2 (File)"]
end
Msg1["Message A"] --> P1
Msg2["Message B"] --> P2
style P1 fill:#e1f5fe,stroke:#01579b
style P2 fill:#e1f5fe,stroke:#01579b
end
subgraph RocketMQ ["RocketMQ Storage Model"]
subgraph R_CommitLog ["CommitLog (All Data)"]
CL_Data["Msg A | Msg B | Msg C ..."]
end
subgraph R_Queues ["Consume Queues (Indices)"]
Q1[Queue 1]
Q2[Queue 2]
end
MsgA_In["Message A"] --> CL_Data
MsgB_In["Message B"] --> CL_Data
CL_Data -.->|Dispatch| Q1
CL_Data -.->|Dispatch| Q2
Q1 -->|Pointer| CL_Data
Q2 -->|Pointer| CL_Data
style R_CommitLog fill:#fff3e0,stroke:#e65100
style Q1 fill:#e8f5e9,stroke:#2e7d32
style Q2 fill:#e8f5e9,stroke:#2e7d32
end
- Kafka消费消息,Broker只需要直接从Partition读取消息返回就够了,也就是读一次就够了。
- 而在RocketMQ中,Broker则需要先从Queue上读取到Offset值,再跑到CommitLog上,将完整数据读出来,也就是需要读两次。
那么问题来了,看起来Kafka的设计更高效,为什么RocketMQ不采用Kafka的设计?这就得说一下底层存储了。
Kafka底层存储
Kafka的Partition分区,其实在底层有很多段,也就是Segment组成,每个Segment可以认为就是个小文件。将消息数据写入到Partition分区,本质上就是将数据写入到某个Segment文件下。
机械磁盘顺序写的性能会比随机写快很多,差距高达几十倍。
为了提升性能,Kafka对每个小文件都是顺序写。
- 如果只有一个Segment文件,那写文件的性能会很好。
- 但当Topic变多之后,Topic底下的Partition分区也会变多,对应Partition底下的Segment文件也会变多。
同时写多个Topic底下的Partition,就是同时写多个文件。每个文件内部都是顺序写,但多个Segment存放在磁盘的不同地方,原本顺序写磁盘就可能劣化,变成了随机写,于是写性能就降低了。
graph TD
P[Producer]
subgraph Brocker ["Brocker"]
subgraph File1 ["Partition 1 (内部顺序写)"]
direction LR
SGM1[segment1] -.-> SGM2[segment2] -.-> SGM3[segment3] -.-> SGMm[...]
end
subgraph File2 ["Partition 2 (内部顺序写)"]
direction LR
SGM4[segment1] -.-> SGM5[segment2] -.-> SGM6[segment3] -.-> SGMn[...]
end
subgraph File3 ["Partition ... (内部顺序写)"]
direction LR
SGM7[segment1] -.-> SGM8[segment2] -.-> SGM9[segment3] -.-> SGMx[...]
end
end
P-->|随机写|SGM1
P-->|随机写|SGM4
P-->|随机写|SGM7
RocketMQ的底层存储
为了缓解同时写多个文件带来的随机写问题,RocketMQ索性将单个Broker底下的多个Topic数据,全部写到一个逻辑文件CommitLog上。这就消除了随机写多文件的问题,将所有写操作都变成了顺序写,大大提升了RocketMQ在多Topic场景下的写性能。
flowchart TB
subgraph Queues [Queues]
subgraph Topic3 ["Topic3(Queue)"]
O7[7]
O8[8]
O9[9]
end
subgraph Topic2 ["Topic2(Queue)"]
O4[4]
O5[5]
O6[6]
end
subgraph Topic1 ["Topic1(Queue)"]
O1[1]
O2[2]
O3[3]
end
end
subgraph Disk[Disk]
Head[Disk Head]
subgraph CommitLog ["CommitLog (Single File)"]
S1[segment1]
S2[segment2]
S3[segment3]
Sn[...]
end
end
Producer --> Head
Head-->|全局顺序写|S1
S1-.-|ptr|O2
S2-.-|ptr|O5
S3-.-|ptr|O8
style Disk fill:#eceff1,stroke:#455a64
style CommitLog fill:#fff3e0,stroke:#e65100
style Head fill:#ffcc80,stroke:#ef6c00,stroke-width:2px
3. 简化备份模型
Kafka Partition主从 - 将Partition分散到多个Broker中,并为Partition配置副本,将Partition分为Leader和Follower,也就是主和从。 - 主从Partition之间会建立数据同步,本质上就是同步Partition底下的Segment的文件数据。
由于RocketMQ将Broker上的所有Topic数据写到CommitLog上,所以在备份时直接同步整个CommitLog文件,以Broker为单位区分主从。保持高可用的同时,也大大简化了备份模型。
在功能上做加法
虽然RocketMQ的架构比Kafka简单,但功能却比Kafka要更丰富
- 消息过滤:比如某一批消息,存储的是用户和订单数据,消费想根据VIP等级进一步分类
- Kafka支持通过Topic这一个维度,将数据进行分类,比如订单数据和用户数据是两个不同的Topic。但如果还想再进一步分类呢?在Kafka里,消费者需要消费Topic为用户数据的所有消息,再将VIP6的用户过滤出来
- 而RocketMQ支持对消息打上标记,也就是打Tag,这个是多维度的。消费者能根据Tag过滤所需要的数据,这样消费者就能只获取这部分数据,省下了消费者过滤数据时的资源消耗。
- 支持事务:
- Kafka支持事务,比如生产者发三条消息,ABC这三条消息要么同时发送成功,要么同时发送失败。这确实也叫事务,但跟我们要的不太一样。
- 写业务代码的时候,我们更想要的事务是执行一些自定义逻辑,和生产者发消息,这两件事要么同时成功,要么同时失败。而这正是RocketMQ支持的事务能力。
- 加入延时队列:如果希望消息被投递出去之后,消费者不能立马消费到,而是过一定时间后才消费,也就是所谓的延时消息。这可以用RocketMQ的延时队列实现,而Kafka就得自己实现类似功能,很费劲。
- 加入死信队列:消费消息是有可能失败的,失败后一般可以设置重试。如果多次重试失败,MQ会将消息放到一个专门的队列,方便我们后面单独处理。这种专门存放失败消息的队列,就是死信队列。Kafka原生不支持这个功能,需要我们自己实现消息回溯。
- 消息回溯:Kafka支持通过调整Offset,让消费者从某个地方开始消费。而RocketMQ除了可以调整Offset,还支持调整时间。
性能差异及底层原因
虽然RocketMQ参考了Kafka的设计思路,简化了架构,丰富了功能,看起来RocketMQ更能打。但Kafka却一直也很流行,RocketMQ必然有着不如Kafka的地方。是什么呢?
答案是性能。严格来说是吞吐量。数据显示RocketMQ每秒能处理10万量级的数据,Kafka则是17万。
但这就很奇怪了,为什么RocketMQ参考了Kafka的架构,却不能跟Kafka保持一样的性能呢?
零拷贝
为了防止消息队列的消息在进程崩溃后丢失,一般不会放内存里,而是放磁盘上。那么问题就来了,消息从消息队列的磁盘发送到消费者,过程是怎么样的呢?
消息的发送过程
操作系统分为用户空间和内核空间,程序处于用户空间,而磁盘属于硬件。操作系统本质上是程序和硬件设备的一个中间层。程序需要通过操作系统去调用硬件能力。如果用户想要将数据从磁盘发送到网络,那么就会发生下面这几件事:
- Read:程序发起系统调用read,将磁盘数据从设备👉内核空间的缓冲区。
- Kernel to User:从内核空间的缓冲区👉用户空间。
- Write:程序在发起系统调用write,将数据从用户空间👉Socket发送缓冲区。
- Socket to NIC:再从Socket发送缓冲区👉网卡。
- Send:最终数据就会经过网络到达消费者。
整个过程,本机内发生了两次系统调用,对应四次用户空间和内核空间的切换,以及四次数据拷贝。结果就是同样一份数据来回拷贝
Kafka通过零拷贝技术优化了这个复制的问题。常见的方案有两种,分别是mmap和sendfile。
sequenceDiagram
participant Disk
participant KernelBuf as Kernel Buffer
participant UserBuf as User Buffer
participant SocketBuf as Socket Buffer
participant NIC
Note over Disk, NIC: 传统 I/O 方式
Disk->>KernelBuf: Copy 1 (DMA)
KernelBuf->>UserBuf: Copy 2 (CPU)
UserBuf->>SocketBuf: Copy 3 (CPU)
SocketBuf->>NIC: Copy 4 (DMA)
rect rgb(255, 250, 240)
Note over Disk, NIC: 零拷贝 - sendfile (Kafka)
Disk->>KernelBuf: Copy 1 (DMA)
KernelBuf->>NIC: Copy 2 (DMA)
Note right of KernelBuf: SG-DMA (Gather Copy)
end
rect rgb(240, 248, 255)
Note over Disk, NIC: 零拷贝 - mmap (RocketMQ)
Disk->>KernelBuf: Copy 1 (DMA)
Note right of KernelBuf: 映射到用户空间 (No Copy)
KernelBuf->>SocketBuf: Copy 2 (CPU)
SocketBuf->>NIC: Copy 3 (DMA)
end
mmap是什么
mmap是操作系统内核提供的一个方法,可以将内核空间的缓冲区映射到用户空间。用了它整个发送流程就有了一些变化:
- Map:程序发起系统调用mmap,将磁盘数据从设备拷贝到内核空间的缓冲区。
- Mapping:内核空间的缓冲区映射到用户空间,这里不需要拷贝。
- Write:程序在发起系统调用write数据,从内核空间将缓冲区拷贝到Socket发送缓冲区。
- Copy:再从Socket发送缓冲区拷贝到网卡。
整个过程发生了两次系统调用,对应四次用户空间和内核空间的切换,以及三次数据拷贝。对比之前省下一次内核空间到用户空间的拷贝。
看到这里大家估计也懵了,不是说零拷贝吗,怎么还有三次拷贝?mmap作为一种零拷贝技术,指的是用户空间到内核空间,这个过程不需要拷贝,而不是指数据从磁盘发送到网卡这个过程。零拷贝,这该死的文字游戏。
sendfile是什么
sendfile也是内核提供的一个方法,从名字可以看出就是用来发送文件数据的。
- Sendfile:程序发起系统调用sendfile。
- DMA Copy:内核会将数据从磁盘设备拷贝到内核空间的缓冲区。
- DMA Copy:内核空间缓冲区里的数据可以直接拷贝到网卡。
整个过程发生了一次系统调用,对应两次用户空间和内核空间的切换,以及两次数据拷贝。说好的零拷贝,怎么还有两次拷贝?其实这里的零拷贝指的是0 CPU拷贝。sendfile场景下需要的两次拷贝,都不是CPU直接参与的拷贝,而是DMA控制器在干活,不耽误我们CPU跑程序。
sequenceDiagram
participant Disk
participant KernelBuf as Kernel Buffer
participant SocketBuf as Socket Buffer
participant NIC
Note over Disk, NIC: mmap 方式 (RocketMQ)
Disk->>KernelBuf: Copy (DMA)
Note over KernelBuf: Mapped to User Space
KernelBuf->>SocketBuf: Copy (CPU)
SocketBuf->>NIC: Copy (DMA)
Note over Disk, NIC: sendfile 方式 (Kafka)
Disk->>KernelBuf: Copy (DMA)
KernelBuf->>NIC: Copy (DMA)
Kafka性能为什么更好
RocketMQ使用的是mmap技术,而Kafka使用的是sendfile。Kafka以更少的拷贝次数以及系统内核切换次数,获得了更高的性能。
那为什么RocketMQ不使用sendfile?
- mmap返回的是数据的具体内容,应用层能获取到消息内容并进行一些逻辑处理。
- RocketMQ的一些功能却需要了解具体的消息内容,方便二次投递等,比如将消费失败的消息重新投递到死信队列中。
- 如果RocketMQ使用sendfile,那根本没机会获取到消息内容,也就没办法实现一些好用的功能了。
- sendfile返回的则是成功发送了几个字节数。具体发了什么内容,应用层根本不知道。
- Kafka却没有上述这些功能特性,追求极致性能,正好可以使用sendfile。
除了零拷贝以外,Kafka高性能的原因还有很多,比如批处理、数据压缩啥的,但那些优化手段,RocketMQ也都能借鉴一波,唯独这个零拷贝那是毫无办法。
没有一种架构是完美的,一种架构,往往用于适配某些场景,很难做到既要又要还要。当场景不同,就需要做一些定制化改造,通过牺牲一部分能力去换取另一部分能力。做架构,做到最后都是在做折中。
怎么选?
经常有人问Kafka和RocketMQ到底该怎么选,用哪个?官方点的回答是这个要看场景的。说了等于没说。其实标准只有一个:
- 如果是大数据或者日志场景,比如Spark、Flink、Log等这些关键词的场景,那就用Kafka
- 除此之外尽量用RocketMQ。