初识RabbitMQ-概念、安装和使用

发布于:2024-04-26 ⋅ 阅读:(25) ⋅ 点赞:(0)

 摘要:整理列下RabbitMQ相关的内容,包括基本概念、安装(Linux和MacOS)和使用,记录下...

        消息队列看作是一个按照顺序存放消息的容器,当使用消息的时候,直接从容器中按照顺序取出消息使用即可。对系统而言,消息队列的作用主要体现为如下三点

  1. 通过异步处理提高系统性能(减少响应所需时间)

    用户请求数据存入消息队列之后,立即返回结果,异步的对消息进行消费。需要注意的是后续的处理可能会失败。

  2. 削峰/限流

    将短时间高并发产生的事务消息存储在消息队列中,避免对系统产生的冲击

  3. 降低系统耦合性。

    消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息,生产者和消费者之间不存在直接耦合。

消息队列基础知识总结 | JavaGuide「Java学习 + 面试指南」一份涵盖大部分 Java 程序员所需要掌握的核心知识。准备 Java 面试,首选 JavaGuide!icon-default.png?t=N7T8https://javaguide.cn/high-performance/message-queue/message-queue.html

1. RabbitMQ初识

        RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

        RabbitMQ具有特点如下:可靠性、灵活的路由、扩展性、高可用性、支持多种协议、多语言客户端、易用的管理界面和插件机制

具体介绍可参阅官网介绍:

RabbitMQ: One broker to queue them all | RabbitMQicon-default.png?t=N7T8https://www.rabbitmq.com/

                                                 RabbitMQ 的整体模型架构如下

        RabbitMQ的核心概念

  • Producer: 消息生产者,就是投递消息的一方;
  • Message:消息一般分成消息体(playLoad)和消息头(Label)两部分;
  • Exchange:交换机用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者),或许会被直接丢弃掉。RabbitMQ交换机有4中类型对应不同的路由策略direct(默认),fanout, topic, 和 headers,后面介绍。
  • Binding:绑定交换机和队列,在绑定的时候一般会指定一个 BindingKey(绑定建)。注意BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,fanout类型就会无视BindingKey。
  • Routing key: 路由键,决定路由规则
  • Queue:队列,存储消息直到发送给消费者。RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。注意多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理。
  • Connection:连接服务端
  • Channel:信道,读写数据
  • Consumer:消费者,也就是接收消息的一方;其连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
  • Broker:服务实例/节点,大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
  • Virtual host:虚拟主机,用于区分不同的服务,类似于不同域名,不会相互影响

2. RabbitMQ安装

2.1 Linux下安装RabbitMQ

        建议在一个新建的阿里云的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。实际使用CentOS 7.9 依然OK

  • 设置编码为UTF-8
## 命令解析
## echo命令用于输出文本到标准输出。
## "export LC_ALL=en_US.UTF-8" 是要输出的文本,这里设置了LC_ALL环境变量的值为en_US.UTF-8,表示使用UTF-8编码作为默认的语言环境。
## >> /etc/profile将输出内容追加到/etc/profile文件的末尾。在这里,/etc/profile是一个系统级的shell配置文件,它在用户登录时被执行。
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# echo "export LC_ALL=en_US.UTF-8"  >>  /etc/profile
## 命令解析
## source命令用于在当前shell环境中**执行指定的脚本文件**,这里是执行/etc/profile文件。
## 执行source /etc/profile可以立即使新设置的环境变量生效,而不需要重新登录系统。
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# source /etc/profile
  • 下载安装 RabbitMQ 的脚本(script.rpm.sh)并通过bash执行
## 命令解析
## curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh: curl命令是一个用于在命令行下传输数据的工具。-s 参数表示静默模式,不输出进度或错误信息。https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh是一个URL,指向了一个脚本文件。
## |: 管道符号将前一个命令的输出作为后一个命令的输入。
## sudo bash: 使用了sudo命令以超级用户的权限执行后面的命令。bash 是一个Unix shell,也是一个命令行解释器,它用于执行脚本
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
Detected operating system as centos/7.
Checking for curl...
Detected curl...
Downloading repository file: https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/config_file.repo?os=centos&dist=7&source=script
done.
Installing pygpgme to verify GPG signatures...
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirrors.aliyun.com
base                                                                                                                                                                         | 3.6 kB  00:00:00     
docker-ce-stable                                                                                                                                                             | 3.5 kB  00:00:00     
epel                                                                                                                                                                         | 4.7 kB  00:00:00     
extras                                                                                                                                                                       | 2.9 kB  00:00:00     
mysql-connectors-community                                                                                                                                                   | 2.6 kB  00:00:00     
mysql-tools-community                                                                                                                                                        | 2.6 kB  00:00:00     
mysql57-community                                                                                                                                                            | 2.6 kB  00:00:00     
nginx-stable                                                                                                                                                                 | 2.9 kB  00:00:00     
rabbitmq_rabbitmq-server-source/signature                                                                                                                                    |  836 B  00:00:00     
Retrieving key from https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
Importing GPG key 0x4D206F89:
 Userid     : "https://packagecloud.io/rabbitmq/rabbitmq-server (https://packagecloud.io/docs#gpg_signing) <support@packagecloud.io>"
 Fingerprint: 8c69 5b02 19af deb0 4a05 8ed8 f4e7 8920 4d20 6f89
 From       : https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
rabbitmq_rabbitmq-server-source/signature                                                                                                                                    | 1.0 kB  00:00:00 !!! 
updates                                                                                                                                                                      | 2.9 kB  00:00:00     
(1/3): epel/x86_64/updateinfo                                                                                                                                                | 1.0 MB  00:00:00     
(2/3): epel/x86_64/primary_db                                                                                                                                                | 7.0 MB  00:00:00     
(3/3): docker-ce-stable/7/x86_64/primary_db                                                                                                                                  | 142 kB  00:00:00     
rabbitmq_rabbitmq-server-source/primary                                                                                                                                      |  175 B  00:00:02     
Package pygpgme-0.3-9.el7.x86_64 already installed and latest version
Nothing to do
Installing yum-utils...
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirrors.aliyun.com
Package yum-utils-1.1.31-54.el7_8.noarch already installed and latest version
Nothing to do
Generating yum cache for rabbitmq_rabbitmq-server...
Importing GPG key 0x4D206F89:
 Userid     : "https://packagecloud.io/rabbitmq/rabbitmq-server (https://packagecloud.io/docs#gpg_signing) <support@packagecloud.io>"
 Fingerprint: 8c69 5b02 19af deb0 4a05 8ed8 f4e7 8920 4d20 6f89
 From       : https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
Generating yum cache for rabbitmq_rabbitmq-server-source...

The repository is setup! You can now install packages.
## 命令解析
## curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh: 这部分使用了curl命令,同样是用于从指定的URL下载数据。-s参数表示静默模式,不输出进度或错误信息。https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh是一个URL,指向了一个脚本文件。
## |: 管道符号,它的作用是将前一个命令的输出作为后一个命令的输入。
## sudo bash: sudo命令以超级用户的权限执行后面的命令。bash则是用于执行脚本的命令行解释器。
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
Detected operating system as centos/7.
Checking for curl...
Detected curl...
Downloading repository file: https://packagecloud.io/install/repositories/rabbitmq/erlang/config_file.repo?os=centos&dist=7&source=script
done.
Installing pygpgme to verify GPG signatures...
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirrors.aliyun.com
rabbitmq_erlang-source/signature                                                                                                                                             |  819 B  00:00:00     
Retrieving key from https://packagecloud.io/rabbitmq/erlang/gpgkey
Importing GPG key 0xDF309A0B:
 Userid     : "https://packagecloud.io/rabbitmq/erlang (https://packagecloud.io/docs#gpg_signing) <support@packagecloud.io>"
 Fingerprint: 2ebd e413 d3ce 5d35 bcd1 5b7c 71c6 3471 df30 9a0b
 From       : https://packagecloud.io/rabbitmq/erlang/gpgkey
rabbitmq_erlang-source/signature                                                                                                                                             |  951 B  00:00:00 !!! 
rabbitmq_rabbitmq-server/x86_64/signature                                                                                                                                    |  833 B  00:00:00     
rabbitmq_rabbitmq-server/x86_64/signature                                                                                                                                    | 1.8 kB  00:00:00 !!! 
rabbitmq_erlang-source/primary                                                                                                                                               |  175 B  00:00:02     
Package pygpgme-0.3-9.el7.x86_64 already installed and latest version
Nothing to do
Installing yum-utils...
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirrors.aliyun.com
rabbitmq_rabbitmq-server-source/signature                                                                                                                                    |  836 B  00:00:00     
rabbitmq_rabbitmq-server-source/signature                                                                                                                                    | 1.0 kB  00:00:00 !!! 
Package yum-utils-1.1.31-54.el7_8.noarch already installed and latest version
Nothing to do
Generating yum cache for rabbitmq_erlang...
Importing GPG key 0xDF309A0B:
 Userid     : "https://packagecloud.io/rabbitmq/erlang (https://packagecloud.io/docs#gpg_signing) <support@packagecloud.io>"
 Fingerprint: 2ebd e413 d3ce 5d35 bcd1 5b7c 71c6 3471 df30 9a0b
 From       : https://packagecloud.io/rabbitmq/erlang/gpgkey
Generating yum cache for rabbitmq_erlang-source...

The repository is setup! You can now install packages.
  • 安装指定版本的 RabbitMQ 软件包
## 命令解析
## rabbitmq-server-3.8.2-1.el7.noarch: 这是要安装的软件包的名称和版本。在这里,rabbitmq-server是软件包的名称,3.8.2-1.el7.noarch是软件包的版本。3.8.2是RabbitMQ的版本号,1.el7表示适用于CentOS 7版本的软件包,noarch表示这个软件包是与特定架构无关的,可以在任何架构的系统上运行
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirrors.aliyun.com
Resolving Dependencies
--> Running transaction check
---> Package rabbitmq-server.noarch 0:3.8.2-1.el7 will be installed
--> Processing Dependency: erlang >= 21.3 for package: rabbitmq-server-3.8.2-1.el7.noarch
--> Running transaction check
---> Package erlang.x86_64 0:23.3.4.11-1.el7 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

====================================================================================================================================================================================================
 Package                                        Arch                                  Version                                         Repository                                               Size
====================================================================================================================================================================================================
Installing:
 rabbitmq-server                                noarch                                3.8.2-1.el7                                     rabbitmq_rabbitmq-server                                 12 M
Installing for dependencies:
 erlang                                         x86_64                                23.3.4.11-1.el7                                 rabbitmq_erlang                                          19 M

Transaction Summary
====================================================================================================================================================================================================
Install  1 Package (+1 Dependent package)

Total download size: 31 M
Installed size: 47 M
**Is this ok [y/d/N]: y**
Downloading packages:
(1/2): erlang-23.3.4.11-1.el7.x86_64.rpm                                                                                                                                     |  19 MB  00:00:03     
(2/2): rabbitmq-server-3.8.2-1.el7.noarch.rpm                                                                                                                                |  12 MB  00:00:04     
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Total                                                                                                                                                               7.6 MB/s |  31 MB  00:00:04     
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  Installing : erlang-23.3.4.11-1.el7.x86_64                                                                                                                                                    1/2 
  Installing : rabbitmq-server-3.8.2-1.el7.noarch                                                                                                                                               2/2 
  Verifying  : erlang-23.3.4.11-1.el7.x86_64                                                                                                                                                    1/2 
  Verifying  : rabbitmq-server-3.8.2-1.el7.noarch                                                                                                                                               2/2 

Installed:
  rabbitmq-server.noarch 0:3.8.2-1.el7                                                                                                                                                              

Dependency Installed:
  erlang.x86_64 0:23.3.4.11-1.el7                                                                                                                                                                   

Complete!
  • 安装完成后启动
## 启动RabbitMQ服务
## systemctl: 是一个系统服务管理工具,用于管理系统的服务。它可以启动、停止、重启和管理系统服务的状态;
## start: systemctl命令的一个子命令,用于启动指定的系统服务;
## rabbitmq-server: 这是要启动的服务的名称,即RabbitMQ。
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# systemctl start rabbitmq-server
## 检查RabbitMQ的状态
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl status
Status of node rabbit@iZuf6eqvrwmawrgxqo5kw7Z ...
Runtime

OS PID: 14601
OS: Linux
Uptime (seconds): 24
RabbitMQ version: 3.8.2
Node name: rabbit@iZuf6eqvrwmawrgxqo5kw7Z
Erlang configuration: Erlang/OTP 23 [erts-11.2.2.10] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64] [hipe]
Erlang processes: 264 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:


Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@iZuf6eqvrwmawrgxqo5kw7Z

Config files
2.2 MacOS下安装RabbitMQ
  • 首先,使用更新 brew update命令Homebrew
## brew update 是Homebrew(一个 macOS 上的包管理器)中用于更新本地Homebrew软件包索引的命令
## 检查远程仓库以查看是否有可用的更新版本或者新的软件包,并将本地的软件包索引与远程仓库同步**。
chenzh12@chenzh12deiMac ~ % brew update
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/bottles-portable-ruby/portable-ruby-3.1.4.arm64_big_sur.bottle.tar.gz
####################################################################################################################################################### 100.0%
==> Pouring portable-ruby-3.1.4.arm64_big_sur.bottle.tar.gz
==> Homebrew collects anonymous analytics.
Read the analytics documentation (and how to opt-out) here:
  https://docs.brew.sh/Analytics
No analytics have been recorded yet (nor will be during this `brew` run).

==> homebrew/core is old and unneeded, untapping to save space...
Untapping homebrew/core...
Untapped 2 commands and 7039 formulae (7,050 files, 931.5MB).
==> homebrew/cask is old and unneeded, untapping to save space...
Untapping homebrew/cask...
Untapped 4432 casks (4,214 files, 511.3MB).
==> Downloading https://formulae.brew.sh/api/formula_tap_migrations.jws.json
####################################################################################################################################################### 100.0%
Updated 3 taps (homebrew/services, homebrew/core and homebrew/cask).
==> New Formulae
c-blosc2              ingress2gateway       libscfg               msieve                policy_sentry         redict                valkey
dissent               jnv                   logdy                 navidrome             protoc-gen-js         rtabmap               vfox
ffmpeg@6              jtbl                  manim                 overarch              rage                  rustcat               whisperkit-cli
gitu                  liblc3                mantra                parsedmarc            ratchet               tartufo               yo
==> New Casks
arctic                    ente-auth                 hhkb-studio               loungy                    starnet2                  yandex-music
boltai                    fujifilm-x-raw-studio     irpf2024                  outfox                    toneprint
capcut                    godspeed                  juxtacode                 phoenix-code              viable
clearvpn                  halloy                    lookaway                  requestly                 xcodepilot
==> Outdated Formulae
ca-certificates     gettext             icu4c               libtiff             libxdmcp            maven               pcre2               xz
cairo               glib                jpeg-turbo          libx11              libxext             openjdk             redis               zstd
fontconfig          harfbuzz            libpng              libxcb              little-cms2         openssl@1.1         xorgproto

You have 23 outdated formulae installed.
You can upgrade them with brew upgrade
or list them with brew outdated.
  • 安装Erlang语言环境: brew install erlang

        MacOS安装erlang语言环境可能会出现多次失败,需要网络状况良好,此外还需要运行xcode-select --install,提前安装Command Line Tools

chenzh12@chenzh12deiMac ~ % brew install erlang
Warning: You are using macOS 11.
We (and Apple) do not provide support for this old version.
It is expected behaviour that some formulae will fail to build in this old version.
It is expected behaviour that Homebrew will be buggy and slow.
Do not create any issues about this on Homebrew's GitHub repositories.
Do not create any issues even if you think this message is unrelated.
Any opened issues will be immediately closed without response.
Do not ask for help from Homebrew or its maintainers on social media.
You may ask for help in Homebrew's discussions but are unlikely to receive a response.
Try to figure out the problem yourself and submit a fix as a pull request.
We will review it but may or may not accept it.

==> Fetching dependencies for erlang: ca-certificates, openssl@3, m4, libtool, unixodbc, cmake, jpeg-turbo, libpng, xz, zstd, libtiff, pcre2, pkg-config and wxwidgets
==> Fetching ca-certificates
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/ca-certificates-2024-03-11.all.bottle.tar.gz
Already downloaded: /Users/chenzh12/Library/Caches/Homebrew/downloads/17b42fcbbed5f1d5a5f959ad26b218ab108748f29913580d4bbe585be13af894--ca-certificates-2024-03-11.all.bottle.tar.gz
==> Fetching openssl@3
==> Downloading https://raw.githubusercontent.com/Homebrew/homebrew-core/47f48c875570ca368d8e1ad0e2c7035403e8db90/Formula/o/openssl@3.rb
####################################################################################################################################################### 100.0%
==> Downloading https://github.com/openssl/openssl/commit/e9d7083e241670332e0443da0f0d4ffb52829f08.patch?full_index=1
####################################################################################################################################################### 100.0%
==> Downloading https://github.com/openssl/openssl/releases/download/openssl-3.2.1/openssl-3.2.1.tar.gz
==> Downloading from https://objects.githubusercontent.com/github-production-release-asset-2e65be/7634677/43abcd22-856f-405a-9126-4942d3d35f1f?X-Amz-Algorithm
####################################################################################################################################################### 100.0%
==> Fetching m4
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/m4-1.4.19.arm64_big_sur.bottle.tar.gz
####################################################################################################################################################### 100.0%
==> Fetching libtool
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/libtool-2.4.7.arm64_big_sur.bottle.1.tar.gz
####################################################################################################################################################### 100.0%
==> Fetching unixodbc
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/unixodbc-2.3.12.arm64_big_sur.bottle.tar.gz
####################################################################################################################################################### 100.0%
==> Fetching cmake
==> Downloading https://raw.githubusercontent.com/Homebrew/homebrew-core/47f48c875570ca368d8e1ad0e2c7035403e8db90/Formula/c/cmake.rb
####################################################################################################################################################### 100.0%
==> Downloading https://github.com/Kitware/CMake/releases/download/v3.29.2/cmake-3.29.2.tar.gz
  • 安装RabbitMQ Server:brew install rabbitmq
chenzh12@chenzh12deiMac ~ % brew install rabbitmq
==> Downloading https://formulae.brew.sh/api/cask.jws.json
#=O#-     #       #                                                                                                                                                                                                                                
Warning: You are using macOS 11.
We (and Apple) do not provide support for this old version.
It is expected behaviour that some formulae will fail to build in this old version.
It is expected behaviour that Homebrew will be buggy and slow.
Do not create any issues about this on Homebrew's GitHub repositories.
Do not create any issues even if you think this message is unrelated.
Any opened issues will be immediately closed without response.
Do not ask for help from Homebrew or its maintainers on social media.
You may ask for help in Homebrew's discussions but are unlikely to receive a response.
Try to figure out the problem yourself and submit a fix as a pull request.
We will review it but may or may not accept it.

==> Fetching rabbitmq
==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/rabbitmq-3.13.1.all.bottle.tar.gz
############################################################################################################################################################################################################################################# 100.0%
==> Pouring rabbitmq-3.13.1.all.bottle.tar.gz
==> Caveats
Management UI: http://localhost:15672
Homebrew-specific docs: https://rabbitmq.com/install-homebrew.html

To start rabbitmq now and restart at login:
  brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
  CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
==> Summary
🍺  /opt/homebrew/Cellar/rabbitmq/3.13.1: 1,523 files, 35.9MB
==> Running `brew cleanup rabbitmq`...
Disable this behaviour by setting HOMEBREW_NO_INSTALL_CLEANUP.
Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).
  • 默认并不会将RabbitMQ加到环境变量中,所以要进行小幅的配置
## 进入当前用户的home目录输入 cd ~
## 编辑.bash_profile文件 vim .bash_profile
## 添加 export PATH=$PATH:/usr/local/opt/rabbitmq/sbin 保存并关闭文件
## 更新刚配置的环境变量,输入source .bash_profile
## 启动RabbitMQ的服务端,输入rabbitmq-server
chenzh12@chenzh12deiMac ~ % cd ~
chenzh12@chenzh12deiMac ~ % vim .bash_profile
chenzh12@chenzh12deiMac ~ % source .bash_profile
chenzh12@chenzh12deiMac ~ % rabbitmq-server
2024-04-17 11:13:58.161801+08:00 [notice] <0.44.0> Application syslog exited with reason: stopped
2024-04-17 11:13:58.164286+08:00 [notice] <0.247.0> Logging: switching to configured handler(s); following messages may not be visible in this log output

  ##  ##      RabbitMQ 3.13.1
  ##  ##
  ##########  Copyright (c) 2007-2024 Broadcom Inc and/or its subsidiaries
  ######  ##
  ##########  Licensed under the MPL 2.0. Website: https://rabbitmq.com

  Erlang:      26.2.4 [jit]
  TLS Library: OpenSSL - OpenSSL 3.2.1 30 Jan 2024
  Release series support status: supported

  Doc guides:  https://www.rabbitmq.com/docs/documentation
  Support:     https://www.rabbitmq.com/docs/contact
  Tutorials:   https://www.rabbitmq.com/tutorials
  Monitoring:  https://www.rabbitmq.com/docs/monitoring

  Logs: /opt/homebrew/var/log/rabbitmq/rabbit@localhost.log
        <stdout>

  Config file(s): (none)

  Starting broker... completed with 7 plugins.

  • 关闭rabbitmq: rabbitmqctl stop
chenzh12@chenzh12deiMac ~ % rabbitmqctl stop
Stopping and halting node rabbit@localhost ...
2.3 Windows安装

不推荐,因为要求系统用户名和计算机名必须是英文,而Win10改名比较麻烦,而且可能会有其他坑,而且和未来的实际工作场景严重不符,没有Windows作为服务器的。

3. RabbitMQ应用

3.1 RabbitMQ的管理后台

        开启Web管理后台:rabbitmq-plugins enable rabbitmq_management

[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@iZuf6eqvrwmawrgxqo5kw7Z:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@iZuf6eqvrwmawrgxqo5kw7Z...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.

        添加admin用户

[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl add_user admin password
Adding user "admin" ...
[root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

注:默认用户只有guest,远端无法访问

添加admin用户后,需要在ECS安全组中打开端口15672,然后在登录页面(IP:15672)使用admin的密码进行登录

        Admin→Virtual Hosts→Set permission在虚拟主机需要给新创建的admin添加权限

        使用Java创建生产者和消费者连接RabbitMQ服务,生产者Send向名为'hello'的消息队列发送消息,消费者Recv从指定的队列接收消息并进行处理。

/**
 * 描述: 生产者连接到RabbitMQ服务端,然后发送一条消息,然后退出。
 */
public class Send {
    private final static String QUEUE_NAME = "hello";   // 队列名

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂,用于配置RabbitMQ连接。
        ConnectionFactory factory = new ConnectionFactory();
        // 配置RabbitMQ连接,服务端地址用于连接
        factory.setHost("IP");  //运行RabbitMQ的阿里云实例ip
        factory.setUsername("admin");  // 1.需要给ECS实例开端口5672;2.默认创建的guest用户权限虽高,但不支持远端登录,此处使用自己创建的admin;3.Virtual Hosts添加admin
        factory.setPassword("password");
        // 建立连接
        Connection connection = factory.newConnection();
        // 获得信道
        Channel channel = connection.createChannel();
        // 声明队列,参数解释:队列名称、是否持久化、是否独占、是否自动删除、其他参数(如消息 TTL、最大长度等)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发布消息
        String message = "Hello World!7";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); // 参数解释:交换机名称(默认交换机为空字符串)、队列名称、消息属性、消息内容(字节数组)
        System.out.println("发送消息:" + message);
        // 关闭连接
        channel.close();
        connection.close();
    }
}

/**
 * 描述:消费者接收消息,并打印,持续运行
 */
public class Recv {
    private final static String QUEUE_NAME = "hello"; // 队列名

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂,用于配置 RabbitMQ 连接的参数
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("IP");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //接收消息并消费,开始接收指定队列中的消息并消费。参数包括队列名称、是否自动确认消息、消费者对象
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){  // 使用了匿名内部类DefaultConsumer,并重写了其handleDelivery方法,用于处理接收到的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        });
    }
}

        当生产者发送消息并被消费这消费之后,可以在RabbitMQ管理后台中直观看到队列中的消息变化:

3.2 多个消费者

        当存在多个消费者时,RabbitMQ如何分配队列中的的消息?本节介绍循环调度和公平派遣两种策略。

注:IDEA设置多实例并行运行,Run/Debug Configurations→Modify options→Allow mutiple instances开启

RabbitMQ循环调度

        循环调度是指当多个消费者同时订阅同一个队列,并且队列中有多条消息时,RabbitMQ 将以循环的方式将消息分发给各个消费者,即逐条将消息轮流发送给不同的消费者。这种行为是 RabbitMQ 的默认行为,称为"轮询"分发(Round-Robin),用于在多个消费者之间均匀地分配消息负载。

        在循环调度中,消息将平均分布到每个消费者上,每个消费者一次处理一个消息,直到队列中的所有消息都被处理完毕

RabbitMQ公平派遣(Fair Dispatch)

        公平派遣(Fair Dispatch)是一种消息分发策略,旨在确保每个消费者都能平等地获得工作,而不会出现某些消费者繁忙而其他消费者闲置的情况。在工作队列模式中,通常使用公平派遣来提高系统的整体性能和公平性。公平派遣的关键在于使用消息预取(Prefetch Count)机制,需要注意公平派遣需要增加消息确认机制。消费者可以使用channel.basicQos(number)设置每次从队列中取出的消息数,并使用方法channel.basicAck()手动确认消息被消费,这样就不会出现有的消费者繁忙有的限制的情况。

/**
 * 描述:消费者,接收前面的批量消息
 */
public class Worker {

    private final static String TASK_QUEUE_NAME = "task_queue";  // 存储队列的名称

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");  // 启用本地RabbitMQ客户端,使用默认guest账户登录
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("开始接收消息");
        **channel.basicQos(1);**  // 设置每次从队列中获取的消息数量为1,这样可以确保每个消费者在处理完一条消息之前不会接收到新的消息,这也称为"公平派遣"。
        // 从队列中消费消息。参数解释:队列名称、是否自动确认消息、消费者对象(这里使用了 DefaultConsumer,重写了其 handleDelivery 方法来处理消息)
        channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body) throws IOException { // 方法参数解释:消费者标签、信封(包含消息元数据,如交换机、路由键等信息)、消息属性、消息内容
                String message = new String(body, "UTF-8");
                System.out.println("收到了消息:" + message);
                try {
                    doWork(message);
                }finally {
                    System.out.println("消息处理完成");
                    **channel.basicAck(envelope.getDeliveryTag(), false);  // 手动确认消息已经被消费,参数解释:消息标识符、是否批量确认**
                }
            }
        });
    }

    private static void doWork(String task) {  //根据字符(消息内容)处理任务,含有'.'则等待一秒
        char[] chars = task.toCharArray();
        for (char ch : chars) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

4. 交换机的工作模式

        RabbitMQ 常用的 Exchange Type 有 fanoutdirecttopicheaders 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)

        fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的;fanout 类型的交换机会把所有发送到该交换机的消息路由到所有与它绑定的队列中,不需要做任何判断操作,所以也是速度最快的。

/**
 * 描述:发送日志信息
 */
public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";  // 交换机名称
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****FANOUT**
        **channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);  **
        String message = "info:Hello World4";  // 发送内容
        // **发布消息到指定的交换机。参数依次为交换机名称、路由键(对于fanout类型的交换机,路由键为空)、消息属性、消息内容的字节数组。字符串消息内容转换为UTF-8编码的字节数组进行发送
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));**
        System.out.println("发送了消息:" + message);
        channel.close();
        connection.close();
    }
}
/**
 * 描述:接收日志消息
 */
public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****FANOUT**
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); 
        String queueName = channel.queueDeclare().getQueue();  // 声明了临时队列,并获取到队列的名称。临时队列是在没有指定队列名称的情况下创建的,每次连接时都会生成一个不同的队列名。
        channel.queueBind(queueName, EXCHANGE_NAME, "");  // 完成交换机和队列的绑定**
        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer); // 消费指定队列中的消息。参数依次为队列名称、是否自动确认消息、消费者对象
    }
}

        direct:根据RoutingKey匹配消息路由到指定队列;把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。例如,channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"))中的路由键"Info"与交换机绑定队列的channel.queueBind(queueName, EXCHANGE_NAME, "info")**方法的绑定键"Info"完全匹配。

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";  // 交换机名称
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // **声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****DIRECT**
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 发布消息到指定的交换机。参数依次为交换机名称、路由键(对于fanout类型的交换机,路由键为空)、消息属性、消息内容的字节数组。字符串消息内容转换为UTF-8编码的字节数组进行发送
        String message = "info:Hello World!";

        **channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));**
        System.out.println("发送了消息," + "等级为info,消息内容:" + message);

        message = "warning:Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
        System.out.println("发送了消息," + "等级为warning,消息内容:" + message);

        message = "error:Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
        System.out.println("发送了消息," + "等级为error,消息内容:" + message);
        channel.close();
        connection.close();
    }
}
/**
 * 描述:接收info,warning,error3个等级的日志
 */
public class ReceiveLogsDirect1 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机同时绑定3个queue
        channel.queueBind(queueName, EXCHANGE_NAME, "info");  // 绑定Routingkey
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");**

        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
/**
 * 描述:接收error等级的日志
 */
public class ReceiveLogsDirect2 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机绑定1个queue
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

        topic:生产者指定RoutingKey消息根据消费端指定的队列,通过模糊匹配的方式进行相应转发;匹配规则:'*'可以代替一个单词,'#'可以替代零个或者多个单词

/**
 * 描述:topic模式交换机,发送消息
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 交换机类型TOPIC

        String message = "Animal World";

        String[] routingKeys = new String[9];
        routingKeys[0] = "quick.orange.rabbit";
        routingKeys[1] = "lazy.orange.elephant";
        routingKeys[2] = "quick.orange.fox";
        routingKeys[3] = "lazy.brown.fox";
        routingKeys[4] = "lazy.pink.rabbit";
        routingKeys[5] = "quick.brown.fox";
        routingKeys[6] = ".orange.";  // **匹配到ReceiveLogsTopic1,则**
        routingKeys[7] = "quick.orange.male.rabbit";
        routingKeys[8] = "lazy.orange.male.rabbit";

        for (int i = 0; i < routingKeys.length; i++) {
            channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, message.getBytes("UTF-8"));
            System.out.println("发送了:" + message+" routingKey:"+routingKeys[i]);
        }

        channel.close();
        connection.close();
    }
}
开始接收消息
收到消息:Animal World routingKey: quick.orange.rabbit
收到消息:Animal World routingKey: lazy.orange.elephant
收到消息:Animal World routingKey: quick.orange.fox
收到消息:Animal World routingKey: quick.orange.rabbit
收到消息:Animal World routingKey: lazy.orange.elephant
收到消息:Animal World routingKey: quick.orange.fox
收到消息:Animal World routingKey: .orange.
/**
 * 描述: 特定路由键
 */
public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "*.orange.*";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message + " routingKey: " + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
/**
 * 描述:特定路由键2
 */
public class ReceiveLogsTopic2 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "*.*.rabbit";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
        String routingKey2 = "lazy.#";
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);

        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message + " routingKey: " + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

        headers:不推荐,根据发送消息内容中的headers属性来匹配。

5. Spring Boot 整合RabbitMQ

  • 首先,安装依赖,并配置RabbitMQ
**pom.xml**
<!--依赖声明指定了项目所需的Spring Boot AMQP(高级消息队列协议)的启动器-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

**aplication.properties**
## 配置RabbitMQ的属性
server.port=8080
spring.application.name=producer

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
  • 然后,生产者发送消息到指定的交换机,并根据指定的路由键将消息路由到相应的队列中。消息被发送到 RabbitMQ 服务器上,并等待消费者来处理。

        Spring AMQP 提供的一个方法,用于向 RabbitMQ 发送消息方法如下:

rabbitmqTemplate.convertAndSend(exchange, routingKey, message);
其中:
exchange 是要发送消息到的交换机的名称。
routingKey 是消息的路由键,用于将消息路由到匹配的队列。
message 是要发送的消息内容。
/**
 * 描述: rabbitmq配置类
 */
@Configuration  //标识这是一个配置类
public class TopicRabbitConfig {

    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    } // 定义了一个名为队列"queue1"

    @Bean
    public Queue queue2() {
        return new Queue("queue2");
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("bootExchange");
    } //定义了一个名为bootExchange的Topic交换机
    **// 绑定到交换机上,并指定路由键**
    @Bean
    Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange) {  // queue1队列绑定到交换机上,并指定路由键为"dog.red"
        return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
    }

    @Bean
    Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange) { // queue2队列绑定到交换机上,并指定路由键为"dog.#"
        return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
    }
}
/**
 * 描述:发送消息
 */
@Component
public class MsgSender {
    @Autowired
    private AmqpTemplate rabbitmqTemplate; // @Autowired注解将AmqpTemplate接口的实例注入到rabbitmqTemplate字段中,用于发送消息到 abbitMQ

    public void send1() {
        String message = "This is message1, routing key is dog.red";
        System.out.println("发送了:" + message);
        this.**rabbitmqTemplate.convertAndSend**("bootExchange","dog.red",message);  // 将消息发送到名为"bootExchange"的交换机,并使用"dog.red"作为路由键
    }

    public void send2() {
        String message = "This is message2, routing key is dog.black";
        System.out.println("发送了:" + message);
        this.**rabbitmqTemplate.convertAndSend**("bootExchange","dog.black",message);
    }
}
  • 最后,消费者接收消息,并执行相应操作。
/**
 * 描述:消费者1
 */
@Component
@RabbitListener(queues = "queue1")  // **标记一个方法或类是RabbitMQ消息的监听器**,并指定监听的队列名称
public class Receiver1 {

    @RabbitHandler   // **标记RabbitMQ消息处理方法**。当消费者接收到消息时,Spring将调用该方法来处理消息。方法的参数类型为String,表示接收到的消息内容。
    public void process(String message) {
        System.out.println("Receiver1: " + message);
    }
}
/**
 * 描述:消费者2
 */
@Component
@RabbitListener(queues = "queue2")
public class Receiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Receiver2: " + message);
    }
}

当send1和send2发送消息后,消费者收到的结果如下:
*Receiver1: This is message1, routing key is dog.red
Receiver2: This is message1, routing key is dog.red
Receiver2: This is message2, routing key is dog.black*