什么是RabbitMQ
RabbitMQ拥有数万用户,是最流行的开源消息代理之一。从T-Mobile到Runtastic,RabbitMQ在全球的小型初创企业和大型企业中都有使用。
RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联邦配置中,以满足高规模、高可用性的需求。
RabbitMQ运行在许多操作系统和云环境中,为大多数流行语言提供了广泛的开发工具。
以上内容翻译自官网 官网地址:www.rabbitmq.com/
什么是延时队列
延迟队列也是一个消息队列,只是它是一个带延迟功能的消息队列。简单说,在实际开发中,对于某些业务场景,我们需要让队列中的消息在指定时间时候才被消费者消费,例如:
- 场景一:订单下单之后一定时间未支付会自动取消订单
- 场景二:涉及到T+d(工作日延迟)或者D+d(自然日延迟)等延迟交付的场景
- 场景三:新用户注册之后一个月没有下单,发个短信勾引一波
可能有人会问,以上情况,我起个定时任务轮询处理,也能达到目的。是的,确实能达到目的,但是如果在该类业务数据量大的情况,处理起来就会十分麻烦,对服务器造成较大压力,并且轮询会有较大误差产生。如果使用延时队列来完成可以避免此类问题。
rabbitmq本身是不直接支持延时队列的,RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现:
1、 TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间
2、 DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
以此达到延时效果。
安装RabbitMQ
本文以linux环境安装为例:
因为rabbitmq使用erlang编写,所以需要erlang环境
安装erlang环境
1、 ##### 安装GCC GCC-C++ Openssl #####
> yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
2、 ##### 安装ncurses #####
> yum -y install ncurses-devel
3、 安装erlang
> wget http://erlang.org/download/otp_src_18.2.1.tar.gz
> tar xvfz otp_src_18.2.1.tar.gz
> ./configure
> make install
>
安装RabbitMQ
这种方式安装之后文件为xz格式,需要对xz解压
xz -d rabbitmq-server-generic-unix-3.8.3.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.8.3.tar
如果没有xz解压工具,需要先安装xz工具
yum install xz
将解压后的文件复制到/usr/local/下面
cp -r rabbitmq_server-3.8.3 /usr/local/rabbitmq
编辑profile文件vim /etc/profile
添加如下内容
export PATH=/usr/local/rabbitmq/sbin:$PATH
执行source /etc/profile
使更改生效
开启rabbitmq管理方式
rabbitmq-plugins enable rabbitmq_management #启动后台管理界面
rabbitmq-server -detached #后台运行rabbitmq
guest用户默认不能访问,需要修改权限
添加用户: rabbitmqctl add_user admin admin
添加权限: rabbitmqctl set_permissions -p “/” pwd123 “.” “.” “.*”
修改用户角色: rabbitmqctl set_user_tags admin administrator
到此使用访问 ip:15672可以看到如下界面
使用新建的用户登录成功。到此rabbitmq安装完成。如果使用阿里云服务器,记得开放安全组端口信息。
安装延时插件
cd /usr/local/rabbitmq/plugins
进入插件目录
下载rabbiimq延时插件
找到rabbitmq_delayed_message_exchange插件选择版本
右键.ez格式复制链接地址获得下载地址
在当前目录执行
wget github.com/rabbitmq/ra… bbitmq_delayed_message_exchange-3.8.0.ez
启用延时插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
验证插件安装结果
看到type有可选为x-delayed-message类型,即表明延时插件安装成功。
springboot2.0+使用RabbitMQ
进入spring官网,通过官网生成springboot工程结构。ps:使用idea也可以,本文介绍spring官网形式
本文已经选好了后面将要使用到的依赖,点击下方generate生成工程,解压到java工作环境,使用idea打开即可。
简单队列
1、 添加rabbitmq配置文件信息
spring.application.name=rabbitmq-delay-demo
server.port=8081
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=用户
spring.rabbitmq.password=连接密码
2、 编写rabbitmq配置类
package com.yezi.rabbitmqdelaydemo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
/**
* 初始化一个测试队列
*
* @return
*/
@Bean
public Queue helloQueue() {
return new Queue("test.mq");
}
}
3、 编写消息发送类
package com.yezi.rabbitmqdelaydemo.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "测试发送消息 " + LocalDateTime.now();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("test.mq", context);
}
}
4、 编写消息接收类
package com.yezi.rabbitmqdelaydemo.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "test.mq")
public class Receiver {
@RabbitHandler
public void process(String context) {
System.out.println("Receiver : " + context);
}
}
5、 测试发送
package com.yezi.rabbitmqdelaydemo;
import com.yezi.rabbitmqdelaydemo.mq.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitmqDelayDemoApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.send();
}
}
6、 测试结果
以上简单消息测试成功。
延时队列
1、 编写配置类
package com.yezi.rabbitmqdelaydemo.config;
import com.yezi.rabbitmqdelaydemo.mq.delayed.DelayedTopic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqDelayedConfig {
/**
* 初始化延时队列
*
* @return
*/
@Bean
public Queue delayedQueue() {
return new Queue(DelayedTopic.DELAYED_QUEUE_NAME);
}
/**
* 定义一个延迟交换机
*
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DelayedTopic.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
/**
* 绑定队列到这个延迟交换机上
*
* @param queue
* @param customExchange
* @return
*/
@Bean
public Binding bindingNotify(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAYED_ROUTING_KEY).noargs();
}
}
编写关键字定义类
package com.yezi.rabbitmqdelaydemo.mq.delayed;
public interface DelayedTopic {
String DELAYED_EXCHANGE_NAME = "delay_exchange";
String DELAYED_QUEUE_NAME = "delayed.queue";
String DELAYED_ROUTING_KEY = "delayed.queue.routingkey";
}
2、 编写消息发送者
package com.yezi.rabbitmqdelaydemo.mq.delayed;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void delayedMessage() {
String context = "test delay message";
System.out.println("Send time: " + LocalDateTime.now() + " Send: " + context);
//延时时间6秒
rabbitTemplate.convertAndSend(DelayedTopic.DELAYED_EXCHANGE_NAME, DelayedTopic.DELAYED_ROUTING_KEY, context, a -> {
a.getMessageProperties().setDelay(6000);
return a;
});
}
}
3、 编写消息消费者
package com.yezi.rabbitmqdelaydemo.mq.delayed;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
@Component
public class DelayedReceiver {
@RabbitListener(queues = DelayedTopic.DELAYED_QUEUE_NAME)
public void receive(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
System.out.println("Received time: " + LocalDateTime.now() + " Received: " + s);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
4、 测试发送
package com.yezi.rabbitmqdelaydemo.controller;
import com.yezi.rabbitmqdelaydemo.mq.delayed.DelayedSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private DelayedSender delayedSender;
/**
* 测试发送延时消息
*
* @return
*/
@GetMapping("/delayedSender")
public String delayedSender() {
delayedSender.delayedMessage();
return "ojbk";
}
}
1、 验证结果
至此,可以看到,6秒之后接收到消息,延时消息发送成功与接收成功。
SCS方式实现延时队列
什么是SCS
SCS是Spring Cloud Stream的缩写,Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
SCS延时队列
1、 编写通道配置
package com.yezi.scsdemo.mq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface DelayedSink {
String OUTPUT = "delayed-topic-output";
String INPUT = "delayed-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
2、 编写消息发送者
package com.yezi.scsdemo.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class DelayedSender {
@Autowired
private DelayedSink delayedSink;
public void delayedMessage() {
String context = "test delay message";
System.out.println("Send time: " + LocalDateTime.now() + " Send: " + context);
//延时时间20秒
delayedSink.output().send(MessageBuilder.withPayload(context).setHeader("x-delay", 20000).build());
}
}
3、 编写消息消费者
package com.yezi.scsdemo.mq;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@EnableBinding(DelayedSink.class)
@Component
public class DelayedReceiver {
@StreamListener(DelayedSink.INPUT)
public void receive(String message) {
System.out.println("Received time: " + LocalDateTime.now() + " Received: " + message);
}
}
这里面涉及到几个SCS的核心注解,具体的内容可以去这篇博客查阅,这里不做展开
> [blog.didispace.com/spring-clou…][blog.didispace.com_spring-clou] 作者:程序猿DD
4、 修改配置文件application.properties
添加如下信息:
server.port=8080
spring.application.name=rabbitmq-delay-scs-demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=账号
spring.rabbitmq.password=密码
#以下为生产者端配置
#将发送者队列绑定到指定交换机
spring.cloud.stream.bindings.delayed-topic-output.destination=delayed-topic-demo
#开启延时,生产者和消费者端都需要开启这个配置
spring.cloud.stream.rabbit.bindings.delayed-topic-output.producer.delayed-exchange=true
#以下为消费者端配置
#将消费者队列绑定到指定交换机
spring.cloud.stream.bindings.delayed-topic-input.destination=delayed-topic-demo
#消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息
spring.cloud.stream.bindings.delayed-topic-input.group=group-1
#开启延时,生产者和消费者端都需要开启这个配置
spring.cloud.stream.rabbit.bindings.delayed-topic-input.consumer.delayed-exchange=true
5、 这里贴一个pom文件依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
6、 测试发送
package com.yezi.scsdemo.controller;
import com.yezi.scsdemo.mq.DelayedSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private DelayedSender delayedSender;
/**
* 测试发送延时消息
*
* @return
*/
@GetMapping("/scs/delayedSender")
public String delayedSender() {
delayedSender.delayedMessage();
return "scs ojbk";
}
}
1、 验证结果
这里看到已经有一条消息进来了,由于我们发送时设置延迟时间为20秒,等待20秒之后,在idea控制台查看信息
可以看到20秒之后,消息被消费了,以上SCS延时消息测试成功。
小结
综合来看,SCS相比使用配置类的形式实现延时消息,代码更加简洁,无需做交换机的绑定操作,只需要在properties或者yml文件中通过配置的形式实现队列与交换机的绑定,开发者在使用SCS的时候可以将更多的精力放到业务上!