14-netty基础-手写rpc-提供方(服务端)-06

发布于:2025-08-08 ⋅ 阅读:(21) ⋅ 点赞:(0)

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 功能逻辑

在服务端启动的时候,在spring容器中bean已经被初始化好之后,拿到当前bean的信息,判断是否被BonnieRemoteService修饰,如果被修饰则获取到当前类下的所有Method,然后将这个Method信息缓存到容器中。以供后续rpc反射调用。
缓存容器使用Map, key:类的全路径+方法名称  value:Method即可

2、核心代码

spring容器中bean已经被初始化好,可以实现BeanPostProcessor接口中的postProcessAfterInitialization方法实现初始化后扩展功能

实现InitializingBean接口中afterPropertiesSet方法,将nettyServer的服务放在这块启动

2.1 收集BonnieRemoteService修饰的类

package com.bonnie.protocol.spring.service;

import com.bonnie.protocol.annotation.BonnieRemoteService;
import com.bonnie.protocol.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Component
@Slf4j
public class SpringRpcProviderBean implements BeanPostProcessor, InitializingBean {

    private String serverAddress = "127.0.0.1";
    private Integer serverPort = 48081;

//    public SpringRpcProviderBean(String serverAddress, Integer serverPort) {
//        this.serverAddress = serverAddress;
//        this.serverPort = serverPort;
//    }

    /**
     * bean初始化完成后,执行该逻辑
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        System.out.println("SpringRpcProviderBean===="+beanName);
        // 只要bean声明了BonnieRemoteService注解,则需要把该服务发布到网络上
        // 判断当前类上是否有注解BonnieRemoteService
        boolean flag = bean.getClass().isAnnotationPresent(BonnieRemoteService.class);
        if (flag) {
            Method[] declaredMethods = bean.getClass().getDeclaredMethods();
            for (Method method : declaredMethods) {
                String serviceName = bean.getClass().getInterfaces()[0].getName();
                String key = serviceName + "." + method.getName();

                BeanMethod beanMethod = new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);

                // 缓存到map容器中
                Mediator.beanMethodMap.put(key, beanMethod);
            }
        }

        return bean;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("启动Netty服务端======48081");
        new Thread(()->{
            new NettyServer(serverAddress, serverPort).startNettyServer();
        }).start();
    }
}

2.2 NettyServer实现

启动服务,设置编解码的方式

package com.bonnie.protocol.netty;

import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {

    private String serverAddress;
    private Integer serverPort;

    public NettyServer(String serverAddress, Integer serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
    }

    public void startNettyServer() {
        log.info("begin start Netty server");
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, work)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                // 长度域解码器
                                .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 12, 4, 0, 0))
                                .addLast(new BonnieEncoder())
                                .addLast(new BonnieDecoder())
                                .addLast(new RpcServerHandler());
                    }
                });
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(this.serverAddress, this.serverPort).sync();
            log.info("Server started Success on serverAddress {} Port,{}",this.serverAddress, this.serverPort);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
           e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

}

2.3 NettyServer接收客户端请求数据

package com.bonnie.protocol.netty;

import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.spring.service.Mediator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);

    /**
     * 服务端接收客户端消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        long requestId = msg.getHeader().getRequestId();
        log.info("接收到客户端的消息: requestId {} {}", requestId, JSONObject.toJSONString(msg));


        // 构建返回消息ReqResponse
        RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<>();
        Header header = msg.getHeader();
        // 设置返回的消息类型
        header.setReqType(ReqTypeEnum.RESPONSE.getCode());
        // 通过调用获取到方法的返回数据
        Object result = Mediator.getInstance().processor(msg.getContent());

        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setMsg("success");
        rpcResponse.setData(result);

        responseRpcProtocol.setHeader(header);
        responseRpcProtocol.setContent(rpcResponse);

        // 数据写入到客户端
        ctx.writeAndFlush(responseRpcProtocol);
    }




}


网站公告

今日签到

点亮在社区的每一天
去签到