Netty 实现dubbo rpc

发布于:2024-05-07 ⋅ 阅读:(29) ⋅ 点赞:(0)

一、RPC 的基本介绍

  RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。

常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.

若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云

二、RPC 调用的过程

在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。

调用流程说明

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 接收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给server stub
  • server stub 将返回导入结果进行编码并发送给消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client) 得到结果
  • 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

三、dubbo RPC

1.需求说明

dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。

模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。

2.设计说明

创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:

3.代码实现

netty用的包:4.1.20.Final。pom.xml如下:

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.20.Final</version>
</dependency>

1)公共接口

/**
 * @author: fqtang
 * @date: 2024/05/05/21:51
 * @description: 服务提供方和服务消费方都需要
 */
public interface HelloService {

	String say(String mes);
}

2)公共接口实现类

import org.springframework.util.StringUtils;
import com.tfq.netty.dubborpc.publicinterface.HelloService;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:53
 * @description: 描述
 */
public class HelloServiceImpl implements HelloService {

	private static int count = 0;

	/**
	 * 当有消费方调用该方法时就返回一个结果
	 *
	 * @param mes 传入消息
	 * @return 返回结果
	 */
	@Override
	public String say(String mes) {
		System.out.println("收到客户端消息=" + mes);
		if(StringUtils.isEmpty(mes)) {
			return "你好客户端,我已经收到你的消息 ";
		}else{
			return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";
		}
	}
}

3)服务提供者

import com.tfq.netty.dubborpc.netty.NettyServer;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:57
 * @description: 启动服务提供者,就是NettyServer
 */
public class ServerBootstrap {

	public static void main(String[] args) {

		String hostName="127.0.0.1";
		int port = 8001;
		NettyServer.startServer(hostName,port);
	}

}



import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:59
 * @description: 描述
 */
public class NettyServer {

	public static void startServer(String hostName,int port){
		startServer0(hostName,port);
	}

	/**
	 * 编写一个方法,完成对Netty Server的初始化工作和启动
	 * @param hostName
	 * @param port
	 */
	private static void startServer0(String hostName,int port){
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try{
			ServerBootstrap serverBootstrap = new ServerBootstrap();

			serverBootstrap.group(bossGroup,workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder());
						pipeline.addLast(new StringEncoder());
						pipeline.addLast(new NettyServerHandler());
					}
				});

			ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();
			System.out.println("服务提供方开始提供服务~~~");
			channelFuture.channel().closeFuture().sync();
		}catch(Exception e){
			e.printStackTrace();
		}finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}

	}
}



import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
import com.tfq.netty.dubborpc.provider.HelloServiceImpl;

/**
 * @author: fqtang
 * @date: 2024/05/05/22:03
 * @description: 描述
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端调用的消息,并调用服务
		System.out.println("msg = " + msg);
		//客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头
		//比如:dubboserver#hello#xxxx
		if(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {
			String res = new HelloServiceImpl().say(msg.toString()
				.substring(msg.toString()
					.lastIndexOf("#") + 1));
			ctx.writeAndFlush(res);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}


4)消费者

import com.tfq.netty.dubborpc.netty.NettyClient;
import com.tfq.netty.dubborpc.publicinterface.HelloService;

/**
 * @author: fqtang
 * @date: 2024/05/05/23:26
 * @description: 消费者
 */
public class ClientBootstrap {

	/**
	 * 这里定义协议头
	 */
	public static final String ProtocolHeader = "dubboserver#say#";

	public static void main(String[] args) throws InterruptedException {
		//创建一个消费者
		NettyClient customer = new NettyClient();
		//创建代理对象
		HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);
		while(true) {
			Thread.sleep(10 * 1000);
			//通过代理对象调用提供者的方法(服务)
			String res = helloService.say("你好 dubbo~");
			System.out.println("调用的结果 res = " + res);
		}
	}
}



import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: fqtang
 * @date: 2024/05/05/23:04
 * @description: 描述
 */
public class NettyClient {

	//创建一个线程池
	private static ExecutorService executorService = Executors.newFixedThreadPool(2);

	private static NettyClientHandler clientHandler;

	/**
	 * 编写方法使用代理模式,获取一个代理对象
	 * @param serviceClass
	 * @param protocolHeader
	 * @return
	 */
	public Object getBean(final Class<?> serviceClass, final String protocolHeader) {

		return Proxy.newProxyInstance(Thread.currentThread()
				.getContextClassLoader(),
			new Class<?>[]{serviceClass}, (proxy, method, args) -> {
				if(clientHandler == null) {
					initClient("127.0.0.1", 8001);
				}
				//设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],
				//args[0] 就是客户端调用api say(???),参数
				clientHandler.setParam(protocolHeader + args[0]);
				return executorService.submit(clientHandler).get();
			});
	}

	private static void initClient(String hostName, int port) {
		EventLoopGroup worker = new NioEventLoopGroup();
		try {
			clientHandler = new NettyClientHandler();
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(worker)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline channelPipeline = ch.pipeline();
						channelPipeline.addLast(new StringDecoder());
						channelPipeline.addLast(new StringEncoder());
						channelPipeline.addLast(clientHandler);
					}
				});

			ChannelFuture channelFuture = bootstrap.connect(hostName, port)
				.sync();
			/*channelFuture.channel()
				.closeFuture()
				.sync();*/
		} catch(InterruptedException e) {
			e.printStackTrace();
		} /*finally {
			worker.shutdownGracefully();
		}*/
	}
}



package com.tfq.netty.dubborpc.netty;

import java.util.concurrent.Callable;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: fqtang
 * @date: 2024/05/05/22:48
 * @description: 描述
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

	private ChannelHandlerContext context;
	/**
	 * 返回的结果
	 */
	private String result;
	/**
	 * 客户端调用方法返回的参数
	 */
	private String param;

	/**
	 * 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)
	 * @param ctx
	 * @throws Exception
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		//因为在其他方法会使用到这个ctx
		context = ctx;
		System.out.println("调用(1) channelActive--->连接到服务器");
	}

	/**
	 *  被调用(4)
	 * 收到服务器的数据后,调用方法
	 * @param ctx
	 * @param msg
	 * @throws Exception
	 */
	@Override
	public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		result = (String) msg;
		System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);
		//唤醒等待的线程
		notify();
		System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	/**
	 * 被调用(3), 被调用(5)
	 * 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果
	 * @return
	 * @throws Exception
	 */
	@Override
	public synchronized Object call() throws Exception {
		context.writeAndFlush(param);
		System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");
		//进行wait,等待channelRead 方法获取到服务器的结果后,唤醒
		wait();
		System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");
		return result;
	}

	/**
	 * 被调用(2)
	 * @param param
	 */
	void setParam(String param){
		System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");
		this.param = param;
	}
}

若有问题请留言。