标签搜索

目 录CONTENT

文章目录

『聚合』 RabbitMQ入门

沙漠渔
2024-03-17 02:20:23 / 0 评论 / 0 点赞 / 102 阅读 / 25,985 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2024-03-17,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RabbitMq入门

image-20240218215925296

1.MQ

1.1同步调用

同步调用的优势是时效性强,等到结果后才返回。但也存在很多问题,以支付服务为例,如果扣减余额、更新交易流水、更新交易订单状态三步为同步调用,上一步执行完才能继续下一个业务。会存在以下问题:

  1. 拓展性差。耦合度高,后续如果要在当前的业务基础上增加功能,整个业务会越来越臃肿。
  2. 性能下降。同步调用,调用者需要等待整个支付业务执行完毕才能继续执行。并且随着业务功能的增加,整个支付业务的执行时间会越来越长。
  3. 级联失败。如果基于远程调用,调用用户服务、交易服务.......时,当其中某个服务出现故障时,整个支付服务都会回滚,交易失败。

要解决这些问题,需要使用异步调用的方式来替代同步调用。

1.2异步调用

异步调用方式就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,即原来的调用方
  • 消息Broker:管理、暂存、转发消息
  • 消息接收者:接收和处理消息的人,即原来的服务提供方

异步调用的发送方不用同步调用接收者的业务接口,而是发送一条消息到消息Broker。接收者根据需求从Broker中订阅消息。每当发送发发送消息后,接受者都能获取消息并处理。这样发送方和接收方就能完全解耦。

还是以支付业务为例,使用异步调用后,除了必要的用户服务(扣除余额,更新支付流水),其他的业务都可以通过订阅的方式,自行处理各自的业务逻辑。在这个基础上,如果有新的业务加进来,也只需要让新业务订阅消息即可。

不管后期增加了多少消费者,对于支付业务而言,都是执行扣除余额、更新支付流水、发送消息这三个步骤,业务耗时不变,且大大降低了业务之间的耦合性。另外,即使其它业务执行过程中出现了故障,也不会影响到支付业务。

异步调用的优势:

  • 解除耦合,拓展性强
  • 无需等待,性能好
  • 故障隔离,避免级联失败
  • 缓存消息,流量削峰填谷

异步调用的缺点:

  • 不能立即得到调用结果,时效性差

  • 不确定下游业务执行是否成功

  • 业务安全依赖Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

1.3MQ技术选型

MQ(MessageQueue),中文是消息队列,也就是异步调用中的Broker。

目前比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka
RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。

2.RabbitMQ

2.1安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:Messaging that just works — RabbitMQ

这里以3.8版本为例

  1. 将下载的mq.tar包上传到Linux服务器上,使用docker load命令导入mq镜像:
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# ll
-rw-r--r-- 1 root root 243M 2月  19 21:25 mq.tar
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker load -i mq.tar
a70daca533d0: Loading layer  75.16MB/75.16MB
1d76618777b7: Loading layer  2.494MB/2.494MB
3c4641134cb2: Loading layer    122MB/122MB
925d7d7de53e: Loading layer  345.1kB/345.1kB
c7a6248661bc: Loading layer  19.84MB/19.84MB
6ba57a07380e: Loading layer  4.608kB/4.608kB
57de3e20948c: Loading layer  1.536kB/1.536kB
56d9e5402fdd: Loading layer  17.41kB/17.41kB
ce6be100ab60: Loading layer  34.02MB/34.02MB
Loaded image: rabbitmq:3.8-management
  1. 运行镜像
[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker network create testnet
767e702a12a708c64bafb2bb7f9b4feeb693d620a74a9cc5c52c2c869bb0110a

[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker run \
> -e RABBITMQ_DEFAULT_USER=root \			#设置用户名
> -e RABBITMQ_DEFAULT_PASS=123456 \			#设置密码
> -v mq-plugins:/plugins \					#挂载数据卷mq-plugins到/plugins目录下
> --name mq \								#容器名
> --hostname mq \							#主机名
> -p 15672:15672 \							#端口映射
> -p 5672:5672 \							#端口映射
> --network testnet \						#设置网桥
> -d \
> rabbitmq:3.8-management
46f6028a906ba3376d3eb416af5ec954e1872179691d08cd9320b5aab28f22dd

[root@iZ7xvgwzig5m0o0rnf19bkZ ~]# docker ps
CONTAINER ID   IMAGE                     COMMAND                   CREATED         STATUS         PORTS                                                                                                                                                 NAMES
46f6028a906b   rabbitmq:3.8-management   "docker-entrypoint.s…"   3 minutes ago   Up 3 minutes   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   mq

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

3.浏览器访问 http://ip:15672 ,输入用户账号密码即可登录RabbitMQ的管理页面

image-20240219220435572

image.png

RabbitMQ对应的架构图:

image.png

  • publisher:生产者,即发送消息的一方
  • consumer:消费者,接收消息的一方
  • queue:队列,负责存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责路由和转发消息,没有存储消息的能力。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual-host:虚拟主机,起到隔离数据的作用。每个虚拟主机相互独立,有各自不同的交换机、队列

以上这些东西都可以在RabbitMQ的管理控制台操作。

2.2收发消息(交换机、队列)

例子:在RabbitMQ控制台完成收发消息

  • 新建队列hello.queue1和hello.queue2
  • 向默认的amp.fanout交换机发送一条消息
  • 查看消息是否到达两个队列

(1)在控制台Queues页面创建指定的队列hello.queue1和hello.queue2

image-20240219223206319

(2)创建之后如图显示

image-20240219223120269

(3)交换机和队列需要进行绑定,发送到交换机的消息才能理由到对应的队列

点击Exchanges选项卡,点击amq.fanout交换机,进入交换机详情页

image-20240219224016202

(4)然后点击Bindings菜单,在表单中填写要绑定的队列名称:

image-20240219224205633

(5)绑定后显示如下

image-20240219224202601

(6)在绑定的队列中也可以看到绑定交换机信息

image-20240219224419722

(7)在交换机中发送消息测试

image-20240219224640378

(8)消息分别成功到达绑定的两个队列中

image-20240219224824199

(9)进入队列中,可以查看接收的消息

image-20240219225005089

这个时候如果有消费者监听了MQ的hello.queue1hello.queue2队列,自然就能接收到消息了。

2.3数据隔离

2.3.1用户管理

点击Admin选项卡,可以看到RabbitMQ控制台的用户管理页面。在这里可以创建用户、创建虚拟主机等:

image-20240220212805086

  • Name:用户名
  • Tags:权限,administrator代表有超级管理员权限
  • Can access virtual hosts:可以访问的virtual host,这里默认是 "/"
  • Has password:是否有密码

为了节约成本,一般来说通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。为了避免相互干扰,会利用virtual host的隔离特性,将不同的项目隔离,一般会做两件事:

  1. 给每个项目创建独立的运维账号(用户),将管理权限分离
  2. 给每个项目创建不同的virtual host,将每个项目的数据隔离

2.3.2Virtual Host

点击右边的Virtual Hosts选项,可以管理,创建MQ的虚拟主机,或者设置用户对虚拟主机的访问权限。

image-20240220214136999

例子:新建用户,为该用户创建一个虚拟主机,测试不同的虚拟主机之间的数据隔离现象

(1)在Users页面创建新用户test

image-20240220214734935

image-20240220214815343

(2)退出并登录新创建的用户test,在admin的Virtual Host页面创建虚拟主机

image-20240220215414353

创建完后如图:

image-20240220215438624

由于是登录test用户后创建的/test-host,因此回到Users菜单,会发现当前用户已经具备了对这个虚拟主机的访问权限:

image-20240220215920209

(3)点击右上角的Virtual host下拉菜单,选择刚才创建的虚拟主机。然后再次查看queues选项卡,可以发现之间创建的队列已经看不到了:

image-20240220220312820

这就是基于virtual host的隔离效果。

3.SpringAMQP

AMQP:高级消息队列协议(Advanced Message Queuing Protocol),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,不受客户端中间件的不同产品、不同开发语言等的条件限制。

RabbitMQ采用了AMQP协议,因此具备了跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以和RabbitMQ交互,并且RabbitMQ官方也提供了各种不同语言的客户端。

Spring AMQP:Spring AMQP是基于AMQP的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

由于RabbitMQ提供的Java客户端编码相对复杂(https://www.rabbitmq.com/getstarted.html),一般生产环境下会更多结合Spring来使用。而Spring的官方基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP的官方地址

SpringAMQP提供了三个功能:

  1. 自动声明队列、交换机及其绑定关系
  2. 基于注解的监听器模式,异步接收消息
  3. 封装了RabbitTemplate工具,用于发送消息

3.1创建项目

创建一个项目,项目结构如下,包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者
image-20240220224031434

在mq-demo这个父工程中,已经配置好了SpringAMQP的相关的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--Jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
    </dependencies>
</project>

因此,子工程中就可以直接使用SpringAMQP了。

3.2快速入门

例子:利用控制台创建队列simple.queue

  1. 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
  2. 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

正常的情况下都是通过交换机发送消息到队列,这里为了测试方便,直接向队列发送消息,跳过交换机。这种模式一般测试使用,很少在生产中使用。

首先在控制台创建simple.queue队列

image-20240220225220765

3.2.1消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx 		#主机IP地址
    port: 5672					#端口			
    virtual-host: /test-host	#虚拟主机
    username: test				#用户名
    password: 123456			#密码

在publisher服务中编写测试类SpringAMQPTest,利用SpringAMQP提供的RabbitTemplate工具类测试发送消息:

package com.publisher;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        //队列名称
        String queueName = "simple.queue";
        //消息
        String message = "Hello,Spring AMQP!";
        //发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

可以在控制台看到已发送的消息:

image-20240222214830474

3.2.2消息接收

SpringAMQP提供声明式的消息监听,只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法.

(1)在consumer服务的配置文件中同样配置RabbitMQ

(2)编写监听类

package com.consumer.listeners;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MQListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者收到simple.queue消息===>【" + msg + "】");
    }
}

启动项目,日志显示:

image-20240222220321867

3.2.3总结

SpringAMQP收发消息:

  • 引入spring-boot-starter-amqp依赖
  • 配置rabbitmq服务端信息
  • 利用RabbitTemplate发送消息
  • 利用@RabbitListener注解声明要监听的队列,监听消息

3.3WorkQueues模型

Work Queues,任务模型,简单地说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

创建一个队列work.queue,在publisher服务一秒发送50条消息到work.queue,在consumer服务中定义两个消息监听者,都监听work.queue队列。让消费者1每秒可以处理50条消息,消费者每秒处理5条消息。

image-20240222223553901

(1)publisher服务定义测试方法

@Test
public void testWorkQueue() throws InterruptedException {
    //队列名称
    String queueName = "work.queue";
    for (int i = 0; i < 50; i++) {
        //消息
        String message = "Hello, worker, message_" + i;
        //发送消息
        rabbitTemplate.convertAndSend(queueName, message);
        Thread.sleep(20);
    }
}

(2)consumer服务提供两个消费者,分别有不同的消费速度

package com.consumer.listeners;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MQListener {
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到 work.queue消息===>【" + msg + "】");
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2 收到 work.queue消息......【" + msg + "】");
        Thread.sleep(200);
    }
}

测试结果:

消费者2 收到 work.queue消息......【Hello, worker, message_0】
消费者1 收到 work.queue消息===>【Hello, worker, message_1】
消费者1 收到 work.queue消息===>【Hello, worker, message_3】
消费者1 收到 work.queue消息===>【Hello, worker, message_5】
消费者2 收到 work.queue消息......【Hello, worker, message_2】
消费者1 收到 work.queue消息===>【Hello, worker, message_7】
消费者1 收到 work.queue消息===>【Hello, worker, message_9】
消费者1 收到 work.queue消息===>【Hello, worker, message_11】
消费者1 收到 work.queue消息===>【Hello, worker, message_13】
消费者2 收到 work.queue消息......【Hello, worker, message_4】
消费者1 收到 work.queue消息===>【Hello, worker, message_15】
消费者1 收到 work.queue消息===>【Hello, worker, message_17】
消费者1 收到 work.queue消息===>【Hello, worker, message_19】
消费者2 收到 work.queue消息......【Hello, worker, message_6】
消费者1 收到 work.queue消息===>【Hello, worker, message_21】
消费者1 收到 work.queue消息===>【Hello, worker, message_23】
消费者1 收到 work.queue消息===>【Hello, worker, message_25】
消费者2 收到 work.queue消息......【Hello, worker, message_8】
消费者1 收到 work.queue消息===>【Hello, worker, message_27】
消费者1 收到 work.queue消息===>【Hello, worker, message_29】
消费者1 收到 work.queue消息===>【Hello, worker, message_31】
消费者2 收到 work.queue消息......【Hello, worker, message_10】
消费者1 收到 work.queue消息===>【Hello, worker, message_33】
消费者1 收到 work.queue消息===>【Hello, worker, message_35】
消费者1 收到 work.queue消息===>【Hello, worker, message_37】
消费者1 收到 work.queue消息===>【Hello, worker, message_39】
消费者2 收到 work.queue消息......【Hello, worker, message_12】
消费者1 收到 work.queue消息===>【Hello, worker, message_41】
消费者1 收到 work.queue消息===>【Hello, worker, message_43】
消费者1 收到 work.queue消息===>【Hello, worker, message_45】
消费者2 收到 work.queue消息......【Hello, worker, message_14】
消费者1 收到 work.queue消息===>【Hello, worker, message_47】
消费者1 收到 work.queue消息===>【Hello, worker, message_49】
消费者2 收到 work.queue消息......【Hello, worker, message_16】
消费者2 收到 work.queue消息......【Hello, worker, message_18】
消费者2 收到 work.queue消息......【Hello, worker, message_20】
消费者2 收到 work.queue消息......【Hello, worker, message_22】
消费者2 收到 work.queue消息......【Hello, worker, message_24】
消费者2 收到 work.queue消息......【Hello, worker, message_26】
消费者2 收到 work.queue消息......【Hello, worker, message_28】
消费者2 收到 work.queue消息......【Hello, worker, message_30】
消费者2 收到 work.queue消息......【Hello, worker, message_32】
消费者2 收到 work.queue消息......【Hello, worker, message_34】
消费者2 收到 work.queue消息......【Hello, worker, message_36】
消费者2 收到 work.queue消息......【Hello, worker, message_38】
消费者2 收到 work.queue消息......【Hello, worker, message_40】
消费者2 收到 work.queue消息......【Hello, worker, message_42】
消费者2 收到 work.queue消息......【Hello, worker, message_44】
消费者2 收到 work.queue消息......【Hello, worker, message_46】
消费者2 收到 work.queue消息......【Hello, worker, message_48】

可以看到虽然两个消费者的速度不一样,但是仍然每人都消费了25条消息:

  • 消费者1很快处理完25条消息
  • 消费者2却在缓慢处理自己的25条消息

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致一个消费者空闲,另一个消费者忙得飞起。这显然是不合理的。

3.3.1消费者消息推送限制

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积。

我们可以通过修改consumer的配置文件,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一消息

再次测试,结果如下:

消费者1 收到 work.queue消息===>【Hello, worker, message_0】
消费者2 收到 work.queue消息......【Hello, worker, message_1】
消费者1 收到 work.queue消息===>【Hello, worker, message_2】
消费者1 收到 work.queue消息===>【Hello, worker, message_3】
消费者1 收到 work.queue消息===>【Hello, worker, message_4】
消费者1 收到 work.queue消息===>【Hello, worker, message_5】
消费者1 收到 work.queue消息===>【Hello, worker, message_6】
消费者1 收到 work.queue消息===>【Hello, worker, message_7】
消费者1 收到 work.queue消息===>【Hello, worker, message_8】
消费者2 收到 work.queue消息......【Hello, worker, message_9】
消费者1 收到 work.queue消息===>【Hello, worker, message_10】
消费者1 收到 work.queue消息===>【Hello, worker, message_11】
消费者1 收到 work.queue消息===>【Hello, worker, message_12】
消费者1 收到 work.queue消息===>【Hello, worker, message_13】
消费者1 收到 work.queue消息===>【Hello, worker, message_14】
消费者1 收到 work.queue消息===>【Hello, worker, message_15】
消费者1 收到 work.queue消息===>【Hello, worker, message_16】
消费者1 收到 work.queue消息===>【Hello, worker, message_17】
消费者2 收到 work.queue消息......【Hello, worker, message_18】
消费者1 收到 work.queue消息===>【Hello, worker, message_19】
消费者1 收到 work.queue消息===>【Hello, worker, message_20】
消费者1 收到 work.queue消息===>【Hello, worker, message_21】
消费者1 收到 work.queue消息===>【Hello, worker, message_22】
消费者1 收到 work.queue消息===>【Hello, worker, message_23】
消费者1 收到 work.queue消息===>【Hello, worker, message_24】
消费者1 收到 work.queue消息===>【Hello, worker, message_25】
消费者1 收到 work.queue消息===>【Hello, worker, message_26】
消费者2 收到 work.queue消息......【Hello, worker, message_27】
消费者1 收到 work.queue消息===>【Hello, worker, message_28】
消费者1 收到 work.queue消息===>【Hello, worker, message_29】
消费者1 收到 work.queue消息===>【Hello, worker, message_30】
消费者1 收到 work.queue消息===>【Hello, worker, message_31】
消费者1 收到 work.queue消息===>【Hello, worker, message_32】
消费者1 收到 work.queue消息===>【Hello, worker, message_33】
消费者2 收到 work.queue消息......【Hello, worker, message_34】
消费者1 收到 work.queue消息===>【Hello, worker, message_35】
消费者1 收到 work.queue消息===>【Hello, worker, message_36】
消费者1 收到 work.queue消息===>【Hello, worker, message_37】
消费者1 收到 work.queue消息===>【Hello, worker, message_38】
消费者1 收到 work.queue消息===>【Hello, worker, message_39】
消费者1 收到 work.queue消息===>【Hello, worker, message_40】
消费者1 收到 work.queue消息===>【Hello, worker, message_41】
消费者2 收到 work.queue消息......【Hello, worker, message_42】
消费者1 收到 work.queue消息===>【Hello, worker, message_43】
消费者1 收到 work.queue消息===>【Hello, worker, message_44】
消费者1 收到 work.queue消息===>【Hello, worker, message_45】
消费者1 收到 work.queue消息===>【Hello, worker, message_46】
消费者1 收到 work.queue消息===>【Hello, worker, message_47】
消费者1 收到 work.queue消息===>【Hello, worker, message_48】
消费者2 收到 work.queue消息......【Hello, worker, message_49】

3.3.2总结

Work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,实现能者多劳

3.4交换机

真正生产环境都会经过exchange(交换机)来发送消息,而不是直接发送到队列,交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

3.4.1Fanout交换机

Fanout,英文翻译是扇出。Fanout Exchange 会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。

image-20240306220843741

(1)可以有多个队列

(2)每个队列都要绑定到 Exchange

(3)生产者发送的消息,只能发送到交换机

(4)交换机把消息发送给绑定过的所有队列

(5)订阅队列的消费者都能拿到消息

例子

在RabbitMQ控制台中声明队列fanout.queue1和fanout.queue2,声明交换机test.fanout,将两个队列与该交换机绑定。在consumer服务中,编写两个消费者方法,分别监听两个队列,在publisher中写测试方法,向test.fanout发送消息

  1. 创建队列和交换机
image-20240306221811470 image-20240306222035351
  1. 绑定队列到交换机
image-20240306222329689 image-20240306222335709

3.消费者consumer服务中创建监听方法,分别监听两个队列

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    log.info("消费者 1 收到 fanout.queue1 消息 ===>{}", msg);
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    log.info("消费者 2 收到 fanout.queue2 消息 ===>{}", msg);
}

4.生产者服务向test.fanout交换机发送一条消息

@Test
public void testSendFanout() {
    String exchangeName = "test.fanout";
    String message = "hello,every body!!!";
    rabbitTemplate.convertAndSend(exchangeName, null, message);
}

监听的两个队列都收到了test.fanout交换机的同一条消息:

image-20240306223658563

总结

交换机的作用:

  • 接受publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • Fanout类型的交换机会将消息路由到每个绑定的队列

3.4.2Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。Direct Exchange会将接收到的消息根据规则路由到指定的Queue,由此称为定向路由
image.png
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Bindingkey与消息的 Routing key完全一致,才会接收到消息

例子

在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2,声明交换机test.direct,将两队列和其绑定。在consumer服务中,编写两个消费方法,分别监听两队列。在publisher中编写测试方法,利用不用的RoutingKey向test.direct交换机发送消息

1.创建队列和交换机

image-20240306225441172 image-20240306225509996

2.交换机绑定队列,指定RoutingKey

image-20240306225737077 image-20240306225837416

3.consumer服务创建两个监听方法,分别监听两个队列

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者 1 收到 direct.queue1 消息 ===>【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.err.println("消费者 2 收到 direct.queue1 消息 ===>【" + msg + "】");
}

4.publisher服务向test.direct交换机发送消息

@Test
public void testSendDirect() {
    String exchangeName = "test.direct";
    String redMsg = "红色警报!!";
    String blueMsg = "蓝色警报!!";
    String yellowMsg = "黄色警报!!";
    rabbitTemplate.convertAndSend(exchangeName, "red", redMsg);
    rabbitTemplate.convertAndSend(exchangeName, "blue", blueMsg);
    rabbitTemplate.convertAndSend(exchangeName, "yellow", yellowMsg);
}

两个队列都指定了bindingkey=red,因此都能收到routingkey=red的消息,其他的key则只有各自对应bindingkey的队列才能收到

image-20240306230858944

3.4.3Topic交换机

Topic类型的交换机和Direct交换机相比,相同之处是它们都可以根据 RoutingKey 把消息路由到不同的队列。只不过Topic类型的交换机可以让队列在绑定 Bindingkey 的时候使用通配符

BindingKey 一般都是由多个或者一个单词组成,多个单词之间以.分割,例如item.insert

通配符规则:

  • # 匹配0个或者多个单词
  • * 只匹配一个单词

例如:

  • item.# 可以匹配 item.spu.insert 或者 item.spu
  • item.* 只能匹配 item.spu

image.png

例子

在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2,声明交换机test.topic,将两队列和其绑定。在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2,在publisher中编写测试方法,利用不同的RoutingKey向test.topic交换机发送消息。

1.创建队列和交换机,并绑定队列到交换机

image-20240307215257965 image-20240307215329435 image-20240307215551078

2.consumer服务创建两个方法,分别监听两队列

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
    System.out.println("消费者 1 收到 topic.queue1 消息 ===>【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
    System.err.println("消费者 2 收到 topic.queue1 消息 ===>【" + msg + "】");
}

3.publisher服务向test.topict交换机发送消息

@Test
public void testSendTopic() {
    String exchangeName = "test.topic";
    String msg = "华尔街金融风暴!";
    String msg2 = "今天多云转晴";
    String msg3 = "生育率再创历史新低!";
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
    rabbitTemplate.convertAndSend(exchangeName, "USA.news", msg2);
    rabbitTemplate.convertAndSend(exchangeName, "china.news", msg3);
}
image-20240307221143940

总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

3.5声明队列和交换机

在实际开发中,队列和交换机不可能都在控制台上手动创建,一但业务需要的队列有数十条甚至几百条,手工创建管理的方式极可能出错。

推荐的做法是由程序启动的时候检查队列和交换机是否存在,如果不存在则自动创建。

3.5.1基本API

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

image-20240307222728767

3.5.2例子

一般来说,交换机、队列以及其关系的声明都在consumer服务创建。

1.consumer服务中编写一个config类,声明交换机、队列,并将两个进行绑定

package com.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

    @Bean
    public FanoutExchange fanoutExchange() {
        //创建一个Fanout类型的交换机,命名为test.fanout
        return ExchangeBuilder.fanoutExchange("test.fanout2").build();
        // 或者 return new FanoutExchange("test.fanout2");
    }

    @Bean
    public Queue fanoutQueue3() {
        //创建一个队列,命名为fanout.queue3
        return QueueBuilder.durable("fanout.queue3").build();
        //或者 return new Queue("fanout.queue3");
    }

    @Bean
    public Binding fanoutBinding() {
        //将fanout.queue3队列绑定到test.fanout交换机中
        return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
        //当代码执行fanoutQueue3()方法时候,spring首先会去查看容器中是否已经存在该Bean,
        //如果不存在才会调用方法创建。fanoutExchange()同理
    }

//和下面的写法同理
//   @Bean
//   public Binding fanoutBinding(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
//      return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
//   }
}

重启consumer服务,可以看到自动创建了指定的队列和交换机,并将其进行了绑定:

image-20240307230012804

3.5.3基于注解声明

@Bean的方式创建比较麻烦,如果要创建的是direct类型的交换机,要绑定多个RoutingKey,只能通过如下的方式一个一个绑定,比较麻烦:

@Bean
public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
    //指定RoutingKey
    return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

@Bean
public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
    //指定RoutingKey
    return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
...

除了这种基于@Bean的方式来创建队列交换机,SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

//direct模式
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue3", durable = "true"),
        exchange = @Exchange(name = "teset.direct2", type = ExchangeTypes.DIRECT),
        key = {"red", "blue", "yellow"}
))
public void listenDirectQueue3(String msg) {
    System.out.println("消费者3收到了direct.queue3的消息 ===>" + msg);
}
image-20240312221614112

总结

  1. 声明队列、交换机、绑定关系的Bean:
    • Queue
    • FanoutExchange,DirectExchange,TopicExchange
    • Binding
  2. 基于@RabbitListener注解声明队列和交换机有哪些常见注解:
    • @Queue
    • @Exchange

3.6消息转换器

3.6.1测试默认转换器

创建一个队列 queue.object,并往该队列发送Object类型的消息:

@Test
public void testSendObject() {
    Map<String, Object> msg = new HashMap<>();
    msg.put("name", "jack");
    msg.put("age", 21);
    rabbitTemplate.convertAndSend("queue.object", msg);
}

可以看到队列中存储的内容是jdk自动序列化后的字节码:

image-20240312223248854

Spring对消息对象的处理是由org.spirngframework.amqp.support.converter.MessageConverter来处理的。而默认实现的是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

存在如下问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

3.6.2配置JSON转换器

建议采用JSON序列化代替默认的JDK序列化,要做两件事:

(1)在publisher、consumer服务中都引入Jackson依赖

<!--Jackson-->
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

(2)配置消息转换器,在publisherconsumer两个服务的启动类中各自添加一个Bean即可:

//import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
//import org.springframework.amqp.support.converter.MessageConverter;

@Bean
public MessageConverter jacksonMessageConvertor() {
    return new Jackson2JsonMessageConverter();
}

3.6.3消费者接收Object

再次向queue.object队列发送消息,可以看到Object类型的消息成功转成json类型:

image-20240312231305704

在消费者服务中接收,可以指定接收的消息类型:

@RabbitListener(queues = "queue.object")
public void listenQueueObject(Map<String, Object> msg) {
    System.out.println("消费者收到了queue.object的消息===>" + msg);
}
image-20240312231830414

⚠ 文章源地址: https://www.cnblogs.com/liyuelian/p/18074171.html 转载请注明出处
0
广告 广告

评论区