专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

RabbitMQ学习笔记(3)-Spring集成RabbitMQ

前言

在Spring项目中访问RabbitMQ主要有两种方式。

  • 基于原生的rabbitmq-client;
  • 借助Spring AMQP;

接下来分别介绍如何用这两种方式实现RabbitMQ生产者和消费者。

1. 原生rabbitmq-client实现

使用原生rabbitmq-client访问RabbitMQ,一定程度更容易理解连接、读写RabbitMQ的过程,但是有许多业务无关的代码,需要对Connection/Channel进行创建、关闭和管理,不建议实际生产中使用这种方式。
下面简单介绍原生rabbitmq-client实现:

1.1 引入maven jar包

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>

1.2 生产者代码

1.2.1 核心API

生产和消费消息都是通过Channel执行操作,生产者的核心API是Channel.basicPublish()。下面展示了最基础的API(还有些其它封装的API),一般情况下只需要设定exchangeroutingKeybody

    public void basicPublish(String exchange, String routingKey,
                             boolean mandatory, boolean immediate,
                             BasicProperties props, byte[] body)

1.2.2 代码示例

生产者代码示例如下,连接RabbitMQ Broker时需要设置hostportuserNamepassword等属性,其生产消息过程主要包括5步:创建Connection创建Channel创建队列和交换机等生产消息关闭资源

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);

try {
    // 创建connection和channel
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列和交换机,并将队列绑定到交换机
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct", true, false, false, null);
    channel.queueBind(QUEUE_NAME, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null);

    // 发送消息
    String message = "Hello World!";
    channel.basicPublish(DIRECT_EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
    System.out.println(String.format(" [Producer] [Exchange=%s] [RoutingKey=%s] Sent '%s'", DIRECT_EXCHANGE_NAME, ROUTING_KEY, message));

    // 关闭资源
    channel.close();
    connection.close();
} catch (IOException ex) {
    ex.printStackTrace();
} catch (TimeoutException ex) {
    ex.printStackTrace();
}

1.3 消费者代码

1.3.1 核心API

消费者有拉模式推模式两种,推模式需要通过回调的形式处理消息,推模式的核心API是Channel.basicConsume(),下面展示了最基础的API,一般情况下只需要设置queueautoAckexclusivecallback

public String basicConsume(String queue, final boolean autoAck, String consumerTag,
                           boolean noLocal, boolean exclusive, Map<String, Object> arguments,
                           final Consumer callback)

1.3.2 代码示例

消费者代码示例如下,消费消息的过程主要包括6步:创建Connection创建Channel创建队列和交换机等设置消费者属性消费队列并设置回调关闭资源

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);

try {
    // 创建connection和channel
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列和交换机,并将队列绑定到交换机
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct", true, false, false, null);
    channel.queueBind(QUEUE_NAME, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null);

    // 设置消费者属性,比如Qos,客户端最多接收不ack的消息数目,当未ack消息达到这个数目后,消费者将阻塞,直至未ack消息数目减少
    channel.basicQos(100);
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag,
                                      Envelope envelope,
                                      AMQP.BasicProperties properties,
                                      byte[] body) throws IOException {
            String message = new String(body);
            System.out.println(" [Consumer] ConsumerTag: " + consumerTag);
            System.out.println(" [Consumer] Received '" + message + "'");
            // 这里是自行实现的JsonUtil
            System.out.println(" [Consumer] Envelope: " + JsonUtil.objectToString(envelope));
            System.out.println(" [Consumer] Properties: " + JsonUtil.objectToString(properties));

            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    // autoAck=false
    channel.basicConsume(QUEUE_NAME, false, consumer);

    Thread.sleep(30000);
    channel.close();
    connection.close();
} catch (IOException ex) {
    ex.printStackTrace();
} catch (TimeoutException ex) {
    ex.printStackTrace();
} catch (InterruptedException ex) {
    ex.printStackTrace();
}

2. Spring AMQP知识

Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。其包括两部分spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现(目前唯一实现)。

2.1 特点

Spring AMQP有如下特点:

  • 使用RabbitTemplate来发送/接收消息;
  • 使用Listener Container来异步消费消息;
  • 使用RabbitAdmin来创建队列、交换机和绑定等;
  • 使用CachingConnectionFactory来创建、缓存和管理Connection/Channel
  • 拥有重试断开重连等能力;

2.2 重要概念

2.2.1 ConnectionFactory

ConnectionFactory接口用于管理Client与Broker之间的Connection和Channel,具体实现类是CachingConnectionFactory
CachingConnectionFactory有两种缓存模式,分别是Connection模式Channel模式

Channel模式

Channel模式是CachingConnectionFactory默认模式,该模式下会对channel进行缓存。

  • Channel模式下只会创建一个Connection
  • 可以通过setChannelCacheSize来调整缓存的channel数目,默认为25;(需要注意缓存大小不代表最大数目)
  • 该模式下createConnection()返回同一个Connection,Connection.close()方法无效。
Connection模式

Connection模式下Connection和Channel都会进行缓存;

  • 可以通过setConnectionCacheSize来调整缓存的Connection数目,默认为1;(缓存大小不代表最大数目)
  • 可以通过setConnectionLimit设置最大Connection数目;
  • 执行createConnection()会新建或者从缓存中获取一个Connection;

connection分离

publisher与consumer最好使用不同的connection,防止消费者阻塞导致生产者也阻塞。

2.2.2 RabbitTemplate

AmqpTemplate接口是关于消息发送/接收的高级别抽象,具体实现类是RabbitTemplate,也有AsyncRabbitTemplate。

  • RabbitTemplate在发送消息时,会创建或从缓存中获取Channel。
  • 支持重试,可以根据需求设置不同重试机制。

2.2.3 自动恢复机制

Rabbitmq客户端从4.0.x版本开始默认支持自动恢复,但Spring AMQP有自己的恢复机制,并且两者间有一些冲突,Spring官方建议禁用amqp-client自动恢复机制。

3. Spring AMQP实现

使用Spring AMQP来访问RabbitMQ也可以根据使用方式不同,分为以下几种方式:

  • 原生API;
  • 使用bean来管理;
  • 使用@EnableRabbit注解来自动注入。

在实际项目中第一种方式基本是不会使用的,一般都会使用Spring特性来读取配置文件、管理Bean等。接下来主要介绍第二种方式,为了便于理解,对原生API形式也进行简单介绍。

3.1 原生API实现

3.1.1 引入maven jar包

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.0.RELEASE</version>
</dependency>

3.1.2 生产者

生产者代码示例如下主要包括创建CachingConnectionFactory使用RabbitAdmin创建队列等使用RabbitTemplate发送消息

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(25);

// 创建队列、交换机、绑定队列到交换机
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue(QUEUE_NAME, true, false, false, null));
admin.declareExchange(new DirectExchange(DIRECT_EXCHANGE_NAME, true, false, null));
admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null));

// 发送消息,使用convertAndSend可直接传入String类型消息
AmqpTemplate template = new RabbitTemplate(connectionFactory);
String message = "Hello World!";
template.convertAndSend(DIRECT_EXCHANGE_NAME, ROUTING_KEY, message);
System.out.println(String.format(" [Producer] [Exchange=%s] [RoutingKey=%s] Sent '%s'", DIRECT_EXCHANGE_NAME, ROUTING_KEY, message));

3.1.3 消费者

Spring AMQP中可以使用RabbitTemplate.receive()来消费消息,但这种方式对应拉模式,一般采用对应推模式的listener container方式来消费消息,对应的类是SimpleMessageListenerContainer
消费者代码示例如下,主要包括创建CachingConnectionFactory使用RabbitAdmin创建队列等使用容器设置消费消息回调启动消费者

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);

// 创建队列和交换机、绑定队列到交换机
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue(QUEUE_NAME, true, false, false, null));
admin.declareExchange(new DirectExchange(DIRECT_EXCHANGE_NAME, true, false, null));
admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null));

// 使用容器的方式消费消息
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setPrefetchCount(300);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 使用ChannelAwareMessageListener是为了调用channel对象的ack方法
container.setMessageListener(new ChannelAwareMessageListener() {
    @Override
    public void onMessage(Message message, Channel channel) {
        try {
            System.out.println(" [Consumer] ConsumerTag: " + message.getMessageProperties().getConsumerTag());
            System.out.println(" [Consumer] Received '" + message.getBody() + "'");
            System.out.println(" [Consumer] Properties: " + JsonUtil.objectToString(message.getMessageProperties()));

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
});
container.setMissingQueuesFatal(false);
// 启动消费者
container.start();

3.2 Bean管理方式实现

(1)修改配置文件,禁用Rabbit自动配置

# 较新版本的springboot中会自动注入rabbitTemplate等bean,通过禁用来使用自定义的bean
spring.autoconfigure.exclude: org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration

(2)建立RabbitmqConfiguration类,并添加@Configuration注解,用于注册各种bean

出于方便,使用@ConfigurationProperties@Setter从配置文件读取配置。

@Slf4j
@Configuration
@ConfigurationProperties("rabbitmq")
@Setter
public class RabbitmqConfiguration {
    // 这些配置值从配置文件读取
    private String host;
    private Integer port;
    private String virtualHost;
    private String userName;
    private String password;

    private String queueName;
    private String directExchangeName;
}

(3)注册ConnectionFactory

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.getRabbitConnectionFactory().setUsername(userName);
    cachingConnectionFactory.getRabbitConnectionFactory().setPassword(password);
    cachingConnectionFactory.getRabbitConnectionFactory().setVirtualHost(virtualHost);
    cachingConnectionFactory.getRabbitConnectionFactory().setHost(host);
    cachingConnectionFactory.getRabbitConnectionFactory().setPort(port);
    cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
    cachingConnectionFactory.setChannelCacheSize(25);
    // 还支持 设置心跳、连接超时时间、connection/channel监听器(连接建立、断开等)、异常日志自定义、connection/channel命名策略等,可根据业务需求合理配置

    return cachingConnectionFactory;
}

(4)注册独立的publisher connection factory

避免消费者阻塞导致生产者也阻塞,生成者消费者使用不同connection。
新版Spring AMQP支持getPublisherConnectionFactory()来获取独立ConnectionFactory。

@Bean
public ConnectionFactory publisherConnectionFactory() {
    // 实际测试publisherConnectionFactory()与connectionFactory()执行createConnection()返回的是不同connection对象,
    // 可见publisherConnectionFactory是独立开来的
    return connectionFactory().getPublisherConnectionFactory();
}

(5)注册RabbitAdmin

@Bean
public AmqpAdmin rabbitAdmin() {
    return new RabbitAdmin(connectionFactory());
}

(6)注册RabbitTemplate,并设置重试机制

这里使用了官方文档上默认的重试机制

@Bean
public AmqpTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(publisherConnectionFactory());

    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);

    // 猜测如果RabbitTemplate构造器直接传入参数connectionFactory(),同时此处设为true,效果应该与传入publisherConnectionFactory()一致
    // template.setUsePublisherConnection(true);

    return template;
}

(7)注册消费者容器,并设置消费回调

@Autowired
private RabbitmqListener rabbitmqListener;

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(queueName);
    container.setMessageListener(rabbitmqListener);
    // 手动ack
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    // 建议该字段设置为false,防止因为不能访问队列导致消费者启动失败。不能访问原因举例:新connection创建是队列被其它连接占用,且设置了exclusive
    container.setMissingQueuesFatal(false);
    return container;
}

(8)使用rabbitAdmin创建队列、交换机等

@PostConstruct
public void init() {
    AmqpAdmin rabbitAdmin = rabbitAdmin();
    // 队列、交换机是否有必要注册为bean,看业务需要。还可以根据需要对rabbitAdmin再进行一次封装,使得能够直接传入队列名进行创建队列
    rabbitAdmin.declareQueue(queue());
    rabbitAdmin.declareExchange(directExchange());
    rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, directExchangeName, queueName, null));
}

@PreDestroy
public void destroy() {
    AmqpAdmin rabbitAdmin = rabbitAdmin();
    rabbitAdmin.deleteQueue(queueName);
    rabbitAdmin.deleteExchange(directExchangeName);
}

@Bean
public Queue queue() {
    return new Queue(queueName, true, false, false, null);
}

@Bean(name = "directExchange")
public Exchange directExchange() {
    return new DirectExchange(directExchangeName, true, false, null);
}

(9)编写生产者Service

可以根据业务需求封装成具备特定功能的Service,这里出于方便使用了定时任务来发送消息。

@Slf4j
@Service
public class RabbitmqProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Autowired
    @Qualifier("directExchange")
    private Exchange directExchange;

    @Autowired
    private Queue queue;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(directExchange.getName(), queue.getName(), message);
    }

    // 需要开启@EnableScheduling
    @Scheduled(initialDelay = 3000, fixedDelay = 3000)
    public void scheduledSendingMessage() {
        String message = "[" + System.currentTimeMillis() + "] Hello, world!";
        sendMessage(message);
        log.info("[Producer] [Exchange={}] [RoutingKey={}] Sent message={}", directExchange.getName(), queue.getName(), message);
    }
}

(10)编写消费者回调函数

为了能够手动ack,继承ChannelAwareMessageListener,并重写onMessage方法。

@Slf4j
@Service
// 实现ChannelAwareMessageListener接口,使用channel对象进行ack
public class RabbitmqListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        Long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("[Consumer] Receive message={}", new String(message.getBody()));
            log.info("[Consumer] Properties={}", message.getMessageProperties());
            channel.basicAck(deliveryTag, false);
        } catch (IOException ex) {
            log.info("Fail to ack mq message, deliveryTag={}", deliveryTag);
        }
    }
}

小结

本文主要介绍了Spring AMQP的一些基本知识和用法,并给出了常见的代码实现。

参考网址

https://docs.spring.io/spring-amqp/reference/html/#_preface

文章永久链接:https://tech.souyunku.com/36629

未经允许不得转载:搜云库技术团队 » RabbitMQ学习笔记(3)-Spring集成RabbitMQ

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们