Netty-RPC

发布于:2025-07-04 ⋅ 阅读:(18) ⋅ 点赞:(0)

1.server

package com.ldj.demo.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * User: ldj
 * Date: 2025/7/2
 * Time: 8:26
 * Description: No Description
 */
public class NettyServer {

    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用来接收进来的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用来处理已接收连接的流量
        try {
            ServerBootstrap b = new ServerBootstrap(); // 启动 NIO 服务辅助类
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 类来实例化新的通道去接受进来的连接
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道管道的初始化器
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, String msg) throws Exception {
                                    System.out.println("收到消息: " + msg);
                                    ctx.writeAndFlush("Server echo: " + msg); // 将消息回显给客户端
                                }
                            });
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置通道选项
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置子通道选项

            // 绑定并开始接受进来的连接
            ChannelFuture f = b.bind(port).sync();

            System.out.println("服务端启动成功,监听端口:" + port);

            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080; // 指定服务端口
        new NettyServer(port).start();
    }
}

2.client

package com.ldj.demo.rpc.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * User: ldj
 * Date: 2025/7/2
 * Time: 8:28
 * Description: No Description
 */
public class NettyClient {

    private final String host;
    private final int port;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void connect() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                    System.out.println("收到回复: " + msg);
                                }
                            });
                        }
                    });

            // 连接服务端
            ChannelFuture f = b.connect(host, port).sync();
            System.out.println("已连接到服务器");

            Channel channel = f.channel();

            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.print("请输入消息('exit'退出): ");
                String message = scanner.nextLine();
                if ("exit".equalsIgnoreCase(message)) {
                    break;
                }
                channel.writeAndFlush(message);
            }

            // 关闭连接
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080; // 确保这个端口号与服务器端配置的相同
        new NettyClient(host, port).connect();
    }
}