Java 中间件:RocketMQ 顺序消息(全局/分区顺序)

Java 中间件:RocketMQ 顺序消息(全局顺序 vs 分区顺序)详解

RocketMQ 的顺序消息(Ordered Message)是解决“消息必须按发送顺序被消费”这一经典问题的核心特性。它严格保证FIFO(First In First Out),广泛应用于电商订单流程(创建→支付→发货)、金融交易撮合、任务调度等场景。

RocketMQ 支持两种顺序消息:

  • 全局顺序消息(Global Order)
  • 分区顺序消息(Partition Order,也称局部顺序消息)

核心原理
生产端通过 MessageQueueSelector 将同一业务标识(Sharding Key,如订单 ID)的消息路由到同一个队列(Queue);消费端使用 MessageListenerOrderly 单线程加锁消费该队列,从而实现严格顺序。

1. 全局顺序 vs 分区顺序对比

维度全局顺序消息分区顺序消息(推荐)
队列数量Topic 只创建 1 个 QueueTopic 可创建多个 Queue(默认推荐 8~16 个)
顺序范围整个 Topic 内所有消息严格 FIFO同一 Sharding Key(分区)内严格 FIFO,不同分区可并行
吞吐量/性能极低(单队列,单线程消费)高(多队列并行消费)
适用场景性能要求不高、全局严格顺序(如证券全市场撮合)绝大多数业务(如同一订单/同一用户消息顺序)
高可用差(单队列单点风险)好(多队列分布在不同 Broker)
实现方式Topic 设置 1 个读写队列使用 sendOrderly() + Sharding Key

生产环境强烈推荐分区顺序。全局顺序仅在极少数极端场景使用,且许多云厂商(如阿里云 TDMQ)已不再推荐或支持单队列全局顺序(高可用考虑)。

2. 生产端:如何发送顺序消息

关键点:

  • 使用 send(Message msg, MessageQueueSelector selector, Object arg) 方法。
  • arg 通常传入业务唯一标识(如 orderId),作为 Sharding Key。
  • Selector 负责将相同 Key 的消息路由到同一 Queue。

Maven 依赖(RocketMQ 5.x / 4.x 通用,推荐最新 5.x)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.0.0</version>  <!-- 或最新版本 -->
</dependency>

完整发送示例(分区顺序)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
        producer.setNamesrvAddr("localhost:9876");  // 或你的 NameServer 地址
        producer.start();

        String[] tags = {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 10; i++) {
            int orderId = i % 3;  // 模拟 3 个订单
            Message msg = new Message("OrderTopic", tags[i % tags.length],
                    ("Order " + orderId + " Step " + i).getBytes());

            // 关键:使用 sendOrderly + MessageQueueSelector
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;           // Sharding Key = orderId
                    int index = id % mqs.size();          // 取模路由到同一 Queue
                    return mqs.get(index);
                }
            }, orderId);   // arg = orderId

            System.out.printf("Send OrderId=%d, Queue=%d, Result=%s%n",
                    orderId, sendResult.getMessageQueue().getQueueId(), sendResult);
        }

        producer.shutdown();
    }
}

说明

  • 相同 orderId 的消息一定会进入同一个 Queue。
  • 不同 orderId 可分布在不同 Queue,实现并行。

3. 消费端:如何实现顺序消费

关键点:

  • 使用 MessageListenerOrderly(而不是 MessageListenerConcurrently)。
  • RocketMQ 会对每个 Queue 加锁,确保单线程消费。
  • 消费失败时推荐返回 SUSPEND_CURRENT_QUEUE_A_MOMENT(暂停当前队列一段时间),避免阻塞整个队列。

消费示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("OrderTopic", "*");  // 订阅 Topic

        // 关键:使用 MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                MessageExt msg = msgs.get(0);  // 顺序消费时通常一次只取一条
                System.out.printf("Thread=%s, QueueId=%d, Msg=%s%n",
                        Thread.currentThread().getName(),
                        msg.getQueueId(),
                        new String(msg.getBody()));

                try {
                    // 业务处理(必须同步、顺序执行)
                    processOrder(new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    // 失败时暂停当前 Queue,重试
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });

        consumer.start();
        System.out.println("Order Consumer Started.");
    }

    private static void processOrder(String content) {
        // 模拟业务处理
        System.out.println("Processing: " + content);
    }
}

4. 注意事项与最佳实践(避坑指南)

  1. Topic 与 Queue 配置
  • 分区顺序:Topic 默认多 Queue(推荐根据业务并发度设置 8~32 个)。
  • 全局顺序:手动创建 Topic 时设置读写队列数 = 1(不推荐生产使用)。
  1. 生产者
  • 必须使用 sendOrderly() 方法。
  • Sharding Key 选择最细粒度(订单ID > 用户ID),避免热点队列。
  • 单一生产者实例保证发送顺序(多实例可能乱序)。
  1. 消费者
  • 必须使用 MessageListenerOrderly
  • 业务处理必须同步,不能异步提交线程池。
  • 消费失败不要抛异常,推荐返回 SUSPEND_CURRENT_QUEUE_A_MOMENT(默认暂停 1 秒,可配置)。
  • 同一个 Consumer Group 只能用于顺序消息或普通消息,不能混用。
  1. 性能与高可用
  • 分区顺序可通过增加 Queue 数提升并发。
  • Broker 故障时可能短暂失序(客户端重连后恢复)。
  • 严格顺序场景建议结合重试次数设置,避免无限阻塞。
  1. RocketMQ 4.x vs 5.x
  • 5.x 进一步优化了顺序消费并发度(gRPC SDK 支持更好)。
  • 原理基本一致,SDK 兼容性良好。
  1. 常见问题
  • 消费乱序 → 检查是否用了 MessageListenerConcurrently
  • 队列热点 → Sharding Key 粒度太粗。
  • 性能差 → 优先用分区顺序而非全局。

5. 适用场景总结

  • 推荐:电商订单全流程、用户行为序列、任务依赖链(分区顺序)。
  • 谨慎:全局严格撮合交易(性能瓶颈大,可考虑其他方案)。
  • 不适合:高吞吐、无顺序要求的场景(用普通消息)。

通过以上代码,你可以快速在 Java 项目中实现 RocketMQ 顺序消息。建议结合 Spring Boot + RocketMQ Starter 进一步封装(@RocketMQMessageListener + consumeMode = ConsumeMode.ORDERLY)。

需要Spring Boot 集成完整示例多线程消费优化与事务消息结合RocketMQ 5.x gRPC 版本,或者Docker 部署测试环境,随时告诉我,我继续给你详细代码和配置!

掌握顺序消息后,你的 RocketMQ 使用将更加精准和可靠。🚀

文章已创建 5103

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部