RabbitMQ 最新版 安装,配置,java接入使用(详细教程)

发布于:2024-08-23 ⋅ 阅读:(143) ⋅ 点赞:(0)

一 RabbitMQ下载

RabbitMQ 官网最新版下载:

RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ依赖erlang-26.2.5.2-1.el7.x86_64.rpm下载:

https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm

二 RabbitMQ安装

   1 安装erlang环境

  安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开发的

  执行安装指令如下:

rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm

   执行后如下图:

  验证 erlang 安装是否成功,执行erl可以查看版本,说明安装成功如下图:

 

2 安装RabbitMQ

  执行安装RabbitMQ指令如下:

rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm 

  执行安装中,如下图: 

 注意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依赖版本,如下图: 

如果出现上图的错误,请参考上一步重新安装erlang环境即可。

 安装结束后,消息队列数据保存在哪?日记在哪?想了解更多的信息?

只需一条指令可查询当前状态信息:

rabbitmq-diagnostics status

执行后如下图:

从上图状态中可以看出目前没有使用任何配置文件,以可以看到以下有用的信息:

  • 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
  • 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log

 上图信息很详细,可以说开发者开发这个工具非常的细心,对软件有足够了解使用也安心!

3 配置RabbitMQ(可选项

  安装好后RabbitMQ没有使用任何的配置文件(也没有默认配置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站配置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个配置文件:

vi /etc/rabbitmq/rabbitmq.config

配置文件内容:

[
  {rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
  {rabbitmq_management, [
  {listener, [{port,59876}, {ssl, false}]}
  ]}
].

  通过配置配置文件实现变更:

  1.   客户端 51091 用于消费或生产端连接,IP 0.0.0.0 代表绑定服务器内外网IP。
  2.   管理端口 59876 用于RabbitMQ的Web管理。

再次执行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下图:

 上图可以看到刚刚创建的配置文件已被引用状态。

4 RabbitMQ 启动与关闭

   RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:

#启动
systemctl start rabbitmq-server

#停止关闭
systemctl stop rabbitmq-server

#重启
systemctl restart rabbitmq-server

#开机启动
systemctl enable rabbitmq-server

#查看状态
systemctl status rabbitmq-server

  操作如下图:

5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启

RabbitMQ的安装后自带Web管理界面,但是需要执行以下指令开启:

rabbitmq-plugins enable rabbitmq_management

 我们平时只需要一名管员即可,后面要增加用户或设置权限直接在Web操作即可。

   新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.

#新增人员
rabbitmqctl add_user hua abc123uuPP

#设置权限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"

#设置为管理员
rabbitmqctl set_user_tags hua administrator

* 表示授予该用户对该虚拟主机上所有队列和交换机的 configurewriteread 权限。

  • 第一个 ".*" 表示用户可以配置任意队列和交换机。
  • 第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
  • 第三个 ".*" 表示用户可以从任意队列中消费消息。

 执行过程如图:

执行上面命令增加一个Web管理员:

  • 用户名称:hua
  • 密码:abc123uuPP
  • 权限 :管理员

  如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:

  • 默认用户:guest
  • 默认密码:guest

三 RabbitMQ Web 管理

1 RabbitMQ Web 登陆

 进入RabbitMQ Web 登陆页面如下:

首先我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:

使用上面新建的账号hua登陆,登陆成功如下图:

2 用户管理

 用户管理,用户增加操作简单,如下图:

3 用户权限设置 

   用户管理,用户权限设置操作简单,如下图:

四 java代码接入

  方式一 java通用:

  1 引入mvn依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>

  JAVA 连接RabbitMQ生产消息与接收消费测试代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author hua
 * @date 2024-08-21 18:01
 */
public class TestRabbitMQ {

    private final static String QUEUE_NAME = "hello";

    public static void main1(String[] args) {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(51091);
        factory.setUsername("java_producer");
        factory.setPassword("java_producer");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(51091);
        factory.setUsername("java_consumer");
        factory.setPassword("java_consumer");
        // 连接到 RabbitMQ 服务器
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列(确保队列存在)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            // 定义回调函数,当有消息送达时执行
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };

            // 消费消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

 测试运行发送消息,发送成功。如下图:

 

 测试运行接收消息,消费成功。如下图:

 上面测试通过后,改成服务类方便生产环境使用来发送消息代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author hua
 * @date 2024-08-22
 */
@Service
public class RabbitMqServiceImpl {

    private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);

    private static final String QUEUE_NAME = "test";
    private Connection connection;
    private Channel channel;

    public RabbitMqServiceImpl() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(51091);
        factory.setUsername("java_producer");
        factory.setPassword("java_producer");
        //如果不指定虚拟机默认会使用/
        factory.setVirtualHost("test");


        try {
            this.connection = factory.newConnection();
            this.channel = connection.createChannel();
            this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            logger.info("RabbitMqServiceImpl initialized successfully.");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
            throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
        }
    }

    public void sendMessage(String message) {
        try {
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error("Failed to send message: {}", e.getMessage());
        }
    }

    public void close() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

方式二 SpringBoot框架使用

mvn依赖包:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

spring配置文件:

Spring: 
 rabbitmq:
    host: xx.xx.xx.xx
    port: 51091
    username: java_consumer
    password: java_consumer
    virtual-host: hellow
    connection-timeout: 6000

JAVA代码:

  发送消息java代码:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author hua
 * @date 2024-08-22
 */
@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(queue.getName(), message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

  接收消息java代码:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author hua
 * @date 2024-08-22
 */
@Component
public class RabbitListener {

    private static final Logger logger = LogManager.getLogger(RabbitListener.class);

    @RabbitListener(queues = "test")
    public void receiveMessage(String message) {

        try {
            System.out.println("rabbit rev <- "+message);
            //具体业务
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("rabbit err= ", e);
        }

    }
}

   上面代码在生产发送消息时通过编码方式更灵活,接收直接使用注解更简单。


网站公告

今日签到

点亮在社区的每一天
去签到