netty系列文章:
1 功能逻辑
在服务端启动的时候,在spring容器中bean已经被初始化好之后,拿到当前bean的信息,判断是否被BonnieRemoteService修饰,如果被修饰则获取到当前类下的所有Method,然后将这个Method信息缓存到容器中。以供后续rpc反射调用。
缓存容器使用Map, key:类的全路径+方法名称 value:Method即可
2、核心代码
spring容器中bean已经被初始化好,可以实现BeanPostProcessor接口中的postProcessAfterInitialization方法实现初始化后扩展功能
实现InitializingBean接口中afterPropertiesSet方法,将nettyServer的服务放在这块启动
2.1 收集BonnieRemoteService修饰的类
package com.bonnie.protocol.spring.service;
import com.bonnie.protocol.annotation.BonnieRemoteService;
import com.bonnie.protocol.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Component
@Slf4j
public class SpringRpcProviderBean implements BeanPostProcessor, InitializingBean {
private String serverAddress = "127.0.0.1";
private Integer serverPort = 48081;
// public SpringRpcProviderBean(String serverAddress, Integer serverPort) {
// this.serverAddress = serverAddress;
// this.serverPort = serverPort;
// }
/**
* bean初始化完成后,执行该逻辑
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
System.out.println("SpringRpcProviderBean===="+beanName);
// 只要bean声明了BonnieRemoteService注解,则需要把该服务发布到网络上
// 判断当前类上是否有注解BonnieRemoteService
boolean flag = bean.getClass().isAnnotationPresent(BonnieRemoteService.class);
if (flag) {
Method[] declaredMethods = bean.getClass().getDeclaredMethods();
for (Method method : declaredMethods) {
String serviceName = bean.getClass().getInterfaces()[0].getName();
String key = serviceName + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
// 缓存到map容器中
Mediator.beanMethodMap.put(key, beanMethod);
}
}
return bean;
}
@Override
public void afterPropertiesSet() throws Exception {
log.info("启动Netty服务端======48081");
new Thread(()->{
new NettyServer(serverAddress, serverPort).startNettyServer();
}).start();
}
}
2.2 NettyServer实现
启动服务,设置编解码的方式
package com.bonnie.protocol.netty;
import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer {
private String serverAddress;
private Integer serverPort;
public NettyServer(String serverAddress, Integer serverPort) {
this.serverAddress = serverAddress;
this.serverPort = serverPort;
}
public void startNettyServer() {
log.info("begin start Netty server");
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 长度域解码器
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 12, 4, 0, 0))
.addLast(new BonnieEncoder())
.addLast(new BonnieDecoder())
.addLast(new RpcServerHandler());
}
});
try {
ChannelFuture channelFuture = serverBootstrap.bind(this.serverAddress, this.serverPort).sync();
log.info("Server started Success on serverAddress {} Port,{}",this.serverAddress, this.serverPort);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
2.3 NettyServer接收客户端请求数据
package com.bonnie.protocol.netty;
import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.spring.service.Mediator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);
/**
* 服务端接收客户端消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
long requestId = msg.getHeader().getRequestId();
log.info("接收到客户端的消息: requestId {} {}", requestId, JSONObject.toJSONString(msg));
// 构建返回消息ReqResponse
RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<>();
Header header = msg.getHeader();
// 设置返回的消息类型
header.setReqType(ReqTypeEnum.RESPONSE.getCode());
// 通过调用获取到方法的返回数据
Object result = Mediator.getInstance().processor(msg.getContent());
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMsg("success");
rpcResponse.setData(result);
responseRpcProtocol.setHeader(header);
responseRpcProtocol.setContent(rpcResponse);
// 数据写入到客户端
ctx.writeAndFlush(responseRpcProtocol);
}
}