Skip to content

** 初识MQ **

** ** 同步和异步通讯 微服务间通讯有同步和异步两种⽅式:

同步通讯:就像打电话,需要实时响应。

异步通讯:就像发邮件,不需要⻢上回复。** **

两种⽅式各有优劣,打电话可以⽴即得到响应,但是你却不能跟多个⼈同时通话。发送邮件可以同时与多个⼈收发邮 件,但是往往响应会有延迟。

** 同步通讯 **

我们之前学习的 Feign 调⽤就属于同步⽅式,虽然调⽤可以实时得到结果,但存在下⾯的问题: 微服务间基于 Feign 的调⽤就属于同步⽅式,存在⼀些问题。

** 总结:**

** 同步调⽤的优点: **

时效性较强,可以⽴即得到结果

**同步调⽤的缺点: **

耦合度⾼ 性能和吞吐能⼒下降

有额外的资源消耗 有级联失败问题

** 异步通讯 **

异步调⽤则可以避免上述问题:

我们以购买商品为例,⽤⼾⽀付后需要调⽤订单服务完成订单状态修改,调⽤物流服务,从仓库分配响应的库存并准 备发货。

在事件模式中,⽀付服务是事件发布者( publisher ),在⽀付完成后只需要发布⼀个⽀付成功的事件( event ),事 件中带上订单 id 。 订单服务和物流服务是事件订阅者( Consumer ),订阅⽀付成功的事件,监听到事件后完成⾃⼰业务即可。

为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,⽽是有⼀个中间⼈( Broker )。发布者发布事件到Broker ,不关⼼谁来订阅事件。订阅者从 Broker 订阅事件,不关⼼谁发来的消息。 异步调⽤常⻅实现就是事件驱动模式

优势⼀:服务解耦

优势⼆:性能提升,吞吐量提⾼

优势三:服务没有强依赖,不担⼼级联失败问题

优势四:流量削峰

好处:

吞吐量提升:⽆需等待订阅者处理完成,响应更快速

故障隔离:服务没有直接调⽤,不存在级联失败问题 调⽤间没有阻塞,不会造成⽆效的资源占⽤

耦合度极低,每个服务都可以灵活插拔,可替换

流量削峰:不管发布事件的流量波动多⼤,都由 Broker 接收,订阅者可以按照⾃⼰的速度去处理事件

缺点:

架构复杂了,业务没有明显的流程线,不好管理

需要依赖于 Broker 的可靠、安全、性能

好在现在开源软件或云平台上 Broker 的软件是⾮常成熟的,⽐较常⻅的⼀种就是我们今天要学习的 MQ 技术。

** 技术对⽐: **

** **MQ ,中⽂是消息队列( MessageQueue ),字⾯来看就是存放消息的队列。也就是事件驱动架构中的 Broker 。 ⽐较常⻅的 MQ 实现:

ActiveMQ

RabbitMQ

RocketMQ

Kafka

追求可⽤性: Kafka 、 RocketMQ 、 RabbitMQ 追求可靠性: RabbitMQ 、 RocketMQ 追求吞吐能⼒: RocketMQ 、 Kafka 追求消息低延迟: RabbitMQ 、 Kafka

** 安装RabbitMQ **

⽅式⼀:在线拉取

docker pull rabbitmq:3-management

⽅式⼆:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后,使⽤命令加载镜像即可:

docker load -i mq.tar

安装mq(命令如下)

docker run \

-e RABBITMQ_DEFAULT_USER=root \

-e RABBITMQ_DEFAULT_PASS=123456 \

-v mq-plugins:/plugins -

-name mq \

--hostname xxxx \

-p 15672:15672 \

-p 5672:5672 \

-d \ rabbitmq:3.9-management

访问:http://192.168.25.3:15672/#/queues (如下图表示配置成功)

创建一个名为init.test的(队列)

访问并对照 MQ 的基本结构:

RabbitMQ 中的⼏个概念:

channel :操作 MQ 的⼯具 ( 通道 )

exchange :路由消息到队列中(交换机)

queue :缓存消息 (队列)

virtual host :虚拟主机,是对 queue 、 exchange 等资源的逻辑分组(环境隔离)

** RabbitMQ消息模型 **

RabbitMQ 官⽅提供了 5个不同的 Demo ⽰例,对应了不同的消息模型:

(1)基本消息队列( BasicQueue )

(2) ⼯作消息队列( WorkQueue )

(3) 发布订阅( Publish 、 Subscribe ),⼜根据交换机类型不同分为三种:

3.1 Fanout Exchange :⼴播

3.2 Direct Exchange :路由

3.3 Topic Exchange:主题

实操:

(1)创建父工程名为mq-demo

(2)pom文件中引入依赖(如下)

plain
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.xja</groupId>
 <artifactId>mq-demo</artifactId>
 <version>1.0-SNAPSHOT</version>
 <packaging>pom</packaging>
 <name>Archetype - mq-demo</name>
 <url>http://maven.apache.org</url>
 <modules>
 <module>publisher</module>
 <module>consumer</module>
 </modules>
 <!-- springboot父类依赖 -->
 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.3.9.RELEASE</version>
 <relativePath/>
 </parent>
 <dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!-- AMQP依赖,包含RabbitMQ -->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <!-- 单元测试 -->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 </dependency>
 <!-- 消息转化时候使用 -->
 <dependency>
 <groupId>com.fasterxml.jackson.dataformat</groupId>
 <artifactId>jackson-dataformat-xml</artifactId>
 <version>2.9.10</version>
 </dependency>
 </dependencies>
</project>

(3)创建名为 publisher (** 事件发布者 **)的子项目

  1. ,编写启动类

  1. ,编写application.yml文件(目前只设置端口号即可)

  1. ,编写一个名为InitDemoPublisher的发布者类

编写代码:

plain
package com.xja.application.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class InitDemoPublisher {
 public static void main(String[] args) throws IOException, TimeoutException {
 //创建连接
 ConnectionFactory connectionFactory = new ConnectionFactory();
 connectionFactory.setUsername("root");//配置连接相关参数
 connectionFactory.setPassword("123456");//密码
 connectionFactory.setVirtualHost("/");//设置当前连接要访问的虚拟主机为根路径 /。
 connectionFactory.setHost("192.168.25.3");//虚拟机IP地址
 connectionFactory.setPort(5672);//对内访问端口

 //获取连接
 Connection connection = connectionFactory.newConnection();
 //创建连接通道
 Channel channel = connection.createChannel();
 //发送消息 队列 消息属性(是否持久化) 消息体
 channel.basicPublish("", "init.test", null, ("Hello lizi and qianqian!").getBytes());

 //关闭
 channel.close();
 connection.close();
 }
}

(4)创建名为 consumer( 事件订阅者 ) 的子项目

  1. ,编写启动类

  1. ,编写application.yml文件(目前只设置端口号即可)

  1. ,编写一个名为InitDemoPublisher的事件订阅者类
plain
package com.xja.application.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class InitDemoConsumer {
 public static void main(String[] args) throws IOException, TimeoutException {
 //创建连接
 ConnectionFactory connectionFactory = new ConnectionFactory();
 //配置连接相关参数
 connectionFactory.setUsername("root");
 connectionFactory.setPassword("123456");
 connectionFactory.setVirtualHost("/");
 connectionFactory.setHost("192.168.25.3");
 connectionFactory.setPort(5672);
 //获取连接
 Connection connection = connectionFactory.newConnection();
 //创建连接通道
 Channel channel = connection.createChannel();
 //获取消息
 channel.basicConsume("init.test", true,new DefaultConsumer(channel){
 `@Override`
 public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body){
 System.out.println("接收到的消息 ——>" + new String(body));
 }
 });
 }
}

(5),首先启动名为 consumer( 事件订阅者 ) 的子项目,然后启动名为 publisher (** 事件发布者 **)的子项目,控制台显示如下

**介绍 SpringAMQP **

SpringAMQP 是基于 RabbitMQ 封装的⼀套模板,并且还利⽤ SpringBoot 对其实现了⾃动装配,使⽤起来⾮常⽅ 便。 SpringAmqp 的官⽅地址: https://spring.io/projects/spring-amqp

**SpringAMQP 提供了三个功能: **

⾃动声明队列、交换机及其绑定关系

基于注解的监听器模式,异步接收消息

封装了 RabbitTemplate ⼯具,⽤于发送消息

操作:

(1) 在⽗⼯程 mq-demo 中引⼊依赖

plain
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)⾸先配置 MQ 地址,在 publisher 服务的 application.yml 中添加配置

plain
spring:
 rabbitmq:
 host: 192.168.25.3 # 主机名
 port: 5672 # 端口
 virtual-host: / # 虚拟主机
 username: root # 用户名
 password: 123456 # 密码

(3) 然后在 publisher 服务中编写测试类 BasicTest ,并利⽤ RabbitTemplate 实现消息发送:

plain
package com.xja.application;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

`@SpringBootTest`
public class BasicTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void basicTest() {
 rabbitTemplate.convertAndSend("mq.basic", "呵呵");
 }

}

(4)⾸先配置 MQ 地址,在 consumer 服务的 application.yml 中添加配置:

plain
spring:
 rabbitmq:
 host: 192.168.25.3 # 主机名
 port: 5672 # 端口
 virtual-host: / # 虚拟主机
 username: root # 用户名
 password: 123456 # 密码

(5)在application下的包basic中创建一个BasicReceive实体类以及BasicReceiveBind实体类

BasicReceive用@Configuration标注表示配置类主要用于⾃动声明队列

plain
package com.xja.application.basic;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

`@Component` //后期`@Configuration`
public class BasicReceive {
 `@Bean`
 public Queue queue() {
 return new Queue("mq.basic");
 }

}

BasicReceiveBind用于接收事件发布者的信息

plain
package com.xja.application.basic;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

`@Component`
public class BasicReceiveBind {

 `@RabbitListener`(queues = {"mq.basic"})
 public void receive(String message) {
 System.out.println("接收的message=" + message);
 }
}

(6)启动项目重新测试(控制台显示)

(7)对以上代码升级(主要升级地方不再创建两个类,使用注解方式对⾃动声明队列、交换机及其绑定关系)

在application下的包basic中创建实体类BasicAnnotionReceive

plain
package com.xja.application.basic;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

`@Component`
public class BasicAnnotionReceive {
 //在监听队列的同时,自动声明该队列(如果队列不存在)
 `@RabbitListener`(queuesToDeclare = `@Queue`(value = "mq.basic"))
 public void receive(String message) {
 System.out.println("接收到的message=" + message);
 }
}

(8)重新启动项目(如下图)

使用 SpringAMQP当有多个事件订阅者时,事件发布者发布多个事件会怎样处理???

(1)在Consumer( 事件订阅者 )的application下的task包下创建一个名为TaskAnnotionReceive的实体类,在类中写多个接受一个队列的方法

plain
package com.xja.application.task;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

`@Component`
public class TaskAnnotionReceive {

 //注解:当订阅队列有多个时候,会采用轮询()
 `@RabbitListener`(queuesToDeclare = `@Queue`(value = "mq.task"))
 public void receive1(String message) {
 System.out.println("receive1接收到的message=" + message);
 }

 `@RabbitListener`(queuesToDeclare = `@Queue`(value = "mq.task"))
 public void receive2(String message) {
 System.out.println("receive2接收到的message=" + message);
 }

}

(2)在publisher (** 事件发布者 )事件发布者的test包下创建一个名为TaskTest的测试类**

plain
package com.xja.application;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

`@SpringBootTest`
public class TaskTest {

 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void send() {
 for (int i = 0; i < 10; i++) {
 rabbitTemplate.convertAndSend("mq.task", "呵呵");
 }

 }
}

(3)重启两个项目(控制台如下显示)

总结:

当多个消费者(如你的 receive1receive2 方法)监听同一个队列(mq.task)时,RabbitMQ 默认采用 “轮询(Round-Robin)” 机制分发消息,具体表现为:

核心规则:

  1. 消息 “平均分配”:队列中的消息会按顺序交替分发给各个消费者,每个消息只会被一个消费者处理(不会重复消费)。 例如你发送 10 条消息,receive1 可能接收第 1、3、5、7、9 条,receive2 接收第 2、4、6、8、10 条,大致各分配一半。
  2. “推模式” 主动分发:RabbitMQ 会主动将消息推送给消费者,而非消费者主动拉取,且默认不考虑消费者的处理能力(即使某个消费者处理较慢,仍会按轮询分配)。

为什么这样设计?

这种机制属于 RabbitMQ 的 “工作队列(Work Queue)” 模式,核心目的是 “负载均衡”:通过多个消费者共同处理同一个队列的消息,提高消息处理效率(比如处理耗时任务时,避免单消费者压力过大)。

(一) 发布/订阅(交换机)

**发布订阅的模型如图: **

可以看到,在订阅模型中,多了⼀个 exchange ⻆⾊,⽽且过程略有变化:

Publisher :⽣产者,也就是要发送消息的程序,但是不再发送到队列中,⽽是发给 X (交换机) 3 种类型:

Exchange :交换机,图中的 X 。⼀⽅⾯,接收⽣产者发送的消息。另⼀⽅⾯,知道如何处理消息,例如递交给 某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。 Exchange 有以 下

Fanout :⼴播,将消息交给所有绑定到交换机的队列

Direct :定向,把消息交给符合指定 routing key 的队列

Topic :⽀持通配符,把消息交给符合 routing pattern (路由模式) 的队列

Consumer :消费者,与以前⼀样,订阅队列,没有变化 Queue :消息队列也与以前⼀样,接收消息、缓存消息。

Exchange(交换机)只负责处理消息,不能存储消息,因此如果没有任何队列与 Exchange 绑定,或者没有符合路 由规则的队列,那么消息会丢失!

Fanout ( 广播模式)

Fanout ,英⽂翻译是分裂,我觉得在 MQ 中叫⼴播更合适。

在⼴播模式下,消息发送流程是这样的:

  1. ) 可以有多个队列

  2. ) 每个队列都要绑定到 Exchange (交换机)

  3. ) ⽣产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,⽣产者⽆法决定

  4. ) 交换机把消息发送给绑定过的所有队列

  5. ) 订阅队列的消费者都能拿到消息


** 我们的计划是这样的:**

创建⼀个交换机 fanout ,类型是 Fanout

创建两个队列 fanout.queue1 和 fanout.queue2 ,绑定到交换机 fanout

代码演示:

1,在application目录下创建一个名为fanout的目录并创建三个类分别是FanoutBind(fanout绑定队列)与FanoutReceiveBind(fanout接收消息)这两个结合使用,以及FanoutAnnotionReceive(里用注解绑定交换机和队列和)

  1. ,在FanoutBind(fanout绑定)中编写
plain
package com.xja.application.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//加上交换机
//`@Configuration`
public class FanoutBind {

 `@Bean`//交换机
 public FanoutExchange exchange() {
 return new FanoutExchange("fanout");
 }

 `@Bean`//队列queue1
 public Queue queue1() {
 return new Queue("fanout.queue1");
 }

 `@Bean`//队列queue2
 public Queue queue2() {
 return new Queue("fanout.queue2");
 }

 `@Bean`//绑定队列queue1
 public Binding binding1(FanoutExchange exchange,Queue queue1){
 return BindingBuilder.bind(queue1).to(exchange);
 }

 `@Bean`//绑定队列queue2
 public Binding binding2(FanoutExchange exchange,Queue queue2){
 return BindingBuilder.bind(queue2).to(exchange);
 }
}
  1. ,在FanoutReceiveBind(fanout接收消息)中编写
plain
package com.xja.application.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//`@Component`
public class FanoutReceiveBind {

 `@RabbitListener`(queues = {"fanout.queue1"})
 public void receive1(String message) {
 System.out.println("接收的message=" + message);
 }

 `@RabbitListener`(queues = {"fanout.queue2"})
 public void receive2(String message) {
 System.out.println("接收的message=" + message);
 }
}
  1. ,在publisher项目中编写一个测试类FanoutTest
plain
package com.xja.application;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

`@SpringBootTest`
public class FanoutTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void basicTest() {
 rabbitTemplate.convertAndSend("fanout","", "呵呵");
 }
}
  1. ,启动项目运行测试(如下图)

  1. ,代码升级使用注解的方式FanoutAnnotionReceive(里用注解绑定交换机和队列和)
plain
package com.xja.application.fanout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;

//`@Component`
//`@Slf4j`
public class FanoutAnnotionReceive {

 `@RabbitListener`(
 bindings = `@QueueBinding`(
 value = `@Queue`(value = "queue1"),
 exchange =`@Exchange`(value = "fantout",type = ExchangeTypes.FANOUT),
 key = ""
 ))
 public void receive1(String message) throws InterruptedException {
 System.out.println("receive1接收message = " + message);
// log.debug("mq.fontout.queue1-receive1接收message = " + message);
 }

 `@RabbitListener`(
 bindings = `@QueueBinding`(
 value = `@Queue`(value = "queue2"),
 exchange =`@Exchange`(value = "fantout",type = ExchangeTypes.FANOUT),
 key = ""
 ))
 public void receive2(String message) throws InterruptedException {
 System.out.println("receive2接收message = " + message);
// log.debug("receive2接收message = " + message);
 }
}
  1. ,重新启动项目运行发送消息(如下图)

(二)Direct ( 声明队列和交换机 )

在 Fanout 模式中,⼀条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列 消费。这时就要⽤到 Direct 类型的 Exchange 。

在 Direct 模型下:

队列与交换机的绑定,不能是任意绑定了,⽽是要指定⼀个 RoutingKey (路由 key )

消息的发送⽅在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey 。

Exchange 不再把消息交给每⼀个绑定的队列,⽽是根据消息的 Routing Key 进⾏判断,只有队列的Routingkey 与消息的 Routing key 完全⼀致,才会接收到消息

代码演示:

  1. ,首先在application下创建一个名为Direct 目录,在本目录下创建三个实体类分别是DirectBind(消息分配者),DirectReceiveBind(消息接收者),DirectAnnotionReceive( 基于注解声明队列和交换机 )

  1. 在DirectBind(消息分配者)中编写
plain
package com.xja.application.direct;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.text.DecimalFormat;

//加上交换机
`@Configuration`
public class DirectBind {

 `@Bean`//交换机
 public DirectExchange exchange() {
 return new DirectExchange("direct");
 }

 `@Bean`//队列queue1
 public Queue queue1() {
 return new Queue("direct.queue1");
 }

 `@Bean`//队列queue2
 public Queue queue2() {
 return new Queue("direct.queue2");
 }

 `@Bean`//绑定队列queue1
 public Binding binding1(DirectExchange exchange,Queue queue1){
 return BindingBuilder.bind(queue1).to(exchange).with("A");//指定消息队列的Routingkey
 }

 `@Bean`//绑定队列queue2
 public Binding binding2(DirectExchange exchange,Queue queue2){
 return BindingBuilder.bind(queue2).to(exchange).with("B");
 }
}
  1. ,在DirectReceiveBind(消息接收者)类中编写
plain
package com.xja.application.direct;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

`@Component`
public class DirectReceiveBind {

 `@RabbitListener`(queues = {"direct.queue1"})
 public void receive1(String message) {
 System.out.println("receive1接收的message=" + message);
 }

 `@RabbitListener`(queues = {"direct.queue2"})
 public void receive2(String message) {
 System.out.println("receive2接收的message=" + message);
 }
}
  1. ,在publisher(消息发送者)项目中编写测试类DirectTest
plain
package com.xja.application;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

`@SpringBootTest`
public class DirectTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void basicTest() {
 rabbitTemplate.convertAndSend("direct","A", "呵呵");
 }
}
  1. ,启动项目测试(当指定不同的 Routingkey 时会被转换器转换到不要同的消息队列)

  1. ,代码升级(基于注解声明队列和交换机 )在编辑刚刚consumer项目下的创建的DirectAnnotionReceive( 基于注解声明队列和交换机 )
plain
package com.xja.application.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//`@Component`
//`@Slf4j`
public class DirectAnnotionReceive {

 `@RabbitListener`(
 bindings = `@QueueBinding`(
 value = `@Queue`(value = "queue1"),
 exchange =`@Exchange`(value = "direct",type = ExchangeTypes.DIRECT),
 key = "A"
 ))
 public void receive1(String message) throws InterruptedException {
 System.out.println("receive1接收message = " + message);
 }

 `@RabbitListener`(
 bindings = `@QueueBinding`(
 value = `@Queue`(value = "queue2"),
 exchange =`@Exchange`(value = "direct",type = ExchangeTypes.DIRECT),
 key = "B"
 ))
 public void receive2(String message) throws InterruptedException {
 System.out.println("receive2接收message = " + message);
 }
}
  1. ,重启项目进行测试

3.5.3.总结

描述下 Direct 交换机与 Fanout 交换机的差异?

Fanout 交换机将消息路由给每⼀个与之绑定的队列

Direct 交换机根据 RoutingKey 判断路由给哪个队列

如果多个队列具有相同的 RoutingKey ,则与 Fanout 功能类似

基于 @RabbitListener 注解声明队列和交换机有哪些常⻅注解?

@Queue

@Exchange

(三) Topic

说明 :

Topic 类型的 Exchange 与 Topic 类型 Direct 相⽐,都是可以根据 Exchange 可以让队列在绑定 RoutingKey 把消息路由到不同的队列。只不过 Routing key 的时候使⽤通配符!

Routingkey ⼀般都是有⼀个或多个单词组成,多个单词之间以 ”.” 分割,例如: item.insert

通配符规则:

:匹配⼀个或多个词

  • :匹配不多不少恰好 1 个词

举例:

item.# :能够匹配 item.spu.insert 或者 item.spu

item.* :只能匹配 item.spu

代码演示:

在项目consumer的application下创建topic文件夹在当前文件夹下创建三个实体类分别是TopicBind(声明交换机与消息队列),TopicReceiveBind(接收消息信息),TopicAnnotionReceive(全注解方式声明交换机以及消息队列,接收信息)

  1. ,在TopicBind(声明交换机与消息队列)编辑
plain
package com.xja.application.topic;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

//加上交换机
//`@Configuration`
public class TopicBind {

 `@Bean`//交换机
 public TopicExchange exchange() {
 return new TopicExchange("topic");
 }

 `@Bean`//队列queue1
 public Queue queue1() {
 return new Queue("topic.queue1");
 }

 `@Bean`//队列queue2
 public Queue queue2() {
 return new Queue("topic.queue2");
 }

 `@Bean`//队列queue2
 public Queue queue3() {
 return new Queue("topic.queue3");
 }
 `@Bean`//队列queue2
 public Queue queue4() {
 return new Queue("topic.queue4");
 }

 `@Bean`//绑定队列queue1
 public Binding binding1(TopicExchange exchange,Queue queue1){
 return BindingBuilder.bind(queue1).to(exchange).with("china.#");
 }

 `@Bean`//绑定队列queue2
 public Binding binding2(TopicExchange exchange,Queue queue2){
 return BindingBuilder.bind(queue2).to(exchange).with("#.news");
 }

 `@Bean`//绑定队列queue1
 public Binding binding3(TopicExchange exchange,Queue queue3){
 return BindingBuilder.bind(queue3).to(exchange).with("#.weather");
 }

 `@Bean`//绑定队列queue1
 public Binding binding4(TopicExchange exchange,Queue queue4){
 return BindingBuilder.bind(queue4).to(exchange).with("japan.#");
 }
}
  1. ,在TopicReceiveBind(接收消息信息)编辑
plain
package com.xja.application.topic;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

//`@Component`
public class TopicReceiveBind {

 `@RabbitListener`(queues = {"topic.queue1"})
 public void receive1(String message) {
 System.out.println("receive1接收的message=" + message);
 }

 `@RabbitListener`(queues = {"topic.queue2"})
 public void receive2(String message) {
 System.out.println("receive2接收的message=" + message);
 }

 `@RabbitListener`(queues = {"topic.queue3"})
 public void receive3(String message) {
 System.out.println("receive3接收的message=" + message);
 }

 `@RabbitListener`(queues = {"topic.queue4"})
 public void receive4(String message) {
 System.out.println("receive4接收的message=" + message);
 }
}
  1. ,在项目publisher创建实现类TopicTest
plain
package com.xja.application;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

`@SpringBootTest`
public class TopicTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void basicTest() {
 rabbitTemplate.convertAndSend("topic","china.news", "呵呵");
 }
}
  1. ,启动项目进行测试

  1. ,升级项目编辑TopicAnnotionReceive(全注解方式声明交换机以及消息队列,接收信息)

总结:

描述下 Direct 交换机与 Topic 交换机的差异?

Topic 交换机接收的消息 RoutingKey 必须是多个单词,以. 分割

Topic 交换机与队列绑定时的 bindingKey 可以指定通配符

:代表 0 个或多个词

  • :代表 1 个词

配置JSON格式形式消息

  1. ,思考首先测试在上述实例中我们传递的都是String类型的字符串信息,那么其他类型的消息是否可以传递呢????(比如数组,List集合以及Map集合,或者是一个对象形式)!!!!!!!!!!

代码示例:

  1. ,在项目consumer中创建一个conver的目录,在当前目录下方创建一个类名为ConvertAnnotionRecevie(以注解方式声明交换机以及消息队列接收)

plain
package com.xja.application.convert;

import com.xja.application.pojo.User;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

//`@Component`
public class ConvertAnnotionRecevie {
 //String类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(String message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }

 //int类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(int message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }

 //List集合类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(List message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }

// //Map集合类型
// `@RabbitListener`(
// bindings = `@QueueBinding`(
// value = `@Queue`(value = "queue1"),
// exchange =`@Exchange`(value = "convert",type = ExchangeTypes.TOPIC),
// key = "xx"
// ))
// public void receive1(Map message) throws InterruptedException {
// System.out.println("receive接收message = " + message);
// }

}
  1. ,在项目publisher下方编写一个名为ConvertTest的测试类
plain
package com.xja.application;

import com.xja.application.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

`@SpringBootTest`
public class ConvertTest {

 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void basicTest() {//String类型
 rabbitTemplate.convertAndSend("convert","xx", "呵呵");
 }


 `@Test`
 public void basicTest2() {//int类型
 rabbitTemplate.convertAndSend("convert","xx", 1);
 }

 `@Test`
 public void basicTest3() {//List集合类型
 List list = new ArrayList();
 list.add(1);
 list.add(2);
 rabbitTemplate.convertAndSend("convert","xx", list);
 }


 `@Test`
 public void basicTest4() {//List集合类型
 Map map = new HashMap();
 map.put("x","1");
 map.put("y","2");
 rabbitTemplate.convertAndSend("convert","xx", map);
 }



}
  1. ,测试以上类型的消息均可传递(但是对象类型的传递不了)

显然, JDK 序列化⽅式并不合适。我们希望消息体的体积更⼩、可读性更⾼,因此可以使⽤ JSON ⽅式来做序列化和反 序列化。

在 publisher 和 consumer 两个服务中都引⼊依赖:

plain
<!-- 消息转化时候使用 -->
<dependency>
 <groupId>com.fasterxml.jackson.dataformat</groupId>
 <artifactId>jackson-dataformat-xml</artifactId>
 <version>2.9.10</version>
</dependency>

配置消息转换器。 在连个项⽬的启动类中添加⼀个 Bean 即可

plain
`@Bean`
public Jackson2JsonMessageConverter jsonMessageConverter() {
 return new Jackson2JsonMessageConverter();
}

消息持久化

MQ 默认是内存存储消息,开启持久化功能可以确保缓存在 MQ 中的消息不丢失。那么就需要我们将信息持久化到磁盘以 确保安全
1,测试默认储存方式

将队列中⼿动添加⼀条消息 ( 关闭消费服务 )

此时我们重新启动 mq

此时发现系统默认的交换机存在、我们创建的队列存在,但是队列中的消息已经丢失。

2, 交换机持久化

我们观察⼀下交换机发现 Features 选中的,统⼀存在了⼀个单词, D (Durable 持久的 ) ,那么就意味着系统创建 的交换机因为指定了 Durable 所以不会丢失。

生产者确认机制

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 以后,会返回⼀个结果 给发送者,表⽰消息是否处理成功。结果有两种请求:

publisher-confirm ,发送者确认 :

(1)消息成功投递到交换机,返回 ack (acknowledge - 答谢、告知已经收到 )

(2)消息未投递到交换机,返回 nack

publisher-return ,发送者回执 :

(1)消息投递到交换机了,但是没有路由到队列。返回 ACK ,及路由失败原因。

  1. ,⾸先修改⽣产者服务配置⽂件添加:
plain
publisher-confirm-type: correlated #用于确认消息是否成功到达 交换机
publisher-returns: true #消息返回机制,用于处理消息到达交换机后,但 无法路由到任何队列 的情况。
template:
 mandatory: true #强制要求交换机在消息无法路由时,必须将消息返回给生产者(而不是直接丢弃)

配置说明:

publish-confirm-type :开启 publisher-confirm ,这⾥⽀持两种类型:

simple :同步等待 confirm 结果,直到超时

correlated :异步回调,定义 ConfirmCallback , MQ 返回结果时会回调这个 ConfirmCallback publish-returns :开启 publish-return 功能,同样是基于 callback 机制,不过是定义ReturnCallback template.mandatory :定义消息路由失败时的策略。

true ,则调⽤ ReturnCallback ;

false :则直接丢弃消息

配置全局的消息失败 Callback ⽅法 每个 RabbitTemplate 只能配置⼀个 ReturnCallback ,因此需要在项⽬启动过程中配置:

创建⼀个配置⽂件

plain
package com.xja.application.callback;

import com.rabbitmq.client.ReturnCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

`@Component`
`@Slf4j`
public class ReturnCallbackHandle implements ApplicationContextAware {
 `@Override`
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
 //获取mq对象信息
 RabbitTemplate rabbitTemplate =
 (RabbitTemplate)applicationContext.getBean("rabbitTemplate");
 //增加一个回调方法
 rabbitTemplate.setReturnsCallback(returnedMessage -> {
 // 判断是否是延迟消息
 Integer receivedDelay =
 returnedMessage.getMessage().getMessageProperties().getReceivedDelay();
 if (receivedDelay != null && receivedDelay > 0) {
 // 是一个延迟消息,忽略这个错误提示
 return;
 }
 // 记录日志
 log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
 returnedMessage.getReplyCode(),
// 修改发送的代码测试:
 returnedMessage.getReplyText(),
 returnedMessage.getExchange() ,
 returnedMessage.getRoutingKey(),
 returnedMessage.getMessage().toString());
 //todo 处理出问题的消息
 });
 }

}

在测试类中编写

plain
package com.xja.application;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.CorrelationDataPostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

`@Slf4j`
`@SpringBootTest`
public class PTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void contextLoads() {
 //1 .准备消息
 String message = "最美香山红叶,我们一起沐浴红色海洋!";
 //准备CorrelationData
 // 消息ID
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 //准备
 correlationData.getFuture().addCallback(result -> {
 if (result.isAck()){
 //ACK(成功)
 log.debug("信息发送成功到交换机!消息ID:{}", correlationData.getId());
 }else {
 //NACK(失败)
 log.debug("信息发送到到交换机失败!消息ID:{}", correlationData.getId());
 }
 },ex -> {
 //记录日志
 log.error("消息发送失败",ex);
 //重发消息
 });
 rabbitTemplate.convertAndSend("xx","",message,correlationData);
 }

}

死信交换机

1,认识死信交换机

当⼀个队列中的消息满⾜下列情况之⼀时,可以成为死信( dead letter ):

消费者使⽤ basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false

消息是⼀个过期消息,超时⽆⼈消费

要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了 dead-letter-exchange 属性,指定了⼀个交换机,那么队列中的死信就会投递到这个交换机 中,⽽这个交换机称为死信交换机( Dead Letter Exchange ,简称 DLX )

和异常处理交换机不同的是,异常处理交换机同样也是做兜底的功能,但是转发是有消费者控制准发的,⽽死信交换 机则是由队列转发的,⽽且死信交换机的处理能⼒更为健壮⼀些。

什么样的消息会成为死信?

消息被消费者 reject 或者返回 nack( 且不⼊队列 )

消息超时未消费

队列满了

如何给队列绑定死信交换机?

给队列设置 dead-letter-exchange 属性,指定⼀个交换机

给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey

注解形式 (声明交换机以及消息队列)

plain
package com.xja.application.deadletter;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

`@Component`
`@Slf4j`
public class DeadLetter {
 //配置sms队列
 `@RabbitListener`(bindings = `@QueueBinding`(
 value = `@Queue`(value = "dl.queue",arguments = {
 `@Argument`(name = "x-dead-letter-exchange",value = "dl"),
 `@Argument`(name = "x-dead-letter-routing-key",value = "dl.key")
 }),
 exchange = `@Exchange`(value = "dl.exchange",type = ExchangeTypes.TOPIC),
 key = ""
 ))
 public void re1(String message){
// int a = 1/0;
 System.out.println("sms队列接收消息为{}" + message);
 }
}

测试代码

plain
package com.xja.application;

import org.springframework.amqp.core.Message;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.amqp.core.MessagePostProcessor;
import java.nio.charset.StandardCharsets;
import java.util.UUID;


`@SpringBootTest`
public class DeadLetterTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void dlTest() {
 rabbitTemplate.convertAndSend("dl.exchange","","666");
 }



}

死信交换机实际就是⼀个普通定义的交换机和队列。

测试:重试失败的时候消息是否投递到交换机。 (可以看出已经投递到dl交换机以及对应的消息队列)

TTL 交换机

TTL ,也就是 Time-To-Live( 存活时间 ) 。如果⼀个队列中的消息 TTL 结束仍未消费,则会变为死信, ttl 超时分为 两种情况:

消息所在的队列设置了存活时间

消息本⾝设置了存活时间

⼀般情况下,死信交换机就相当于⼀个垃圾桶,去做⼀些异常消息的存储⼯作,但是因为其特性, 我们还可以通过其 能特性,利⽤私信交换机实现⼀些我们特殊的功能,⽐如延迟队列。

接下来我们先通过代码实现⼀个死信交换机,先将其运⾏出来。

我们声明⼀组死信交换机和队列,基于注解⽅式 SpringRabbitListener :

设定⼀个队列超时时间

plain
package com.xja.application.deadletter;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

`@Component`
`@Slf4j`
public class DeadLetter {
 //配置sms队列
 `@RabbitListener`(bindings = `@QueueBinding`(
 value = `@Queue`(value = "dl.queue",arguments = {
 `@Argument`(name = "x-dead-letter-exchange",value = "dl"),
 `@Argument`(name = "x-dead-letter-routing-key",value = "dl.key")
// `@Argument`(name = "x-message-ttl",value = "10000",type = "java.lang.Integer") // 修改此处
 }),
 exchange = `@Exchange`(value = "dl.exchange",type = ExchangeTypes.TOPIC),
 key = ""
 ))
 public void re1(String message){
// int a = 1/0;
 System.out.println("sms队列接收消息为{}" + message);
 }
}

或者

plain
`@Test`
 public void testTTL() {
 //创建持久化消息
 Message message = MessageBuilder
 .withBody("6666".getBytes(StandardCharsets.UTF_8))
 .setExpiration("10000") //设置消息5s过期
 .build();
 //准备消息的唯一ID
// CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 //投递消息
 rabbitTemplate.convertAndSend("dl.exchange", "", message);
 }
将消息消费者服务停止掉进行测试:

延迟交换机

利⽤ TTL 结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列 ( Delay Queue )模式。

延迟队列的使⽤场景包括:

延迟发送短信

⽤⼾下单,如果⽤⼾在 15 分钟内未⽀付,则⾃动取消

预约⼯作会议, 20 分钟后⾃动通知所有参会⼈员

因为延迟队列的需求⾮常多,所以 RabbitMQ 的官⽅也推出了⼀个插件,原⽣⽀持延迟队列效果。

这个插件就是 DelayExchange 插件。参考 RabbitMQ 的插件列表⻚⾯: https://www.rabbitmq.com/communit/y-plugins.html

(1)首先产看相关

(2)查找mq的路径

(3)cd切换到上述路径下方

(4)将提前准备好的文件拖进去

(5)重启并且查看是否重启

DelayExchange原理

DelayExchange 需要将⼀个交换机声明为 delayed 类型。当我们发送消息到 delayExchange 时,流程如下:

接收消息

判断消息是否具备 x-delay 属性

如果有 x-delay 属性,说明是延迟消息,持久化到硬盘,读取 x-delay 值,作为延迟时间

返回 routing not found 结果给消息发送者

x-delay 时间到期后,重新投递消息到指定队列

使⽤DelayExchange

插件的使⽤也⾮常简单:声明⼀个交换机,交换机的类型可以是任意类型,只需要设定 delayed 属性为 true 即可, 然后声明队列与其绑定即可。

消息消费者代码

plain
package com.xja.application.delay;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

`@Component`
public class Delay {
 `@RabbitListener`(bindings = `@QueueBinding`(
 value = `@Queue`(value = "delayed"),
 exchange = `@Exchange`(value = "delayed",type = ExchangeTypes.TOPIC,delayed = "true")
 ))
 public void delayed(String msg) {
// msg.get
 System.out.println("msg" + msg);
 }
}

消息发布者代码

plain
package com.xja.application;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

`@SpringBootTest`
`@Slf4j`
public class DelayTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void testTTL() {
 //1 .准备消息
 String s = "最美香山红叶,我们一起沐浴红色海洋!";
 //准备CorrelationData
 // 消息ID
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 //准备
 correlationData.getFuture().addCallback(result -> {
 if (result.isAck()){
 //ACK(成功)
 log.debug("信息发送成功到交换机!消息ID:{}", correlationData.getId());
 }else {
 //NACK(失败)
 log.debug("信息发送到到交换机失败!消息ID:{}", correlationData.getId());
 }
 },ex -> {
 //记录日志
 log.error("消息发送失败",ex);
 //重发消息
 });
 //创建持久化消息
 Message message = MessageBuilder
 .withBody(s.getBytes(StandardCharsets.UTF_8))
 .setHeader("x-delay",10000) //设置消息5s过期
 .build();
 //投递消息
 rabbitTemplate.convertAndSend("delayed", "", message);
 }
}

惰性队列

从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是惰性队列。惰性队列的特征如下:

  1. , 接收到消息后直接存⼊磁盘⽽⾮内存

直接内存存储有预警 40% ,达到阈值就阻⽌消息投递了,此时 mq 的策略就是将内存中的⼀部分数据刷出到 内存 (page-out) 之后内存有空闲了再次接收消息,如果数据量⽐较⼤的情况下, mq 的并发就会出现波浪 曲线,性能忽⾼忽低,性能不稳定。

  1. ,消费者要消费消息时才会从磁盘中读取并加载到内存

  2. ,⽀持数百万条的消息存储

(二) 基于命令⾏设置lazy-queue

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

rabbitmqctl : RabbitMQ 的命令⾏⼯具

set_policy :添加⼀个策略

Lazy :策略名称,可以⾃定义

"^lazy-queue$" :⽤正则表达式匹配队列的名字

'{"queue-mode":"lazy"}' :设置队列模式为 lazy 模式

--apply-to queues :策略的作⽤对象,是所有的队

(二) 基于@Bean声明lazy-queue (定义一个惰性的方法和一个普通的)

(三)在consumer(消费者)项目中基于@RabbitListener声明LazyQueue

plain
package com.xja.application.lazy;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

`@Component`
`@Slf4j`
public class LazyQueueListener {

 /**
 * 监听惰性队列(Lazy Queue)
 * 特性:消息优先存储在磁盘,适合大量消息堆积场景
 */
 `@RabbitListener`(
 bindings = `@QueueBinding`(
 value = `@Queue`(
 value = "lazy.queue",
 durable = "true", // 队列持久化
 arguments = `@Argument`(name = "x-queue-mode", value = "lazy") // 声明为惰性队列
 ),
 exchange = `@Exchange`(value = "lazy.direct"), // 默认direct类型交换机,无需延迟功能
 key = "lazy" // 路由键
 )
 )
 public void listenLazyQueue(String message) {
 log.info("从惰性队列接收消息:{}", message);
 }

 /**
 * 监听普通队列(Normal Queue)
 * 特性:消息优先存储在内存,适合快速处理的场景
 */
 `@RabbitListener`(
 bindings = `@QueueBinding`(
 value = `@Queue`(
 value = "normal.queue",
 durable = "true" // 队列持久化
 // 普通队列无需x-queue-mode参数(默认是memory模式)
 ),
 exchange = `@Exchange`(value = "normal.direct"), // 普通direct交换机,移除delayed配置
 key = "normal" // 路由键
 )
 )
 public void listenNormalQueue(String message) throws InterruptedException {
 // 模拟业务处理耗时(实际场景中需避免长时间阻塞)
 Thread.sleep(5000);
 log.debug("从普通队列接收消息:{}", message);
 }
}

(四)在publisher(发布者)项目中编写测试代码 测试惰性队列性能

plain
package com.xja.application;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

`@SpringBootTest`
public class LazyTest {
 `@Autowired`
 private RabbitTemplate rabbitTemplate;

 `@Test`
 public void testLazy(){
 for (int i = 0; i < 100000; i++) {
 //创建持久化消息
 Message message = MessageBuilder
 .withBody("hello lazy queue!!! ".getBytes(StandardCharsets.UTF_8))
 .build();
 //准备消息的唯一ID
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 //投递消息
 rabbitTemplate.convertAndSend("lazy.direct","lazy",message,correlationData);
 }
 }


 `@Test`
 public void testNormal(){
 for (int i = 0; i < 100000; i++) {
 //创建持久化消息
 Message message = MessageBuilder
 .withBody("hello normal queue!!! ".getBytes(StandardCharsets.UTF_8))
 .build();
 //准备消息的唯一ID
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 //投递消息
 rabbitTemplate.convertAndSend("normal.direct","normal",message,correlationData);
 }
 }

}

(五)测试

惰性队列

普通队列

我们发现惰性队列的数据存在内存中的是⾮常少的,是 IO 出来准备消费的,其他的都在硬盘中存储,⽽普通队列所有 信息都在内存中,及其消耗内存。