外观
** 初识MQ **
** ** 同步和异步通讯 微服务间通讯有同步和异步两种⽅式:
同步通讯:就像打电话,需要实时响应。
异步通讯:就像发邮件,不需要⻢上回复。** **

两种⽅式各有优劣,打电话可以⽴即得到响应,但是你却不能跟多个⼈同时通话。发送邮件可以同时与多个⼈收发邮 件,但是往往响应会有延迟。
** 同步通讯 **
我们之前学习的 Feign 调⽤就属于同步⽅式,虽然调⽤可以实时得到结果,但存在下⾯的问题: 微服务间基于 Feign 的调⽤就属于同步⽅式,存在⼀些问题。

** 总结:**
** 同步调⽤的优点: **
时效性较强,可以⽴即得到结果
**同步调⽤的缺点: **
耦合度⾼ 性能和吞吐能⼒下降
有额外的资源消耗 有级联失败问题
** 异步通讯 **
异步调⽤则可以避免上述问题:
我们以购买商品为例,⽤⼾⽀付后需要调⽤订单服务完成订单状态修改,调⽤物流服务,从仓库分配响应的库存并准 备发货。
在事件模式中,⽀付服务是事件发布者( publisher ),在⽀付完成后只需要发布⼀个⽀付成功的事件( event ),事 件中带上订单 id 。 订单服务和物流服务是事件订阅者( Consumer ),订阅⽀付成功的事件,监听到事件后完成⾃⼰业务即可。
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,⽽是有⼀个中间⼈( Broker )。发布者发布事件到Broker ,不关⼼谁来订阅事件。订阅者从 Broker 订阅事件,不关⼼谁发来的消息。 异步调⽤常⻅实现就是事件驱动模式

优势⼀:服务解耦
优势⼆:性能提升,吞吐量提⾼
优势三:服务没有强依赖,不担⼼级联失败问题
优势四:流量削峰

好处:
吞吐量提升:⽆需等待订阅者处理完成,响应更快速
故障隔离:服务没有直接调⽤,不存在级联失败问题 调⽤间没有阻塞,不会造成⽆效的资源占⽤
耦合度极低,每个服务都可以灵活插拔,可替换
流量削峰:不管发布事件的流量波动多⼤,都由 Broker 接收,订阅者可以按照⾃⼰的速度去处理事件
缺点:
架构复杂了,业务没有明显的流程线,不好管理
需要依赖于 Broker 的可靠、安全、性能
好在现在开源软件或云平台上 Broker 的软件是⾮常成熟的,⽐较常⻅的⼀种就是我们今天要学习的 MQ 技术。
** 技术对⽐: **
** **MQ ,中⽂是消息队列( MessageQueue ),字⾯来看就是存放消息的队列。也就是事件驱动架构中的 Broker 。 ⽐较常⻅的 MQ 实现:
ActiveMQ
RabbitMQ
RocketMQ
Kafka

追求可⽤性: Kafka 、 RocketMQ 、 RabbitMQ 追求可靠性: RabbitMQ 、 RocketMQ 追求吞吐能⼒: RocketMQ 、 Kafka 追求消息低延迟: RabbitMQ 、 Kafka
** 安装RabbitMQ **
⽅式⼀:在线拉取
docker pull rabbitmq:3-management
⽅式⼆:从本地加载
在课前资料已经提供了镜像包:

上传到虚拟机中后,使⽤命令加载镜像即可:
docker load -i mq.tar
安装mq(命令如下)
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins -
-name mq \
--hostname xxxx \
-p 15672:15672 \
-p 5672:5672 \
-d \ rabbitmq:3.9-management

访问:http://192.168.25.3:15672/#/queues (如下图表示配置成功)

创建一个名为init.test的(队列)

访问并对照 MQ 的基本结构:

RabbitMQ 中的⼏个概念:
channel :操作 MQ 的⼯具 ( 通道 )
exchange :路由消息到队列中(交换机)
queue :缓存消息 (队列)
virtual host :虚拟主机,是对 queue 、 exchange 等资源的逻辑分组(环境隔离)
** RabbitMQ消息模型 **
RabbitMQ 官⽅提供了 5个不同的 Demo ⽰例,对应了不同的消息模型:
(1)基本消息队列( BasicQueue )

(2) ⼯作消息队列( WorkQueue )

(3) 发布订阅( Publish 、 Subscribe ),⼜根据交换机类型不同分为三种:
3.1 Fanout Exchange :⼴播

3.2 Direct Exchange :路由

3.3 Topic Exchange:主题

实操:
(1)创建父工程名为mq-demo
(2)pom文件中引入依赖(如下)
plain
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.xja</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Archetype - mq-demo</name>
<url>http://maven.apache.org</url>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<!-- springboot父类依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- 消息转化时候使用 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
</dependencies>
</project>(3)创建名为 publisher (** 事件发布者 **)的子项目
- ,编写启动类

- ,编写application.yml文件(目前只设置端口号即可)

- ,编写一个名为InitDemoPublisher的发布者类
编写代码:
plain
package com.xja.application.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class InitDemoPublisher {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("root");//配置连接相关参数
connectionFactory.setPassword("123456");//密码
connectionFactory.setVirtualHost("/");//设置当前连接要访问的虚拟主机为根路径 /。
connectionFactory.setHost("192.168.25.3");//虚拟机IP地址
connectionFactory.setPort(5672);//对内访问端口
//获取连接
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//发送消息 队列 消息属性(是否持久化) 消息体
channel.basicPublish("", "init.test", null, ("Hello lizi and qianqian!").getBytes());
//关闭
channel.close();
connection.close();
}
}(4)创建名为 consumer( 事件订阅者 ) 的子项目
- ,编写启动类

- ,编写application.yml文件(目前只设置端口号即可)

- ,编写一个名为InitDemoPublisher的事件订阅者类
plain
package com.xja.application.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class InitDemoConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置连接相关参数
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("192.168.25.3");
connectionFactory.setPort(5672);
//获取连接
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//获取消息
channel.basicConsume("init.test", true,new DefaultConsumer(channel){
`@Override`
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body){
System.out.println("接收到的消息 ——>" + new String(body));
}
});
}
}(5),首先启动名为 consumer( 事件订阅者 ) 的子项目,然后启动名为 publisher (** 事件发布者 **)的子项目,控制台显示如下

**介绍 SpringAMQP **
SpringAMQP 是基于 RabbitMQ 封装的⼀套模板,并且还利⽤ SpringBoot 对其实现了⾃动装配,使⽤起来⾮常⽅ 便。 SpringAmqp 的官⽅地址: https://spring.io/projects/spring-amqp


**SpringAMQP 提供了三个功能: **
⾃动声明队列、交换机及其绑定关系
基于注解的监听器模式,异步接收消息
封装了 RabbitTemplate ⼯具,⽤于发送消息
操作:
(1) 在⽗⼯程 mq-demo 中引⼊依赖
plain
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>(2)⾸先配置 MQ 地址,在 publisher 服务的 application.yml 中添加配置
plain
spring:
rabbitmq:
host: 192.168.25.3 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123456 # 密码(3) 然后在 publisher 服务中编写测试类 BasicTest ,并利⽤ RabbitTemplate 实现消息发送:
plain
package com.xja.application;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
`@SpringBootTest`
public class BasicTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void basicTest() {
rabbitTemplate.convertAndSend("mq.basic", "呵呵");
}
}(4)⾸先配置 MQ 地址,在 consumer 服务的 application.yml 中添加配置:
plain
spring:
rabbitmq:
host: 192.168.25.3 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123456 # 密码(5)在application下的包basic中创建一个BasicReceive实体类以及BasicReceiveBind实体类

BasicReceive用@Configuration标注表示配置类主要用于⾃动声明队列
plain
package com.xja.application.basic;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
`@Component` //后期`@Configuration`
public class BasicReceive {
`@Bean`
public Queue queue() {
return new Queue("mq.basic");
}
}BasicReceiveBind用于接收事件发布者的信息
plain
package com.xja.application.basic;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
`@Component`
public class BasicReceiveBind {
`@RabbitListener`(queues = {"mq.basic"})
public void receive(String message) {
System.out.println("接收的message=" + message);
}
}(6)启动项目重新测试(控制台显示)

(7)对以上代码升级(主要升级地方不再创建两个类,使用注解方式对⾃动声明队列、交换机及其绑定关系)
在application下的包basic中创建实体类BasicAnnotionReceive
plain
package com.xja.application.basic;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
`@Component`
public class BasicAnnotionReceive {
//在监听队列的同时,自动声明该队列(如果队列不存在)
`@RabbitListener`(queuesToDeclare = `@Queue`(value = "mq.basic"))
public void receive(String message) {
System.out.println("接收到的message=" + message);
}
}(8)重新启动项目(如下图)

使用 SpringAMQP当有多个事件订阅者时,事件发布者发布多个事件会怎样处理???
(1)在Consumer( 事件订阅者 )的application下的task包下创建一个名为TaskAnnotionReceive的实体类,在类中写多个接受一个队列的方法
plain
package com.xja.application.task;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
`@Component`
public class TaskAnnotionReceive {
//注解:当订阅队列有多个时候,会采用轮询()
`@RabbitListener`(queuesToDeclare = `@Queue`(value = "mq.task"))
public void receive1(String message) {
System.out.println("receive1接收到的message=" + message);
}
`@RabbitListener`(queuesToDeclare = `@Queue`(value = "mq.task"))
public void receive2(String message) {
System.out.println("receive2接收到的message=" + message);
}
}(2)在publisher (** 事件发布者 )事件发布者的test包下创建一个名为TaskTest的测试类**

plain
package com.xja.application;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
`@SpringBootTest`
public class TaskTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void send() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("mq.task", "呵呵");
}
}
}(3)重启两个项目(控制台如下显示)

总结:
当多个消费者(如你的 receive1 和 receive2 方法)监听同一个队列(mq.task)时,RabbitMQ 默认采用 “轮询(Round-Robin)” 机制分发消息,具体表现为:
核心规则:
- 消息 “平均分配”:队列中的消息会按顺序交替分发给各个消费者,每个消息只会被一个消费者处理(不会重复消费)。 例如你发送 10 条消息,
receive1可能接收第 1、3、5、7、9 条,receive2接收第 2、4、6、8、10 条,大致各分配一半。 - “推模式” 主动分发:RabbitMQ 会主动将消息推送给消费者,而非消费者主动拉取,且默认不考虑消费者的处理能力(即使某个消费者处理较慢,仍会按轮询分配)。
为什么这样设计?
这种机制属于 RabbitMQ 的 “工作队列(Work Queue)” 模式,核心目的是 “负载均衡”:通过多个消费者共同处理同一个队列的消息,提高消息处理效率(比如处理耗时任务时,避免单消费者压力过大)。
(一) 发布/订阅(交换机)
**发布订阅的模型如图: **

可以看到,在订阅模型中,多了⼀个 exchange ⻆⾊,⽽且过程略有变化:
Publisher :⽣产者,也就是要发送消息的程序,但是不再发送到队列中,⽽是发给 X (交换机) 3 种类型:
Exchange :交换机,图中的 X 。⼀⽅⾯,接收⽣产者发送的消息。另⼀⽅⾯,知道如何处理消息,例如递交给 某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。 Exchange 有以 下
Fanout :⼴播,将消息交给所有绑定到交换机的队列
Direct :定向,把消息交给符合指定 routing key 的队列
Topic :⽀持通配符,把消息交给符合 routing pattern (路由模式) 的队列
Consumer :消费者,与以前⼀样,订阅队列,没有变化 Queue :消息队列也与以前⼀样,接收消息、缓存消息。
Exchange(交换机)只负责处理消息,不能存储消息,因此如果没有任何队列与 Exchange 绑定,或者没有符合路 由规则的队列,那么消息会丢失!
Fanout ( 广播模式)
Fanout ,英⽂翻译是分裂,我觉得在 MQ 中叫⼴播更合适。

在⼴播模式下,消息发送流程是这样的:
) 可以有多个队列
) 每个队列都要绑定到 Exchange (交换机)
) ⽣产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,⽣产者⽆法决定
) 交换机把消息发送给绑定过的所有队列
) 订阅队列的消费者都能拿到消息
** 我们的计划是这样的:**
创建⼀个交换机 fanout ,类型是 Fanout
创建两个队列 fanout.queue1 和 fanout.queue2 ,绑定到交换机 fanout

代码演示:
1,在application目录下创建一个名为fanout的目录并创建三个类分别是FanoutBind(fanout绑定队列)与FanoutReceiveBind(fanout接收消息)这两个结合使用,以及FanoutAnnotionReceive(里用注解绑定交换机和队列和)

- ,在FanoutBind(fanout绑定)中编写
plain
package com.xja.application.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//加上交换机
//`@Configuration`
public class FanoutBind {
`@Bean`//交换机
public FanoutExchange exchange() {
return new FanoutExchange("fanout");
}
`@Bean`//队列queue1
public Queue queue1() {
return new Queue("fanout.queue1");
}
`@Bean`//队列queue2
public Queue queue2() {
return new Queue("fanout.queue2");
}
`@Bean`//绑定队列queue1
public Binding binding1(FanoutExchange exchange,Queue queue1){
return BindingBuilder.bind(queue1).to(exchange);
}
`@Bean`//绑定队列queue2
public Binding binding2(FanoutExchange exchange,Queue queue2){
return BindingBuilder.bind(queue2).to(exchange);
}
}- ,在FanoutReceiveBind(fanout接收消息)中编写
plain
package com.xja.application.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//`@Component`
public class FanoutReceiveBind {
`@RabbitListener`(queues = {"fanout.queue1"})
public void receive1(String message) {
System.out.println("接收的message=" + message);
}
`@RabbitListener`(queues = {"fanout.queue2"})
public void receive2(String message) {
System.out.println("接收的message=" + message);
}
}- ,在publisher项目中编写一个测试类FanoutTest
plain
package com.xja.application;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
`@SpringBootTest`
public class FanoutTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void basicTest() {
rabbitTemplate.convertAndSend("fanout","", "呵呵");
}
}- ,启动项目运行测试(如下图)

- ,代码升级使用注解的方式FanoutAnnotionReceive(里用注解绑定交换机和队列和)
plain
package com.xja.application.fanout;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;
//`@Component`
//`@Slf4j`
public class FanoutAnnotionReceive {
`@RabbitListener`(
bindings = `@QueueBinding`(
value = `@Queue`(value = "queue1"),
exchange =`@Exchange`(value = "fantout",type = ExchangeTypes.FANOUT),
key = ""
))
public void receive1(String message) throws InterruptedException {
System.out.println("receive1接收message = " + message);
// log.debug("mq.fontout.queue1-receive1接收message = " + message);
}
`@RabbitListener`(
bindings = `@QueueBinding`(
value = `@Queue`(value = "queue2"),
exchange =`@Exchange`(value = "fantout",type = ExchangeTypes.FANOUT),
key = ""
))
public void receive2(String message) throws InterruptedException {
System.out.println("receive2接收message = " + message);
// log.debug("receive2接收message = " + message);
}
}- ,重新启动项目运行发送消息(如下图)

(二)Direct ( 声明队列和交换机 )
在 Fanout 模式中,⼀条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列 消费。这时就要⽤到 Direct 类型的 Exchange 。

在 Direct 模型下:
队列与交换机的绑定,不能是任意绑定了,⽽是要指定⼀个 RoutingKey (路由 key )
消息的发送⽅在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey 。
Exchange 不再把消息交给每⼀个绑定的队列,⽽是根据消息的 Routing Key 进⾏判断,只有队列的Routingkey 与消息的 Routing key 完全⼀致,才会接收到消息
代码演示:
- ,首先在application下创建一个名为Direct 目录,在本目录下创建三个实体类分别是DirectBind(消息分配者),DirectReceiveBind(消息接收者),DirectAnnotionReceive( 基于注解声明队列和交换机 )

- 在DirectBind(消息分配者)中编写
plain
package com.xja.application.direct;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.text.DecimalFormat;
//加上交换机
`@Configuration`
public class DirectBind {
`@Bean`//交换机
public DirectExchange exchange() {
return new DirectExchange("direct");
}
`@Bean`//队列queue1
public Queue queue1() {
return new Queue("direct.queue1");
}
`@Bean`//队列queue2
public Queue queue2() {
return new Queue("direct.queue2");
}
`@Bean`//绑定队列queue1
public Binding binding1(DirectExchange exchange,Queue queue1){
return BindingBuilder.bind(queue1).to(exchange).with("A");//指定消息队列的Routingkey
}
`@Bean`//绑定队列queue2
public Binding binding2(DirectExchange exchange,Queue queue2){
return BindingBuilder.bind(queue2).to(exchange).with("B");
}
}- ,在DirectReceiveBind(消息接收者)类中编写
plain
package com.xja.application.direct;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
`@Component`
public class DirectReceiveBind {
`@RabbitListener`(queues = {"direct.queue1"})
public void receive1(String message) {
System.out.println("receive1接收的message=" + message);
}
`@RabbitListener`(queues = {"direct.queue2"})
public void receive2(String message) {
System.out.println("receive2接收的message=" + message);
}
}- ,在publisher(消息发送者)项目中编写测试类DirectTest
plain
package com.xja.application;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
`@SpringBootTest`
public class DirectTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void basicTest() {
rabbitTemplate.convertAndSend("direct","A", "呵呵");
}
}- ,启动项目测试(当指定不同的 Routingkey 时会被转换器转换到不要同的消息队列)


- ,代码升级(基于注解声明队列和交换机 )在编辑刚刚consumer项目下的创建的DirectAnnotionReceive( 基于注解声明队列和交换机 )
plain
package com.xja.application.direct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//`@Component`
//`@Slf4j`
public class DirectAnnotionReceive {
`@RabbitListener`(
bindings = `@QueueBinding`(
value = `@Queue`(value = "queue1"),
exchange =`@Exchange`(value = "direct",type = ExchangeTypes.DIRECT),
key = "A"
))
public void receive1(String message) throws InterruptedException {
System.out.println("receive1接收message = " + message);
}
`@RabbitListener`(
bindings = `@QueueBinding`(
value = `@Queue`(value = "queue2"),
exchange =`@Exchange`(value = "direct",type = ExchangeTypes.DIRECT),
key = "B"
))
public void receive2(String message) throws InterruptedException {
System.out.println("receive2接收message = " + message);
}
}- ,重启项目进行测试


3.5.3.总结
描述下 Direct 交换机与 Fanout 交换机的差异?
Fanout 交换机将消息路由给每⼀个与之绑定的队列
Direct 交换机根据 RoutingKey 判断路由给哪个队列
如果多个队列具有相同的 RoutingKey ,则与 Fanout 功能类似
基于 @RabbitListener 注解声明队列和交换机有哪些常⻅注解?
@Queue
@Exchange
(三) Topic
说明 :
Topic 类型的 Exchange 与 Topic 类型 Direct 相⽐,都是可以根据 Exchange 可以让队列在绑定 RoutingKey 把消息路由到不同的队列。只不过 Routing key 的时候使⽤通配符!
Routingkey ⼀般都是有⼀个或多个单词组成,多个单词之间以 ”.” 分割,例如: item.insert
通配符规则:
:匹配⼀个或多个词
- :匹配不多不少恰好 1 个词
举例:
item.# :能够匹配 item.spu.insert 或者 item.spu
item.* :只能匹配 item.spu
代码演示:
在项目consumer的application下创建topic文件夹在当前文件夹下创建三个实体类分别是TopicBind(声明交换机与消息队列),TopicReceiveBind(接收消息信息),TopicAnnotionReceive(全注解方式声明交换机以及消息队列,接收信息)
- ,在TopicBind(声明交换机与消息队列)编辑
plain
package com.xja.application.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
//加上交换机
//`@Configuration`
public class TopicBind {
`@Bean`//交换机
public TopicExchange exchange() {
return new TopicExchange("topic");
}
`@Bean`//队列queue1
public Queue queue1() {
return new Queue("topic.queue1");
}
`@Bean`//队列queue2
public Queue queue2() {
return new Queue("topic.queue2");
}
`@Bean`//队列queue2
public Queue queue3() {
return new Queue("topic.queue3");
}
`@Bean`//队列queue2
public Queue queue4() {
return new Queue("topic.queue4");
}
`@Bean`//绑定队列queue1
public Binding binding1(TopicExchange exchange,Queue queue1){
return BindingBuilder.bind(queue1).to(exchange).with("china.#");
}
`@Bean`//绑定队列queue2
public Binding binding2(TopicExchange exchange,Queue queue2){
return BindingBuilder.bind(queue2).to(exchange).with("#.news");
}
`@Bean`//绑定队列queue1
public Binding binding3(TopicExchange exchange,Queue queue3){
return BindingBuilder.bind(queue3).to(exchange).with("#.weather");
}
`@Bean`//绑定队列queue1
public Binding binding4(TopicExchange exchange,Queue queue4){
return BindingBuilder.bind(queue4).to(exchange).with("japan.#");
}
}- ,在TopicReceiveBind(接收消息信息)编辑
plain
package com.xja.application.topic;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
//`@Component`
public class TopicReceiveBind {
`@RabbitListener`(queues = {"topic.queue1"})
public void receive1(String message) {
System.out.println("receive1接收的message=" + message);
}
`@RabbitListener`(queues = {"topic.queue2"})
public void receive2(String message) {
System.out.println("receive2接收的message=" + message);
}
`@RabbitListener`(queues = {"topic.queue3"})
public void receive3(String message) {
System.out.println("receive3接收的message=" + message);
}
`@RabbitListener`(queues = {"topic.queue4"})
public void receive4(String message) {
System.out.println("receive4接收的message=" + message);
}
}- ,在项目publisher创建实现类TopicTest
plain
package com.xja.application;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
`@SpringBootTest`
public class TopicTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void basicTest() {
rabbitTemplate.convertAndSend("topic","china.news", "呵呵");
}
}- ,启动项目进行测试

- ,升级项目编辑TopicAnnotionReceive(全注解方式声明交换机以及消息队列,接收信息)

总结:
描述下 Direct 交换机与 Topic 交换机的差异?
Topic 交换机接收的消息 RoutingKey 必须是多个单词,以. 分割
Topic 交换机与队列绑定时的 bindingKey 可以指定通配符
:代表 0 个或多个词
- :代表 1 个词
配置JSON格式形式消息
- ,思考首先测试在上述实例中我们传递的都是String类型的字符串信息,那么其他类型的消息是否可以传递呢????(比如数组,List集合以及Map集合,或者是一个对象形式)!!!!!!!!!!
代码示例:
- ,在项目consumer中创建一个conver的目录,在当前目录下方创建一个类名为ConvertAnnotionRecevie(以注解方式声明交换机以及消息队列接收)

plain
package com.xja.application.convert;
import com.xja.application.pojo.User;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
//`@Component`
public class ConvertAnnotionRecevie {
//String类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(String message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }
//int类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(int message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }
//List集合类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(List message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }
// //Map集合类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(Map message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }
}- ,在项目publisher下方编写一个名为ConvertTest的测试类
plain
package com.xja.application;
import com.xja.application.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
`@SpringBootTest`
public class ConvertTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void basicTest() {//String类型
rabbitTemplate.convertAndSend("convert","xx", "呵呵");
}
`@Test`
public void basicTest2() {//int类型
rabbitTemplate.convertAndSend("convert","xx", 1);
}
`@Test`
public void basicTest3() {//List集合类型
List list = new ArrayList();
list.add(1);
list.add(2);
rabbitTemplate.convertAndSend("convert","xx", list);
}
`@Test`
public void basicTest4() {//List集合类型
Map map = new HashMap();
map.put("x","1");
map.put("y","2");
rabbitTemplate.convertAndSend("convert","xx", map);
}
}- ,测试以上类型的消息均可传递(但是对象类型的传递不了)
显然, JDK 序列化⽅式并不合适。我们希望消息体的体积更⼩、可读性更⾼,因此可以使⽤ JSON ⽅式来做序列化和反 序列化。
在 publisher 和 consumer 两个服务中都引⼊依赖:
plain
<!-- 消息转化时候使用 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>配置消息转换器。 在连个项⽬的启动类中添加⼀个 Bean 即可
plain
`@Bean`
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}消息持久化
MQ 默认是内存存储消息,开启持久化功能可以确保缓存在 MQ 中的消息不丢失。那么就需要我们将信息持久化到磁盘以 确保安全
1,测试默认储存方式
将队列中⼿动添加⼀条消息 ( 关闭消费服务 )

此时我们重新启动 mq

此时发现系统默认的交换机存在、我们创建的队列存在,但是队列中的消息已经丢失。
2, 交换机持久化

我们观察⼀下交换机发现 Features 选中的,统⼀存在了⼀个单词, D (Durable 持久的 ) ,那么就意味着系统创建 的交换机因为指定了 Durable 所以不会丢失。
生产者确认机制

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 以后,会返回⼀个结果 给发送者,表⽰消息是否处理成功。结果有两种请求:
publisher-confirm ,发送者确认 :
(1)消息成功投递到交换机,返回 ack (acknowledge - 答谢、告知已经收到 )
(2)消息未投递到交换机,返回 nack
publisher-return ,发送者回执 :
(1)消息投递到交换机了,但是没有路由到队列。返回 ACK ,及路由失败原因。

- ,⾸先修改⽣产者服务配置⽂件添加:
plain
publisher-confirm-type: correlated #用于确认消息是否成功到达 交换机
publisher-returns: true #消息返回机制,用于处理消息到达交换机后,但 无法路由到任何队列 的情况。
template:
mandatory: true #强制要求交换机在消息无法路由时,必须将消息返回给生产者(而不是直接丢弃)
配置说明:
publish-confirm-type :开启 publisher-confirm ,这⾥⽀持两种类型:
simple :同步等待 confirm 结果,直到超时
correlated :异步回调,定义 ConfirmCallback , MQ 返回结果时会回调这个 ConfirmCallback publish-returns :开启 publish-return 功能,同样是基于 callback 机制,不过是定义ReturnCallback template.mandatory :定义消息路由失败时的策略。
true ,则调⽤ ReturnCallback ;
false :则直接丢弃消息
配置全局的消息失败 Callback ⽅法 每个 RabbitTemplate 只能配置⼀个 ReturnCallback ,因此需要在项⽬启动过程中配置:
创建⼀个配置⽂件
plain
package com.xja.application.callback;
import com.rabbitmq.client.ReturnCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
`@Component`
`@Slf4j`
public class ReturnCallbackHandle implements ApplicationContextAware {
`@Override`
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取mq对象信息
RabbitTemplate rabbitTemplate =
(RabbitTemplate)applicationContext.getBean("rabbitTemplate");
//增加一个回调方法
rabbitTemplate.setReturnsCallback(returnedMessage -> {
// 判断是否是延迟消息
Integer receivedDelay =
returnedMessage.getMessage().getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
returnedMessage.getReplyCode(),
// 修改发送的代码测试:
returnedMessage.getReplyText(),
returnedMessage.getExchange() ,
returnedMessage.getRoutingKey(),
returnedMessage.getMessage().toString());
//todo 处理出问题的消息
});
}
}在测试类中编写
plain
package com.xja.application;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.CorrelationDataPostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
`@Slf4j`
`@SpringBootTest`
public class PTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void contextLoads() {
//1 .准备消息
String message = "最美香山红叶,我们一起沐浴红色海洋!";
//准备CorrelationData
// 消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//准备
correlationData.getFuture().addCallback(result -> {
if (result.isAck()){
//ACK(成功)
log.debug("信息发送成功到交换机!消息ID:{}", correlationData.getId());
}else {
//NACK(失败)
log.debug("信息发送到到交换机失败!消息ID:{}", correlationData.getId());
}
},ex -> {
//记录日志
log.error("消息发送失败",ex);
//重发消息
});
rabbitTemplate.convertAndSend("xx","",message,correlationData);
}
}死信交换机
1,认识死信交换机
当⼀个队列中的消息满⾜下列情况之⼀时,可以成为死信( dead letter ):
消费者使⽤ basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
消息是⼀个过期消息,超时⽆⼈消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了 dead-letter-exchange 属性,指定了⼀个交换机,那么队列中的死信就会投递到这个交换机 中,⽽这个交换机称为死信交换机( Dead Letter Exchange ,简称 DLX )

和异常处理交换机不同的是,异常处理交换机同样也是做兜底的功能,但是转发是有消费者控制准发的,⽽死信交换 机则是由队列转发的,⽽且死信交换机的处理能⼒更为健壮⼀些。
什么样的消息会成为死信?
消息被消费者 reject 或者返回 nack( 且不⼊队列 )
消息超时未消费
队列满了
如何给队列绑定死信交换机?
给队列设置 dead-letter-exchange 属性,指定⼀个交换机
给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey
注解形式 (声明交换机以及消息队列)
plain
package com.xja.application.deadletter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
`@Component`
`@Slf4j`
public class DeadLetter {
//配置sms队列
`@RabbitListener`(bindings = `@QueueBinding`(
value = `@Queue`(value = "dl.queue",arguments = {
`@Argument`(name = "x-dead-letter-exchange",value = "dl"),
`@Argument`(name = "x-dead-letter-routing-key",value = "dl.key")
}),
exchange = `@Exchange`(value = "dl.exchange",type = ExchangeTypes.TOPIC),
key = ""
))
public void re1(String message){
// int a = 1/0;
System.out.println("sms队列接收消息为{}" + message);
}
}测试代码
plain
package com.xja.application;
import org.springframework.amqp.core.Message;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.amqp.core.MessagePostProcessor;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
`@SpringBootTest`
public class DeadLetterTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void dlTest() {
rabbitTemplate.convertAndSend("dl.exchange","","666");
}
}死信交换机实际就是⼀个普通定义的交换机和队列。
测试:重试失败的时候消息是否投递到交换机。 (可以看出已经投递到dl交换机以及对应的消息队列)

TTL 交换机
TTL ,也就是 Time-To-Live( 存活时间 ) 。如果⼀个队列中的消息 TTL 结束仍未消费,则会变为死信, ttl 超时分为 两种情况:
消息所在的队列设置了存活时间
消息本⾝设置了存活时间

⼀般情况下,死信交换机就相当于⼀个垃圾桶,去做⼀些异常消息的存储⼯作,但是因为其特性, 我们还可以通过其 能特性,利⽤私信交换机实现⼀些我们特殊的功能,⽐如延迟队列。
接下来我们先通过代码实现⼀个死信交换机,先将其运⾏出来。
我们声明⼀组死信交换机和队列,基于注解⽅式 SpringRabbitListener :
设定⼀个队列超时时间
plain
package com.xja.application.deadletter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
`@Component`
`@Slf4j`
public class DeadLetter {
//配置sms队列
`@RabbitListener`(bindings = `@QueueBinding`(
value = `@Queue`(value = "dl.queue",arguments = {
`@Argument`(name = "x-dead-letter-exchange",value = "dl"),
`@Argument`(name = "x-dead-letter-routing-key",value = "dl.key")
// `@Argument`(name = "x-message-ttl",value = "10000",type = "java.lang.Integer") // 修改此处
}),
exchange = `@Exchange`(value = "dl.exchange",type = ExchangeTypes.TOPIC),
key = ""
))
public void re1(String message){
// int a = 1/0;
System.out.println("sms队列接收消息为{}" + message);
}
}或者
plain
`@Test`
public void testTTL() {
//创建持久化消息
Message message = MessageBuilder
.withBody("6666".getBytes(StandardCharsets.UTF_8))
.setExpiration("10000") //设置消息5s过期
.build();
//准备消息的唯一ID
// CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//投递消息
rabbitTemplate.convertAndSend("dl.exchange", "", message);
}将消息消费者服务停止掉进行测试:


延迟交换机
利⽤ TTL 结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列 ( Delay Queue )模式。
延迟队列的使⽤场景包括:
延迟发送短信
⽤⼾下单,如果⽤⼾在 15 分钟内未⽀付,则⾃动取消
预约⼯作会议, 20 分钟后⾃动通知所有参会⼈员
因为延迟队列的需求⾮常多,所以 RabbitMQ 的官⽅也推出了⼀个插件,原⽣⽀持延迟队列效果。
这个插件就是 DelayExchange 插件。参考 RabbitMQ 的插件列表⻚⾯: https://www.rabbitmq.com/communit/y-plugins.html
(1)首先产看相关

(2)查找mq的路径

(3)cd切换到上述路径下方

(4)将提前准备好的文件拖进去

(5)重启并且查看是否重启

DelayExchange原理
DelayExchange 需要将⼀个交换机声明为 delayed 类型。当我们发送消息到 delayExchange 时,流程如下:
接收消息
判断消息是否具备 x-delay 属性
如果有 x-delay 属性,说明是延迟消息,持久化到硬盘,读取 x-delay 值,作为延迟时间
返回 routing not found 结果给消息发送者
x-delay 时间到期后,重新投递消息到指定队列
使⽤DelayExchange
插件的使⽤也⾮常简单:声明⼀个交换机,交换机的类型可以是任意类型,只需要设定 delayed 属性为 true 即可, 然后声明队列与其绑定即可。
消息消费者代码
plain
package com.xja.application.delay;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
`@Component`
public class Delay {
`@RabbitListener`(bindings = `@QueueBinding`(
value = `@Queue`(value = "delayed"),
exchange = `@Exchange`(value = "delayed",type = ExchangeTypes.TOPIC,delayed = "true")
))
public void delayed(String msg) {
// msg.get
System.out.println("msg" + msg);
}
}消息发布者代码
plain
package com.xja.application;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
`@SpringBootTest`
`@Slf4j`
public class DelayTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void testTTL() {
//1 .准备消息
String s = "最美香山红叶,我们一起沐浴红色海洋!";
//准备CorrelationData
// 消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//准备
correlationData.getFuture().addCallback(result -> {
if (result.isAck()){
//ACK(成功)
log.debug("信息发送成功到交换机!消息ID:{}", correlationData.getId());
}else {
//NACK(失败)
log.debug("信息发送到到交换机失败!消息ID:{}", correlationData.getId());
}
},ex -> {
//记录日志
log.error("消息发送失败",ex);
//重发消息
});
//创建持久化消息
Message message = MessageBuilder
.withBody(s.getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",10000) //设置消息5s过期
.build();
//投递消息
rabbitTemplate.convertAndSend("delayed", "", message);
}
}惰性队列
从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是惰性队列。惰性队列的特征如下:
- , 接收到消息后直接存⼊磁盘⽽⾮内存
直接内存存储有预警 40% ,达到阈值就阻⽌消息投递了,此时 mq 的策略就是将内存中的⼀部分数据刷出到 内存 (page-out) 之后内存有空闲了再次接收消息,如果数据量⽐较⼤的情况下, mq 的并发就会出现波浪 曲线,性能忽⾼忽低,性能不稳定。
,消费者要消费消息时才会从磁盘中读取并加载到内存
,⽀持数百万条的消息存储
(二) 基于命令⾏设置lazy-queue
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl : RabbitMQ 的命令⾏⼯具
set_policy :添加⼀个策略
Lazy :策略名称,可以⾃定义
"^lazy-queue$" :⽤正则表达式匹配队列的名字
'{"queue-mode":"lazy"}' :设置队列模式为 lazy 模式
--apply-to queues :策略的作⽤对象,是所有的队
(二) 基于@Bean声明lazy-queue (定义一个惰性的方法和一个普通的)

(三)在consumer(消费者)项目中基于@RabbitListener声明LazyQueue
plain
package com.xja.application.lazy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
`@Component`
`@Slf4j`
public class LazyQueueListener {
/**
* 监听惰性队列(Lazy Queue)
* 特性:消息优先存储在磁盘,适合大量消息堆积场景
*/
`@RabbitListener`(
bindings = `@QueueBinding`(
value = `@Queue`(
value = "lazy.queue",
durable = "true", // 队列持久化
arguments = `@Argument`(name = "x-queue-mode", value = "lazy") // 声明为惰性队列
),
exchange = `@Exchange`(value = "lazy.direct"), // 默认direct类型交换机,无需延迟功能
key = "lazy" // 路由键
)
)
public void listenLazyQueue(String message) {
log.info("从惰性队列接收消息:{}", message);
}
/**
* 监听普通队列(Normal Queue)
* 特性:消息优先存储在内存,适合快速处理的场景
*/
`@RabbitListener`(
bindings = `@QueueBinding`(
value = `@Queue`(
value = "normal.queue",
durable = "true" // 队列持久化
// 普通队列无需x-queue-mode参数(默认是memory模式)
),
exchange = `@Exchange`(value = "normal.direct"), // 普通direct交换机,移除delayed配置
key = "normal" // 路由键
)
)
public void listenNormalQueue(String message) throws InterruptedException {
// 模拟业务处理耗时(实际场景中需避免长时间阻塞)
Thread.sleep(5000);
log.debug("从普通队列接收消息:{}", message);
}
}(四)在publisher(发布者)项目中编写测试代码 测试惰性队列性能
plain
package com.xja.application;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
`@SpringBootTest`
public class LazyTest {
`@Autowired`
private RabbitTemplate rabbitTemplate;
`@Test`
public void testLazy(){
for (int i = 0; i < 100000; i++) {
//创建持久化消息
Message message = MessageBuilder
.withBody("hello lazy queue!!! ".getBytes(StandardCharsets.UTF_8))
.build();
//准备消息的唯一ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//投递消息
rabbitTemplate.convertAndSend("lazy.direct","lazy",message,correlationData);
}
}
`@Test`
public void testNormal(){
for (int i = 0; i < 100000; i++) {
//创建持久化消息
Message message = MessageBuilder
.withBody("hello normal queue!!! ".getBytes(StandardCharsets.UTF_8))
.build();
//准备消息的唯一ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//投递消息
rabbitTemplate.convertAndSend("normal.direct","normal",message,correlationData);
}
}
}(五)测试
惰性队列

普通队列

我们发现惰性队列的数据存在内存中的是⾮常少的,是 IO 出来准备消费的,其他的都在硬盘中存储,⽽普通队列所有 信息都在内存中,及其消耗内存。