源码解析(基于版本0.4.8)
1.任务执行启动阶段
1.1 定时执行任务
QuerySyncTaskTimer实现了CommandLineRunner接口,在项目启动完后,会启动定时任务,每3秒从数据库查询任务列表,并执行相应的同步或删除逻辑。
1.2 CommandLineRunner(延伸)
1.用处:在使用SpringBoot构建项目时,我们通常有一些预先数据的加载。那么SpringBoot提供了一个简单的方式来实现–CommandLineRunner。CommandLineRunner是一个接口,我们需要时,只需实现该接口就行。如果存在多个加载的数据,我们也可以使用@Order注解来排序。
2.案例:分别定义了一个数据加载类MyStartupRunner1,排序为2;以及另一个数据加载类MyStartupRunner2,排序为1。
@Component
@Order(value = 2)
public class MyStartupRunner1 implements CommandLineRunner{
@Override
public void run(String... strings) throws Exception {
System.out.println(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 MyStartupRunner1 order 2 <<<<<<<<<<<<<");
}
}
@Component
@Order(value = 1)
public class MyStartupRunner2 implements CommandLineRunner {
@Override
public void run(String... strings) throws Exception {
System.out.println(">>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 MyStartupRunner2 order 1 <<<<<<<<<<<<<");
}
}
执行结果如下:
>>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 MyStartupRunner2 order 1 <<<<<<<<<<<<<
>>>>>>>>>>>>>>>服务启动执行,执行加载数据等操作 MyStartupRunner1 order 2 <<<<<<<<<<<<<
1.3任务执行逻辑
2.同步任务执行过程
2.1 发布订阅机制EventBus
在从数据库查询到一个同步任务后,会将同步任务进行发布。此处发布订阅使用的是Guava在guava-libraries中为我们提供的事件总线EventBus库。
EventBus是什么: EventBus是 基于 订阅/发布 模式实现的 基于事件的异步分发处理系统。 好处就是能够解耦 订阅者 和 发布者,简化代码。
EventBus注意点:Guava发布的事件默认不会处理线程安全的,但我们可以标注@AllowConcurrentEvents来保证其线程安全
EventBus 项目中的使用:
订阅:首先EventBus为我们提供了register方法来订阅事件,Guava在这里的实现很友好,我们不需要实现任何的额外接口或者base类,只需要在订阅方法上标注上@Subscribe和保证只有一个输入参数的方法就可以搞定。
发布:对于事件源,则可以通过post方法发布事件。 这里Guava对于事件的发布,是依据上例中订阅方法的方法参数类型决定的,换而言之就是post传入的类型和其基类类型可以收到此事件。
2.2 同步流程
在上述EventBus发出一条同步事件后,EventListener会订阅此事件并消费,随后调用SyncManagerService的sync方法(最终调用的是EurekaSyncToNacosServiceImpl类的sync方法)进行同步任务的处理:
- 详细同步流程:
相关组件介绍:
NamingService: NamingService是nacos提供用来实现服务注册、服务订阅、服务发现等功能的api,由NacosNamingService唯一实现,通过这个api就可以跟nacos服务端实现通信。
同步流程:
同步核心代码:
同步有效实例时,构建的实例信息:
3.同步任务执行完后
3.1 同步任务存于缓存
1.将同步信息,存放在specialSyncEventRegistry中,specialSyncEventRegistry是个ConcurrentHashMap,里面存放的信息,key是同步任务id,值是 同步任务信息以及执行同步的方法信息。方便另外一个线程,执行后续循环同步逻辑。
1.上述EurekaSyncToNacosServiceImpl中的同步方法sync执行完毕后,将继续执行2.1中的订阅部分代码。
2.由于返回的值为true,将调用skyWalkerCacheServices.addFinishedTask方法,此方法将往finishedTaskMap中存放key为操作id,值为FinishedTask的键值对信息。主要用于1.3中,线程循环扫描数据库表中的任务时,用作判断是否需要发布同步事件的依据。如果是首次已经执行过同步逻辑,则缓存中有对应数据,后续将不再执行同步事件的发出。
3.2 循环执行同步任务
1.SpecialSyncEventTimer与上述提到的QuerySyncTaskTimer一样,实现了CommandLineRunner接口,在项目启动完后,会启动定时任务。在3.1中分析过,将任务信息,存放在了specialSyncEventRegistry中。定时任务每隔3秒执行一次,会将specialSyncEventRegistry中的任务取出,过滤出任务的状态为同步的任务,并依次遍历,使用eventBus执行发布同步事件逻辑。
2.SpecialSyncEventListener中,订阅上述eventBus发布的同步事件,进行事件消费,并调用EurekaSyncToNacosServiceImpl中的sync方法。具体逻辑参见2.2。按照定时任务设定的频率,循环执行。
4.删除任务执行过程
4.1 删除任务事件发布
与上述2.1对同步任务的处理是类似的,使用eventBus通过post方法发布删除任务事件。
4.2 删除流程
1.在上述EventBus发出一条删除事件后,EventListener会订阅此事件并消费,随后调用SyncManagerService的delete方法(最终调用的是EurekaSyncToNacosServiceImpl类的delete方法)进行删除任务的处理:
2. 详细删除流程:
从specialSyncEventRegistry 这个map中移除对应的任务,则在3.2中循环执行同步任务时,不再循环同步此任务。
根据配置的同步任务的服务名,在eureka中找到此服务下的所有实例。
根据上述eureka中的实例信息,删除nacos中的对应实例。buildSyncInstance是构建实例的方法,和上述2.2中调用的方法是一样的,在此不再赘述。
5.删除任务执行完后
5.1 删除任务存于缓存
1.上述EurekaSyncToNacosServiceImpl中的删除方法delete执行完毕后,将继续执行4.2中的订阅部分代码。由于返回的值为true,将调用skyWalkerCacheServices.addFinishedTask方法,此方法将往finishedTaskMap中存放key为操作id,值为FinishedTask的键值对信息。作用和同步任务类似,在3.1中已经描述过,不再赘述。
此处结尾,再献上公司附近的绝美秋景图~