1. Spring框架进行Bean管理
为了方便进行对象管理,本文使用Spring框架进行Bean的管理。
ClassPathBeanDefinitionScanner
: 注册下面的类
package github.javaguide.spring;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import java.lang.annotation.Annotation;
/**
* custom package scanner
*
* @author shuang.kou
* @createTime 2020年08月10日 21:42:00
*/
public class CustomScanner extends ClassPathBeanDefinitionScanner {
public CustomScanner(BeanDefinitionRegistry registry, Class<? extends Annotation> annoType) {
super(registry);
super.addIncludeFilter(new AnnotationTypeFilter(annoType));
}
@Override
public int scan(String... basePackages) {
return super.scan(basePackages);
}
}
CustomScannerRegistrar
: 定义类的扫描路径
package github.javaguide.spring;
import github.javaguide.annotation.RpcScan;
import github.javaguide.annotation.RpcService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.StandardAnnotationMetadata;
import org.springframework.stereotype.Component;
/**
* scan and filter specified annotations
*
* @author shuang.kou
* @createTime 2020年08月10日 22:12:00
*/
@Slf4j
public class CustomScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {
private static final String SPRING_BEAN_BASE_PACKAGE = "github.javaguide";
private static final String BASE_PACKAGE_ATTRIBUTE_NAME = "basePackage";
private ResourceLoader resourceLoader;
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
//get the attributes and values of RpcScan annotation
AnnotationAttributes rpcScanAnnotationAttributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(RpcScan.class.getName()));
String[] rpcScanBasePackages = new String[0];
if (rpcScanAnnotationAttributes != null) {
// get the value of the basePackage property
rpcScanBasePackages = rpcScanAnnotationAttributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);
}
if (rpcScanBasePackages.length == 0) {
rpcScanBasePackages = new String[]{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()};
}
// Scan the RpcService annotation
CustomScanner rpcServiceScanner = new CustomScanner(beanDefinitionRegistry, RpcService.class);
// Scan the Component annotation
CustomScanner springBeanScanner = new CustomScanner(beanDefinitionRegistry, Component.class);
if (resourceLoader != null) {
rpcServiceScanner.setResourceLoader(resourceLoader);
springBeanScanner.setResourceLoader(resourceLoader);
}
int springBeanAmount = springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);
log.info("springBeanScanner扫描的数量 [{}]", springBeanAmount);
int rpcServiceCount = rpcServiceScanner.scan(rpcScanBasePackages);
log.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);
}
}
2. 注解
本次RPC的代理,基于注解和拦截进行实现,这样方便定义服务接口的调用。
RpcService
:用来定义Rpc服务
package github.javaguide.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* RPC service annotation, marked on the service implementation class
*
* @author shuang.kou
* @createTime 2020年07月21日 13:11:00
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcService {
/**
* Service version, default value is empty string
*/
String version() default "";
/**
* Service group, default value is empty string
*/
String group() default "";
}
RpcScan
:用来定义Rpc的扫描包地址
package github.javaguide.annotation;
import github.javaguide.spring.CustomScannerRegistrar;
import org.springframework.context.annotation.Import;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* scan custom annotations
*
* @author shuang.kou
* @createTime 2020年08月10日 21:42:00
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(CustomScannerRegistrar.class)
@Documented
public @interface RpcScan {
String[] basePackage();
}
- PpcReference:用来对Rpc服务进行引用
package github.javaguide.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* RPC reference annotation, autowire the service implementation class
*
* @author smile2coder
* @createTime 2020年09月16日 21:42:00
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface RpcReference {
/**
* Service version, default value is empty string
*/
String version() default "";
/**
* Service group, default value is empty string
*/
String group() default "";
}
3. 拦截
使用Spring的PostProcessor,解析RpcScan路径上,定义了RpcService的类,将拦截到的对象进行服务注册
package github.javaguide.spring;
import github.javaguide.annotation.RpcReference;
import github.javaguide.annotation.RpcService;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcRequestTransportEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
/**
* call this method before creating the bean to see if the class is annotated
*
* @author shuang.kou
* @createTime 2020年07月14日 16:42:00
*/
@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {
private final ServiceProvider serviceProvider;
private final RpcRequestTransport rpcClient;
public SpringBeanPostProcessor() {
this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension(RpcRequestTransportEnum.NETTY.getName());
}
/**
* 初始化之前,将有@RpcService注解的类发布到注册中心,并将对象缓存到ServiceProvider中
* */
@SneakyThrows
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().isAnnotationPresent(RpcService.class)) {
log.info("[{}] is annotated with [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());
// get RpcService annotation
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
// build RpcServiceProperties
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(rpcService.group())
.version(rpcService.version())
.service(bean).build();
serviceProvider.publishService(rpcServiceConfig);
}
return bean;
}
/**
* 初始化之后,将带有@RpcReference注解的属性注入代理对象
* */
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = bean.getClass();
Field[] declaredFields = targetClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
if (rpcReference != null) {
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(rpcReference.group())
.version(rpcReference.version()).build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);
Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
declaredField.set(bean, clientProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
4. 服务注册
- 服务提供者接口,主要有两个核心方法:
- 发布服务,负责将服务发布到注册中心,并且缓存服务
- 获取服务,从缓存中拿到服务对象
package github.javaguide.provider;
import github.javaguide.config.RpcServiceConfig;
/**
* store and provide service object.
*
* @author shuang.kou
* @createTime 2020年05月31日 16:52:00
*/
public interface ServiceProvider {
/**
* @param rpcServiceConfig rpc service related attributes
*/
void addService(RpcServiceConfig rpcServiceConfig);
/**
* @param rpcServiceName rpc service name
* @return service object
*/
Object getService(String rpcServiceName);
/**
* @param rpcServiceConfig rpc service related attributes
*/
void publishService(RpcServiceConfig rpcServiceConfig);
}
- 实现类
package github.javaguide.provider.impl;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.ServiceRegistryEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author shuang.kou
* @createTime 2020年05月13日 11:23:00
*/
@Slf4j
public class ZkServiceProviderImpl implements ServiceProvider {
/**
* key: rpc service name(interface name + version + group)
* value: service object
*/
private final Map<String, Object> serviceMap;
private final Set<String> registeredService;
private final ServiceRegistry serviceRegistry;
public ZkServiceProviderImpl() {
serviceMap = new ConcurrentHashMap<>();
registeredService = ConcurrentHashMap.newKeySet();
serviceRegistry = ExtensionLoader.getExtensionLoader(ServiceRegistry.class).getExtension(ServiceRegistryEnum.ZK.getName());
}
@Override
public void addService(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName = rpcServiceConfig.getRpcServiceName();
if (registeredService.contains(rpcServiceName)) {
return;
}
registeredService.add(rpcServiceName);
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}
@Override
public Object getService(String rpcServiceName) {
Object service = serviceMap.get(rpcServiceName);
if (null == service) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
}
return service;
}
@Override
public void publishService(RpcServiceConfig rpcServiceConfig) {
try {
String host = InetAddress.getLocalHost().getHostAddress();
this.addService(rpcServiceConfig);
serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));
} catch (UnknownHostException e) {
log.error("occur exception when getHostAddress", e);
}
}
}
- 服务注册:将服务注册到注册中心
package github.javaguide.registry;
import github.javaguide.extension.SPI;
import java.net.InetSocketAddress;
/**
* service registration
*
* @author shuang.kou
* @createTime 2020年05月13日 08:39:00
*/
@SPI
public interface ServiceRegistry {
/**
* register service
*
* @param rpcServiceName rpc service name
* @param inetSocketAddress service address
*/
void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);
}
- ZK注册中心:使用CuratorFramework进行Java端的操作
package github.javaguide.registry.zk;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.zk.util.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import java.net.InetSocketAddress;
/**
* service registration based on zookeeper
*
* @author shuang.kou
* @createTime 2020年05月31日 10:56:00
*/
@Slf4j
public class ZkServiceRegistryImpl implements ServiceRegistry {
@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorUtils.getZkClient();
CuratorUtils.createPersistentNode(zkClient, servicePath);
}
}
5. 服务属性对象注入
解析RpcReference注解,对服务进行代理,将本地调用代理为远端的调用。
- 这个是上面使用Spring进行bean属性注入的方法,包括三步
- 检测到带有RpcReference注解的属性
- 创建一个服务的代理对象
- 将创建的对象,注入到属性中
/**
* 初始化之后,将带有@RpcReference注解的属性注入代理对象
* */
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = bean.getClass();
Field[] declaredFields = targetClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
if (rpcReference != null) {
// 1. 服务的配置
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(rpcReference.group())
.version(rpcReference.version()).build();
// 2. 创建服务的代理对象
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);
Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
// 3. 属性注入
declaredField.setAccessible(true);
try {
declaredField.set(bean, clientProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
- 其中,服务的代理对象,可以通过下面的方法进行创建
- getProxy创建一个代理对象,对象的invoker就是本类
- 本类的invoke,使用Netty进行通信,调用远端的服务,得到返回结果
package github.javaguide.proxy;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.netty.client.NettyRpcClient;
import github.javaguide.remoting.transport.socket.SocketRpcClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* Dynamic proxy class.
* When a dynamic proxy object calls a method, it actually calls the following invoke method.
* It is precisely because of the dynamic proxy that the remote method called by the client is like calling the local method (the intermediate process is shielded)
*
* @author shuang.kou
* @createTime 2020年05月10日 19:01:00
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private static final String INTERFACE_NAME = "interfaceName";
/**
* Rpc通信
*/
private final RpcRequestTransport rpcRequestTransport;
/**
进行配置
*/
private final RpcServiceConfig rpcServiceConfig;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = rpcServiceConfig;
}
public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = new RpcServiceConfig();
}
/**
* 使用jdk自带的Proxy进行代理
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
/**
* This method is actually called when you use a proxy object to call a method.
* The proxy object is the object you get through the getProxy method.
*/
@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
// 1. 创建Rpc请求
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
// 2. 网络通信,将请求发送到远端,得到响应结果
RpcResponse<Object> rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
// 3. 检查并返回Rpc
this.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
}
private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {
if (rpcResponse == null) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}
}