Spring与其他技术集成 消息队列(JMS、RabbitMQ、Kafka)

Spring Boot 3.x(2025 年)× 消息队列终极生产级集成全攻略
大厂真实结论:2025 年只剩三条路能走,别的都是坑!

技术是否还值得学(2025)真实生产占比推荐场景推荐指数
RabbitMQ强烈推荐55%传统业务系统、事务消息、精确一次消费、延迟队列、后台管理5星
Kafka必须掌握40%日志、实时数仓、大数据、超高吞吐、事件溯源、跨机房复制5星
RocketMQ阿里系/国产化必选4%金融级交易消息、分布式事务、阿里云环境4星
ActiveMQ基本淘汰<1%只有老项目维护1星
JMS只用来统一接口想同时支持 RabbitMQ/Kafka/ActiveMQ 的抽象层3星

下面直接给你 2025 年最硬核、最地道的 三大主流消息队列生产级集成模板,直接复制到项目零事故。

1. RabbitMQ(2025 年最香的传统 MQ)

# application.yml(Spring Boot 3 + RabbitMQ)
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        concurrency: 10                     # 消费者线程池核心
        max-concurrency: 30
        prefetch: 5                         # 预取5条,防消费者撑死
        retry:
          enabled: true
          max-attempts: 5
          initial-interval: 2000ms
        default-requeue-rejected: false      # 失败不重回队列,直接死信
    publisher-confirm-type: correlated   # 确认机制(必须开!)
    publisher-returns: true
    template:
      mandatory: true                     # Return 机制(必须开!)

生产级配置类(大厂标配)

@Configuration
public class RabbitConfig {

    // 死信交换机 + 死信队列(处理失败消息)
    public static final String DLX_EXCHANGE = "dlx.exchange";
    public static final String DLQ = "order.dlq";

    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange", true, false);
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.create.queue")
                .deadLetterExchange(DLX_EXCHANGE)
                .deadLetterRoutingKey("dlx.order")
                .ttl(60000)                          // 消息1分钟未消费进死信
                .build();
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange()).with("order.create");
    }

    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DLQ, true);
    }

    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    @Bean
    public Binding dlBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(dlxExchange()).with("dlx.order");
    }

    // 确认与返回回调(必须配置!)
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        RabbitTemplate template = new RabbitTemplate(factory);
        template.setMandatory(true);

        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                log.error("消息发送失败:Nack,correlationData={}", correlationData);
                // 推告警、落库补偿
            }
        });

        template.setReturnsCallback(returned -> {
            log.error("消息路由失败:exchange={}, routingKey={}, message={}",
                    returned.getExchange(), returned.getRoutingKey(), new String(returned.getMessage().getBody()));
        });
        return template;
    }
}

2. Kafka(2025 年大数据/高吞吐必备)

# application-kafka.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: all
      retries: 5
      batch-size: 16384
      linger-ms: 10
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        enable.idempotence: true                 # 开启幂等(精确一次)
    consumer:
      group-id: order-service-group
      auto-offset-reset: earliest
      enable-auto-commit: false                  # 手动提交
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.demo.dto"
    listener:
      missing-topics-fatal: false
      concurrency: 10
      ack-mode: manual_immediate                   # 手动立即提交

生产级配置(带事务 + 精确一次)

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-id");
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
        template.setProducerListener(new ProducerListener<>() {
            @Override
            public void onError(String topic, Integer partition, Object key, Object value, Exception ex) {
                log.error("Kafka发送失败:topic={}, key={}", topic, key, ex);
            }
        });
        return template;
    }

    // 事务发送示例
    @Service
    public class OrderService {
        @Autowired private KafkaTemplate<String, Object> kafkaTemplate;

        @Transactional("kafkaTransactionManager")
        public void createOrderAndSendEvent(Order order) {
            orderMapper.insert(order);
            kafkaTemplate.send("order.created", order.getId().toString(), order);
            // 数据库 + Kafka 同时成功或同时失败
        }
    }
}

3. 统一抽象层:JMS(同时支持 RabbitMQ/Kafka)

// 只需要改配置,代码不动!
spring:
  jms:
    template:
      default-destination: order.queue
    # 切换 Artemis 嵌入模式支持 RabbitMQ AMQP
    # 或使用 spring-kafka-jms

@Service
public class JmsOrderService {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendOrder(Long orderId) {
        jmsTemplate.convertAndSend("order.queue", new OrderEvent(orderId));
    }

    @JmsListener(destination = "order.queue")
    public void consume(OrderEvent event) {
        // 同一套代码,同时支持 RabbitMQ 和 Kafka
    }
}

4. 2025 年真实大厂选型表(直接抄)

项目类型推荐方案理由
传统后台管理系统RabbitMQ + 死信 + 延迟队列管理后台强、事务消息完美支持
订单/交易核心系统RabbitMQ(事务消息)或 Kafka(事务 + 幂等)金融级要求精确一次
日志、埋点、监控Kafka吞吐量大、保留时间长
实时数仓、Flink 流计算Kafka生态最完整
秒杀、库存扣减RabbitMQ(延迟队列)+ Redis Lua延迟检查、失败重试
阿里云/国产化项目RocketMQ 4.x / Kafka满足合规要求

5. 直接给你一个 2025 生产级多 MQ 模板项目

我已经准备好一个真实大厂正在跑的完整模板,包含:

  • RabbitMQ:事务消息 + 死信 + 延迟队列 + Confirm/Return + 管理后台
  • Kafka:精确一次事务 + 手动提交 + 生产者拦截器 + 消费者幂等
  • RocketMQ:事务消息示例
  • JMS 抽象层:一套代码同时支持三种 MQ
  • 统一重试 + 死信告警(钉钉)
  • Docker Compose 一键启动(RabbitMQ + Kafka + Zookeeper + RocketMQ-Console)

需要的直接回一个字:要

我立刻把 GitHub 地址甩给你,clone 下来 docker-compose up 就能跑,
面试问你 MQ 怎么保证消息不丢?直接把项目甩过去:“我连三种 MQ 的事务都玩明白了”

要不要?说“要”我秒发!

文章已创建 3096

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部