RabbitMq入门
1.MQ
1.1同步调用
同步调用的优势是时效性强,等到结果后才返回。但也存在很多问题,以支付服务为例,如果扣减余额、更新交易流水、更新交易订单状态三步为同步调用,上一步执行完才能继续下一个业务。会存在以下问题:
- 拓展性差。耦合度高,后续如果要在当前的业务基础上增加功能,整个业务会越来越臃肿。
- 性能下降。同步调用,调用者需要等待整个支付业务执行完毕才能继续执行。并且随着业务功能的增加,整个支付业务的执行时间会越来越长。
- 级联失败。如果基于远程调用,调用用户服务、交易服务.......时,当其中某个服务出现故障时,整个支付服务都会回滚,交易失败。
要解决这些问题,需要使用异步调用的方式来替代同步调用。
1.2异步调用
异步调用方式就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,即原来的调用方
- 消息Broker:管理、暂存、转发消息
- 消息接收者:接收和处理消息的人,即原来的服务提供方
异步调用的发送方不用同步调用接收者的业务接口,而是发送一条消息到消息Broker。接收者根据需求从Broker中订阅消息。每当发送发发送消息后,接受者都能获取消息并处理。这样发送方和接收方就能完全解耦。
还是以支付业务为例,使用异步调用后,除了必要的用户服务(扣除余额,更新支付流水),其他的业务都可以通过订阅的方式,自行处理各自的业务逻辑。在这个基础上,如果有新的业务加进来,也只需要让新业务订阅消息即可。
不管后期增加了多少消费者,对于支付业务而言,都是执行扣除余额、更新支付流水、发送消息这三个步骤,业务耗时不变,且大大降低了业务之间的耦合性。另外,即使其它业务执行过程中出现了故障,也不会影响到支付业务。
异步调用的优势:
- 解除耦合,拓展性强
- 无需等待,性能好
- 故障隔离,避免级联失败
- 缓存消息,流量削峰填谷
异步调用的缺点:
-
不能立即得到调用结果,时效性差
-
不确定下游业务执行是否成功
-
业务安全依赖Broker的可靠性、安全性和性能
-
架构复杂,后期维护和调试麻烦
1.3MQ技术选型
MQ(MessageQueue),中文是消息队列,也就是异步调用中的Broker。
目前比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。
2.RabbitMQ
2.1安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:Messaging that just works — RabbitMQ
这里以3.8版本为例
- 将下载的mq.tar包上传到Linux服务器上,使用docker load命令导入mq镜像:
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# ll
-rw-r--r-- 1 root root 243M 2月 19 21:25 mq.tar
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker load -i mq.tar
a70daca533d0: Loading layer 75.16MB/75.16MB
1d76618777b7: Loading layer 2.494MB/2.494MB
3c4641134cb2: Loading layer 122MB/122MB
925d7d7de53e: Loading layer 345.1kB/345.1kB
c7a6248661bc: Loading layer 19.84MB/19.84MB
6ba57a07380e: Loading layer 4.608kB/4.608kB
57de3e20948c: Loading layer 1.536kB/1.536kB
56d9e5402fdd: Loading layer 17.41kB/17.41kB
ce6be100ab60: Loading layer 34.02MB/34.02MB
Loaded image: rabbitmq:3.8-management
- 运行镜像
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker network create testnet
767e702a12a708c64bafb2bb7f9b4feeb693d620a74a9cc5c52c2c869bb0110a
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker run \
> -e RABBITMQ_DEFAULT_USER=root \ #设置用户名
> -e RABBITMQ_DEFAULT_PASS=123456 \ #设置密码
> -v mq-plugins:/plugins \ #挂载数据卷mq-plugins到/plugins目录下
> --name mq \ #容器名
> --hostname mq \ #主机名
> -p 15672:15672 \ #端口映射
> -p 5672:5672 \ #端口映射
> --network testnet \ #设置网桥
> -d \
> rabbitmq:3.8-management
46f6028a906ba3376d3eb416af5ec954e1872179691d08cd9320b5aab28f22dd
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
46f6028a906b rabbitmq:3.8-management "docker-entrypoint.s…" 3 minutes ago Up 3 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp mq
可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
3.浏览器访问 http://ip:15672 ,输入用户账号密码即可登录RabbitMQ的管理页面
RabbitMQ对应的架构图:
- publisher:生产者,即发送消息的一方
- consumer:消费者,接收消息的一方
- queue:队列,负责存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责路由和转发消息,没有存储消息的能力。生产者发送的消息由交换机决定投递到哪个队列。
- virtual-host:虚拟主机,起到隔离数据的作用。每个虚拟主机相互独立,有各自不同的交换机、队列
以上这些东西都可以在RabbitMQ的管理控制台操作。
2.2收发消息(交换机、队列)
例子:在RabbitMQ控制台完成收发消息
- 新建队列hello.queue1和hello.queue2
- 向默认的amp.fanout交换机发送一条消息
- 查看消息是否到达两个队列
(1)在控制台Queues页面创建指定的队列hello.queue1和hello.queue2
(2)创建之后如图显示
(3)交换机和队列需要进行绑定,发送到交换机的消息才能理由到对应的队列
点击Exchanges
选项卡,点击amq.fanout
交换机,进入交换机详情页
(4)然后点击Bindings
菜单,在表单中填写要绑定的队列名称:
(5)绑定后显示如下
(6)在绑定的队列中也可以看到绑定交换机信息
(7)在交换机中发送消息测试
(8)消息分别成功到达绑定的两个队列中
(9)进入队列中,可以查看接收的消息
这个时候如果有消费者监听了MQ的hello.queue1
或hello.queue2
队列,自然就能接收到消息了。
2.3数据隔离
2.3.1用户管理
点击Admin选项卡,可以看到RabbitMQ控制台的用户管理页面。在这里可以创建用户、创建虚拟主机等:
- Name:用户名
- Tags:权限,administrator代表有超级管理员权限
- Can access virtual hosts:可以访问的virtual host,这里默认是 "/"
- Has password:是否有密码
为了节约成本,一般来说通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。为了避免相互干扰,会利用virtual host
的隔离特性,将不同的项目隔离,一般会做两件事:
- 给每个项目创建独立的运维账号(用户),将管理权限分离
- 给每个项目创建不同的
virtual host
,将每个项目的数据隔离
2.3.2Virtual Host
点击右边的Virtual Hosts选项,可以管理,创建MQ的虚拟主机,或者设置用户对虚拟主机的访问权限。
例子:新建用户,为该用户创建一个虚拟主机,测试不同的虚拟主机之间的数据隔离现象
(1)在Users页面创建新用户test
(2)退出并登录新创建的用户test,在admin的Virtual Host页面创建虚拟主机
创建完后如图:
由于是登录test用户后创建的/test-host,因此回到Users菜单,会发现当前用户已经具备了对这个虚拟主机的访问权限:
(3)点击右上角的Virtual host下拉菜单,选择刚才创建的虚拟主机。然后再次查看queues选项卡,可以发现之间创建的队列已经看不到了:
这就是基于virtual host
的隔离效果。
3.SpringAMQP
AMQP:高级消息队列协议(Advanced Message Queuing Protocol),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,不受客户端中间件的不同产品、不同开发语言等的条件限制。
RabbitMQ采用了AMQP协议,因此具备了跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以和RabbitMQ交互,并且RabbitMQ官方也提供了各种不同语言的客户端。
Spring AMQP:Spring AMQP是基于AMQP的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
由于RabbitMQ提供的Java客户端编码相对复杂(https://www.rabbitmq.com/getstarted.html),一般生产环境下会更多结合Spring来使用。而Spring的官方基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP的官方地址
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
3.1创建项目
创建一个项目,项目结构如下,包括三部分:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
在mq-demo这个父工程中,已经配置好了SpringAMQP的相关的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--Jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
</dependencies>
</project>
因此,子工程中就可以直接使用SpringAMQP了。
3.2快速入门
例子:利用控制台创建队列simple.queue
- 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
正常的情况下都是通过交换机发送消息到队列,这里为了测试方便,直接向队列发送消息,跳过交换机。这种模式一般测试使用,很少在生产中使用。
首先在控制台创建simple.queue队列
3.2.1消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx #主机IP地址
port: 5672 #端口
virtual-host: /test-host #虚拟主机
username: test #用户名
password: 123456 #密码
在publisher服务中编写测试类SpringAMQPTest,利用SpringAMQP提供的RabbitTemplate工具类测试发送消息:
package com.publisher;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
//队列名称
String queueName = "simple.queue";
//消息
String message = "Hello,Spring AMQP!";
//发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
可以在控制台看到已发送的消息:
3.2.2消息接收
SpringAMQP提供声明式的消息监听,只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法.
(1)在consumer服务的配置文件中同样配置RabbitMQ
(2)编写监听类
package com.consumer.listeners;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到simple.queue消息===>【" + msg + "】");
}
}
启动项目,日志显示:
3.2.3总结
SpringAMQP收发消息:
- 引入spring-boot-starter-amqp依赖
- 配置rabbitmq服务端信息
- 利用RabbitTemplate发送消息
- 利用@RabbitListener注解声明要监听的队列,监听消息
3.3WorkQueues模型
Work Queues,任务模型,简单地说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
创建一个队列work.queue,在publisher服务一秒发送50条消息到work.queue,在consumer服务中定义两个消息监听者,都监听work.queue队列。让消费者1每秒可以处理50条消息,消费者每秒处理5条消息。
(1)publisher服务定义测试方法
@Test
public void testWorkQueue() throws InterruptedException {
//队列名称
String queueName = "work.queue";
for (int i = 0; i < 50; i++) {
//消息
String message = "Hello, worker, message_" + i;
//发送消息
rabbitTemplate.convertAndSend(queueName, message);
Thread.sleep(20);
}
}
(2)consumer服务提供两个消费者,分别有不同的消费速度
package com.consumer.listeners;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MQListener {
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到 work.queue消息===>【" + msg + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2 收到 work.queue消息......【" + msg + "】");
Thread.sleep(200);
}
}
测试结果:
消费者2 收到 work.queue消息......【Hello, worker, message_0】
消费者1 收到 work.queue消息===>【Hello, worker, message_1】
消费者1 收到 work.queue消息===>【Hello, worker, message_3】
消费者1 收到 work.queue消息===>【Hello, worker, message_5】
消费者2 收到 work.queue消息......【Hello, worker, message_2】
消费者1 收到 work.queue消息===>【Hello, worker, message_7】
消费者1 收到 work.queue消息===>【Hello, worker, message_9】
消费者1 收到 work.queue消息===>【Hello, worker, message_11】
消费者1 收到 work.queue消息===>【Hello, worker, message_13】
消费者2 收到 work.queue消息......【Hello, worker, message_4】
消费者1 收到 work.queue消息===>【Hello, worker, message_15】
消费者1 收到 work.queue消息===>【Hello, worker, message_17】
消费者1 收到 work.queue消息===>【Hello, worker, message_19】
消费者2 收到 work.queue消息......【Hello, worker, message_6】
消费者1 收到 work.queue消息===>【Hello, worker, message_21】
消费者1 收到 work.queue消息===>【Hello, worker, message_23】
消费者1 收到 work.queue消息===>【Hello, worker, message_25】
消费者2 收到 work.queue消息......【Hello, worker, message_8】
消费者1 收到 work.queue消息===>【Hello, worker, message_27】
消费者1 收到 work.queue消息===>【Hello, worker, message_29】
消费者1 收到 work.queue消息===>【Hello, worker, message_31】
消费者2 收到 work.queue消息......【Hello, worker, message_10】
消费者1 收到 work.queue消息===>【Hello, worker, message_33】
消费者1 收到 work.queue消息===>【Hello, worker, message_35】
消费者1 收到 work.queue消息===>【Hello, worker, message_37】
消费者1 收到 work.queue消息===>【Hello, worker, message_39】
消费者2 收到 work.queue消息......【Hello, worker, message_12】
消费者1 收到 work.queue消息===>【Hello, worker, message_41】
消费者1 收到 work.queue消息===>【Hello, worker, message_43】
消费者1 收到 work.queue消息===>【Hello, worker, message_45】
消费者2 收到 work.queue消息......【Hello, worker, message_14】
消费者1 收到 work.queue消息===>【Hello, worker, message_47】
消费者1 收到 work.queue消息===>【Hello, worker, message_49】
消费者2 收到 work.queue消息......【Hello, worker, message_16】
消费者2 收到 work.queue消息......【Hello, worker, message_18】
消费者2 收到 work.queue消息......【Hello, worker, message_20】
消费者2 收到 work.queue消息......【Hello, worker, message_22】
消费者2 收到 work.queue消息......【Hello, worker, message_24】
消费者2 收到 work.queue消息......【Hello, worker, message_26】
消费者2 收到 work.queue消息......【Hello, worker, message_28】
消费者2 收到 work.queue消息......【Hello, worker, message_30】
消费者2 收到 work.queue消息......【Hello, worker, message_32】
消费者2 收到 work.queue消息......【Hello, worker, message_34】
消费者2 收到 work.queue消息......【Hello, worker, message_36】
消费者2 收到 work.queue消息......【Hello, worker, message_38】
消费者2 收到 work.queue消息......【Hello, worker, message_40】
消费者2 收到 work.queue消息......【Hello, worker, message_42】
消费者2 收到 work.queue消息......【Hello, worker, message_44】
消费者2 收到 work.queue消息......【Hello, worker, message_46】
消费者2 收到 work.queue消息......【Hello, worker, message_48】
可以看到虽然两个消费者的速度不一样,但是仍然每人都消费了25条消息:
- 消费者1很快处理完25条消息
- 消费者2却在缓慢处理自己的25条消息
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致一个消费者空闲,另一个消费者忙得飞起。这显然是不合理的。
3.3.1消费者消息推送限制
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积。
我们可以通过修改consumer的配置文件,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一消息
再次测试,结果如下:
消费者1 收到 work.queue消息===>【Hello, worker, message_0】
消费者2 收到 work.queue消息......【Hello, worker, message_1】
消费者1 收到 work.queue消息===>【Hello, worker, message_2】
消费者1 收到 work.queue消息===>【Hello, worker, message_3】
消费者1 收到 work.queue消息===>【Hello, worker, message_4】
消费者1 收到 work.queue消息===>【Hello, worker, message_5】
消费者1 收到 work.queue消息===>【Hello, worker, message_6】
消费者1 收到 work.queue消息===>【Hello, worker, message_7】
消费者1 收到 work.queue消息===>【Hello, worker, message_8】
消费者2 收到 work.queue消息......【Hello, worker, message_9】
消费者1 收到 work.queue消息===>【Hello, worker, message_10】
消费者1 收到 work.queue消息===>【Hello, worker, message_11】
消费者1 收到 work.queue消息===>【Hello, worker, message_12】
消费者1 收到 work.queue消息===>【Hello, worker, message_13】
消费者1 收到 work.queue消息===>【Hello, worker, message_14】
消费者1 收到 work.queue消息===>【Hello, worker, message_15】
消费者1 收到 work.queue消息===>【Hello, worker, message_16】
消费者1 收到 work.queue消息===>【Hello, worker, message_17】
消费者2 收到 work.queue消息......【Hello, worker, message_18】
消费者1 收到 work.queue消息===>【Hello, worker, message_19】
消费者1 收到 work.queue消息===>【Hello, worker, message_20】
消费者1 收到 work.queue消息===>【Hello, worker, message_21】
消费者1 收到 work.queue消息===>【Hello, worker, message_22】
消费者1 收到 work.queue消息===>【Hello, worker, message_23】
消费者1 收到 work.queue消息===>【Hello, worker, message_24】
消费者1 收到 work.queue消息===>【Hello, worker, message_25】
消费者1 收到 work.queue消息===>【Hello, worker, message_26】
消费者2 收到 work.queue消息......【Hello, worker, message_27】
消费者1 收到 work.queue消息===>【Hello, worker, message_28】
消费者1 收到 work.queue消息===>【Hello, worker, message_29】
消费者1 收到 work.queue消息===>【Hello, worker, message_30】
消费者1 收到 work.queue消息===>【Hello, worker, message_31】
消费者1 收到 work.queue消息===>【Hello, worker, message_32】
消费者1 收到 work.queue消息===>【Hello, worker, message_33】
消费者2 收到 work.queue消息......【Hello, worker, message_34】
消费者1 收到 work.queue消息===>【Hello, worker, message_35】
消费者1 收到 work.queue消息===>【Hello, worker, message_36】
消费者1 收到 work.queue消息===>【Hello, worker, message_37】
消费者1 收到 work.queue消息===>【Hello, worker, message_38】
消费者1 收到 work.queue消息===>【Hello, worker, message_39】
消费者1 收到 work.queue消息===>【Hello, worker, message_40】
消费者1 收到 work.queue消息===>【Hello, worker, message_41】
消费者2 收到 work.queue消息......【Hello, worker, message_42】
消费者1 收到 work.queue消息===>【Hello, worker, message_43】
消费者1 收到 work.queue消息===>【Hello, worker, message_44】
消费者1 收到 work.queue消息===>【Hello, worker, message_45】
消费者1 收到 work.queue消息===>【Hello, worker, message_46】
消费者1 收到 work.queue消息===>【Hello, worker, message_47】
消费者1 收到 work.queue消息===>【Hello, worker, message_48】
消费者2 收到 work.queue消息......【Hello, worker, message_49】
3.3.2总结
Work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,实现能者多劳
3.4交换机
真正生产环境都会经过exchange(交换机)来发送消息,而不是直接发送到队列,交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.4.1Fanout交换机
Fanout,英文翻译是扇出。Fanout Exchange 会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。
(1)可以有多个队列
(2)每个队列都要绑定到 Exchange
(3)生产者发送的消息,只能发送到交换机
(4)交换机把消息发送给绑定过的所有队列
(5)订阅队列的消费者都能拿到消息
例子
在RabbitMQ控制台中声明队列fanout.queue1和fanout.queue2,声明交换机test.fanout,将两个队列与该交换机绑定。在consumer服务中,编写两个消费者方法,分别监听两个队列,在publisher中写测试方法,向test.fanout发送消息
- 创建队列和交换机
- 绑定队列到交换机
3.消费者consumer服务中创建监听方法,分别监听两个队列
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
log.info("消费者 1 收到 fanout.queue1 消息 ===>{}", msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
log.info("消费者 2 收到 fanout.queue2 消息 ===>{}", msg);
}
4.生产者服务向test.fanout交换机发送一条消息
@Test
public void testSendFanout() {
String exchangeName = "test.fanout";
String message = "hello,every body!!!";
rabbitTemplate.convertAndSend(exchangeName, null, message);
}
监听的两个队列都收到了test.fanout交换机的同一条消息:
总结
交换机的作用:
- 接受publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- Fanout类型的交换机会将消息路由到每个绑定的队列
3.4.2Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。Direct Exchange会将接收到的消息根据规则路由到指定的Queue,由此称为定向路由
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Bindingkey
与消息的Routing key
完全一致,才会接收到消息
例子
在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2,声明交换机test.direct,将两队列和其绑定。在consumer服务中,编写两个消费方法,分别监听两队列。在publisher中编写测试方法,利用不用的RoutingKey向test.direct交换机发送消息
1.创建队列和交换机
2.交换机绑定队列,指定RoutingKey
3.consumer服务创建两个监听方法,分别监听两个队列
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者 1 收到 direct.queue1 消息 ===>【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.err.println("消费者 2 收到 direct.queue1 消息 ===>【" + msg + "】");
}
4.publisher服务向test.direct交换机发送消息
@Test
public void testSendDirect() {
String exchangeName = "test.direct";
String redMsg = "红色警报!!";
String blueMsg = "蓝色警报!!";
String yellowMsg = "黄色警报!!";
rabbitTemplate.convertAndSend(exchangeName, "red", redMsg);
rabbitTemplate.convertAndSend(exchangeName, "blue", blueMsg);
rabbitTemplate.convertAndSend(exchangeName, "yellow", yellowMsg);
}
两个队列都指定了bindingkey=red,因此都能收到routingkey=red的消息,其他的key则只有各自对应bindingkey的队列才能收到
3.4.3Topic交换机
Topic类型的交换机和Direct交换机相比,相同之处是它们都可以根据 RoutingKey
把消息路由到不同的队列。只不过Topic类型的交换机可以让队列在绑定 Bindingkey
的时候使用通配符。
BindingKey
一般都是由多个或者一个单词组成,多个单词之间以.
分割,例如item.insert
通配符规则:
#
匹配0个或者多个单词*
只匹配一个单词
例如:
item.#
可以匹配item.spu.insert
或者item.spu
item.*
只能匹配item.spu
例子
在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2,声明交换机test.topic,将两队列和其绑定。在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2,在publisher中编写测试方法,利用不同的RoutingKey向test.topic交换机发送消息。
1.创建队列和交换机,并绑定队列到交换机
2.consumer服务创建两个方法,分别监听两队列
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
System.out.println("消费者 1 收到 topic.queue1 消息 ===>【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
System.err.println("消费者 2 收到 topic.queue1 消息 ===>【" + msg + "】");
}
3.publisher服务向test.topict交换机发送消息
@Test
public void testSendTopic() {
String exchangeName = "test.topic";
String msg = "华尔街金融风暴!";
String msg2 = "今天多云转晴";
String msg3 = "生育率再创历史新低!";
rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
rabbitTemplate.convertAndSend(exchangeName, "USA.news", msg2);
rabbitTemplate.convertAndSend(exchangeName, "china.news", msg3);
}
总结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
.
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
3.5声明队列和交换机
在实际开发中,队列和交换机不可能都在控制台上手动创建,一但业务需要的队列有数十条甚至几百条,手工创建管理的方式极可能出错。
推荐的做法是由程序启动的时候检查队列和交换机是否存在,如果不存在则自动创建。
3.5.1基本API
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
3.5.2例子
一般来说,交换机、队列以及其关系的声明都在consumer服务创建。
1.consumer服务中编写一个config类,声明交换机、队列,并将两个进行绑定
package com.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
//创建一个Fanout类型的交换机,命名为test.fanout
return ExchangeBuilder.fanoutExchange("test.fanout2").build();
// 或者 return new FanoutExchange("test.fanout2");
}
@Bean
public Queue fanoutQueue3() {
//创建一个队列,命名为fanout.queue3
return QueueBuilder.durable("fanout.queue3").build();
//或者 return new Queue("fanout.queue3");
}
@Bean
public Binding fanoutBinding() {
//将fanout.queue3队列绑定到test.fanout交换机中
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
//当代码执行fanoutQueue3()方法时候,spring首先会去查看容器中是否已经存在该Bean,
//如果不存在才会调用方法创建。fanoutExchange()同理
}
//和下面的写法同理
// @Bean
// public Binding fanoutBinding(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
// return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
// }
}
重启consumer服务,可以看到自动创建了指定的队列和交换机,并将其进行了绑定:
3.5.3基于注解声明
@Bean的方式创建比较麻烦,如果要创建的是direct类型的交换机,要绑定多个RoutingKey,只能通过如下的方式一个一个绑定,比较麻烦:
@Bean
public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
//指定RoutingKey
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
@Bean
public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
//指定RoutingKey
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
...
除了这种基于@Bean的方式来创建队列交换机,SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
//direct模式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3", durable = "true"),
exchange = @Exchange(name = "teset.direct2", type = ExchangeTypes.DIRECT),
key = {"red", "blue", "yellow"}
))
public void listenDirectQueue3(String msg) {
System.out.println("消费者3收到了direct.queue3的消息 ===>" + msg);
}
总结
- 声明队列、交换机、绑定关系的Bean:
- Queue
- FanoutExchange,DirectExchange,TopicExchange
- Binding
- 基于@RabbitListener注解声明队列和交换机有哪些常见注解:
- @Queue
- @Exchange
3.6消息转换器
3.6.1测试默认转换器
创建一个队列 queue.object,并往该队列发送Object类型的消息:
@Test
public void testSendObject() {
Map<String, Object> msg = new HashMap<>();
msg.put("name", "jack");
msg.put("age", 21);
rabbitTemplate.convertAndSend("queue.object", msg);
}
可以看到队列中存储的内容是jdk自动序列化后的字节码:
Spring对消息对象的处理是由org.spirngframework.amqp.support.converter.MessageConverter来处理的。而默认实现的是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
存在如下问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
3.6.2配置JSON转换器
建议采用JSON序列化代替默认的JDK序列化,要做两件事:
(1)在publisher、consumer服务中都引入Jackson依赖
<!--Jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意:如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
(2)配置消息转换器,在publisher
和consumer
两个服务的启动类中各自添加一个Bean即可:
//import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
//import org.springframework.amqp.support.converter.MessageConverter;
@Bean
public MessageConverter jacksonMessageConvertor() {
return new Jackson2JsonMessageConverter();
}
3.6.3消费者接收Object
再次向queue.object队列发送消息,可以看到Object类型的消息成功转成json类型:
在消费者服务中接收,可以指定接收的消息类型:
@RabbitListener(queues = "queue.object")
public void listenQueueObject(Map<String, Object> msg) {
System.out.println("消费者收到了queue.object的消息===>" + msg);
}
评论区