前言
在Spring项目中访问RabbitMQ主要有两种方式。
- 基于原生的rabbitmq-client;
- 借助Spring AMQP;
接下来分别介绍如何用这两种方式实现RabbitMQ生产者和消费者。
1. 原生rabbitmq-client实现
使用原生rabbitmq-client访问RabbitMQ,一定程度更容易理解连接、读写RabbitMQ的过程,但是有许多业务无关的代码
,需要对Connection/Channel进行创建、关闭和管理,不建议实际生产中使用这种方式。
下面简单介绍原生rabbitmq-client实现:
1.1 引入maven jar包
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
1.2 生产者代码
1.2.1 核心API
生产和消费消息都是通过Channel执行操作,生产者的核心API是Channel.basicPublish()
。下面展示了最基础的API(还有些其它封装的API),一般情况下只需要设定exchange
、routingKey
和body
。
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
1.2.2 代码示例
生产者代码示例如下,连接RabbitMQ Broker时需要设置host
、port
、userName
、password
等属性,其生产消息过程主要包括5步:创建Connection
、创建Channel
、创建队列和交换机等
、生产消息
、关闭资源
。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
try {
// 创建connection和channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列和交换机,并将队列绑定到交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct", true, false, false, null);
channel.queueBind(QUEUE_NAME, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish(DIRECT_EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println(String.format(" [Producer] [Exchange=%s] [RoutingKey=%s] Sent '%s'", DIRECT_EXCHANGE_NAME, ROUTING_KEY, message));
// 关闭资源
channel.close();
connection.close();
} catch (IOException ex) {
ex.printStackTrace();
} catch (TimeoutException ex) {
ex.printStackTrace();
}
1.3 消费者代码
1.3.1 核心API
消费者有拉模式
和推模式
两种,推模式
需要通过回调的形式处理消息,推模式
的核心API是Channel.basicConsume()
,下面展示了最基础的API,一般情况下只需要设置queue
、autoAck
、exclusive
和callback
。
public String basicConsume(String queue, final boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
1.3.2 代码示例
消费者代码示例如下,消费消息的过程主要包括6步:创建Connection
、创建Channel
、创建队列和交换机等
、设置消费者属性
、消费队列并设置回调
、关闭资源
。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
try {
// 创建connection和channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列和交换机,并将队列绑定到交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct", true, false, false, null);
channel.queueBind(QUEUE_NAME, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null);
// 设置消费者属性,比如Qos,客户端最多接收不ack的消息数目,当未ack消息达到这个数目后,消费者将阻塞,直至未ack消息数目减少
channel.basicQos(100);
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println(" [Consumer] ConsumerTag: " + consumerTag);
System.out.println(" [Consumer] Received '" + message + "'");
// 这里是自行实现的JsonUtil
System.out.println(" [Consumer] Envelope: " + JsonUtil.objectToString(envelope));
System.out.println(" [Consumer] Properties: " + JsonUtil.objectToString(properties));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// autoAck=false
channel.basicConsume(QUEUE_NAME, false, consumer);
Thread.sleep(30000);
channel.close();
connection.close();
} catch (IOException ex) {
ex.printStackTrace();
} catch (TimeoutException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
2. Spring AMQP知识
Spring AMQP项目将核心Spring概念
应用于基于AMQP
的消息传递解决方案的开发。其包括两部分spring-amqp
是基础抽象,spring-rabbit
是RabbitMQ实现(目前唯一实现)。
2.1 特点
Spring AMQP有如下特点:
- 使用
RabbitTemplate
来发送/接收消息; - 使用
Listener Container
来异步消费消息; - 使用
RabbitAdmin
来创建队列、交换机和绑定等; - 使用CachingConnectionFactory来
创建、缓存和管理Connection/Channel
; - 拥有
重试
、断开重连
等能力;
2.2 重要概念
2.2.1 ConnectionFactory
ConnectionFactory接口用于管理Client与Broker之间的Connection和Channel,具体实现类是CachingConnectionFactory
。
CachingConnectionFactory有两种缓存模式,分别是Connection模式
和Channel模式
。
Channel模式
Channel模式
是CachingConnectionFactory默认模式
,该模式下会对channel进行缓存。
- Channel模式下
只会创建一个Connection
。 - 可以通过
setChannelCacheSize
来调整缓存的channel数目,默认为25;(需要注意缓存大小不代表最大数目) - 该模式下createConnection()返回同一个Connection,Connection.close()方法无效。
Connection模式
Connection模式下Connection和Channel都会进行缓存;
- 可以通过
setConnectionCacheSize
来调整缓存的Connection数目,默认为1;(缓存大小不代表最大数目) - 可以通过
setConnectionLimit
设置最大Connection数目; - 执行createConnection()会新建或者从缓存中获取一个Connection;
connection分离
publisher与consumer最好使用不同的connection,防止消费者阻塞导致生产者也阻塞。
2.2.2 RabbitTemplate
AmqpTemplate接口是关于消息发送/接收的高级别抽象,具体实现类是RabbitTemplate
,也有AsyncRabbitTemplate。
- RabbitTemplate在发送消息时,会创建或从
缓存
中获取Channel。 支持重试
,可以根据需求设置不同重试机制。
2.2.3 自动恢复机制
Rabbitmq客户端从4.0.x版本开始默认支持自动恢复,但Spring AMQP有自己的恢复机制,并且两者间有一些冲突,Spring官方建议禁用amqp-client自动恢复机制。
3. Spring AMQP实现
使用Spring AMQP来访问RabbitMQ也可以根据使用方式不同,分为以下几种方式:
- 原生API;
- 使用bean来管理;
- 使用@EnableRabbit注解来自动注入。
在实际项目中第一种方式基本是不会使用的,一般都会使用Spring特性来读取配置文件、管理Bean等。接下来主要介绍第二种方式,为了便于理解,对原生API形式也进行简单介绍。
3.1 原生API实现
3.1.1 引入maven jar包
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
3.1.2 生产者
生产者代码示例如下主要包括创建CachingConnectionFactory
、使用RabbitAdmin创建队列等
、使用RabbitTemplate发送消息
。
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(25);
// 创建队列、交换机、绑定队列到交换机
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue(QUEUE_NAME, true, false, false, null));
admin.declareExchange(new DirectExchange(DIRECT_EXCHANGE_NAME, true, false, null));
admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null));
// 发送消息,使用convertAndSend可直接传入String类型消息
AmqpTemplate template = new RabbitTemplate(connectionFactory);
String message = "Hello World!";
template.convertAndSend(DIRECT_EXCHANGE_NAME, ROUTING_KEY, message);
System.out.println(String.format(" [Producer] [Exchange=%s] [RoutingKey=%s] Sent '%s'", DIRECT_EXCHANGE_NAME, ROUTING_KEY, message));
3.1.3 消费者
Spring AMQP中可以使用RabbitTemplate.receive()
来消费消息,但这种方式对应拉模式,一般采用对应推模式的listener container
方式来消费消息,对应的类是SimpleMessageListenerContainer
。
消费者代码示例如下,主要包括创建CachingConnectionFactory
、使用RabbitAdmin创建队列等
、使用容器设置消费消息回调
、启动消费者
。
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
// 创建队列和交换机、绑定队列到交换机
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue(QUEUE_NAME, true, false, false, null));
admin.declareExchange(new DirectExchange(DIRECT_EXCHANGE_NAME, true, false, null));
admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, DIRECT_EXCHANGE_NAME, ROUTING_KEY, null));
// 使用容器的方式消费消息
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setPrefetchCount(300);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 使用ChannelAwareMessageListener是为了调用channel对象的ack方法
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
try {
System.out.println(" [Consumer] ConsumerTag: " + message.getMessageProperties().getConsumerTag());
System.out.println(" [Consumer] Received '" + message.getBody() + "'");
System.out.println(" [Consumer] Properties: " + JsonUtil.objectToString(message.getMessageProperties()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
container.setMissingQueuesFatal(false);
// 启动消费者
container.start();
3.2 Bean管理方式实现
(1)修改配置文件,禁用Rabbit自动配置
# 较新版本的springboot中会自动注入rabbitTemplate等bean,通过禁用来使用自定义的bean
spring.autoconfigure.exclude: org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
(2)建立RabbitmqConfiguration类,并添加@Configuration注解,用于注册各种bean
出于方便,使用@ConfigurationProperties
和@Setter
从配置文件读取配置。
@Slf4j
@Configuration
@ConfigurationProperties("rabbitmq")
@Setter
public class RabbitmqConfiguration {
// 这些配置值从配置文件读取
private String host;
private Integer port;
private String virtualHost;
private String userName;
private String password;
private String queueName;
private String directExchangeName;
}
(3)注册ConnectionFactory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.getRabbitConnectionFactory().setUsername(userName);
cachingConnectionFactory.getRabbitConnectionFactory().setPassword(password);
cachingConnectionFactory.getRabbitConnectionFactory().setVirtualHost(virtualHost);
cachingConnectionFactory.getRabbitConnectionFactory().setHost(host);
cachingConnectionFactory.getRabbitConnectionFactory().setPort(port);
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
cachingConnectionFactory.setChannelCacheSize(25);
// 还支持 设置心跳、连接超时时间、connection/channel监听器(连接建立、断开等)、异常日志自定义、connection/channel命名策略等,可根据业务需求合理配置
return cachingConnectionFactory;
}
(4)注册独立的publisher connection factory
避免消费者阻塞导致生产者也阻塞,生成者消费者使用不同connection。
新版Spring AMQP支持getPublisherConnectionFactory()来获取独立ConnectionFactory。
@Bean
public ConnectionFactory publisherConnectionFactory() {
// 实际测试publisherConnectionFactory()与connectionFactory()执行createConnection()返回的是不同connection对象,
// 可见publisherConnectionFactory是独立开来的
return connectionFactory().getPublisherConnectionFactory();
}
(5)注册RabbitAdmin
@Bean
public AmqpAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
(6)注册RabbitTemplate,并设置重试机制
这里使用了官方文档上默认的重试机制
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(publisherConnectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
// 猜测如果RabbitTemplate构造器直接传入参数connectionFactory(),同时此处设为true,效果应该与传入publisherConnectionFactory()一致
// template.setUsePublisherConnection(true);
return template;
}
(7)注册消费者容器,并设置消费回调
@Autowired
private RabbitmqListener rabbitmqListener;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(queueName);
container.setMessageListener(rabbitmqListener);
// 手动ack
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 建议该字段设置为false,防止因为不能访问队列导致消费者启动失败。不能访问原因举例:新connection创建是队列被其它连接占用,且设置了exclusive
container.setMissingQueuesFatal(false);
return container;
}
(8)使用rabbitAdmin创建队列、交换机等
@PostConstruct
public void init() {
AmqpAdmin rabbitAdmin = rabbitAdmin();
// 队列、交换机是否有必要注册为bean,看业务需要。还可以根据需要对rabbitAdmin再进行一次封装,使得能够直接传入队列名进行创建队列
rabbitAdmin.declareQueue(queue());
rabbitAdmin.declareExchange(directExchange());
rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, directExchangeName, queueName, null));
}
@PreDestroy
public void destroy() {
AmqpAdmin rabbitAdmin = rabbitAdmin();
rabbitAdmin.deleteQueue(queueName);
rabbitAdmin.deleteExchange(directExchangeName);
}
@Bean
public Queue queue() {
return new Queue(queueName, true, false, false, null);
}
@Bean(name = "directExchange")
public Exchange directExchange() {
return new DirectExchange(directExchangeName, true, false, null);
}
(9)编写生产者Service
可以根据业务需求封装成具备特定功能的Service,这里出于方便使用了定时任务来发送消息。
@Slf4j
@Service
public class RabbitmqProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
@Qualifier("directExchange")
private Exchange directExchange;
@Autowired
private Queue queue;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(directExchange.getName(), queue.getName(), message);
}
// 需要开启@EnableScheduling
@Scheduled(initialDelay = 3000, fixedDelay = 3000)
public void scheduledSendingMessage() {
String message = "[" + System.currentTimeMillis() + "] Hello, world!";
sendMessage(message);
log.info("[Producer] [Exchange={}] [RoutingKey={}] Sent message={}", directExchange.getName(), queue.getName(), message);
}
}
(10)编写消费者回调函数
为了能够手动ack,继承ChannelAwareMessageListener,并重写onMessage方法。
@Slf4j
@Service
// 实现ChannelAwareMessageListener接口,使用channel对象进行ack
public class RabbitmqListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("[Consumer] Receive message={}", new String(message.getBody()));
log.info("[Consumer] Properties={}", message.getMessageProperties());
channel.basicAck(deliveryTag, false);
} catch (IOException ex) {
log.info("Fail to ack mq message, deliveryTag={}", deliveryTag);
}
}
}
小结
本文主要介绍了Spring AMQP的一些基本知识和用法,并给出了常见的代码实现。
参考网址
https://docs.spring.io/spring-amqp/reference/html/#_preface