Java 中间件:RocketMQ 顺序消息(全局顺序 vs 分区顺序)详解
RocketMQ 的顺序消息(Ordered Message)是解决“消息必须按发送顺序被消费”这一经典问题的核心特性。它严格保证FIFO(First In First Out),广泛应用于电商订单流程(创建→支付→发货)、金融交易撮合、任务调度等场景。
RocketMQ 支持两种顺序消息:
- 全局顺序消息(Global Order)
- 分区顺序消息(Partition Order,也称局部顺序消息)
核心原理:
生产端通过 MessageQueueSelector 将同一业务标识(Sharding Key,如订单 ID)的消息路由到同一个队列(Queue);消费端使用 MessageListenerOrderly 单线程加锁消费该队列,从而实现严格顺序。
1. 全局顺序 vs 分区顺序对比
| 维度 | 全局顺序消息 | 分区顺序消息(推荐) |
|---|---|---|
| 队列数量 | Topic 只创建 1 个 Queue | Topic 可创建多个 Queue(默认推荐 8~16 个) |
| 顺序范围 | 整个 Topic 内所有消息严格 FIFO | 同一 Sharding Key(分区)内严格 FIFO,不同分区可并行 |
| 吞吐量/性能 | 极低(单队列,单线程消费) | 高(多队列并行消费) |
| 适用场景 | 性能要求不高、全局严格顺序(如证券全市场撮合) | 绝大多数业务(如同一订单/同一用户消息顺序) |
| 高可用 | 差(单队列单点风险) | 好(多队列分布在不同 Broker) |
| 实现方式 | Topic 设置 1 个读写队列 | 使用 sendOrderly() + Sharding Key |
生产环境强烈推荐分区顺序。全局顺序仅在极少数极端场景使用,且许多云厂商(如阿里云 TDMQ)已不再推荐或支持单队列全局顺序(高可用考虑)。
2. 生产端:如何发送顺序消息
关键点:
- 使用
send(Message msg, MessageQueueSelector selector, Object arg)方法。 arg通常传入业务唯一标识(如 orderId),作为 Sharding Key。- Selector 负责将相同 Key 的消息路由到同一 Queue。
Maven 依赖(RocketMQ 5.x / 4.x 通用,推荐最新 5.x)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version> <!-- 或最新版本 -->
</dependency>
完整发送示例(分区顺序)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr("localhost:9876"); // 或你的 NameServer 地址
producer.start();
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
int orderId = i % 3; // 模拟 3 个订单
Message msg = new Message("OrderTopic", tags[i % tags.length],
("Order " + orderId + " Step " + i).getBytes());
// 关键:使用 sendOrderly + MessageQueueSelector
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg; // Sharding Key = orderId
int index = id % mqs.size(); // 取模路由到同一 Queue
return mqs.get(index);
}
}, orderId); // arg = orderId
System.out.printf("Send OrderId=%d, Queue=%d, Result=%s%n",
orderId, sendResult.getMessageQueue().getQueueId(), sendResult);
}
producer.shutdown();
}
}
说明:
- 相同
orderId的消息一定会进入同一个 Queue。 - 不同
orderId可分布在不同 Queue,实现并行。
3. 消费端:如何实现顺序消费
关键点:
- 使用
MessageListenerOrderly(而不是MessageListenerConcurrently)。 - RocketMQ 会对每个 Queue 加锁,确保单线程消费。
- 消费失败时推荐返回
SUSPEND_CURRENT_QUEUE_A_MOMENT(暂停当前队列一段时间),避免阻塞整个队列。
消费示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*"); // 订阅 Topic
// 关键:使用 MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
MessageExt msg = msgs.get(0); // 顺序消费时通常一次只取一条
System.out.printf("Thread=%s, QueueId=%d, Msg=%s%n",
Thread.currentThread().getName(),
msg.getQueueId(),
new String(msg.getBody()));
try {
// 业务处理(必须同步、顺序执行)
processOrder(new String(msg.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 失败时暂停当前 Queue,重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start();
System.out.println("Order Consumer Started.");
}
private static void processOrder(String content) {
// 模拟业务处理
System.out.println("Processing: " + content);
}
}
4. 注意事项与最佳实践(避坑指南)
- Topic 与 Queue 配置:
- 分区顺序:Topic 默认多 Queue(推荐根据业务并发度设置 8~32 个)。
- 全局顺序:手动创建 Topic 时设置读写队列数 = 1(不推荐生产使用)。
- 生产者:
- 必须使用
sendOrderly()方法。 - Sharding Key 选择最细粒度(订单ID > 用户ID),避免热点队列。
- 单一生产者实例保证发送顺序(多实例可能乱序)。
- 消费者:
- 必须使用
MessageListenerOrderly。 - 业务处理必须同步,不能异步提交线程池。
- 消费失败不要抛异常,推荐返回
SUSPEND_CURRENT_QUEUE_A_MOMENT(默认暂停 1 秒,可配置)。 - 同一个 Consumer Group 只能用于顺序消息或普通消息,不能混用。
- 性能与高可用:
- 分区顺序可通过增加 Queue 数提升并发。
- Broker 故障时可能短暂失序(客户端重连后恢复)。
- 严格顺序场景建议结合重试次数设置,避免无限阻塞。
- RocketMQ 4.x vs 5.x:
- 5.x 进一步优化了顺序消费并发度(gRPC SDK 支持更好)。
- 原理基本一致,SDK 兼容性良好。
- 常见问题:
- 消费乱序 → 检查是否用了
MessageListenerConcurrently。 - 队列热点 → Sharding Key 粒度太粗。
- 性能差 → 优先用分区顺序而非全局。
5. 适用场景总结
- 推荐:电商订单全流程、用户行为序列、任务依赖链(分区顺序)。
- 谨慎:全局严格撮合交易(性能瓶颈大,可考虑其他方案)。
- 不适合:高吞吐、无顺序要求的场景(用普通消息)。
通过以上代码,你可以快速在 Java 项目中实现 RocketMQ 顺序消息。建议结合 Spring Boot + RocketMQ Starter 进一步封装(@RocketMQMessageListener + consumeMode = ConsumeMode.ORDERLY)。
需要Spring Boot 集成完整示例、多线程消费优化、与事务消息结合、RocketMQ 5.x gRPC 版本,或者Docker 部署测试环境,随时告诉我,我继续给你详细代码和配置!
掌握顺序消息后,你的 RocketMQ 使用将更加精准和可靠。🚀