RabbitMQ快速入门
字数: 0 字 时长: 0 分钟
笔记源码:https://github.com/kerwim/RabbitMQ-test-framework
1、前言
字数: 0 字 时长: 0 分钟
1.1 为什么需要消息队列?
举个生活中的场景:
假设你网购了很多东西,快递小哥要把这些包裹一个个送到你家门口,但是你很忙,你在开会没空处理这些包裹。
没有丰巢(没有消息队列)
- 高频敲门:快递小哥每送一个包裹,都要去你家敲门。如果你不在家,快递小哥只能等着你回来,或者多次跑到你家尝试送达。这就会让快递小哥浪费大量时间,无法及时去送其他人的包裹。
- 混乱和延迟:如果很多快递小哥同时来你家送货,你可能要频繁接收包裹,这会打扰到你的开会进度。而且,快递公司也很难协调这些快递员的工作,可能导致包裹延迟送达。
有了丰巢(有消息队列)
- 集中投递:快递小哥可以先把所有包裹统一送到丰巢。这样,无论你家有没有人,快递小哥都可以继续去送其他人的包裹,不用等你,也不会频繁打扰你。
- 有序处理:你可以根据自己的时间,去丰巢一次性取走所有的包裹。这样既节省了快递小哥的时间,也让你能够在方便的时候集中处理这些包裹。
解释:
- 丰巢就像消息队列,负责暂时存放快递(消息),然后你可以在方便的时候去处理这些快递(消费消息)。
- 快递小哥就像生产者,他负责产生包裹(消息),并把它们送到巢取(消息队列)。
- 你就像消费者,当你有时间时,去丰巢取包裹(消费消息)。
通过消息队列,系统中的各个部分可以更高效地工作,不需要彼此等待,减少了资源浪费和延迟。
1.1 消息队列优势
比如平时我们经常解除到的业务下单功能
同步下单
异步下单:
你会发现,存在消息队列后,马上可以给客户返回响应了,后续的更新购物车、库存、会员积分等等等等都可以异步完成。
1.1.1 解耦
如果同步下单,功能的耦合度就非常高了
如果异步下单,就可以做到业务解耦。
1.1.2 快速响应
同步下单响应时间较长,用户下单体验不太好。
异步下单响应时间较短,用户体验相对友好。
1.1.3 削峰限流
同步下单的情况下,一旦人数过多会造成并发压力传递。
异步下单可以起到削峰限流的效果。
1.1.4 增加系统弹性
比如说我现在有 4 个功能分别是保存订单、更新购物车、更新库存、更新积分,但是我想新增一个功能数据统计。
如果是同步的话、那就需要修改代码了。
那如果是异步的呢?我本来有四个功能
那我是不是只需要新增一个模块叫数据统计,就 ok 了,这样做保证了其他模块的代码不被入侵。
1.1.5 小结
同步:
系统耦合度高
并发压力持续向后续服务传导
系统结构缺乏弹性,可扩展性差
响应时间长
异步:
参与的各功能模块相对独立,耦合度低
借助消息队列实现流量削峰填谷
各功能模块对接消息队列,系统功能扩展方便
快速响应
注意:
并不是把所有交互方式都改成异步
强关联调用还是通过OpenFeign进行同步调用
弱关联、可独立拆分出来的功能使用消息队列进行异步调用
1.2 什么是消息队列?
消息队列是实现应用程序和应用程序之间通信的中间件产品
假如我们有程序A和程序B需要通信,发送消息的叫生产者,接受消息的叫消费者(后面会在体系里介绍)。你可能会问,消息传递也可以靠 https://请求的啊,是的你说的没错,但是网络请求就是我们上面说的了同步。消息队列就是上面说到的菜鸟驿站。
1.3 消息队列底层实现的两大主流方式
• 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要
• 目前主流的消息队列通信协议标准包括:
• AMQP(Advanced Message Queuing Protocol):通用协议,IBM公司研发
• JMS(Java Message Service):专门为Java语言服务,SUN公司研发,一组由Java接口组成的Java标准
JMS | AMQP | |
---|---|---|
七层网络模型 | JMS对应应用层 | AMQP对应传输层与会话层 |
消息模型 | JMS主要支持点对点和发布/订阅两种消息模型。 | ●AMQP支持多种消息模型,包括点对点(P2P)和发布/订阅(Pub/Sub)。 |
支 持 的 编 程 语 言 和 平 台 | JMS主要针对Java平台,因此在其 他编程语言和平台上的支持相对较少 | AMQP支持多种编程语言和平台, 包括Java、C++、Python等 |
可靠性 | JMS也支持消息持久化和事务性消息,但具体实现取决于消息传递系统的提供者。 | AMQP提供了强大的消息可靠性保证,包括消息持久化、事务性消息和消息确认机制。 |
传输协议 | JMS使用一种面向文本的协议(如HTTP 或TCP),消息的传输效率可能较低。 | AMQP使用二进制协议进行消息传递,提 供了高效、可靠的消息投递机制。 |
拓展性和兼容性 | JMS在Java环境中有较好的拓展性和兼容性,但在与非Java环境集成时受到限制。 | AMQP具有很好的拓展性和兼容性,可以在不同的消息代理之间交互操作。 |
综上对比 ,AMQP是大多数开发者更好的选择!
各主流MQ产品对比
1.4 RabbitMQ介绍
官网地址:https://www.rabbitmq.com/
RabbitMQ是一款基于AMQP、由Erlang语言开发的消息队列产品,2007年Rabbit技术公司发布了它的1.0版本
1.4.1:RabbitMQ体系结构介绍
1.4.1.1 生产者和消费者
- 重要:对体系结构的理解直接关系到后续的操作和使用
现在有两个程序程序 A 和 程序 B
假如程序A是消息的发送端,也可以称为消息的生产者 (Producer)
假如程序B是消息的接收端,也可以称为消息的消费者 (Consumer)
一般情况下 生产者和消费者都可以是微服务中的一个单独的小服务
Connection:消息发送端或消息消费端到消息队列主体服务器之间的TCP连接
大家学过计算机网络都知道,建立TCP连接需要三次握手,反复确认。
所以如果每一次访问RabbitMQ服务器都建立一个Connection开销会极大,效率低下。
所以Channel就是在一个已经建立的Connection中建立的逻辑连接。
1.4.1.2 Channel(信道)
Channel是一个重要的概念,中文名通常被称为“通道”或“信道”。它是客户端与 RabbitMQ 服务器之间的一个轻量级连接。简单来说,Channel是执行所有操作的地方,包括声明交换机(Exchange)、队列(Queue)以及绑定(Binding)(后面会介绍这三个),以及发布消息和接收消息等。而 Channel允许客户端在一个连接上创建多个 Channel从而减少了频繁建立和关闭网络连接的成本。每个 Channel都可以在相同的 TCP 连接上复用,提高了网络通信的效率。
交换机是消息的入口点,它根据预设的路由规则将消息分发到对应的队列。生产者通过 `Channel` 向交换机发布消息。
其实每一次发送消息,都是使用Connection 的 Channel 来完成的
1.4.1.3 Broker
Broker 是 RabbitMQ 的核心组件,它负责接收、存储和分发消息。在图中,Broker 包括多个虚拟主机(Virtual Hosts)。
大白话:Broker 就是RabbitMQ的主体服务器本身,负责接收和分发消息,如果后面搭建 MQ 集群,可以创建很个Broker。
在Broker里面可以根据逻辑和项目的需要划分很多个分组,每一个分组称之为Virtual Host(虚拟机)
1.4.1.4 Virtual Host 虚拟机
Virtual Host 是一个逻辑隔离的区域,类似于传统意义上的物理主机。每个 Virtual Host 内部包含交换机(Exchanges)、队列(Queues)和绑定(Bindings)。多个 Virtual Host 可以在同一台 Broker 上存在,提供更好的组织和安全性。
每一个Broker 中可以有很多个Virtual Host,下面介绍Virtual Host 的第一个组件Exchange (交换机)
1.4.1.5 Exchange (交换机)
中文翻译叫交换机,它是消息到达的第一站。消息在交换机中转、交换机不会存储消息
同理,每个虚拟机中可以有多个交换机
消息到达交换机后,下一站就发往队列。
1.4.1.6 Queue (队列)
队列是消息的实际存储位置,消费者可以从队列中接收消息。队列通过绑定(Binding)与交换机相连,决定哪些消息会被放入队列。
消息被消费端消费后、这个消息也会在队列中删除。
同理、一个虚拟机中可以有多个 Exchange 和 多个 Queue
消费端同样需要建立连接,从队列中取走消息。
消费端建立建立也是同一种方式需要用到Channel
那么问题来了,交换机怎么知道我们要发送到那个队列呢?
那就看他们之间的 Binding 了
1.4.1.7 Binding
绑定是连接交换机和队列的关系,它定义了消息如何从交换机流入队列。不同类型的交换机会有不同的绑定方式。后续会在代码中有体验,通过路由键的方式。交换机可以绑定一个队列,也可以绑定多个队列。
1.4.1.8 总结
- Producer - 生产者是发送消息的应用程序。生产者通过
Connection
连接到 Broker,并使用Channel
来发布消息。 - Broker - Broker 是 RabbitMQ 的核心组件,它负责接收、存储和分发消息。Broker 包括多个虚拟主机(Virtual Hosts)。
- Virtual Host - Virtual Host 是一个逻辑隔离的区域,类似于传统意义上的物理主机。每个 Virtual Host 内部包含交换机(Exchanges)、队列(Queues)和绑定(Bindings)。多个 Virtual Host 可以在同一台 Broker 上存在,提供更好的组织和安全性。
- Exchange - 交换机是消息的入口点,它根据预设的路由规则将消息分发到对应的队列。生产者通过
Channel
向交换机发布消息。 - Queue - 队列是消息的实际存储位置,消费者可以从队列中接收消息。队列通过绑定(Binding)与交换机相连,决定哪些消息会被放入队列。
- Consumer - 消费者是从队列中接收消息的应用程序。消费者通过
Connection
连接到 Broker 并使用Channel
来订阅队列。 - Binding - 绑定是连接交换机和队列的关系,它定义了消息如何从交换机流入队列。不同类型的交换机会有不同的绑定方式。
- Connection - Connection 表示应用程序与 Broker 之间的网络连接。每个 Connection 可以包含多个
Channel
。
假设有一个生产者想要发送一条消息,消息传递的过程如下:
- 生产者 -> Connection: 生产者与 RabbitMQ 建立连接。
- Connection -> Channel: 生产者创建一个信道。
- Channel -> Exchange: 生产者通过信道将消息发布到一个特定的交换机上。
- Exchange -> Queue: 交换机根据绑定规则将消息发送到一个或多个队列。
- Queue -> Channel: 消费者通过信道订阅队列。
- Channel -> Consumer: 当队列中有新消息时,消费者通过信道接收到这些消息。
简要在总结一次,这个真的很重要。
- 生产者 (Producer): 生成消息。
- 信道 (Channel): 用于执行消息发布操作。
- 交换机 (Exchange): 决定消息的流向。
- 队列 (Queue): 存储消息,等待消费者消费。
- 消费者 (Consumer): 接收并处理队列中的消息。
2、安装
字数: 0 字 时长: 0 分钟
2.1 安装
前提:需要具备 Docker 知识.
# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
2.2 验证
访问后台管理界面:http://你的 ip 地址:15672
使用上面创建Docker容器时指定的默认用户名、密码登录:
看到这个界面,安装成功
遇到其他的安装中的问题,可以问度娘。
3、基础篇
字数: 0 字 时长: 0 分钟
3.1 RabbitMQ的7种用法
3.1.1 Hello World
这是最简单的入门模式,展示了如何发送和接收一条消息。生产者发送一条消息到队列,消费者从队列中读取消息。这种模式非常适合初学者快速上手。
流程:
- 生产者创建一个队列并将消息发送到队列。
- 消费者监听队列并接收消息。
3.1.2 Work Queues(工作队列)
工作队列模式适用于需要处理大量任务的情况。在这种模式下,多个消费者共享一个队列,每个任务只会被分配给一个消费者。
流程:
- 生产者将任务消息发送到队列。
- 多个消费者竞争从队列中获取任务。
- 完成任务后,消费者确认任务已完成。
3.1.3 Publish/Subscribe (发布/订阅模式)
发布/订阅模式允许一个生产者将消息发布到一个交换机,然后所有订阅了该交换机的队列都能接收到这条消息。这种模式适用于广播消息。
流程:
- 生产者将消息发布到一个 Fanout 类型的交换机。
- 多个队列绑定到该交换机。
- 消费者监听各自的队列并接收消息。
3.1.4 Routing (路由模式)
路由模式允许根据消息的路由键(routing key)将消息发送到特定的队列。这种模式可以实现更精确的消息分发。
流程:
- 生产者将消息发送到一个 Direct 类型的交换机,并附带一个路由键。
- 队列绑定到交换机,并指定绑定键。
- 交换机根据绑定键将消息发送到相应的队列。
3.1.5 Topics (主题模式)
主题模式扩展了路由模式,允许根据消息的主题(topic)将消息发送到队列。消费者可以订阅多个主题。
流程:
- 生产者将消息发送到一个 Topic 类型的交换机,并附带一个主题。
- 队列绑定到交换机,并指定一个或多个绑定主题。
- 交换机根据绑定主题将消息发送到相应的队列。
3.1.6 Remote Procedure Call (RPC) (远程过程调用模式)
远程过程调用模式允许一个服务通过消息队列调用另一个服务中的方法,并获得响应。这种模式可以用于实现分布式服务调用。
流程:
- 客户端发送一个请求到队列。
- 服务端从队列中读取请求,并处理后将结果发送回客户端。
- 客户端接收服务端的响应。
注:RPC本质上是同步调用,不符合我们消息队列的一个定位,平时用的很少很少
3.1.7 Publisher Confirms (发布确认模式)
发布确认模式确保消息被正确发送到队列。如果消息没有被正确发送,生产者可以重新发送消息。
流程:
- 生产者发送消息到交换机,并开启发布确认模式。
- 如果消息成功发送到队列,交换机会发送确认给生产者。
- 如果消息未能发送到队列,生产者会收到失败的通知,并可以采取相应的措施。
3.1.8 总结
以上七种模式分别适用于不同的场景:
- Hello World 适合快速入门。
- Work Queues 用于任务分配和负载均衡。
- Publish/Subscribe 用于广播消息。
- Routing 用于基于键值的精确消息分发。
- Topics 用于基于主题的消息分发。
- Remote Procedure Call (RPC) 用于实现分布式服务调用。
- Publisher Confirms 用于确保消息的可靠发送。
重点是四种模式: Work Queues 、Publish/Subscribe 、Routing 、Topics
3.2 Hello World
生产者发送消息,消费者接收消息,用最简单的方式实现
官网说明参见下面超链接:
RabbitMQ tutorial - "Hello World!" — RabbitMQ
3.2.1 创建Java工程
引入依赖
3.2.2 发送消息
代码
其实很简单,你都学到了消息队列,那你肯定学过 JDBC Redis 缓存等等,都是创建工厂主机地址端口号吧啦吧啦........一样的。但是注意把connectionFactory.setHost换成你的
package com.Noctifloroused;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.115.191");
// 设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为 /
connectionFactory.setVirtualHost("/");
// 设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
// 设置连接密码;默认为guest
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建Channel(信道)
Channel channel = connection.createChannel();
// 声明(创建)Queue 队列
// queue 参数1:队列名称
// durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// arguments 参数5:队列其它参数
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = " First RabbitMQ Message";
// 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// 参数2:路由key,简单模式可以传递队列名称
// 参数3:配置信息
// 参数4:消息内容
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
执行后控制台会
3.2.3 验证
打开网页控制台
点到队列内部:
内部如下图
在点击获取信息:
拿到你刚刚发送的信息
3.3.4 接受消息
package com.Noctifloroused;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("192.168.200.100");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
// 参数1. consumerTag:标识
// 参数2. envelope:获取一些信息,交换机,路由key...
// 参数3. properties:配置信息
// 参数4. body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
// 参数1. queue:队列名称
// 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3. callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue",true,consumer);
}
}
你就会接收到
3.3.5 验证
全是0,因为消息被消费掉了,所以RabbitMQ服务器上没有。
3.3 Work Queues
官网地址 https://www.rabbitmq.com/tutorials/tutorial-two-java
有多个消费者,一个发布者,多个消费端监听同一个队列的时候,这种情况下,消费者是竞争关系。也就是谁抢到是谁的。
3.3.1 封装工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static final String HOST_ADDRESS = "192.168.200.100";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection con = ConnectionUtil.getConnection();
// amqp://guest@192.168.200.100:5672/
System.out.println(con);
con.close();
}
}
3.3.2 生产者代码
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
* exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
* arguments 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
/**
* 参数1:使用默认交换机(""
* 参数2:路由键(也就是队列名)为 QUEUE_NAME
* 参数3:消息属性使用默认值(null)
* 参数4:消息体为 body 字符串的字节数组形式
*/
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
System.out.println("已发送消息:" + body);
}
channel.close();
connection.close();
}
}
验证:
可以看到,是有的。
3.3.3 消费者代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:" +new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
/**
*
* @param consumerTag 标识消费者的标签,可以用来取消消费
* @param envelope 包含了消息的元信息,比如交换机名称、路由键等
* @param properties 包含了消息的属性,例如消息的优先级、内容类型等
* @param body 消息的实际内容,是一个字节数组
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 body:" +new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
效果如上图所示。
3.4 Publish/Subscribe
终于要引入我们的新角色:交换机了
生产者不是把消息直接发送到队列,而是发送到交换机
交换机接收消息,而如何处理消息取决于交换机的类型
交换机有如下3种常见类型
Fanout:广播,将消息发送给所有绑定到交换机的队列(本次发布订阅就是用的 Fanout 交换机)
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.4.1 模式说明
理解概念
Publish:发布,这里就是把消息发送到交换机上
Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系
工作机制:
消息发送到交换机上,就会以广播的形式发送给所有已绑定队列
监听队列的消费者,如果有多个,这些消费者还是竞争关系。
3.4.2 生产者端代码
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @projectName: RabbitMQ-test-framework
* @package: com.Noctifloroused.producer
* @className: Producer
* @author: kerwim
* @description: 生产者代码
* @date: 2024/9/6 20:32
* @version: 1.0
*/
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//定义交换机名称
String exchangeName = "test_fanout";
// 3、创建交换机
// 参数1. exchange:交换机名称
// 参数2. type:交换机类型
// DIRECT("direct"):定向
// FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"):通配符的方式
// HEADERS("headers"):参数匹配
// 参数3. durable:是否持久化
// 参数4. autoDelete:自动删除
// 参数5. internal:内部使用。一般false
// 参数6. arguments:其它参数
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 4、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 5、绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout,routingKey设置为""
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 6、发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}
可以看到代码创建的交换机
看到代码创建的队列
交换机可以看到绑定关系,Routing Key 是空是因为没用到
3.4.3 消费者代码
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
/**
* 参数 1 队列名
* 参数 2 是否持久化
* 参数 3 是否自动删除
* 参数 4 没有额外的声明参数
*/
channel.queueDeclare(queue2Name,true,false,false,null);
/**
* 定义了一个Consumer实例,用于处理消息队列中的消息
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
效果
6.4 总结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式本质上是绑定默认交换机
- 发布订阅模式绑定指定交换机
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
3.5 Routing
3.5.1 模式说明
前面学到的 发布订阅是广播,就是一个交换机发送消息到所有队列。但有时候我们不想把这条消息,发送给该交换机下绑定的所有队列中,那么路由模式就能够解决我们的需求。
交换机的类型= direct 路由模式,队列1 与 该交换机之间绑定的key=email,而队列2与该交换机绑定的key=phone,当生产者投递消息时,可以指定一个Routing key,用来识别这条消息最终存储到哪个队列中,只有消息的 Routing key 与Binding key 相同时,交换机才会把消息发给该队列。
direct路由方式
Direct类型交换机的路由算法是:要想一个消息能到达这个队列,需要BindingKey和RoutingKey正好能匹配得上。
如下图所示,虽然交换机绑定了两个队列,但投递消息时指定RoutingKey 为 email ,只有队列1与交换机的BindingKey匹配上了,队列2与交换机的BindingKey=phone,因此队列 2 是收不到的.
若消息的RoutingKey是其他字符串(不是phone 和 email),那么这条消息会被交换机直接遗弃,不会投递到任何队列中
同时,交换机也支持多重绑定。不同的队列可以用相同的Binding key与同一交换机绑定。如下图,当消息的Routing key为black时,消息将进入 Q1 和 Q2。
3.5.2 生产者代码
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error (routingkey 为error)
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//如果两个队列都绑定了error,则都会收到消息,你发送到的是 routing key,相同的队列
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
/**
* 发送消息
* 交换机名称
* 路由key
* 消息属性
*/
channel.basicPublish(exchangeName,"warning",null,message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}
运行后发现确实多了这个交换机
绑定了俩队列,其中队列 2 有三个 routing key
只要在你发送的代码里把 waning 改成 error,两个队列都能收到
3.5.3 消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
//启动消费
channel.basicConsume(queue1Name,true,consumer);
}
}
import com.Noctifloroused.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
/**
* queue2Name :队列名
* 参数2 持久化
* 参数3 排他性
* 参数4 自动删除
* 参数5 其他参数
*/
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
3.5.4 总结
现在发送 warning
可以看到
只有消费者 2 收到了。
如果改成 error
可以看到两个都都收到了
3.6 Topic
3.6.1 模式说明
topics 模式支持模糊匹配RoutingKey,就像是sql 中的 like子句模糊查询,而路由模式等同于sql中的where子句等值查询
topic
交换机背后的路由算法类似于 direct
交换,使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。
如上图,主题模式不能具有任意的 routingKey,必须由一个英文句点“.”分隔的字符串(分割符),比如 “ fruit.orange.mango ”。
通配符规则:
#:匹配零个或多个词,
*:匹配一个词
- 例子 1
可以保证的是虽然两条规则都匹配,但是消息并不会发送两次。
- 例子 2
- 例子 3
3.6.2 生产者代码
package com.Noctifloroused.Producer;
import com.Noctifloroused.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
// key1
channel.queueBind(queue1Name,exchangeName,"#.error");
// key2
channel.queueBind(queue1Name,exchangeName,"order.*");
// key3
channel.queueBind(queue2Name,exchangeName,"*.*");
// 发送消息到队列:order.info、 匹配 key2 和 key3
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
// channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
// 发送消息到队列:goods.info、 匹配 key3
// body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
// channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());
// 发送消息到队列:goods.info、 匹配 key1 key3
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
channel.close();
connection.close();
}
}
可以看到有这个交换机
也可以看到绑定得2 个队列 3 个 key 和他的匹配规则
3.6.3 消费者代码
package com.Noctifloroused.Consumer;
import com.Noctifloroused.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
package com.Noctifloroused.Consumer;
import com.Noctifloroused.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
3.6.4 总结
执行上面的 3 次测试后
队列 1 和 2 分别打印
队列 1
队列 2
再看一张官方的图
usa.news 和 usa weather 都只想 usa.#
也就是说美国的新闻和天气 都匹配 usa.#
#.news 是欧洲的新闻和美国的欣慰都匹配 #.news
....以此类推
3.7 RPC
官方图
远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一样,所以这不是典型的消息队列工作方式,暂时就不展开说明。
3.8 工作模式小结
- 直接发送到队列:
- 底层使用了默认交换机
- 经过交换机发送到队列
Fanout:没有Routing key直接绑定队列
Direct:通过Routing key绑定队列,消息发送到绑定的队列上。
一个交换机绑定一个队列:定点发送,
一个交换机绑定多个队列:广播发送
Topic:针对Routing key使用通配符
4、进阶篇
字数: 0 字 时长: 0 分钟
4.1 客户端整合 Springboot
4.1.1 创建工程
4.1.2 配置POM
我实在 module06-integration-springboot 的 pom 中写
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
4.1.3 配置消费者工程的 YML 和 主启动类
消费者
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
logging:
level:
com.atguigu.mq.listener.MyMessageListener: info
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQConsumerMainType {
public static void main(String[] args) {
SpringApplication.run(RabbitMQConsumerMainType.class, args);
}
}
生产者
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerMainType.class, args);
}
}
4.1.4 消费者端代码
package com.Noctifloroused.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
//写法 1 创建队列 交换机 监听
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue(value = QUEUE_NAME,declare = "true"), //指定队列并持久化
// exchange = @Exchange(value = EXCHANGE_DIRECT), //指定交换机
// key = {ROUTING_KEY} //指定 routing key
// ))
//写法 2 监听
@RabbitListener(queues = {QUEUE_NAME})
public void onMessage(String dataString,
Message message,
Channel channel) {
System.out.println("接收到消息:" + dataString);
}
}
因为是消费者端,其实一般只需要写法2,监听队列就可以了。
但是因为我运行过写法 1
点击 queue.order发现里面是绑定好了的
写法 1代码解释:
此Java注解配置RabbitMQ消息监听: @RabbitListener:标记方法为RabbitMQ消息消费者。 @QueueBinding:定义队列与交换机的绑定关系。 value = @Queue(declare = "true"):声明队列,declare=true表示自动创建队列。 exchange:指定关联的交换机名称。 key:设置路由键,用于匹配消息路由。
4.1.5 生产者端代码
其实 springboot 有一个非常好用的工具就是 rabbitTemplate,有点类似 redisTemplate,是 springboot 帮你写好的一个发送消息的工具类,直接@Autowired 引入就可以使用。
在测试类里面测试一下
package com.atguigu.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
//实际项目中肯定是要抽出来的
//交换机名称
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
//routing key
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 该函数testSendMessage()使用rabbitTemplate对象向RabbitMQ消息队列发送消息:
* EXCHANGE_DIRECT:指定直连交换机名称。
* ROUTING_KEY:设置路由键。
* "Hello rabbit!":作为消息内容发送给RabbitMQ。
*/
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello rabbit");
}
}
为什么这里没有队列名称?
因为知道那个 routing key 和交换机,不就知道队列了?
测试结果
发送端:
接收端
4.2 可靠性投递
4.2.1故障情况
1、故障(一)消息没有发送到MQ 服务器
这样走没问题,但是万一出现
那就是没有发送成功,
解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消
息队列服务器上,那就可以尝试重新发送
解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
2、故障(二)消息队列服务器宕机
消息存进去消息队列了,但是还没消费当前消息,队列 MQ 服务器直接宕机
解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
3、故障(三)消费端宕机
但是消费端出现问题,例如:宕机、抛异常等等
解决方案
• 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
• 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
4.2.2 解决故障一方案(一)
生产者端消息确认机制
4.2.2.1 创建 module
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerMainType.class, args);
}
}
4.2.2.2 yml 代码
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info
注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效
4.2.2.3 创建配置类
在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:
方法名 | 方法功能 | 所属接口 | 接口所属类 |
---|---|---|---|
confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。对,就是对 springboot 封装的那个RabbitTemplate进行增强。
原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:
设置组件调用的方法 | 所需对象类型 |
---|---|
setConfirmCallback() | ConfirmCallback接口类型 |
setReturnCallback() | ReturnCallback接口类型 |
API说明
①ConfirmCallback接口
这是RabbitTemplate内部的一个接口,源代码如下:
/**
* A callback for publisher confirmations.
*
*/
@FunctionalInterface
public interface ConfirmCallback {
/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
生产者端发送消息之后,回调confirm()方法
- ack参数值为true:表示消息成功发送到了交换机
- ack参数值为false:表示消息没有发送到交换机
②ReturnCallback接口
同样也RabbitTemplate内部的一个接口,源代码如下:
/**
* A callback for returned messages.
*
* @since 2.3
*/
@FunctionalInterface
public interface ReturnsCallback {
/**
* Returned message callback.
* @param returned the returned message and metadata.
*/
void returnedMessage(ReturnedMessage returned);
}
接口中的returnedMessage()方法仅仅在消息没有发送到队列时调用
ReturnedMessage类中主要属性含义如下:
属性名 | 类型 | 含义 |
---|---|---|
message | org.springframework.amqp.core.Message | 消息以及消息相关数据 |
replyCode | int | 应答码,类似于HTTP响应状态码 |
replyText | String | 应答码说明 |
exchange | String | 交换机名称 |
routingKey | String | 路由键名称 |
配置类代码
- 要点 1
加@Configuration注解,加入IOC容器、Component也行
- 要点 2
配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。
操作封装到了一个专门的void init()方法中。
为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。
关于@PostConstruct注解大家可以参照以下说明:
@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。
使用@PostConstruct注解的方法必须满足以下条件:
- 方法不能有任何参数。
- 方法必须是非静态的。
- 方法不能返回任何值。
当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* PostConstruct 注解的方法会在依赖注入完成后自动执行,用于初始化一些配置
* 这个类构造后 -> 配置好回调函数 ->再把配置好的回调函数注入到rabbitTemplate中,
* 这样当消息发送成功或失败时,就会调用对应的回调函数进行处理。也就是rabbitTemplate得到增强!
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 方法在消息发送成功时被调用,记录日志信息
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData={},ack={},cause={}",correlationData,ack,cause);
}
/**
* 方法在消息未被队列接收时调用,记录未投递成功消息的详细信息。
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息主体: " + new String(returnedMessage.getMessage().getBody()));
log.info("应答码: " + returnedMessage.getReplyCode());
log.info("描述:" + returnedMessage.getReplyText());
log.info("消息使用的交换器 exchange : " + returnedMessage.getExchange());
log.info("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());
}
}
发送消息代码
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello ~~~~~~");
}
}
通过调整代码,测试如下三种情况:
交换机正确、路由键正确
交换机正确、路由键不正确,无法发送到队列
把它故意写乱
结果:
他会告诉你 没有这个 route ,注意,这个是returnedMessage()方法执行的
交换机不正确,无法发送到交换机
把交换机改了
结果 ack 是 false,然后也很明确的告诉你,404 交换机不存在(不要纠结那个消息发送成功,那个我忘了改了)
4.2.3 解决故障一方案(二)
生产者端消息确认机制
这个方法不是很常用
- 生产者发送消息:
- 生产者发送消息到目标交换机。
- 消息路由:
- 目标交换机将消息路由到队列 A。
- 消费者处理消息:
- 消费者(业务)从队列 A 接收消息并处理。
- 发现状态值为 2:
- 当消费者处理消息时,发现状态值为 2,意味着消息处理失败。
- 更新本地消息表:
- 更新本地消息表的状态为 "消费失败"。
- 定时任务轮询检查消息发送状态:
- 定时任务定期查询本地消息表,查找状态为 "消费失败" 的消息。
- 发送备份消息:
- 如果找到未处理成功的消息,将消息发送到备份交换机。
- 消息路由到队列 B:
- 备份交换机将消息路由到队列 B。
- 消费者记录消息:
- 另一个消费者(记录)从队列 B 接收消息并记录日志或其他相关信息。
- 更新本地消息表:
- 更新本地消息表的状态为 "消费成功"。
这个流程的主要目的是确保消息得到妥善处理,即使在某些情况下出现故障也能保证消息不丢失。当消息处理失败时,系统会将消息发送到备份交换机,以便后续进行记录或重试。同时,本地消息表用于跟踪消息的状态,确保消息最终被正确处理。
4.2.3.1 创建备份交换机
- 创建交换机
注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键
- 创建备份交换机要绑定的队列
创建队列
- 绑定交换机
注意:这里是要和备份交换机绑定
针对备份队列创建消费端监听器
public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
public static final String QUEUE_NAME_BACKUP = "queue.order.backup";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
key = {""}
))
public void processMessageBackup(String dateString,
Message message,
Channel channel) {
log.info("BackUp: " + dateString);
}
4.2.3.2 设定备份关系
- 原交换机删除
为什么要删除原交换机?
因为交换机一旦创建不能修改只能删除重建,给原交换机绑定备份交换机必须得删掉重新建。
- 重新创建原交换机
- 原交换机重新绑定原队列
4.2.3.4 测试
- 启动消费者端
- 修改路由键为不存在的路由键、发送消息,但是路由键不对,于是转入备份交换机
4.2.4 解决故障二方案 (一)
消息队列服务器宕机导致内存中消息丢失
解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
其实我们能想到这个问题,MQ 工程师也是能想到的,默认就是开启了持久化的
论证:
点击这个注解看源码
点击这个 Queue
翻译:
Specifies if this queue should be durable. By default if queue name is provided it is durable.
指定此队列是否应持久。默认情况下,如果提供了队列名称,则它是持久的。
Specifies if this queue should be exclusive. By default if queue name is provided it is not exclusive.
指定此队列是否应为独占队列。默认情况下,如果提供了队列名称,则它不是独占的。
在点击这个:QueueBinding
点击交换机
我们可以看到
durable 默认是 true ,autodelete 默认是 false
4.2.5 解决故障三方案(一)
消费端消息确认
消费端宕机或抛异常导致消息没有成功被消费
消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
4.2.6.1 消费端消息确认
ACK是acknowledge的缩写,表示已确认
默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息
但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了
所以还是要修改成手动确认
创建消费端module
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQConsumerMainType {
public static void main(String[] args) {
SpringApplication.run(RabbitMQConsumerMainType.class, args);
}
}
4.2.6.2 消费端监听器
- 创建监听器类
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
@Component
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
public void processMessage(String dataString, Message message, Channel channel) {
}
}
在接收消息的方法上应用注解
// 修饰监听方法
@RabbitListener(
// 设置绑定关系
bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息
key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {
}
如果队列已创建
@RabbitListener(queues = {QUEUE_NAME})
public void processMessage(String dataString, Message message, Channel channel) {
}
接收消息方法内部逻辑
业务处理成功:手动返回ACK信息,表示消息成功消费
业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:
- 把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍
- 不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止
了解交付标签机制
先理解“deliveryTag:交付标签机制”
这个唯一标识就是deliveryTag(交付标签)
deliveryTag是一个64位整数
消息往消费端投递时,会携带交付标签
交付标签有啥用?
消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
如果交换机是Fanout模式,同一个消息广播到了不同队列,deliveryTag会重复吗?
不会,deliveryTag在Broker(MQ服务器)范围内唯一
- 相关API
下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口
①basicAck()方法
- 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
②basicNack()方法
- 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
③basicReject()方法
- 方法功能:根据指定的deliveryTag,对该消息表示拒绝
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
basicNack()和basicReject()有啥区别?
- basicNack()有批量操作
- basicReject()没有批量操作
完整代码实例
javaimport com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @Slf4j public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; // 修饰监听方法 @RabbitListener( // 设置绑定关系 bindings = @QueueBinding( // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"), // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"), // 配置路由键信息 key = {ROUTING_KEY} )) public void listen(String msg, Message message, Channel channel) throws IOException { // 1、获取当前消息的 deliveryTag 值备用 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 2、正常业务操作 log.info("消费端接收到消息内容:" + msg); // System.out.println(10 / 0); // 3、给 RabbitMQ 服务器返回 ACK 确认信息 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 4、获取信息,看当前消息是否曾经被投递过 //redelivered 为true:说明当前消息已经重复投递过一次了 // redelivered 为false:说明当前消息是第一次投递 Boolean redelivered = message.getMessageProperties().getRedelivered(); if (!redelivered) { // 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次 //对指定的消息标签deliveryTag执行拒绝操作(basicNack)。 //不重新入队被拒绝的消息(false)。 //确认自动回复到服务器,无需等待应答(true) channel.basicNack(deliveryTag, false, true); } else { // 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列 channel.basicReject(deliveryTag, false); } // reject 表示拒绝 // 辨析:basicNack()和 basicReject()方法区别 // basicNack()能控制是否批量操作 // basicReject()不能控制是否批量操作 // channel.basicReject(deliveryTag, true); } } }
要点总结
要点1:把消息确认模式改为手动确认
要点2:调用Channel对象的方法返回信息
- ACK:Acknowledgement,表示消息处理成功
- NACK:Negative Acknowledgement,表示消息处理失败
- Reject:拒绝,同样表示消息处理失败
要点3:后续操作
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据
流程梳理
4.3 消费端限流
4.3.1 削峰限流的好处
其实就是设置消费端一次取走多少个消息,这样就不会被并发压力冲倒消费端程序导致程序崩溃
4.3.2设置方式
非常简单,就是一个参数:prefetch
就是消费端配置文件(yaml)加一个属性,是不是很简单,我那时候还没学的时候听到什么微服务,什么削峰填谷,感觉好厉害的名词,其实就是MQ的削峰填谷就是在消费端配置一个属性。当然原理肯定是需要跟深层次的理解了。
spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息
假设我们先不加上prefetch属性
生产者代码
@Test
public void testSendMessage2() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello world" + i);
}
}
4.3.4 测试
消费端代码
@RabbitListener(queues = {QUEUE_NAME})
public void listen2(String msg, Message message, Channel channel) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("消费端接收到消息内容:" + msg);
TimeUnit.SECONDS.sleep(1);
// 给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);
}
启动后
可以看到代码发送的是 100 条消息
然后瞬间Ready清零,之所以慢慢返回 ack 是因为代码里做了睡眠。
当我们加上prefetch
可以看到属性是慢慢取的,这样就能够做到削峰填谷的效果
4.4 消息超时
4.4.1什么是消息超时
给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
我们可以从两个层面来给消息设定过期时间:
• 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
• 消息本身:给具体的某个消息设定过期时间
• 如果两个层面都做了设置,那么哪个时间短,哪个生效
4.4.2 队列层面设置
就是整个队列所有的消息固定时间内没有被消费直接删除
在 UI 上新建一个交换机机
新建一个队列 然后设置 x-message-ttl = 5000ms
代码
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT2,
ROUTING_KEY2,
"Hello ~~~~~~");
}
然后绑定他们.
测试:
不启动消费端程序
向设置了过期时间的队列中发送1条消息
等5秒后,看是否全部被过期删除
可以看到这里是有发送过来的,5s 没人消费后就直接归零了
4.4.3 消息层面设置
就是单独一条消息固定时间内没有被消费自动删除
新建一个队列,没有固定时间过期的队列,就是上文队列层面创建队列不添加x-message-ttl
- 生产端代码
@Test
public void testSendMessageTTL() {
// 1、创建消息后置处理器对象
MessagePostProcessor messagePostProcessor = (Message message) -> {
// 设定 TTL 时间,以毫秒为单位
message.getMessageProperties().setExpiration("5000");
return message;
};
// 2、发送消息
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello world!", messagePostProcessor);
}
这里说明一下:MessagePostProcessor
MessagePostProcessor 是一个函数式接口,通常用在消息中间件(如 RabbitMQ)的场景中,允许用户自定义对消息进行最后的加工或修改。
上述代码中的实现主要用于: 修改消息属性,特别设置了消息的过期时间(TTL)为 5000 毫秒。 接受一个 Message 对象并返回同一个对象,使该对象携带自定义的属性变化
这里需要 Lambda 表达式和函数式接口的理解。
测试
不启动消费端程序
向设置了过期时间的队列中发送1条消息
等5秒后,看是否全部被过期删除
4.5 死信和死信队列
4.5.1 概念
概念:当一个消息无法被消费,它就变成了死信
死信产生的原因大致有下面三种:
拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信
超时:消息到达超时时间未被消费
- 死信的处理方式大致有下面三种:
丢弃:对不重要的消息直接丢弃,不做处理
入库:把死信写入数据库,日后处理
监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)
这里我们演示死信队列
4.5.2 测试准备
4.5.2.1 创建死信交换机和死信队列
跟常规队列一样的,常规设定即可,没有特殊设置:
- 死信交换机:exchange.dead.letter.video
- 死信队列:queue.dead.letter.video
- 死信路由键:routing.key.dead.letter.video
创建后记得绑定交换机和路由键。
4.5.2.2 创建正常交换机和正常队列
正常交换机:exchange.normal.video
正常队列:queue.normal.video
正常路由键:routing.key.normal.video
注意创建正常队里的时候需要绑定死信队列的交换机,路由等等,交换机路由是必须绑定的。
参数key不用自己写,点一个就行了。
全部完成后点击正常队列可以看到相关属性
4.5.3 Java 代码常量声明
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";
public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";
public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
4.5.4 测试消费端拒收
- 生产端代码:
@Test
public void testSendMessageButReject() {
rabbitTemplate
.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况1:消息被拒绝");
}
- 消费端代码:
public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
// 监听正常队列,但是拒绝消息
log.info("★[normal]消息接收到,但我拒绝。");
//拒绝消息,并且不重新放回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info("★[dead letter]dataString = " + dataString);
log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
- 测试结果
先启动消费端,在启动生产端
能看到正常队列拒绝后进入死信交换机,死信交换机接收到了
4.5.5 测试消息数量超过队列上限
在前面有设置队列上限是十个。
- 生产者代码
@Test
public void testSendMultiMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况2:消息数量超过队列的最大容量" + i);
}
}
- 消费者代码
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info("★[dead letter]dataString = " + dataString);
log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal2(Message message, Channel channel) throws IOException {
// 监听正常队列
log.info("★[normal]消息接收到。");
//收到消息不在拒绝
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
测试结果
注意:你先启动生产者发送消息,然后在启动消费端消费消息
不然先启动消费端的话,你发送一条那边消费一条,就没有超出队列限制了。
4.5.6 测试超时未消费
前面创建队列的时候已经设置好了超时时间
- 消费端代码
正常发送一条消息即可,所以使用第一个例子的代码。
@Test
public void testSendMessageTimeout() {
rabbitTemplate
.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况3:消息超时");
}
然后注释掉监听正常队列的,只留一个监听死信队列的
等待10s后