延迟消息

延迟等级

官方默认设置了 18 哥延迟等级

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

发送延迟消息:按照默认顺序 1-18 数字就对应上面的延迟时间

Message msg = new Message (TOPIC, TAG, "OrderID199", "ok", getBytes(StandardCharsets.UTF_8));
//设置延迟等级
msg.setDelayTimeLevel(3);
producer.send(msg);

基本原理

延迟消息都会被存储到 RocketMQ 的一个内部 Topic:SCHEDULE_TOPIC_XXXX 中

SCHEDULE_TOPIC_XXXX 共有 18 个 MessageQueue:

  • 对应延迟消息的 18 个等级,根据指定的 DelayTimeLevel 来决定选择哪个 MessageQueue
  • 有一个定时任务,每 100 ms 执行一次判断 SCHEDULE_TOPIC_XXXX Topic 中的 MessageQueue 的消息是否到达延迟时间
  • 若到达延迟时间,将 SCHEDULE_TOPIC_XXXX 中的消息投递到消息最初需要投递的 Topic 之中

为什么不支持任意时间?

RocketMQ 并不支持任意时间的延迟,可能的主要原因就是如果提供任意时间,就会涉及到消息的排序,会有一定的性能损耗

事务消息

RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或失败的消息

image-20241006131642263

基本流程

第一阶段:

  • 发送 Message,Half Message ,即半事务消息
  • 此类型的 Message 是不会被 Consumer 消费的

第二阶段:如果半事务消息投递成功,则会开始执行本地事务

分为如下三种 Case:

  • 本地事务执行成功:会为 Broker 发送 commit 消息,被 commit 过后的 Message 才能被 Consumer 消费
  • 本地事务执行失败:
    • 会为 Broker 发送 rollback 消息,Broker 则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性
    • 如果 Producer 实例或者网络出现问题,Producer 没能及时 de 将本地事务执行的结果通知 Broker,Broker 会通过扫描发现某条 Message 长时间处于半事务状态,Broker 会主动 de 给 Producer 询问此 Message 对应的事务状态

基本设计

采用 2PC 两阶段设计:

image-20241006133353449


将 Message 原本真实的 Topic 和 MessageQueue 进行备份

  • 存入到 PROPERTY_REALTOPC、PROPERTY_REAL_QUEUE_ID 中

将消息投递到一个内部 Topic 中 RMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事务消息

所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue,因为一个 Topic 下只有 1 个 MessageQueue:

  • 这个 Topic 下的所有 Message 就是全局有序的,ta 们会按照先来后到的顺序被消费

如果本地事务执行成功进行 Commit,则将 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的 Topic 中,供后续流程执行

  • 并删除这条 Half Message,但删除也是假删除,只是给 Message 打上一个删除的 tag

如果本地事务执行失败进行 rollback,则直接删除这条 Half Message,但删除也是假删除

如果本地事务吃吃没有返回结果(默认时间 6s),则会触发事务回查机制

  • 执行回查之前需要校验检查次数是否达到了最大值(需要手动设置,没有默认值)
  • 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3 天
  • 如果满足上面条件中的一种 Half Message 会被放进 TRANS_CHECK_MAX_TOPIC Topic 中
  • 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了
  • 在没有达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑
  • 如果回查成功,则删除投递的 Half Message

消息重试

重试时间

消息消费失败后,并不会立即重试,而是一个递增的时间间隔来进行重试的,重试次数默认为 16 次

只比延迟消息的时间间隔等级少了前两个,延迟消息总共有 18 个等级,而消息重试使用了延迟消息的第 3-18 等级

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

基本原理

重试的 Message,RocketMQ 的做法并不是将其投递回原来的 Topic,而是重试队列

每个 ConsumerGroup 都有自己的重试队列:

  • 其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETR% + 消费者组名称
  • 所有在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列和普通队列

消费失败的 Message,Consumer 会将其投回 Broker:

  • 相当于这条 Message 已经被消费掉了,之后重试的只是内容相同,但实际不是同一个的 Message
  • 然后会校验重试的次数,如果达到 16 次,则会进入死信队列,组成为 %DLQ% + 消费者组名称
  • 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列 SCHEDULE_TOPIC_XXXX 中
  • 然后等到了延迟等级对应的时间后,在投递到 ConsumerGroup 所对应的重试队列当中,供后续消费

消息存储

整体架构

RocketMQ 的混合性存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog中)

针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构

Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中

image-20241006191349416


核心步骤:

  1. 首先,生产者根据 topic 发送消息,消息存储在 commitLog中,1 G一个文件,当文件满了,写入下一个文件
  2. 其次,ReputMessageService 重写消息服务执行 2 个分发操作:
    • 创建 ConsumerQueue 逻辑消费队列:
      • 参数:commitLogOffset 物理偏移量、msgSize 消息长度、tagsCode tag 哈希
    • 创建 IndexFile 索引文件:
      • 以创建时的时间戳命名
  3. 最后,消费者根据 topic、tag 拉取消息消费,根据 key 查询消息

重要文件

commitLog 消息日志:

  • 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容

consumequeue 逻辑消费队列:

  • 存储了 commitLog 的起始物理 offset,目的是提高消费的性能

indexFile 索引文件:

  • 提供了一种可以通过 key 或者时间区间来查询消息的方法

consumequeue 文件:

consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为:

  • 8 字节的 commmitLog 物理偏移量
  • 4 字节 的消息长度
  • 8 字节 tag hashcode

单个文件由 30w 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M

  • 默认一个 topic 对应 4 个 queueId,即 4 个 messageQueue

每个 messageQueue 文件夹下有多个 consumeQueue,所以:messageQueue 1 :N consumeQueue

image-20241006192406854

通信机制

通信架构图

image-20241006192645688

基本通讯流程如下:

  • Broker 启动后需要完成一次将自己注册至 NameServer 的操作,随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息
  • 消息生成者 Producer 作为客户端发送消息时,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息
    • 如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息
  • 消息生产者 Producer 根据获取的路由信息选择一个队列(MessageQueue) 进行消息发送
    • Broker 作为消息的接收者接收消息并落盘存储
  • 消息消费者 Consumer 根据获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费

为了实现客户端与服务器之间高效的数据请求与接收:

  • RocketMQ-Remoting 包自定义了通信协议并在 Netty 的基础之上扩展了通信模块