手写—— netty 实现 rabbitMq客户端

发布于:2024-12-18 ⋅ 阅读:(56) ⋅ 点赞:(0)

要使用 Netty 实现一个 RabbitMQ 客户端,你可以将 RabbitMQ 协议封装在 Netty 中,通过自定义编码和解码来实现与 RabbitMQ 的通信。RabbitMQ 使用 AMQP (Advanced Message Queuing Protocol) 协议,因此我们需要创建合适的协议封装和处理逻辑。

在这篇博客中,我们将实现一个简单的 Netty 客户端,连接到 RabbitMQ 服务器并发送消息。由于 RabbitMQ 的 AMQP 协议比较复杂,我们将专注于通过 Netty 建立连接并进行简单的消息传递。你可以通过这个框架进一步实现完整的 AMQP 客户端。

环境准备

  1. 添加依赖:
    pom.xml 中添加所需的依赖,主要包括 Netty 和 RabbitMQ 的 AMQP 客户端支持库。
<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.75.Final</version>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.0</version>
    </dependency>
</dependencies>
  1. 启动 RabbitMQ:
    确保你已经启动了 RabbitMQ 服务,可以在本地或云端使用 RabbitMQ,默认端口是 5672

Netty 客户端实现

我们将实现一个基于 Netty 的 RabbitMQ 客户端,简化流程:首先建立连接,发送一个简单的消息。

1. 创建 Netty 客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;

public class RabbitMQNettyClient {
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 5672; // RabbitMQ 默认端口

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new ByteArrayDecoder()); // 处理 byte 数组的解码器
                             pipeline.addLast(new ByteArrayEncoder()); // 处理 byte 数组的编码器
                             pipeline.addLast(new RabbitMQClientHandler()); // 自定义处理逻辑
                         }
                     });

            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
2. 自定义 Handler:RabbitMQClientHandler

我们需要定义一个 RabbitMQClientHandler 来处理与 RabbitMQ 的通信。为了简化,在这个例子中,我们模拟一个简单的连接请求和发送消息的过程。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.rabbitmq.client.*;

import java.io.IOException;

public class RabbitMQClientHandler extends ChannelInboundHandlerAdapter {
    private final ConnectionFactory factory;
    private Connection connection;
    private Channel channel;

    public RabbitMQClientHandler() {
        factory = new ConnectionFactory();
        factory.setHost("localhost"); // 设置 RabbitMQ 主机
        factory.setPort(5672); // 设置 RabbitMQ 默认端口
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("hello", false, false, false, null);

            String message = "Hello, RabbitMQ!";
            channel.basicPublish("", "hello", null, message.getBytes());

            System.out.println("Sent message: " + message);
        } catch (Exception e) {
            e.printStackTrace();
            ctx.close();
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 处理从 RabbitMQ 返回的消息
        byte[] messageBytes = (byte[]) msg;
        String message = new String(messageBytes);
        System.out.println("Received: " + message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
3. 启动客户端

你可以运行 RabbitMQNettyClient 类,它将连接到本地的 RabbitMQ 服务器并尝试发送一个简单的消息 "Hello, RabbitMQ!"

在实际的场景中,你可能会需要实现更复杂的 AMQP 协议操作,例如队列的声明、交换机绑定、消息发布和接收等。你可以通过继承和扩展 Netty 客户端 Handler 来处理这些操作。

总结

通过这篇博客,我们了解了如何使用 Netty 实现一个简单的 RabbitMQ 客户端,并通过自定义 ChannelHandler 实现与 RabbitMQ 的通信。虽然我们简化了 AMQP 协议,但这个框架为你进一步实现更复杂的 RabbitMQ 客户端奠定了基础。

后续可以深入探讨 AMQP 协议细节,例如如何处理消息的确认、消费者、交换机等。通过结合 Netty,我们能够处理高并发的网络请求,构建高效的消息队列系统。


网站公告

今日签到

点亮在社区的每一天
去签到