为了让用户使用方便,封装Spring Boot Starter基于注解驱动的RPC框架。
新增Spring Boot Starter module
starlink-spring-boot-starter
添加rpc-core核心包
<dependency>
<groupId>com.starlink</groupId>
<artifactId>starlink-rpc-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
定义注解
@EnableRpc
标识项目开启RPC功能、执行RPC框架初始化方法
/**
* 启用 Rpc 注解
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcInitBootstrap.class, RpcProviderBootstrap.class, RpcConsumerBootstrap.class})
public @interface EnableRpc {
/**
* 需要启动 server
*
* @return
*/
boolean needServer() default true;
}
@RpcService
服务提供者注解,在需要注册和提供的服务类上使用
RpcService注解中,需要指定服务注册所需信息,如服务接口实现类、版本号等,通过注解属性来定义。
/**
* 服务提供者注解(用于注册服务)
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
/**
* 服务接口类
*/
Class<?> interfaceClass() default void.class;
/**
* 版本
*/
String serviceVersion() default RpcConstants.DEFAULT_SERVICE_VERSION;
}
@RpcReference
服务消费者注解,在需要注入服务代理对象的属性上使用。类似Spring中的@Resource注解。
/**
* 服务消费者注解(用于注入服务)
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface RpcReference {
/**
* 服务接口类
*/
Class<?> interfaceClass() default void.class;
/**
* 版本
*/
String serviceVersion() default RpcConstants.DEFAULT_SERVICE_VERSION;
/**
* 负载均衡器
*/
String loadBalancer() default LoadBalancerKeys.ROUND_ROBIN;
/**
* 重试策略
*/
String retryStrategy() default RetryStrategyKeys.NO;
/**
* 容错策略
*/
String tolerantStrategy() default TolerantStrategyKeys.FAIL_FAST;
/**
* 模拟调用
*/
boolean mock() default false;
}
注解驱动
/**
* Rpc 框架启动
*/
@Slf4j
public class RpcInitBootstrap implements ImportBeanDefinitionRegistrar {
/**
* Spring 初始化时执行,初始化 RPC 框架
*
* @param importingClassMetadata
* @param registry
*/
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// 获取 EnableRpc 注解的属性值
boolean needServer = (boolean) importingClassMetadata.getAnnotationAttributes(EnableRpc.class.getName())
.get("needServer");
// RPC 框架初始化(配置和注册中心)
RpcApplication.init();
// 全局配置
final RpcConfig rpcConfig = RpcApplication.getRpcConfig();
if (needServer) {
// 启动服务器
VertxServer server = VertxServerFactory.getInstance(rpcConfig.getProtocol());
server.doStart(rpcConfig.getServerPort());
} else {
log.info("不启动 server");
}
}
}
当项目开启EnableRpc注解,才启动服务器。
/**
* Rpc 服务提供者启动
*/
@Slf4j
public class RpcProviderBootstrap implements BeanPostProcessor {
/**
* Bean 初始化后执行,注册服务
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
RpcService rpcService = beanClass.getAnnotation(RpcService.class);
if (rpcService != null) {
// 需要注册服务
// 1. 获取服务基本信息
Class<?> interfaceClass = rpcService.interfaceClass();
// 默认值处理
if (interfaceClass == void.class) {
interfaceClass = beanClass.getInterfaces()[0];
}
String serviceName = interfaceClass.getName();
String serviceVersion = rpcService.serviceVersion();
// 2. 注册服务
// 本地注册
LocalRegistry.register(serviceName, beanClass);
// 全局配置
final RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 注册服务到注册中心
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(serviceVersion);
serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
serviceMetaInfo.setServicePort(rpcConfig.getServerPort());
try {
registry.register(serviceMetaInfo);
} catch (Exception e) {
throw new RuntimeException(serviceName + " 服务注册失败", e);
}
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
}
通过注解和反射机制,注册服务信息。
/**
* Rpc 服务消费者启动
*/
@Slf4j
public class RpcConsumerBootstrap implements BeanPostProcessor {
/**
* Bean 初始化后执行,注入服务
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
// 遍历对象的所有属性
Field[] declaredFields = beanClass.getDeclaredFields();
for (Field field : declaredFields) {
RpcReference rpcReference = field.getAnnotation(RpcReference.class);
if (rpcReference != null) {
// 为属性生成代理对象
Class<?> interfaceClass = rpcReference.interfaceClass();
if (interfaceClass == void.class) {
interfaceClass = field.getType();
}
field.setAccessible(true);
Object proxyObject = ServiceProxyFactory.getProxy(interfaceClass);
try {
field.set(bean, proxyObject);
field.setAccessible(false);
} catch (IllegalAccessException e) {
throw new RuntimeException("为字段注入代理对象失败", e);
}
}
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
}
注入代理对象。
最后,给EnableRpc添加@Import
注解,注册自定义启动类,实现灵活可选加载。
/**
* 启用 Rpc 注解
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcInitBootstrap.class, RpcProviderBootstrap.class, RpcConsumerBootstrap.class})
public @interface EnableRpc {
/**
* 需要启动 server
*
* @return
*/
boolean needServer() default true;
}