Flowable 高级功能
重点说明: 文章篇幅较长,请耐心观看。重点在文末,尤其是关于流程的本土化功能。
一、自定义扩展实现
1、自定义ProcessEngineConfiguration
扩展
详见类:
cn.blnp.net.flowable.boot.config.flowable.FlowableEngineConfiguration
2、自定义流程元素属性
单元测试用例详见:
cn.blnp.net.flowable.boot.custom.element.CusFlowElementTest#test
3、自定义流程活动行为
单元测试用例详见:
cn.blnp.net.flowable.boot.custom.activity.CusFlowActivityTest#test1
4、自定义流程事件(催办)
测试案例详见:cn.blnp.net.flowable.boot.custom.event.CusFlowEventTest#test
完整的催办业务案例实现,可参考以下文章:
- doc/1、流程催办案例实现.md
5、自定义流程校验规则
单元测试案例:cn.blnp.net.flowable.boot.custom.verify.CusValidatorTest#test
二、多租户与多数据源管理实现
Flowable在企业中最常见的用法是一个Flowable引擎对应一个数据库,为了实现系统/业务的数据隔离,还会使用Flowable提供的多租户功能(不同租户通过数据库表中的TenantId进行区别)。随着数据量的不断增加,这种多租户单数据源的模式可能会导致数据库负载过重,出现存储容量不足、读写性能下降等问题。为了解决这些问题,可以考虑多租户多数据源的模式,它允许在一个工作流引擎实例中处理多个租户的数据,并将不同租户的数据存储在不同的数据源中,以提高系统的容量和性能,并进一步提高数据的隔离性和安全性。
1、Flowable 提供的支持
对于 Flowable 的多数据库多租户工作流引擎配置类MultiSchemaMultiTenantProcessEngineConfiguration
,它提供了一些特定的方法和属性,用于配置工作流引擎以支持多租户和多数据源模式的数据存储。当工作流引擎需要连接到多个数据源进行操作时,此类通过自动路由机制自动选择需要操作的数据源,数据库的操作对客户端来说是透明的。
org.flowable.engine.impl.cfg.multitenant.MultiSchemaMultiTenantProcessEngineConfiguration#MultiSchemaMultiTenantProcessEngineConfiguration
1.1、TenantInfoHolder 接口
TenantInfoHolder是一个接口,主要的作用是持有Flowable的多租户信息。TenantInfoHolder提供了一种方便的方式来获取和设置当前租户的信息,以便在多租户环境中进行数据隔离和管理。通过TenantInfoHolder,用户可以轻松地在不同的租户之间切换,并且确保在处理数据时始终使用正确的租户信息。
package org.flowable.common.engine.impl.cfg.multitenant;
import java.util.Collection;
public interface TenantInfoHolder {
//获取所有租户列表
Collection<String> getAllTenants();
//设置当前租户ID
void setCurrentTenantId(String var1);
//获取当前租户ID
String getCurrentTenantId();
//清空当前租户ID
void clearCurrentTenantId();
}
1.2、TenantAwareDataSource 数据源封装类
Flowable中,多租户数据源的管理对于支持多租户的业务流程和数据访问非常重要。通过TenantAwareDataSource,可以更灵活地管理不同租户的数据源,从而更好地支持多租户的应用场景。TenantAwareDataSource是Flowable提供的一个数据源的封装类,该类实现了DataSource接口,用于支持多租户的数据源管理,负责路由数据库请求到不同的目标数据源,可以实现多数据源的动态切换和路由,根据租户的身份来动态地切换数据源,以确保每个租户只能访问属于己的数据。这种数据源可以帮助开发人员在多租户应用程序中有效地管理数据,并确保数据的安全性和隔离性。TenantAwareDataSource中定义了两个成变量:
public class TenantAwareDataSource implements DataSource {
protected TenantInfoHolder tenantInfoHolder;
protected Map<Object, DataSource> dataSources = new ConcurrentHashMap();
//.....
}
其中,tenantInfoHolder持有多租户信息,dataSources通过ConcurrentHashMap存储租户ID与数据源的映射关系,这个映射关系通过TenantAwareDataSource 类的addDataSource建立:
public void addDataSource(Object key, DataSource dataSource) {
this.dataSources.put(key, dataSource);
}
获取当前的租户ID,进而返回真正的数据源的核心方法是getCurrentDataSource(),其内容如下:
protected DataSource getCurrentDataSource() {
String tenantId = this.tenantInfoHolder.getCurrentTenantId();
DataSource dataSource = (DataSource)this.dataSources.get(tenantId);
if (dataSource == null) {
throw new FlowableException("Could not find a dataSource for tenant " + tenantId);
} else {
return dataSource;
}
}
以上代码的核心逻辑是首先从tenantInfoHolder中通过getCurrentTenantId()方法获取当前租户,然后根据租户从dataSources中查询得到该租户的数据源,从而实现多租户数据访问的需求。因此,当Flowable执行数据库操作之前,需要先获取数据源链接,此时会调用TenantAwareDataSource的getConnection()方法:
public Connection getConnection(String username, String password) throws SQLException {
return this.getCurrentDataSource().getConnection(username, password);
}
2、对多租户多数据源模式的实现
2.1、实现TenantInfoHolder接口
实现该接口的目的是,为了保存租户信息的上下文对象,以便在Flowable的各个组件能访问和管理租户信息。
package cn.blnp.net.flowable.boot.customer.flowable.holder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.multitenant.TenantInfoHolder;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* <h3>多租户管理扩展支持</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/9 19:26
*/
@Slf4j
public class MultiTenantInfoHolder implements TenantInfoHolder {
/**
* 线程安全的租户列表
**/
@Setter
@Getter
private CopyOnWriteArrayList<String> tenantList = new CopyOnWriteArrayList<>();
/**
* 用于记录当前持有的租户ID信息
**/
private static ThreadLocal<String> tenantThreadLocal = new ThreadLocal<>();
@Override
public Collection<String> getAllTenants() {
return tenantList;
}
@Override
public void setCurrentTenantId(String tenantId) {
tenantThreadLocal.set(tenantId);
}
@Override
public String getCurrentTenantId() {
return tenantThreadLocal.get();
}
@Override
public void clearCurrentTenantId() {
tenantThreadLocal.remove();
}
}
与之相对应的核心配置类如下所示:
package cn.blnp.net.flowable.boot.config;
import cn.blnp.net.flowable.boot.bean.entity.HcTenant;
import cn.blnp.net.flowable.boot.config.flowable.MultiTenantDataSourceProcessConfig;
import cn.blnp.net.flowable.boot.customer.flowable.holder.MultiTenantInfoHolder;
import cn.blnp.net.flowable.boot.service.ITenantService;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.druid.pool.DruidDataSource;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* <h3>自定义项目相关配置</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/9 19:35
*/
@Slf4j
@Configuration
@AllArgsConstructor
public class CusItemConfiguration {
private final ITenantService tenantService;
private final DataSource dataSource;
@Bean
public MultiTenantInfoHolder multiTenantInfoHolder() {
MultiTenantInfoHolder tenantInfoHolder = new MultiTenantInfoHolder();
//查询平台内的所有租户信息
List<HcTenant> tenantListInfo = tenantService.lambdaQuery()
.list();
CopyOnWriteArrayList<String> tenantList = new CopyOnWriteArrayList<>();
for (HcTenant hcTenant : tenantListInfo) {
if (StringUtils.isBlank(hcTenant.getDataSource())) {
continue;
}
tenantList.add(hcTenant.getId());
}
tenantInfoHolder.setTenantList(tenantList);
return tenantInfoHolder;
}
@Bean
@DependsOn("multiTenantInfoHolder")
public MultiTenantDataSourceProcessConfig multiTenantDataSourceProcessConfig(MultiTenantInfoHolder multiTenantInfoHolder) {
MultiTenantDataSourceProcessConfig tenantDataSource = new MultiTenantDataSourceProcessConfig(multiTenantInfoHolder);
tenantDataSource.setDatabaseType("mysql");
//平台租户列表信息
List<HcTenant> tenantListInfo = tenantService.lambdaQuery()
.list();
//租户与对应数据源关系
Map<String, DataSource> tenantRelationMap = new ConcurrentHashMap<>();
for (HcTenant tenant : tenantListInfo) {
if (StringUtils.isNotBlank(tenant.getDataSource())) {
JSONObject dataSourceConfig = JSONUtil.parseObj(tenant.getDataSource());
//对应租户的数据源配置信息
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(dataSourceConfig.getStr("url"));
dataSource.setUsername(dataSourceConfig.getStr("username"));
dataSource.setPassword(dataSourceConfig.getStr("password"));
dataSource.setDriverClassName(dataSourceConfig.getStr("driverClassName"));
dataSource.setInitialSize(dataSourceConfig.getInt("initialSize",5));
tenantRelationMap.put(tenant.getId(), dataSource);
}
}
//数据库的执行策略
tenantDataSource.setDatabaseSchemaUpdate("true");
tenantDataSource.setDisableEventRegistry(true);
tenantDataSource.setTenantDataSourceRelationMap(tenantRelationMap);
return tenantDataSource;
}
}
2.2、自定义MultiSchemaMultiTenantProcessEngineConfiguration配置类
package cn.blnp.net.flowable.boot.config.flowable;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.impl.cfg.multitenant.TenantInfoHolder;
import org.flowable.engine.ProcessEngine;
import org.flowable.engine.impl.cfg.multitenant.MultiSchemaMultiTenantProcessEngineConfiguration;
import javax.sql.DataSource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <h3>多数据源与多租户的关系管理配置</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/9 19:39
*/
@Getter
@Slf4j
public class MultiTenantDataSourceProcessConfig extends MultiSchemaMultiTenantProcessEngineConfiguration {
@Setter
private Map<String, DataSource> tenantDataSourceRelationMap = new ConcurrentHashMap<>();
public MultiTenantDataSourceProcessConfig(TenantInfoHolder tenantInfoHolder) {
super(tenantInfoHolder);
}
@Override
public ProcessEngine buildProcessEngine() {
log.warn("===开始构建多数据源与多租户关系===");
//将租户与对应数据源进行关系绑定
for (String tenantId : tenantInfoHolder.getAllTenants()) {
DataSource source = tenantDataSourceRelationMap.get(tenantId);
if (null == source) {
continue;
}
super.registerTenant(tenantId, source);
}
return super.buildProcessEngine();
}
}
2.3、自定义租户注解&AOP切面
自定义租户注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TenantAnnotation {
/**
* 声明租户ID值
**/
String tenantId() default "default";
}
AOP切面处理:
package cn.blnp.net.flowable.boot.ext.aspect;
import cn.blnp.net.flowable.boot.annotations.TenantAnnotation;
import cn.blnp.net.flowable.boot.customer.flowable.holder.MultiTenantInfoHolder;
import cn.blnp.net.flowable.boot.utils.SpELKeyGeneratorUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* <h3>自定义租户ID切面实现</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/9 19:49
*/
@Slf4j
@Aspect
@Component
@AllArgsConstructor
public class TenantAspect {
private final MultiTenantInfoHolder multiTenantInfoHolder;
@Pointcut("@annotation(cn.blnp.net.flowable.boot.annotations.TenantAnnotation)")
public void tenantPoint() {
}
@Around("tenantPoint()")
public Object tenantPointCut(ProceedingJoinPoint point) throws Throwable{
//获取方法签名
MethodSignature signature = (MethodSignature) point.getSignature();
//获取切入方法的对象
Method method = signature.getMethod();
//获取方法上的AOP注解
TenantAnnotation tenantAnnotation = method.getAnnotation(TenantAnnotation.class);
//获取注解上的租户ID值
String tenantIdKey = tenantAnnotation.tenantId();
String tenantId = SpELKeyGeneratorUtil.generateKeyBySpEL(tenantIdKey, point.getArgs(), signature.getParameterNames());
log.warn("租户已切换,当前租户ID为:{}", tenantId);
//切换租户
multiTenantInfoHolder.setCurrentTenantId(tenantId);
try {
return point.proceed();
} finally {
//清空租户
multiTenantInfoHolder.clearCurrentTenantId();
log.warn("租户已重置,当前租户ID为:{}", multiTenantInfoHolder.getCurrentTenantId());
}
}
}
2.4、案例演示
单元测试案例详见:cn.blnp.net.flowable.boot.tenant.MultiTenantTest#test
三、自定义身份管理引擎
1、自定义身份认证实体与数据管理服务
1.1、自定义用户数据管理服务
详见:cn.blnp.net.flowable.boot.customer.flowable.idm.CustomUserDataManager
1.2、自定义身份认证实体管理服务
详见:cn.blnp.net.flowable.boot.customer.flowable.idm.CustomUserEntityManager
2、自定义身份管理引擎与配置器
2.1、自定义身份管理引擎配置
IdmEngineConfiguration 是用于配置和初始化IdmEngine的类,提供一系列方法来配置IdmEngine的各种属性,比如数据库连接、密码策略、实体管理器和数据管理器等。另外,还可以通过调用它的buildIdmEngine()方法来构建和初始化IdmEngine实例。
自定义身份管理引擎配置,这里选择继承org.flowable.idm.engine.IdmEngineConfiguration,重写它的buildIdmEngine()和init()方法,内容如下:
public class CustomIdmEngineConfiguration extends IdmEngineConfiguration {
@Override
public IdmEngine buildIdmEngine() {
this.init();
return new IdmEngineImpl(this);
}
@Override
protected void init() {
//初始化引擎配置
super.initEngineConfigurations();
//初始化命令上下文工厂
super.initCommandContextFactory();
//初始化命令执行器
super.initCommandExecutors();
//初始化身份管理相关服务
super.initServices();
}
}
2.2、自定义身份管理引擎配置器
IdmEngineConfigurator是用于创建和管理IdmEngineConfiguration对象的工具类,提供创建及修改IdmEngineConfiguration对象的方法。IdmEngineConfigurator实现了ProcessEngineConfigurator接口,会在Flowable工作流引擎启动时被加载,并通过调用IdmEngineConfiguration的buildIdmEngine()方法来构建和初始化IdmEngine实例。
自定义身份管理引擎配置,这里选择继承org.flowable.idm.engine.IdmEngineConfiguration,重写它的buildIdmEngine()和init()方法,内容如下:
public class CustomIdmEngineConfigurator extends IdmEngineConfigurator {
@Override
public void configure(AbstractEngineConfiguration engineConfiguration) {
//初始化引擎配置
initEngineConfigurations(engineConfiguration, idmEngineConfiguration);
//启动身份管理引擎
idmEngineConfiguration.buildIdmEngine();
//初始化服务配置
initServiceConfigurations(engineConfiguration, idmEngineConfiguration);
}
}
四、自定义流程定义缓存容器
不建议实现该功能,原因是实现过程比较复杂。实现过程中产生的问题也比较多,主要有以下几个:
- 底层流程定义相关的对象未实现序列化接口,执行对象序列化与反序列化时无法执行
- 在流程定义初始化前,通过 Javassist 动态实现接口对指定底层对象进行修改编译。但是产生的问题:
attempted duplicate class definition
无法解决 - 如在当前工程中,对底层对象进行拷贝复制并手动实现接口则可以解决上述问题;但存在较多问题,一是改动的对象类比较多;二则是对象
BpmnModel.java
类的方法getEventSupport()
源码中就自带了@JsonIgnore
注解。该注解用意是阻止第三方使用者对对象进行序列化操作。
解决方案:
通过对项目进行 kryo 的集成,使用Redis时指定序列化助手为 kryo 的即可。
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.6.0</version>
</dependency>
@Bean("kryoRedisTemplate")
public RedisTemplate<String, Object> kryoRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer); // key
KryoRedisSerializer redisSerializer = new KryoRedisSerializer();
redisTemplate.setValueSerializer(redisSerializer); //value
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
五、自定义任务作业处理器
单元案例详见:cn.blnp.net.flowable.boot.custom.timer.CusWorkHandlerTest#test1
注意事项:
- 记得启用异步执行配置(
setAsyncExecutorActivate(true)
),否则作业任务不会执行
六、自定义业务日历
1、什么是业务日历?
在实际业务中,工作时间的计算往往不是按照自然日来进行的:通常会区分工作日和非工作日,比如非工作日一般是周末和法定节假日;另外还会定义特定的时间段,如每天的上午9点到下午6点为工作时间。这样就需要根据不同的业务需求和工作规则定义各种用于计算工作时间和非工作时间的日历,即业务日历。
2、自定义业务日历的实现
2.1、创建自定义业务日历
package cn.blnp.net.flowable.boot.customer.flowable.handler;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.impl.calendar.BusinessCalendarImpl;
import org.flowable.common.engine.impl.runtime.ClockReader;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import java.util.Date;
/**
* <h3>自定义业务日历</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/13 11:48
*/
@Slf4j
public class CusBusinessCalendar extends BusinessCalendarImpl {
public static final String NAME = "custom";
public CusBusinessCalendar(ClockReader clockReader) {
super(clockReader);
}
@Override
public Date resolveDuedate(String dueDate, int maxIterations) {
try {
if (dueDate.startsWith("P")) {
//获取当前时间
LocalDateTime currentTime = LocalDateTime.now();
LocalDateTime startTime = null;
LocalDateTime endTime = null;
//周六处理
if (currentTime.getDayOfWeek() == 6) {
//声明任务当天处理的时间是早上9点到18点
startTime = getLocalDateTime(currentTime.plusDays(2), 9, 0, 0);
endTime = getLocalDateTime(currentTime.plusDays(2), 18, 0, 0);
}
//周天处理
else if (currentTime.getDayOfWeek() == 7) {
startTime = getLocalDateTime(currentTime.plusDays(1), 9, 0, 0);
endTime = getLocalDateTime(currentTime.plusDays(1), 18, 0, 0);
} else {
//工作日计算
startTime = getLocalDateTime(currentTime, 9, 0, 0);
endTime = getLocalDateTime(currentTime, 18, 0, 0);
}
LocalDateTime effectiveStartTime = null;
if (currentTime.toDate().before(startTime.toDate())) {
effectiveStartTime = startTime;
} else if (currentTime.toDate().after(startTime.toDate())
&& currentTime.toDate().before(endTime.toDate())) {
effectiveStartTime = currentTime;
} else if (currentTime.toDate().after(endTime.toDate())) {
effectiveStartTime = startTime.plusDays(1);
while (effectiveStartTime.getDayOfWeek() == 6
|| effectiveStartTime.getDayOfWeek() == 7) {
effectiveStartTime = effectiveStartTime.plusDays(1);
}
}
Duration totalDuration =
new Duration(effectiveStartTime.toDateTime(),
(getLocalDateTime(effectiveStartTime, 18, 0, 0)).toDateTime());
Duration duedateDuration = Period.parse(dueDate).toStandardDuration();
if (totalDuration.isLongerThan(duedateDuration)) {
return effectiveStartTime.plus(Period.parse(dueDate)).toDate();
} else {
LocalDateTime nextDay = effectiveStartTime;
while(true) {
nextDay = nextDay.plusDays(1);
if (nextDay.getDayOfWeek() == 6 || nextDay.getDayOfWeek() == 7) {
continue;
}
Duration nextDayDuration = new Duration(
getLocalDateTime(nextDay, 9, 0, 0).toDateTime(),
(getLocalDateTime(nextDay, 18, 0,0)).toDateTime());
if (totalDuration.plus(nextDayDuration)
.isShorterThan(duedateDuration)) {
totalDuration = totalDuration.plus(nextDayDuration);
} else {
return getLocalDateTime(nextDay, 9, 0, 0).plus(
duedateDuration.minus(totalDuration)).toDate();
}
}
}
}
return DateTime.parse(dueDate).toDate();
} catch (Exception e) {
throw new FlowableException("couldn't resolve dueDate: " + e.getMessage(), e);
}
}
private LocalDateTime getLocalDateTime(LocalDateTime dateTime, int hourOfDay, int minuteOfHour, int secondOfMinute) {
return new LocalDateTime(dateTime.getYear(), dateTime.getMonthOfYear(),
dateTime.getDayOfMonth(), hourOfDay, minuteOfHour, secondOfMinute);
}
}
2.2、在工作流引擎中配置
/** 自定义业务日历配置--start **/
@Bean
public DefaultClockImpl defaultClock() {
//初始化时钟
return new DefaultClockImpl();
}
@Bean
@DependsOn("defaultClock")
public CusBusinessCalendar cusBusinessCalendar(DefaultClockImpl clock) {
//自定义业务日历时钟解析
return new CusBusinessCalendar(clock);
}
@Bean
@DependsOn("cusBusinessCalendar")
public MapBusinessCalendarManager mapBusinessCalendarManager(CusBusinessCalendar cusBusinessCalendar) {
//初始化业务日历管理器
Map<String, BusinessCalendar> businessCalendars = new HashMap<>();
//将自定义日历解析器注册至工作流引擎中
businessCalendars.put(CusBusinessCalendar.NAME, cusBusinessCalendar);
businessCalendars.put("cycle", cusBusinessCalendar);
return new MapBusinessCalendarManager(businessCalendars);
}
/** 自定义业务日历配置-- End **/
完整的配置详见类:cn/blnp/net/flowable/boot/config/flowable/FlowableEngineConfiguration.java
2.3、使用案例
详见测试用例:cn.blnp.net.flowable.boot.custom.calendar.CusBusinessCalendarTest#test2
七、本土化流程功能的支持
1、动态跳转
1.1、基础应用案例
**单元测试用例详见:**cn.blnp.net.flowable.boot.localize.dynamic.jump.base.DynamicJumpFlowTest#test1
数据库对应流程实例的历史任务(
act_hi_taskinst
)记录:
1.2、与网关结合案例
**单元测试用例详见:**cn.blnp.net.flowable.boot.localize.dynamic.jump.gateway.DynamicJumpGatewayTest#test
历史任务数据库记录:
在这里插入图片描述
由于案例流程相对较为复杂,这里对每一次的跳转做加以说明:
- 第一次跳转是使用
moveSingleActivityIdToActivityIds
接口实现从用户任务UserTask2跳转到并行网关后的用户任务UserTask6、UserTask7,需要注意的,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定了流程实例编号; - 第二次跳转是使用
moveExecutionToActivityId
接口实现从用户任务UserTask3跳转到排他网关ExclusiveGateway1后的UserTask10; - 第三次跳转是使用
moveActivityIdsToSingleActivityId
接口实现从用户任务UserTask4、UserTask10跳转到UserTask12,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定了流程实例编号 - 第四次跳转是使用
moveSingleExecutionToActivityIds
接口实现从用户任务UserTask12跳转到包容网关InclusiveGateway2前的UserTask3、UserTask4; - 第五次跳转是使用
moveExecutionsToSingleActivityId
接口实现从用户任务UserTask3、UserTask4跳转到包容网关InclusiveGateway2,经网关汇聚后流转到用户任务UserTask12; - 第六次跳转是使用
moveActivityIdTo
接口实现从用户任务UserTask13跳转到用户任务UserTask1,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定了流程实例编号
1.3、与子流程结合案例
**单元测试用例详情:**cn.blnp.net.flowable.boot.localize.dynamic.jump.subflow.DynamicJumpSubFlowTest#test
- 第一次跳转是使用
moveSingleActivityIdToActivityIds
接口实现在事件子流程EventSubProcess1内部从StartEvent4跳转到UserTask8、UserTask9,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定了流程实例编号; - 第二次跳转是使用
moveExecutionsToSingleActivityId
接口实现在事件子流程EventSubProcess1内部从UserTask8、UserTask9跳转到UserTask10; - 第三次跳转是使用
moveSingleExecutionToActivityIds
接口实现从事件子流程EventSubProcess1的UserTask10跳转到子流程SubProcess1的UserTask3、SubProcess2的UserTask6; - 第四次跳转是使用
moveActivityIdsToSingleActivityId
接口实现从子流程SubProcess1的UserTask3、SubProcess2的UserTask6跳转到主流程的ParallelGateway2,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定了流程实例编号; - 第五次跳转是使用
moveActivityIdTo
接口实现从主流程的UserTask1跳转到主流程的UserTask2,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定了流程实例编号
1.4、与调用活动结合案例
**单元测试用例详见:**cn.blnp.net.flowable.boot.localize.dynamic.jump.activity.DynamicJumpCallFlowTest#test
历史任务数据表如下所示:
- 第一次跳转是使用
moveActivityIdToSubProcessInstanceActivityId
接口实现从主流程的UserTask1跳转到子流程的UserTask5: - 第二次跳转是使用
moveActivityIdToParentActivityId
接口实现从子流程的UserTask5跳转到主流程的UserTask4,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定的是子流程的流程实例编号; - 第三次跳转是使用
moveSingleActivityIdToSubProcessInstanceActivityIds
接口实现从主流程的UserTask4跳转到子流程的UserTask7、UserTask8 - 第四次跳转是使用
moveActivityIdsToParentActivityId
接口实现从子流程的UserTask7、UserTask8跳转到主流程的UserTask1,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定的是子流程的流程实例编号; - 第五次跳转是使用
moveSingleActivityIdToActivityIds
接口实现从主流程的UserTask1跳转到主流程的UserTask2、UserTask3; - 第六次跳转是使用
moveActivityIdsToSubProcessInstanceActivityId
接口实现从主流程的UserTask2、UserTask3跳转到子流程的UserTask10; - 第七次跳转是使用
moveSingleActivityIdToParentActivityIds
接口实现从子流程的UserTask10流转到主流程的UserTask2、UserTask3,需要注意的是,这次跳转中调用ChangeActivityStateBuilder
的processInstanceId()
方法指定的是子流程的流程实例编号。
重点提示:
项目中若没有启用工作流引擎的实体链接服务配置,在执行该案例时将会抛出以下异常
2025-06-18 17:28:35.002 ERROR 30456 --- [ main] o.f.c.e.impl.interceptor.CommandContext : Error while closing command context
java.lang.NullPointerException: null
at org.flowable.engine.impl.util.EntityLinkUtil.createEntityLinks(EntityLinkUtil.java:41) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.dynamic.AbstractDynamicStateManager.createCallActivityInstance(AbstractDynamicStateManager.java:843) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.dynamic.AbstractDynamicStateManager.doMoveExecutionState(AbstractDynamicStateManager.java:403) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.dynamic.DefaultDynamicStateManager.moveExecutionState(DefaultDynamicStateManager.java:47) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.cmd.ChangeActivityStateCmd.execute(ChangeActivityStateCmd.java:44) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.cmd.ChangeActivityStateCmd.execute(ChangeActivityStateCmd.java:26) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.interceptor.CommandInvoker$1.run(CommandInvoker.java:67) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.interceptor.CommandInvoker.executeOperation(CommandInvoker.java:140) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.interceptor.CommandInvoker.executeOperations(CommandInvoker.java:114) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.interceptor.CommandInvoker.execute(CommandInvoker.java:72) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.interceptor.BpmnOverrideContextInterceptor.execute(BpmnOverrideContextInterceptor.java:26) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.common.engine.impl.interceptor.TransactionContextInterceptor.execute(TransactionContextInterceptor.java:53) ~[flowable-engine-common-6.8.0.jar:6.8.0]
at org.flowable.common.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:105) ~[flowable-engine-common-6.8.0.jar:6.8.0]
at org.flowable.common.spring.SpringTransactionInterceptor.lambda$execute$0(SpringTransactionInterceptor.java:57) [flowable-spring-common-6.8.0.jar:6.8.0]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.23.jar:5.3.23]
at org.flowable.common.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:57) [flowable-spring-common-6.8.0.jar:6.8.0]
at org.flowable.common.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30) ~[flowable-engine-common-6.8.0.jar:6.8.0]
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:56) ~[flowable-engine-common-6.8.0.jar:6.8.0]
at org.flowable.common.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:51) ~[flowable-engine-common-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.RuntimeServiceImpl.changeActivityState(RuntimeServiceImpl.java:809) ~[flowable-engine-6.8.0.jar:6.8.0]
at org.flowable.engine.impl.runtime.ChangeActivityStateBuilderImpl.changeState(ChangeActivityStateBuilderImpl.java:226) ~[flowable-engine-6.8.0.jar:6.8.0]
解决办法:
启用上述提到的配置即可。配置根据方式的不同来区分:
- XML 配置
<property name="enableEntityLinks" value="true"/>
SpringBoot
配置
ProcessEngineConfigurationImpl config = new StandaloneProcessEngineConfiguration()
.setEnableEntityLinks(true);
ProcessEngine engine = config.buildProcessEngine();
2、任务撤回
任务撤回功能是指发起人发起流程或办理人办理任务后,如果发现错误或者需要重新办理,可以在后续用户任务的办理人(或候选人)办理该任务前将任务撤回,以便进行修正或再次办理。
任务撤回是一个很常见的场景,如申请人发起流程后发现提交材料内容有误,但是此时流程已经流转到下一个审批用户任务,这种情况下如果让申请人沟通下一节点的办理人驳回流程重新编辑,会增加很多工作量,并且大大延长流程的审批时间,而任务撤回非常适用于这种场景。任务撤回功能能够提高工作效率,避免出错的流程继续执行,确保流程的正确性。
2.1、扩展实现
2.1.1、任务撤回命令
package cn.blnp.net.flowable.boot.ext.cmd;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.*;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.common.engine.api.FlowableObjectNotFoundException;
import org.flowable.common.engine.impl.interceptor.Command;
import org.flowable.common.engine.impl.interceptor.CommandContext;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.impl.persistence.entity.HistoricActivityInstanceEntityManager;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.flowable.engine.impl.util.ProcessDefinitionUtil;
import org.flowable.engine.runtime.Execution;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.history.HistoricTaskInstance;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* <h3>任务撤回扩展命令实现</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/18 19:10
*/
@Slf4j
@AllArgsConstructor
public class TaskRecallCmd implements Command<Void> {
/**
* 任务ID
**/
protected final String taskId;
@Override
public Void execute(CommandContext commandContext) {
//taskId参数不能为空
if (this.taskId == null) {
throw new FlowableIllegalArgumentException("Task id is required");
}
ProcessEngineConfigurationImpl procEngineConf = CommandContextUtil.getProcessEngineConfiguration(commandContext);
RuntimeService runtimeService = procEngineConf.getRuntimeService();
//获取历史服务
HistoryService historyService = procEngineConf.getHistoryService();
//根据taskId查询历史任务
HistoricTaskInstance task = historyService.createHistoricTaskInstanceQuery()
.taskId(this.taskId)
.singleResult();
//对任务做一系列的检测
basicCheck(runtimeService, task);
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(task.getProcessDefinitionId());
FlowElement flowElement = bpmnModel.getFlowElement(task.getTaskDefinitionKey());
List<String> nextElementIdList = new ArrayList();
List<UserTask> nextUserTaskList = new ArrayList();
//先获取后续节点信息
getNextElementInfo(bpmnModel, flowElement, nextElementIdList, nextUserTaskList);
//再校验后续节点任务是否已经办理完成
existNextFinishedTaskCheck(historyService, task, nextUserTaskList);
//清理节点历史
deleteHistoricActivityInstance(procEngineConf, historyService, task);
//执行跳转
List<String> recallElementIdList = getRecallElementIdList(runtimeService, task, nextElementIdList);
runtimeService.createChangeActivityStateBuilder()
.processInstanceId(task.getProcessInstanceId())
.moveActivityIdsToSingleActivityId(recallElementIdList, task.getTaskDefinitionKey())
.changeState();
return null;
}
/**
* <p><b>用途:任务校验<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 19:50 2025/6/18
* @params [runtimeService, task]
* @param runtimeService
* @param task
* @return void
**/
private void basicCheck(RuntimeService runtimeService, HistoricTaskInstance task) {
if (task == null) {
String msg = "任务不存在";
throw new FlowableObjectNotFoundException(msg);
}
if (task.getEndTime() == null) {
String msg = "任务正在执行,不需要回退";
throw new FlowableException(msg);
}
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(task.getProcessInstanceId()).singleResult();
if (processInstance == null) {
String msg = "该流程已经结束,无法进行任务回退。";
throw new FlowableException(msg);
}
}
/**
* <p><b>用途:获取后续节点信息<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 19:50 2025/6/18
* @params [bpmnModel, currentNode, nextNodeIdList, nextUserTaskList]
* @param bpmnModel 流程模型
* @param currentNode 当前节点
* @param nextNodeIdList 后续节点ID列表
* @param nextUserTaskList 后续用户任务节点列表
* @return void
**/
private void getNextElementInfo(BpmnModel bpmnModel, FlowElement currentNode,
List<String> nextNodeIdList,
List<UserTask> nextUserTaskList) {
//查询当前节点所有流出顺序流
List<SequenceFlow> outgoingFlows = ((FlowNode) currentNode).getOutgoingFlows();
for (SequenceFlow flow : outgoingFlows) {
//后续节点
FlowElement targetNode = bpmnModel.getFlowElement(flow.getTargetRef());
nextNodeIdList.add(targetNode.getId());
if (targetNode instanceof UserTask) {
nextUserTaskList.add((UserTask) targetNode);
} else if (targetNode instanceof Gateway) {
Gateway gateway = ((Gateway) targetNode);
//网关节点执行递归操作
getNextElementInfo(bpmnModel, gateway, nextNodeIdList, nextUserTaskList);
} else {
//其他类型节点拓展实现
}
}
}
/**
* <p><b>用途:校验后续节点任务是否已办理完成<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 19:55 2025/6/18
* @params [historyService, currentTaskInstance, nextUserTaskList]
* @param historyService 历史服务
* @param currentTaskInstance 当前任务实例
* @param nextUserTaskList 后续用户
* @return void
**/
private void existNextFinishedTaskCheck(HistoryService historyService,
HistoricTaskInstance currentTaskInstance,
List<UserTask> nextUserTaskList) {
List<HistoricTaskInstance> hisTaskList = historyService
.createHistoricTaskInstanceQuery()
.processInstanceId(currentTaskInstance.getProcessInstanceId())
.taskCompletedAfter(currentTaskInstance.getEndTime())
.list();
List<String> nextUserTaskIdList = nextUserTaskList.stream().map(UserTask::getId)
.collect(Collectors.toList());
if (!hisTaskList.isEmpty()) {
hisTaskList.forEach(obj -> {
if (nextUserTaskIdList.contains(obj.getTaskDefinitionKey())) {
String msg = "存在已完成下一节点任务";
throw new FlowableException(msg);
}
});
}
}
/**
* <p><b>用途:获取可撤回的节点列表<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 19:56 2025/6/18
* @params [runtimeService, currentTaskInstance, nextElementIdList]
* @param runtimeService 工作流引擎配置
* @param currentTaskInstance 任务实例
* @param nextElementIdList 后续节点列表
* @return java.util.List<java.lang.String>
**/
private List<String> getRecallElementIdList(RuntimeService runtimeService,
HistoricTaskInstance currentTaskInstance,
List<String> nextElementIdList) {
List<String> recallElementIdList = new ArrayList();
List<Execution> executions = runtimeService.createExecutionQuery()
.processInstanceId(currentTaskInstance.getProcessInstanceId())
.onlyChildExecutions().list();
if (!executions.isEmpty()) {
executions.forEach(obj -> {
if (nextElementIdList.contains(obj.getActivityId())) {
recallElementIdList.add(obj.getActivityId());
}
});
}
return recallElementIdList;
}
/**
* <p><b>用途:清理节点历史<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 19:57 2025/6/18
* @params [procEngineConf, historyService, task]
* @param procEngineConf 工作流引擎配置
* @param historyService 历史服务
* @param task 任务实例
* @return void
**/
private void deleteHistoricActivityInstance(ProcessEngineConfigurationImpl procEngineConf,
HistoryService historyService,
HistoricTaskInstance task) {
//删除要撤回的节点的历史
List<HistoricActivityInstance> allHisActivityList = historyService
.createHistoricActivityInstanceQuery()
.processInstanceId(task.getProcessInstanceId())
.activityId(task.getTaskDefinitionKey()).list();
HistoricActivityInstance hisActivity = allHisActivityList
.stream().filter(obj -> task.getId().equals(obj.getTaskId()))
.findFirst().get();
HistoricActivityInstanceEntityManager hisActivityEntityManager = procEngineConf
.getHistoricActivityInstanceEntityManager();
hisActivityEntityManager.delete(hisActivity.getId());
//删除被撤回的节点的历史
List<HistoricActivityInstance> hisActivityList = historyService
.createHistoricActivityInstanceQuery()
.processInstanceId(task.getProcessInstanceId())
.startedAfter(task.getEndTime())
.orderByHistoricActivityInstanceStartTime()
.asc().list();
List<String> deleteHisActivityIdList = new ArrayList();
if (!CollectionUtils.isEmpty(hisActivityList)) {
hisActivityList.forEach(obj -> {
if (!deleteHisActivityIdList.contains(obj.getActivityId())) {
deleteHisActivityIdList.add(obj.getId());
hisActivityEntityManager.delete(obj.getId());
}
});
}
}
}
2.1.2、任务撤回服务类
@AllArgsConstructor
public class TaskRecallService {
protected ManagementService managementService;
public void executeRecall(String taskId) {
//实例化撤回Command类
TaskRecallCmd taskRecallCmd = new TaskRecallCmd(taskId);
//通过ManagementService管理服务执行撤回Command类
managementService.executeCommand(taskRecallCmd);
}
}
2.2、案例演示
单元测试用例详见:
cn.blnp.net.flowable.boot.localize.task.recall.TaskRecallTest#test
历史任务数据表:
3、流程撤销
3.1、扩展实现
3.1.1、前言
这里以“特殊借款申请流程”为例,申请人发起流程提交“特殊借款申请”用户任务后,流程将流转到“直属上级审批”用户任务和“财务经理审批”用户任务,两个并行分支分别执行。如果流程已经流转到“部门经理审批”用户任务和“财务总监审批”用户任务时,申请人发现提交的申请信息有误,那么申请人可以执行流程撤销操作,使流程重新回到“特殊借款申请”用户任务,由申请人修改信息后再次提交。下面介绍如何扩展Flowable实现流程撤销操作。
流程撤销的应用场景一般有以下几个特点:
1.只能由流程发起人操作;
2.清除流程执行历史
3.流程恢复到流程发起时的状态
3.1.2、自定义撤销 Command 类
因为Flowable的持久化机制是数据集中提交,即所有的insert、update和delete操作在org.flowable.common.engine.impl.db.DbSqlSession
的flush()方法中集中提交,并且顺序是先insert,再update,最后delete。流程撤销操作中涉及删除原有流程实例再创建新的流程实例,因此为避免新产生的数据在数据集提交机制中被删除,为两个操作分别创建Command类。流程删除Command命令类的代码如下:
自定义流程删除命令:
/**
* <h3>自定义流程删除命令实现</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/19 10:27
*/
@Slf4j
@AllArgsConstructor
public class DeleteProcessInstanceCmd implements Command<Map<String, Object>> {
//流程实例编号
protected String processInstanceId;
@Override
public Map<String, Object> execute(CommandContext commandContext) {
ProcessEngineConfigurationImpl procEngineConf = CommandContextUtil.getProcessEngineConfiguration(commandContext);
RuntimeService runtimeService = procEngineConf.getRuntimeService();
//根据processInstanceId查询流程实例
ProcessInstance procInst = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (procInst == null) {
throw new FlowableObjectNotFoundException("编号为 " + processInstanceId + " 的流程实例不存在。", ProcessInstance.class);
}
//不是流程发起者不能撤销流程
String authenticatedUserId = Authentication.getAuthenticatedUserId();
if (!procInst.getStartUserId().equals(authenticatedUserId)) {
throw new FlowableException("非流程发起者不能撤销流程。");
}
//查询流程变量
Map<String, Object> varMap = runtimeService.getVariables(processInstanceId);
//删除流程实例
runtimeService.deleteProcessInstance(processInstanceId, "流程撤销删除流程");
//删除历史流程实例
HistoryService historyService = procEngineConf.getHistoryService();
historyService.deleteHistoricProcessInstance(processInstanceId);
Map<String, Object> procInstMap = new HashMap<>();
procInstMap.put("processInstanceId", processInstanceId);
procInstMap.put("processDefinitionId", procInst.getProcessDefinitionId());
procInstMap.put("processDefinitionKey", procInst.getProcessDefinitionKey());
procInstMap.put("businessKey", procInst.getBusinessKey());
procInstMap.put("tenantId", procInst.getTenantId());
procInstMap.put("variables", varMap);
return procInstMap;
}
}
以上代码实现了org.flowable.common.engine.impl.interceptor.Command
接口类,传入参数为流程实例编号processInstanceId
,并重写了该接口类的execute()方法。其核心逻辑是先根据processInstanceId
查询流程实例,进行一系列校验,包括校验流程实例是否存在、当前操作人是否为流程发起人;然后查询流程变量备用,删除流程实例和历史流程实例,最后将流程实例的相关信息放入一个Map作为结果返回,供后续重建流程时使用。
自定义流程重建命令:
@Slf4j
@AllArgsConstructor
public class ReCreateProcessInstanceCmd implements Command<ProcessInstance> {
protected Map<String, Object> procInstMap;
@Override
public ProcessInstance execute(CommandContext commandContext) {
RuntimeService runtimeService = CommandContextUtil
.getProcessEngineConfiguration(commandContext).getRuntimeService();
//重建流程
ProcessInstanceBuilder processInstanceBuilder = runtimeService
.createProcessInstanceBuilder();
ProcessInstance newProcessInstance = processInstanceBuilder
.processDefinitionId((String)procInstMap.get("processDefinitionId"))
.processDefinitionKey((String)procInstMap.get("processDefinitionKey"))
.predefineProcessInstanceId((String)procInstMap.get("processInstanceId"))
.businessKey((String)procInstMap.get("businessKey"))
.variables((Map)procInstMap.get("variables"))
.tenantId((String)procInstMap.get("tenantId"))
.start();
return newProcessInstance;
}
}
3.1.3、流程撤销服务类
/**
* <h3>自定义流程撤销服务实现</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/19 10:45
*/
@Slf4j
@Service
@AllArgsConstructor
public class RevokeProcessInstanceServiceImpl implements RevokeProcessInstanceService {
protected ManagementService managementService;
@Override
public ProcessInstance executeRevoke(String processInstanceId) {
//实例化删除流程实例Command类
DeleteProcessInstanceCmd deleteProcessInstanceCmd = new DeleteProcessInstanceCmd(processInstanceId);
//通过ManagementService管理服务执行删除流程实例的Command类
Map<String, Object> procInstMap = managementService.executeCommand(deleteProcessInstanceCmd);
//实例化重建流程实例Command类
ReCreateProcessInstanceCmd reCreateProcessInstanceCmd = new ReCreateProcessInstanceCmd(procInstMap);
//通过ManagementService管理服务执行重建流程实例Command类
ProcessInstance procInst = managementService.executeCommand(reCreateProcessInstanceCmd);
return procInst;
}
}
3.2、案例演示
单元测试用例详见:
cn.blnp.net.flowable.boot.localize.flow.recall.FlowRecallTest#test
历史任务数据表:
4、动态创建流程模型
Flowable流程图的核心对象是BpmnModel
对象,它是BPMN 2.0 XML流程定义的Java表现形式,所有流程定义的信息都可以通过BpmnModel
获取。BpmnModel
由流程文档的BPMN 2.0 XML文件转换得到,其中定义的元素含义在BpmnModel
中都有对应的元素属性承载类,如用户任务节点的元素属性承载类为UserTask
、开始事件的元素承载类为StartEvent
等。在进行流程部署时,工作流引擎会对流程文档的BPMN 2.0 XML文件进行解析,将其中的所有元素都解析为对应的承载类,从而组装成一个BmpnModel
对象。BmpnModel
与示例流程元素如图所示:
4.1、工具类实现
具体实现详见类:
cn.blnp.net.flowable.boot.utils.DynamicProcessCreateUtil
4.2、案例演示
单元测试用例详见:
cn.blnp.net.flowable.boot.localize.dynamic.flow.DynamicFlowCreateTest#test
4.3、历史任务数据库表
4.4、动态生成流程设计文件导入官方设计器
5、动态增加临时节点
5.1、基础
为运行时流程实例动态增加临时节点的需求在本土化业务流程场景中是比较常见的。以借款申请流程为例,正常的执行过程是“借款申请”用户任务完成后,依次执行“财务经理审批”用户任务和“总经理审批”用户任务。但是因为某种原因,需要在“借款申请”用户任务和“财务经理审批”用户任务之间临时动态增加一个“部门经理审批”用户任务节点,同时要求该审批节点只对当前流程实例生效,不能影响到同一流程定义下的其他流程实例。
新增临时节点因为涉及BpmnModel
的调整,Flowable的DynamicBpmnService
中提供了如下动态调整API:
void injectUserTaskInProcessInstance(String processInstanceId, DynamicUserTaskBuilder dynamicUserTaskBuilder);
void injectParallelUserTask(String taskId, DynamicUserTaskBuilder dynamicUserTaskBuilder);
void injectEmbeddedSubProcessInProcessInstance(String processInstanceId, DynamicEmbeddedSubProcessBuilder dynamicEmbeddedSubProcessBuilder);
void injectParallelEmbeddedSubProcess(String taskId, DynamicEmbeddedSubProcessBuilder dynamicEmbeddedSubProcessBuilder)
1、injectUserTaskInProcessInstance():
以上4个API中,第一个API的作用是在流程的开始节点和连接到的第一个节点间添加一个并行网关,并行网关的另一条分支连接到一个增加的用户任务上,该用户任务直接连接到一个结束节点。调用该API执行前后的效果如图所示:
2、injectParallelUserTask():
第二个API的作用是将原用户任务的位置换为嵌入式子流程,该子流程内部由开始节点连接到一个并行网关,网关分发至原用户任务和一个新增的用户任务,再汇聚到一个并行网关,然后连接到结束节点。调用该API执行前后的效果如图所示:
3、injectEmbeddedSubProcessInProcessInstance():
第三个API的作用是在流程的开始节点和连接到的第一个节点间加一个并行网关,并行网关的另一条分支连接到一个增加的子流程上,子流程内部嵌入一个外部的流程定义,该子流程直接连接到一个结束节点。调用该API执行前后的效果如图所示:
4、injectParallelEmbeddedSubProcess():
第四个API的作用是将原用户任务的位置换为嵌入式子流程,该子流程的名称默认与原节点相同,子流程内部由开始节点连接到一个并行网关,网关分发至原用户任务和一个新增的子流程,该新增子流程内部嵌入一个外部的流程定义,再汇聚到一个并行网关,然后连接到结束节点。调用该API执行前后的效果如图所示:
5.2、扩展实现
5.2.1、动态增加临时节点Command类
Flowable提供了一个用于在运行时动态修改流程模型的抽象类org.flowable.engine.impl.cmd. AbstractDynamicInjectionCmd
,提供了一些通用的方法,方便开发人员实现自定义的动态修改逻辑。这里选择继承该抽象类开发动态增加临时节点的Command类。
完整代码详见:
cn.blnp.net.flowable.boot.ext.cmd.InjectUserTaskInProcessInstanceCmd
1.通过流程的实例编号获取流程定义包括BpmnModel在内的各种信息
2.根据需求修改BpmnModel,通过Flowable接口增加新节点,移除旧顺序流并创建新顺序流,得到新的BpmnModel;
3.重新生成流程布局
4.部署该BpmnModel得到新的流程定义
5.更新该流程实例各种运行时与历史数据,将旧流程定义编号换为新流程定义编号
5.2.2、动态增加临时节点服务类
/** * <h3>动态增加临时节点接口实现</h3> * * @author <a href="mailto:blnp.yibin@qq.com">lyb</a> * @version 1.0 * @since 2025/6/19 15:58 */ @Slf4j @Service @AllArgsConstructor public class InjectUserTaskInProcessInstanceServiceImpl implements InjectUserTaskInProcessInstanceService { protected ManagementService managementService; @Override public void executeInjectUserTaskInProcessInstance(String currentTaskId, String newTaskKey, String newTaskName, String newTaskAssignee) { //初始化DynamicUserTaskBuilder DynamicUserTaskBuilder dynamicBuilder = new DynamicUserTaskBuilder(); dynamicBuilder.setId(newTaskKey); dynamicBuilder.setName(newTaskName); dynamicBuilder.setAssignee(newTaskAssignee); //初始化Command类 InjectUserTaskInProcessInstanceCmd injectUserTaskInProcessInstanceCmd = new InjectUserTaskInProcessInstanceCmd(currentTaskId, dynamicBuilder); //通过ManagementService管理服务执行动态增加临时节点Command类 managementService.executeCommand(injectUserTaskInProcessInstanceCmd); } }
5.3、案例演示
单元测试用例详见:
cn.blnp.net.flowable.boot.localize.dynamic.temp.DynamicAddTempNodeTest#test
动态新增临时节点流程定义:
6、会签加签、减签
6.1、基础知识
加签、减签的场景一般用在需要多人同时处理一个任务的场景(会签),在Flowable中的RuntimeService默认提供了以下两个操作多实例任务的API接口:
Execution addMultiInstanceExecution(String activityId, String parentExecutionId, Map<String, Object> executionVariables);
void deleteMultiInstanceExecution(String executionId, boolean executionIsCompleted)
其中,addMultiInstanceExecution()
方法用来添加一个多实例执行的子执行实例,借助它可以实现加签。它有3个参数,activityId
是待加签用户任务的key;parentExecutionId
可以是流程实例编号,也可以是当前多实例任务的父执行实例;executionVariables
是在新创建的多实例执行上设置为局部变量的变量。
deleteMultiInstanceExecution()
方法用于删除多实例执行的一个子执行实例,借助它可以实现减签。它有两个参数,executionId
是待减签的子执行实例编号,executionIsCompleted
表示是否将删除的这个实例标记为完成。
完整的助手类,详见:
cn.blnp.net.flowable.boot.utils.MultiInstanceExecutionUtil
这里对工具类的代码做加以说明。addSign()
方法用于实现加签功能,其传入参数中collectionName
是多实例UserTask
的multiInstanceLoopCharacteristics
子元素中flowable:collection
属性配置的集合变量名,elementVariable
是flowable:elementVariable
属性配置的变量名。参数procInstId
是流程实例编号,currentTask
是当前进行加签操作的运行时任务实例,userIds
是需要加签的用户列表。该方法的逻辑是先进行各种必要的参数校验,然后查询并重设多实例办理人集合的变量。接着在查询多实例的根执行实例后,根据根执行实例查询并重设多实例的总实例数。接下来查询多实例的类型,如果是并行多实例,遍历要加签的用户列表,为每个用户查询并重设多实例的激活实例数,最后调用addMultiInstanceExecution()
方法完成创建多实例的执行实例和相关的对象。
deleteSign()
方法用于实现减签功能,其传入参数中collectionName
是多实例UserTask
的multiInstanceLoopCharacteristics
子元素中flowable:collection
属性配置的集合变量名,参数procInstId
是流程实例编号,currentTask
是当前进行减签操作的运行时任务实例,userId
是需要减签的用户。该方法的逻辑是先进行各种必要的参数校验,然后在查询多实例的根执行实例后,根据根执行实例查询并重设多实例的总实例数。接着,查询多实例的类型,如果是串行多实例,则校验被减签人是否在会签后续办理人中;如果是并行多实例,则先校验被减签人是否是会签的运行时任务的办理人,再查询并重设多实例的激活实例数,接着调用deleteMultiInstanceExecution()
方法删除多实例的执行实例和相关的对象。最后,重设多实例办理人集合的变量。
**特别提示:**由于串行多实例在执行过程中,多实例的激活实例数始终为1,多实例任务的执行实例始终不变,在
addSign()
方法进行加签操作时,既没有更新多实例的激活实例数,也没有调用addMultiInstanceExecution()
方法创建新的执行实例。同样的道理,在deleteSign()
方法中,既没有更新多实例的激活实例数,也没有调用deleteMultiInstanceExecution()
方法删除执行实例。
6.2、案例演示
单元测试用例详见:
cn.blnp.net.flowable.boot.localize.dynamic.sign.DynamicAddSignTest#test
历史任务数据表:
流程关联用户表:
7、流程复活
7.1、前言
流程复活是一种比较常见的本土化业务流程场景。在业务流程中,有时候会遇到一些特殊情况或者错误操作导致流程提前结束,这时候需要将已经结束的流程复活到流程中的指定环节,重新激活流程进行处理。在实际应用中,流程复活常常出现在需要重新审批、修改或补充信息的情况下。
例如,当某个流程在某个环节结束后,发现需要重新审批或修改某些关键信息时,可以选择将该流程复活,使其回到之前的环节,以便重新进行处理。流程复活提高了业务流程的灵活性和效率,可以避免因为一些细微的变动而重新创建新的流程实例,从而节省了时间和资源。同时,流程复活也提供了更好的错误纠正和修改机制,使得在流程执行过程中出现的问题能够及时得到解决。
7.2、扩展实现
7.2.1、自定义流程复活Command类
完整实现类详见:
cn.blnp.net.flowable.boot.ext.cmd.RestartProcessInstanceCmd
核心逻辑是先对传入的参数processInstanceId
、activityIds
进行一系列的校验,包括校验是否为空、流程实例是否存在、流程实例是否结束、复活节点是否存在,以及流程定义是否存在等。如果所有的校验均通过,则继续后面的操作。一切就绪后,获取流程启动节点,然后基于它重建运行时流程实例的主执行实例、创建子执行实例,接下来收集运行时流程变量,设置历史流程实例结束节点和结束时间为空,最后执行流程跳转操作将重建的流程跳转到待复活的节点。
7.2.2、流程复活Service类
/**
* <h3>流程复活接口实现</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/20 8:48
*/
@Slf4j
@AllArgsConstructor
public class RestartProcessInstanceServiceImpl implements RestartProcessInstanceService {
protected ManagementService managementService;
@Override
public void executeRestart(String processInstanceId, List<String> activityIds) {
//实例化流程复活Command类
RestartProcessInstanceCmd restartProcessInstance = new RestartProcessInstanceCmd(processInstanceId, activityIds);
//通过ManagementService管理服务执行流程复活Command类
managementService.executeCommand(restartProcessInstance);
}
}
7.3、案例演示
演示案例是一个员工物料领用流程,员工提交申请后首先经并行网关到达“财务经理审批”“直属上级审批”和“物料管理员审批”任务节点,“财务经理审批”和“直属上级审批”用户任务完成后经并行网关汇聚到“部门主管审批”任务节点,部门主管审批和物料管理员审批完成后经并行网关汇聚后到“物料发放”任务节点,办理完成后流程结束。
单元测试用例详见:
cn.blnp.net.flowable.boot.localize.flow.resurgence.ProcessRevivalTest#test
历史任务数据表:
8、任务知会(抄送)
8.1、前言
任务知会通常也称作流程抄送,指的是将流程的某个任务发送给其他人,以便他们了解流程进展或参与流程决策。与候选人和办理人不同的是,知会的任务只能查看、不能办理。通过任务知会功能可以实现信息的共享和协作,提高工作协作效率,减少信息传递的时间和成本。同时,抄送功能还能够增加流程的透明度和可追溯性,确保流程的合规性和质量。
被知会的人与任务之间存在一个关联关系。我们知道,在Flowable中还存在另外一种候选人与任务之间的关联关系,它们存储在ACT_RU_IDENTITYLINK和ACT_HI_IDENTITYLINK表中,分别对应IdentityLinkEntity
和HistoricIdentityLinkEntity
对象,它们的type值为candidate。因此,构建被知会的人与任务之间的关系也可以借鉴这种思路,同样使用IdentityLinkEntity
和HistoricIdentityLinkEntity
对象来表示,而type值设置为carbonCopy
。另外,新构建的知会关系不能影响Flowable的默认逻辑,比如不能干扰查询待办任务、候选任务和已办任务等接口。
8.2、扩展实现
8.2.1、任务知会Command类
完整实现详见:
cn.blnp.net.flowable.boot.ext.cmd.TaskCarbonCopyCmd
简单说下,其核心逻辑是先对传入的参数taskId
、userIds
进行一系列的校验,包括校验是否为空、任务实例是否存在等。如果所有的校验均通过,再根据当前任务是否已完成来分别做不同的操作:对于未结束的任务,通过IdentityLinkService
为每个知会人创建type值为carbonCopy
的IdentityLinkEntity
对象,同时通过HistoryManager
的recordIdentityLinkCreated(IdentityLinkEntity identityLink)
方法根据IdentityLinkEntity
对象创建HistoricIdentityLinkEntity
对象;对于已结束的任务,通过HistoricIdentityLinkService
为每个知会人创建type值为carbonCopy
的HistoricIdentityLinkEntity
对象。
8.2.2、任务知会Service类
/**
* <h3>任务知会(抄送)接口实现</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/20 10:22
*/
@Slf4j
@Service
@AllArgsConstructor
public class TaskCarbonCopyServiceImpl implements TaskCarbonCopyService {
private ManagementService managementService;
private TaskService taskService;
private HistoryService historyService;
@Override
public void executeTaskCarbonCopy(String taskId, List<String> userIds) {
//实例化任务知会Command类
TaskCarbonCopyCmd taskCarbonCopyCmd = new TaskCarbonCopyCmd(taskId, userIds);
//通过ManagementService管理服务执行任务知会Command类
managementService.executeCommand(taskCarbonCopyCmd);
}
/**
* <p><b>用途:查询运行时知会任务列表<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 10:24 2025/6/20
* @params [userId]
* @param userId
* @return java.util.List<org.flowable.task.api.Task>
**/
@Override
public List<Task> getCarbonCopyTasks(String userId) {
List<Task> tasks = taskService.createNativeTaskQuery()
.sql("select t1.* from ACT_RU_TASK t1 join ACT_RU_IDENTITYLINK t2 on "
+ "t2.TASK_ID_=t1.ID_ and t2.TYPE_='carbonCopy' and "
+ "t2.USER_ID_=#{userId}")
.parameter("userId", userId).list();
return tasks;
}
/**
* <p><b>用途:查询历史知会任务列表<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 10:25 2025/6/20
* @params [userId]
* @param userId
* @return java.util.List<org.flowable.task.api.history.HistoricTaskInstance>
**/
@Override
public List<HistoricTaskInstance> getHistoricCarbonCopyTasks(String userId) {
List<HistoricTaskInstance> tasks = historyService
.createNativeHistoricTaskInstanceQuery()
.sql("select * from ACT_HI_TASKINST t1 join ACT_HI_IDENTITYLINK t2 on "
+ "t2.TASK_ID_=t1.ID_ and t2.TYPE_='carbonCopy' and t2.USER_ID_="
+ "#{userId}")
.parameter("userId", userId).list();
return tasks;
}
}
8.3、案例演示
完整用例详见:
cn.blnp.net.flowable.boot.localize.task.notice.FlowTaskNoticeTest#test
历史任务数据表:
9、流程节点自动跳过
9.1、前言
流程节点自动跳过是指在流程运行过程中,能够根据一定的条件或规则自动跳过某些节点而不做任何操作,流程继续向下流转,从而提高流程的执行效率和减少人工操作。
例如,若某个用户任务的办理人正好也是流程发起人,则可以不用办理该任务而直接跳到下一个用户任务;又如,若流程中正好有两个用户任务的办理人是同一人,则可以不办理后一个任务而直接跳到接下来的用户任务。流程节点自动跳过它可以用于简化流程、优化流程执行路径,减少不必要的环节,加快流程执行速度。通过自动跳过,可以实现流程的自动化和智能化,提高工作效率和精确度。如下图所示,对于“部门经理审批”用户任务,如果其办理人为“直属上级审批”用户任务的办理人,它将被自动跳过。
9.2、案例演示
完整案例详见:
cn.blnp.net.flowable.boot.localize.task.skip.AutoSkipProcessTest#test
在以上流程定义中,“部门经理审批”用户任务设置了flowable:skipExpression
属性,用于配置跳过表达式。加粗部分的代码设置flowable:skipExpression
属性为表达式${manager == leader}。表示当该表达式的执行结果为true时,该用户任务就会自动跳过。
历史任务数据表:
特别提醒:
从代码运行结果可知,“直属上级审批”用户任务办理完成后,流程流转到“人力总监审批”用户任务,成功地跳过了“部门经理审批”用户任务。需要注意的是,在代码中配置流程变量时,加入了一个额外的变量_FLOWABLE_SKIP_EXPRESSION_ENABLED
并设置其值为true。必须加入该变量,否则该节点自动跳过功能将不可用。
10、流程实例跨版本迁移
10.1、前言
Flowable支持流程多版本部署,可以方便地管理和使用不同版本的流程定义。在多版本部署的情况下,Flowable会保留所有已经部署的流程定义。当启动一个流程实例时,可以指定要使用的流程定义版本。这样,就可以同时运行不同版本的流程。流程实例跨版本迁移支持将运行中的一个流程实例从所在的流程定义版本迁移到其他版本。该功能通常用于在业务流程发生变化,更新流程发布新版后,让已经运行的流程实例按照新版本的流程定义来流转。
10.2、提供的支持
Flowable中提供了ProcessMigrationService
可用于流程实例的跨版本迁移。该服务提供了一系列API,可以创建迁移计划、验证迁移逻辑、执行迁移计划、批处理迁移实例以及获取批处理执行结果等。通过使用ProcessMigrationService
,可以将一个流程实例从一个流程定义迁移到另一个流程定义。流程实例跨版本迁移可以用于多种场景,比如升级流程定义、修复错误的流程定义、重新设计流程等。
一般可以把这些方法大致分为以下5类:
1)、以createProcessInstanceMigration
开头的2个方法用于创建迁移计划ProcessInstanceMigrationBuilder
,该实例对象中可以设置一系列的迁移逻辑,比如将哪个版本下的流程实例迁移到其他版本的模板中。
2)、以validateMigration
开头的3个方法用于在进行流程迁移操作之前,对迁移操作进行验证,如果验证通过,则进行迁移操作;如果验证不通过,则不能进行迁移操作。比如两个版本间流程差异过大会被判定为无法迁移,这样就可以避免迁移之后流程无法正常运行,确保了迁移操作的有效性和安全性。
3)、以migrateProcessInstance
开头的3个方法用于同步执行迁移计划,调用该API之后,工作流引擎立即开始执行迁移工作。
4)、以batchMigrateProcessInstances
开头的2个方法用于批量迁移流程实例版本,通常在迁移数量较多的流程实例的情况下使用。需要注意的是,调用该API之前需要开启工作流引擎的异步执行器,否则批量迁移不生效
5)、getResultsOfBatchProcessInstanceMigration
用于查询批量迁移流程实例版本的执行结果。
10.3、案例演示
下图是旧版的员工物品领用流程,员工提交申请后,首先经并行网关到达“直属上级审批”和“物料管理员审批”任务节点,“直属上级审批”和“物料管理员审批”用户任务完成后经并行网关汇聚到“物料发放”任务节点,办理完成后流程结束。
现如今业务流程发生变化后的新版流程如图所示,不同之处在于,“直属上级审批”后增加了一个“部门负责人审批”任务节点。
10.3.1、旧版迁移新版
假设旧版流程下的一个流程实例已经流转到了“直属上级审批”和“物料管理员审批”任务节点,这个时候将它迁移到新版。加载两个流程模型并执行相应流程控制:
完整用例代码详见:
cn.blnp.net.flowable.boot.localize.flow.migration.FlowMigrationTest#testOldToNew
以上代码先初始化工作流引擎并部署两个流程,使用旧版流程定义发起流程后完成第一个用户任务,然后先进行迁移校验:通过createProcessInstanceMigrationBuilder()
方法创建ProcessInstanceMigrationBuilder
,它采用链式编程的方式,使用migrateToProcessDefinition(String processDefinitionId)
方法指定要迁移到的流程定义,使用validateMigration(String processInstanceId)
方法指定待迁移的流程实例编号,调用isMigrationValid()
方法返回校验结果。
如果校验不通过,则抛出异常;如果校验通过,则接下来进行迁移操作:通过createProcessInstanceMigrationBuilder()
方法创建ProcessInstanceMigrationBuilder
,它采用链式编程的方式,使用migrateToProcessDefinition(String processDefinitionId)
方法指定要迁移到的流程定义,使用migrate(String processInstanceId)
方法指定待迁移的流程实例编号并执行流程迁移操作。最后查询迁移后的流程实例。在流程迁移前后输出了流程实例所在的流程定义编号。有两个细节需要注意下:
- 进行流程校验时,如果将
validateMigration(String processInstanceId)
方法改为validateMigrationOfProcessInstances(String processDefinitionId)
方法指定一个流程定义编号,表示将校验这个流程定义下所有的流程实例 - 行流程迁移时,如果将
migrate(String processInstanceId)
方法改为migrateProcessInstances(String processDefinitionId)
方法指定一个流程定义编号,工作流引擎将迁移这个流程定义下所有的流程实例。
在跨版本迁移时,Flowable还支持指定节点间的映射关系,比如可以把A1、A2节点对应到B1、B2节点上进行迁移,示例如下:
//指定节点映射
ActivityMigrationMapping.OneToOneMapping mappingFromA1ToB1 = ActivityMigrationMapping.createMappingFor("A1", "B1");
ActivityMigrationMapping.OneToOneMapping mappingFromA1ToB2 = ActivityMigrationMapping.createMappingFor("A2", "B2");
//校验通过后执行流程迁移
processMigrationService.createProcessInstanceMigrationBuilder()
.migrateToProcessDefinition(newProcDef.getId())
.addActivityMigrationMapping(mappingFromA1ToB1)
.addActivityMigrationMapping(mappingFromA1ToB2)
.migrate(procInst.getId());
上述代码指定了A1→B1、A2→B2节点间的映射关系,通过addActivityMigrationMapping()
方法将映射关系加入迁移过程中。以上代码执行的结果是将A1节点迁移到B1节点,A2节点迁移到B2节点。
10.3.2、新版迁移旧版
完整案例详见:
cn.blnp.net.flowable.boot.localize.flow.migration.FlowMigrationTest#testNewToOld
从执行结果可以看出,流程迁移校验不通过。因为流程实例当前所在的“部门负责人审批”任务节点在旧版流程定义中不存在,如果迁移过去,流程将无法运行。
11、动态修改流程定义元素属性
在实际业务场景中,流程调整是常见的需求。随着业务的变化和发展,流程节点的属性需要动态修改,以满足不同的业务需求,比如流程节点名称、用户任务办理人和处理时限等属性。常见的做法是修改流程模型,然后部署发布新的流程定义。流程部署本身是一个比较重的操作,很多情况下,我们希望能够不通过流程部署操作,而采用更轻的操作动态地对流程定义中的元素属性进行修改。
11.1、实现思路
以用户任务为例,在其行为类UserTaskActivityBehavior的execute()方法中,有以下代码:
String activeTaskName = null;
String activeTaskDescription = null;
String activeTaskDueDate = null;
//…
if (processEngineConfiguration.isEnableProcessDefinitionInfoCache()) {
ObjectNode taskElementProperties = BpmnOverrideContext.getBpmnOverrideElementProperties
(userTask.getId(), execution.getProcessDefinitionId());
activeTaskName = DynamicPropertyUtil.getActiveValue(userTask.getName(), DynamicBpmnConstants.
USER_TASK_NAME, taskElementProperties);
activeTaskDescription = DynamicPropertyUtil.getActiveValue(userTask.getDocumentation(),
DynamicBpmnConstants.USER_TASK_DESCRIPTION, taskElementProperties);
activeTaskDueDate = DynamicPropertyUtil.getActiveValue(userTask.getDueDate(),
DynamicBpmnConstants.USER_TASK_DUEDATE, taskElementProperties);
//…
} else {
activeTaskName = userTask.getName();
activeTaskDescription = userTask.getDocumentation();
activeTaskDueDate = userTask.getDueDate();
//…
}
以上代码的逻辑是,当工作流引擎配置的isEnableProcessDefinitionInfoCache()
值为true时,给用户任务属性(名称、描述、过期时间等)赋值的时候就会先从ACT_PROCDEF_INFO表中读取是否有对应的记录(对应ProcessDefinitionInfoCacheObject
对象):如果没有,则直接用XML中配置的属性;如果有,则使用ProcessDefinitionInfoCacheObject
的infoNode
属性(jackson
的ObjectNode
对象)中配置的值。
由此可以看出,通过设置ProcessDefinitionInfoCacheObject
的值,可以实现不通过部署对流程定义中的各个元素(如任务、节点等)进行属性修改,包括名称、描述、候选人、候选组等。在Flowable中,可以借助DynamicBpmnService
提供的API来设置ProcessDefinitionInfoCacheObject
的值,如表所示:
用途 | 方法名 |
---|---|
修改服务任务的className 属性 |
void changeServiceTaskClassName(String id,String className,ObjectNode infoNode) |
修改服务任务的expression 属性 |
void changeServiceTaskExpression(String id,Stringexpression,ObjectNode infoNode) |
修改服务任务的delegateExpression 属性 |
void changeServiceTaskDelegateExpression(String id, String delegateExpression,ObjectNode infoNode) |
修改脚本任务的script属性 | void changeScriptTaskScript(String id,String script,ObjectNode infoNode) |
修改节点的跳过表达式 | void changeSkipExpression(String id,String skipExpression,ObjectNode infoNode) |
修改用户任务的名称 | void changeUserTaskName(String id,String name,ObjectNode infoNode) |
修改用户任务的描述 | void changeUserTaskDescription(String id,String description,ObjectNode infoNode) |
修改用户任务的过期时间 | void changeUserTaskDueDate(String id,String dueDate,ObjectNode infoNode) |
修改用户任务的级别 | void changeUserTaskPriority(String id,String priority,ObjectNode infoNode) |
修改用户任务的分类 | void changeUserTaskCategory(String id,String category,ObjectNode infoNode) |
修改用户任务的表单key | void changeUserTaskFormKey(String id,String formKey,ObjectNode infoNode) |
修改用户任务的办理人 | void changeUserTaskAssignee(String id,String assignee,ObjectNode infoNode) |
修改用户任务的所属人 | void changeUserTaskOwner(String id,String owner,ObjectNode infoNode) |
修改用户任务的单个候选人 | void changeUserTaskCandidateUser(String id,String candidateUser, boolean overwriteOtherChangedEntries,ObjectNode infoNode) |
修改用户任务的单个候选组 | void changeUserTaskCandidateGroup(String id,String candidateGroup,boolean overwriteOtherChangedEntries,ObjectNode infoNode) |
修改用户任务的多个候选人 | void changeUserTaskCandidateUsers(String id,List<String> candidateUsers,ObjectNode infoNode) |
修改用户任务的多个候选组 | void changeUserTaskCandidateGroups(String id,List<String> candidateGroups,ObjectNode infoNode) |
修改多实例的结束条件 | void changeMultiInstanceCompletionCondition(String id,String completionCondition,ObjectNode infoNode) |
修改DMN任务的决策表key | void changeDmnTaskDecisionTableKey(String id,String decisionTableKey,ObjectNode infoNode) |
修改条件顺序流的条件 | void changeSequenceFlowCondition(String id,String condition,ObjectNode infoNode) |
修改调用活动的calledElement 属性 |
void changeCallActivityCalledElement(String id,String calledElement,ObjectNode infoNode) |
通过DynamicBpmnService
提供的API动态修改流程定义元素属性的步骤如下:
1.使用DynamicBpmnService
的getProcessDefinitionInfo(String processDefinitionId)
根据流程定义ID查询ObjectNode
对象(ProcessDefinitionInfoCacheObject
的infoNode
属性值)
2.使用DynamicBpmnService
的“change…”系列方法修改一个或多个属性值
3.使用DynamicBpmnService
的saveProcessDefinitionInfo(String processDefinitionId, ObjectNode infoNode)
保存动态修改的设置。
注意,使用这种方式动态修改流程定义元素属性,需要设置流程定义配置的enableProcessDefinitionInfoCache
属性值为true。
11.2、案例演示
完整案例详见:
cn.blnp.net.flowable.boot.localize.dynamic.modify.DynamicModifyFlowTest#test
特别注意:
我这里使用的版本是Flowable 6.8,有几个问题需要特别注意下:
- 一是,如果执行修改后发起新流程未生效的话,需要检查下流程引擎是否启用了流程定义缓存功能
- 二是,上述的动态修改是基于流程定义缓存来操作的;因此如果缓存被清除或者重启则会失效
12、多语种支持
Flowable支持流程定义、用户任务、子流程的名称和描述的多语种,既可以在流程定义文件中进行多语种的配置,也支持通过API进行多语种的设置。
12.1、在流程定义XML中设置
可以在流程定义的元素中通过extensionElements子元素的flowable:localization子元素来定义多语种,以用户任务配置多语种为例:
<userTask id="userTask1" name="用户任务">
<documentation>用户任务描述信息</documentation>
<extensionElements>
<flowable:localization locale="语种1标识" name="语种1翻译的名称">
<flowable:documentation>语种1翻译的描述</flowable:documentation>
</flowable:localization>
<flowable:localization locale="语种2标识" name="语种2翻译的名称">
<flowable:documentation>语种2翻译的描述</flowable:documentation>
</flowable:localization>
</extensionElements>
</userTask>
在以上配置中,用户任务userTask1配置了名称和描述信息,然后在它的extensionElements
子元素中通过flowable:localization
配置了多语种的名称和描述信息,其中locale表示语种标识,name是用该语种表示的名称,子元素flowable:documentation
表示用该语种表示的描述信息。Flowable支持process
、userTask
和subProcess
通过flowable:localization
子元素来定义多语种。
12.2、通过API设置
在Flowable中,可以借助DynamicBpmnService
提供的API来设置ProcessDefinitionInfoCacheObject
的值,从而实现多语种的设置。
用途 | 方法 |
---|---|
设置多语种的名称 | void changeLocalizationName(String language, String id, String value,ObjectNode infoNode) |
设置多语种的描述 | void changeLocalizationDescription(String language, String id, String value,ObjectNode infoNode) |
通过DynamicBpmnService提供的API设置多语种的步骤如下:
1)、使用DynamicBpmnService
的getProcessDefinitionInfo(String processDefinitionId)
根据流程定义ID查询ObjectNode
对象(ProcessDefinitionInfoCacheObject
的infoNode
属性值)
2)、使用DynamicBpmnService
的changeLocalizationName()
方法设置多语种的名称,使用changeLocalizationDescription()
方法设置多语种的描述
3)、使用DynamicBpmnService
的saveProcessDefinitionInfo(String processDefinitionId, ObjectNode infoNode)
保存多语种的设置
通过以上两种方式设置的多语种信息,可以通过API进行查询。ProcessDefinitionQuery
、ProcessInstanceQuery
、ExecutionQuery
、TaskQuery
、HistoricProcessInstanceQuery
和HistoricTaskInstanceQuery
中均提供locale(String locale)方法根据语种标识查询对应对象,对象的name属性值为该语种设置的名称,description属性值为该语种设置的描述。如果没有该语种标识下设置的值,则使用默认值。
12.3、使用案例
完整案例详见:
cn.blnp.net.flowable.boot.localize.i18n.I18nFlowTest#test
八、架构优化
1、ID 生成器优化
Flowable中实体的ID(即数据库中主键字段ID_)需要由专门的生成器生成。Flowable ID生成器需要配置IdGenerator
接口,该接口只有一个方法,即获取下一个ID。Flowable自带的ID生成器有两个:数据库ID生成器DbIdGenerator
和UUID生成器StrongUuidGenerator
。
public interface IdGenerator {
String getNextId();
}
1.1、数据库ID生成器 DbIdGenerator
通过数据库生成ID的实现方式是在数据库中创建一个表保存一个数字作为当前ID,并在获取当前ID后,在原来的基础上递增生成下一个ID。但是如果这样做,就要在每次生成一个ID时都查询和更新数据库,因此数据库的压力会比较大。此外,因为操作的都是同一张表的同一行数据,所以并发量比较大的情况下,容易产生数据库行锁冲突,效率会比较低。因此,为了提高生成ID的效率,降低数据库压力,通常采用分段生成ID的方式,即每次从数据库中获取的ID不是一个,而是一批,当一批ID用完后再从数据库中获取下一批ID。DbIdGenerator
的源码如下所示:
public class DbIdGenerator implements IdGenerator {
//分段大小
protected int idBlockSize;
//下一个ID的值
protected long nextId;
//分段的最后一个值
protected long lastId = -1;
protected CommandExecutor commandExecutor;
protected CommandConfig commandConfig;
//获取ID
@Override
public synchronized String getNextId() {
//判断当前分段是否还有值
if (lastId < nextId) {
getNewBlock();
}
long _nextId = nextId++;
return Long.toString(_nextId);
}
//获取下一个分段
protected synchronized void getNewBlock() {
IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));
this.nextId = idBlock.getNextId();
this.lastId = idBlock.getLastId();
}
}
在获取ID时,首先判断当前分段是否有剩余,如果有则直接返回ID,否则直接从数据库获取下一个分段。这里用nextId存储该分段下一个ID(每次递增1),用lastId
存储该分段的最后一个ID。Flowable通过GetNextIdBlockCmd
获取下一个分段的ID,其实现源码如下:
详见:
org.flowable.engine.impl.cmd.GetNextIdBlockCmd
public class GetNextIdBlockCmd implements Command<IdBlock> {
private static final long serialVersionUID = 1L;
protected int idBlockSize;
public GetNextIdBlockCmd(int idBlockSize) {
this.idBlockSize = idBlockSize;
}
@Override
public IdBlock execute(CommandContext commandContext) {
PropertyEntity property = (PropertyEntity) CommandContextUtil.getPropertyEntityManager(commandContext).findById("next.dbid");
long oldValue = Long.parseLong(property.getValue());
long newValue = oldValue + idBlockSize;
property.setValue(Long.toString(newValue));
return new IdBlock(oldValue, newValue - 1);
}
}
从execute()方法可知,这里先从ACT_GE_PROPERTY
表中查询name为next.dbid
的记录,并将其value作为下一个ID,然后将value加上分段大小idBlockSize
的值更新到数据库。由于数据库中存储的值为下一个分段的开始值,当前分段值的最后一个值需要用数据库中的值减1。需要注意的是,虽然这里没有主动调用更新数据库操作,但是因为这里是在Command中修改PropertyEntity
实体类的值,所以最后DbSqlSession
会判断缓存中的对象是否有变化。如果有变化,则自动同步到数据库中。这里修改了PropertyEntity
实体对象的value属性,所以对象最终会自动同步到数据库中。
基于数据库的ID生成器的优点是有序且长度较短,对于使用InnoDB
作为存储引擎的数据库来说,采用这种方式插入效率会比较高。其缺点是依赖数据库,并发压力比较大。此外,在分布式环境下,如果要生成全局唯一的ID,需要将ID生成器作为独立服务进行部署和维护,复杂度和运维成本会大幅度增加。
1.2、UUID生成器StrongUuidGenerator
详见:
org.flowable.common.engine.impl.persistence.StrongUuidGenerator
public class StrongUuidGenerator implements IdGenerator {
protected static volatile TimeBasedGenerator timeBasedGenerator;
public StrongUuidGenerator() {
this.ensureGeneratorInitialized();
}
protected void ensureGeneratorInitialized() {
if (timeBasedGenerator == null) {
Class var1 = StrongUuidGenerator.class;
synchronized(StrongUuidGenerator.class) {
if (timeBasedGenerator == null) {
timeBasedGenerator = Generators.timeBasedGenerator(EthernetAddress.fromInterface());
}
}
}
}
public String getNextId() {
return timeBasedGenerator.generate().toString();
}
}
从上述代码可知,Flowable的UUID是通过com.fasterxml.uuid.impl.TimeBasedGenerator
生成的。UUID生成器的优点是不依赖其他服务,并发效率高,分布式情况下也能生成全局唯一ID。其缺点一是无序,对于与InnoDB类似的存储引擎,插入时容易导致索引页分裂,影响插入的性能;二是UUID的长度比较长,占用的空间相对较大。
上述两种ID生成器都有各自的优缺点,但在数据量大、并发和性能要求高的场景下都难以满足业务要求,需要采用更优的ID生成器来实现。更优的生成器需要既满足性能上的要求,又保证全局有序和唯一。这里采用流行ID生成算法——“雪花算法”来实现。
1.3、自定义ID生成器
雪花算法是Twitter推出的开源分布式ID生成算法,将一个64位长整型数字作为全局ID,共分为以下4个部分。
- 第1位:固定值0。
- 第2位~第42位:41位时间戳。
- 第43位~第52位:10位机器ID。
- 第53位~第64位:12位序列号,从0开始递增,最大为4095。
雪花算法每毫秒最多生成4096个ID,也就是每秒能生成多达400万个ID,能够满足绝大部分应用场景的业务需求。雪花算法由Scala实现,需要使用Java重写算法。这里直接使用Hutool工具包即可。
public class SnowFlakeIdGenerator implements IdGenerator {
@Override
public String getNextId() {
Snowflake snowflake = IdUtil.getSnowflake(1, 1);
return snowflake.nextIdStr();
}
}
2、定时器优化
Flowable支持定时器功能,但是Flowable内置的定时器在数据量较大、并发度较高时,存在一定的性能问题。
2.1、执行过程
1、创建TimerJobEntity
。当流程执行到定时器中间事件或者定时器边界事件时,会创建TimerJobEntity
,在数据库表ACT_RU_TIMER_JOB
中生成一条记录。
2、获取到期的TimerJobEntity
。工作流引擎启动时会启动一个线程循环扫描到期的定时器任务,该线程调用AcquireTimerJobsRunnable
的run()方法,查询ACT_RU_TIMER_JOB表中到期的TimerJobEntity
。需要注意的是,默认情况下每次会从数据库查询512条数据,如果要改变批次的大小,可以通过AsyncJobExecutorConfiguration
配置类的setMaxTimerJobsPerAcquisition()
方法来设置。
3、将TimerJobEntity
转换为JobEntity
。这一步包含两个操作,一是根据TimerJobEntity
生成一个新的JobEntity
,二是删除TimerJobEntity
。从数据库的角来看,即在ACT_RU_JOB表中生成一条新记录,然后删除ACT_RU_TIMER_JOB表中的记录。这一步是通过线程池来完成的,线程池的设置同样通过异步任务配置类AsyncJobExecutorConfiguration
来实现,比如可以通过setMoveTimerExecutorPoolSize()
方法设置线程池的核心线程数和最大线程数。
4、获取JobEntity
并执行。工作流引擎启动时会启动一个线程扫描需要执行的JobEntity
。该线程执行AcquireAsyncJobsDueRunnable
类的run()方法,查询ACT_RU_JOB表中的数据。获取JobEntity
后,通过异步执行器AsyncExecutor
的executeAsyncJob()
方法异步执行任务。如果任务执行失败,就激活重试逻辑。默认重试3次仍然失败,则将数据转移到ACT_RU_DEADLETTER_JOB表中。需要注意的是,查询JobEntity
是单线程的,但是执行JobEntity
则是由线程池异步完成的。任务执行效率与两个参数有关。一个是每次从数据库获取JobEntity
的数量,默认值为512,可以通过异步任务配置类AsyncJobExecutorConfiguration
的setMaxAsyncJobsDuePerAcquisition()
方法进行设置。另一个参数是任务的执行线程数,由线程池控制,包括核心线程数和最大线程数两个参数,默认值都为8,可以通过工作流引擎配置类ProcessEngineConfigurationImpl
的setAsyncExecutorCorePoolSize()
方法和setAsyncExecutorMaxPoolSize()
方法分别设置核心线程数和最大线程数。
2.2、优化
2.2.1、TimerJobEntity
转 JobEntity
Flowable首先会查询到期的TimerJobEntity
,并锁定对应的记录,即设置ACT_RU_TIMER_JOB表中的LOCK_EXP_TIME_和LOCK_OWNER_字段。这一步的逻辑比较简单,通过单线程完成,性能通常足以满足需求,可以通过调节每批次查询的数量来进一步优化性能。这里的主要问题是,生产环境下,通常采用多服务器部署模式,可能导致同一TimerJobEntity
会被多台机器获取,从而引起并发冲突。Flowable提供全局锁的模式来获取到期的定时器任务,其配置如下:
@Bean
public ProcessEngine createProcessEngine() {
SpringProcessEngineConfiguration engineConf = new SpringProcessEngineConfiguration();
engineConf.setDataSource(dataSource);
engineConf.setTransactionManager(transactionManager);
//开启异步任务全局锁模式
AsyncJobExecutorConfiguration jobConf = new AsyncJobExecutorConfiguration();
jobConf.setGlobalAcquireLockEnabled(true);
engineConf.setAsyncExecutorConfiguration(jobConf);
ProcessEngine engine = engineConf.buildProcessEngine();
return engine;
}
以上代码段中,注释部分的代码表示开启异步任务全局锁模式,Flowable的全局锁本质上是通过数据库实现的分布式锁。Flowable会在数据库表ACT_GE_PROPERTY中插入指定名称的记录,其值是当前时间以及所在服务器的机器名和IP地址。下列3种情况表示获取锁成功:
- 数据库中没有指定名称的记录;
- 数据库中有指定名称的记录,但是其值为空
- 数据库中有指定名称的记录,其值也不为空,但是锁已经超时
锁超时的计算是用锁设置的时间加上指定的时长与当前时间做比较,如果早于当前时间,则表示锁已超时。Flowable的全局锁由LockCmd实现,其核心逻辑如下:
详见:
org.flowable.common.engine.impl.cmd.LockCmd#execute
public Boolean execute(CommandContext commandContext) {
AbstractEngineConfiguration engineConfiguration = (AbstractEngineConfiguration)commandContext.getEngineConfigurations().get(this.engineType);
PropertyEntityManager propertyEntityManager = engineConfiguration.getPropertyEntityManager();
PropertyEntity property = (PropertyEntity)propertyEntityManager.findById(this.lockName);
//情况一
if (property == null) {
property = (PropertyEntity)propertyEntityManager.create();
property.setName(this.lockName);
property.setValue(Instant.now().toString() + hostLockDescription);
propertyEntityManager.insert(property);
return true;
}
//情况二
else if (property.getValue() == null) {
property.setValue(Instant.now().toString() + hostLockDescription);
return true;
}
//情况三
else if (this.forceAcquireAfter != null) {
String value = property.getValue();
Instant lockAcquireTime = Instant.parse(value.substring(0, value.indexOf(90) + 1));
if (lockAcquireTime.plus(this.forceAcquireAfter).isBefore(Instant.now())) {
property.setValue(Instant.now().toString() + hostLockDescription);
return true;
} else {
return false;
}
} else {
return false;
}
}
锁的释放通过ReleaseLockCmd
来实现,释放锁有两种方式:一种是直接删除ACT_GE_PROPERTY中的对应记录;另一种是将对应记录的值设置为空。其实现如下:
详见:
org.flowable.common.engine.impl.cmd.ReleaseLockCmd#execute
public Void execute(CommandContext commandContext) {
PropertyEntityManager propertyEntityManager = ((AbstractEngineConfiguration)commandContext.getEngineConfigurations().get(this.engineType)).getPropertyEntityManager();
PropertyEntity property = (PropertyEntity)propertyEntityManager.findById(this.lockName);
if (property != null) {
property.setValue((String)null);
if (this.delete) {
propertyEntityManager.delete(property);
}
return null;
} else {
throw new FlowableObjectNotFoundException("Lock with name " + this.lockName + " does not exist");
}
}
全局锁模式下,通过AcquireTimerJobsWithGlobalAcquireLockCmd
命令来获取并锁定TimerJobEntity
,而非全局锁模式下,则通过AcquireTimerJobsCmd
命令来实现。其源码如下所示:
if (globalAcquireLockEnabled) {
try {
timerJobs = lockManager.waitForLockRunAndRelease(
configuration.getLockWaitTime(), () -> {
return commandExecutor.execute(new AcquireTimerJobsWithGlobalAcquireLockCmd(asyncExecutor));
});
} catch (Exception e) {
//记录日志
}
} else {
timerJobs = commandExecutor.execute(new AcquireTimerJobsCmd(asyncExecutor));
}
AcquireTimerJobsWithGlobalAcquireLockCmd
与AcquireTimerJobsCmd
的区别在于,前者在锁定TimerJobEntity
时是批量更新数据库的,不检查版本号。这是因为已经有全局锁的保护,不会存在乐观锁冲突的情况。而后者则是一条记录一条记录地更新,且更新时要进行版本号的检查,以防止并发更新导致数据冲突。
成功获取到TimerJobEntity
后,下一步需要将TimerJobEntity
转移到JobEntity
。同样,Flowable也分为全局锁模式和非全局锁模式,但是这个过程是通过线程池来执行的。
在全局锁模式下,会通过BulkMoveTimerJobsToExecutableJobsCmd
将TimerJobEntity
转换为JobEntity
,而非全局锁模式则通过MoveTimerJobsToExecutableJobsCmd
来实现。其区别在于,前者删除TimerJobEntity
时是批量删除的,且不做版本检测,而后者则是一条一条删除的,删除时需要检测版本号。这一过程影响性能最主要的因素是线程池的线程数,默认值为4,可以通过异步任务配置类AsyncJobExecutorConfiguration
的setMoveTimerExecutorPoolSize()
方法设置。
2.2.2、查询JobEntity并执行
将TimerJobEntity
转换为JobEntity
后,Flowable会通过另外一个线程查询所有未被锁定的JobEntity
。查询成功后,需要锁定对应的记录,即设置表ACT_RU_JOB中的LOCK_EXP_TIME_和LOCK_OWNER_字段值。与获取TimerJobEntity
类似,在多服务器部署的情况下,同一JobEntity
可能会被多台机器获取,导致并发冲突。同理,可以通过开启全局锁模式解决。
if (globalAcquireLockEnabled) {
acquiredJobs = commandExecutor.execute(new
AcquireJobsWithGlobalAcquireLockCmd(asyncExecutor, remainingCapacity,
jobEntityManager));
} else {
acquiredJobs = commandExecutor.execute(new
AcquireJobsCmd(asyncExecutor, remainingCapacity, jobEntityManager));
}
全局锁模式和非全局锁模式下,分别通过命令AcquireJobsWithGlobalAcquireLockCmd和命令AcquireJobsCmd来查询JobEntity数据,两者主要区别在于,前者在锁定JobEntity时是批量操作的,更新时不会对数据库中记录进行版本校验,而后者则是单条数据操作的,更新时需要校验版本号。
查询到JobEntity后,开始执行JobEntity,该阶段是真正执行业务逻辑的阶段,是整个定时器任务生命周期中最耗费资源与时间的阶段。Flowable采用异步多线程方式执行JobEntity。查询JobEntity时需要通过全局锁来解决多台机器获取相同数据的问题, 因此执行JobEntity前需要先获取全局锁,这容易导致在某一台机器执行的任务数量较多,压力偏大,出现负载不均衡的现象。因此,我们采用MQ来优化JobEntity的执行机制。可以把任务ID发送到MQ中,所有机器通过消费MQ中的消息来执行JobEntity。
这里使用RabbitMQ实现该功能。可以在Spring的IoC容器中获取RabbitMQTemplate实例,并通过RabbitMQTemplate实例发送定时器任务消息。为了支持MQ模式,Flowable提供了可扩展抽象类AbstractMessageBasedJobManager,可以继承该类来实现将任务消息发送到MQ的功能。代码实现如下:
/**
* <h3>任务消息发送助手</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/23 16:16
*/
@Slf4j
@Component
public class RocketMqMessageBasedHobManager extends AbstractMessageBasedJobManager {
@Resource
private RocketMQTemplate rocketMQTemplate;
public final static String JOB_TOPIC = "job_topic";
public static final String JOB_CONSUMER_GROUP = "job_consumer_group";
@Override
protected void sendMessage(JobInfo job) {
if (job instanceof JobEntity) {
JobEntity jobEntity = (JobEntity) job;
//采用同步顺序推送
rocketMQTemplate.syncSendOrderly(JOB_TOPIC,
//自定义推送消息对象
new CusJobMessage(jobEntity.getProcessInstanceId(), jobEntity.getId()),
jobEntity.getProcessInstanceId());
}
}
}
除了继承AbstractMessageBasedJobManager
类,还需在工作流引擎配置类中设置JobManager
,代码如下:
/**
* 注册自定义工作任务分发配置
**/
//设置异步任务执行模式为消息队列模式
configuration.setAsyncExecutorMessageQueueMode( true);
configuration.setJobManager(jobManager);
上述代码用于设置Flowable工作流引擎的JobManager
,并设置异步任务执行模式为消息队列模式。这样TimerJobEntity
在转换为JobEntity
后,会直接调用RocketMQMessageBasedJobManager
的sendMessage()
方法,将异步任务信息发送到RocketMQ
。此外,开启消息队列模式后,扫描ACT_RU_JOB的线程将不会被启动。
将异步任务信息发送到MQ后,接下来就是消费MQ中的消息,并通过Flowable提供的ManagementService调用executeJob()方法执行异步任务。其代码实现如下:
/**
* <h3>自定义工作流任务消费者</h3>
*
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @version 1.0
* @since 2025/6/23 17:38
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqMessageBasedHobManager.JOB_TOPIC,
consumerGroup = RocketMqMessageBasedHobManager.JOB_CONSUMER_GROUP,
consumeMode = ConsumeMode.ORDERLY
)
public class CusJobConsumer implements RocketMQListener<CusJobMessage> {
@Resource
private ManagementService managementService;
@Override
public void onMessage(CusJobMessage jobMessage) {
log.info("Consumer job message {}", jobMessage);
Job job = managementService.createJobQuery()
.jobId(jobMessage.getJobId())
.singleResult();
if (Optional.ofNullable(job).isPresent()) {
managementService.executeJob(jobMessage.getJobId());
}
}
}
该消费者类实现RocketMQListener
接口,并通过注解@RocketMQMessageListener
配置消费的Topic为对应任务生产者的Topic。此外,这里将消费模式配置为ConsumeMode.ORDERLY
,确保了同一流程实例的消息能够顺序消费,防止同一流程实例的定时器任务出现乱序执行的问题。获取定时器任务消息后,通过Flowable工作流引擎API执行对应ID的定时器任务。
3、历史数据异步化
为了提高流程执行的效率,Flowable将流程数据分为运行时数据和历史数据,以避免历史数据积累影响工作流引擎性能。如果业务本身不需要历史数据,工作流引擎可以不保存历史数据,这样引擎性能会有进一步的提升。
同时保存运行时数据和历史数据的要求会导致流程执行效率下降,因为历史数据虽然不会影响流程的执行,但会使数据量随着时间累积。因此,对于历史数据,一方面要解决其对工作流引擎性能的影响,另一方面则要解决大数据量的存储问题。基于这两个目标,对于历史数据,可采用异步分布式存储方式存储。
首先通过异步方式存储历史数据,然后结合RocketMQ
和分布式数据库MongoDB
解决海量数据的存储问题。
3.1、Flowable 异步历史机制
Flowable自身已经支持异步历史。开启Flowable的异步历史机制,需要在工作流引擎配置类中指定asyncHistoryExecutorActivate
和isAsyncHistoryEnabled
两个配置项的值为true。其配置类代码如下:
@Configuration
public class FlowableConfig {
@Autowired
private DataSource dataSource;
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public ProcessEngine createProcessEngine() {
SpringProcessEngineConfiguration engineConf = new
SpringProcessEngineConfiguration();
engineConf.setDataSource(dataSource);
engineConf.setTransactionManager(transactionManager);
engineConf.setAsyncHistoryExecutorActivate(true);
engineConf.setAsyncHistoryEnabled(true);
ProcessEngine engine = engineConf.buildProcessEngine();
return engine;
}
}
Flowable工作流引擎默认使用DefaultHistoryManager来处理历史数据,如果启用异步历史,则使用AsyncHistoryManager来处理历史数据,其源码实现如下:
详见:
org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl#initHistoryManager
public void initHistoryManager() {
if (historyManager == null) {
if (isAsyncHistoryEnabled) {
historyManager = new AsyncHistoryManager(this);
} else {
historyManager = new DefaultHistoryManager(this);
}
}
}
异步历史管理器AsyncHistoryManager
对历史数据的处理逻辑是将数据转换为JSON
格式后存入数据库表ACT_GE_BYTEARRAY
,并在ACT_RU_HISTORY_JOB
表中创建一个历史任务。以流程实例历史记录为例,源码处理逻辑如下:
详见:
org.flowable.engine.impl.history.DefaultHistoryManager#recordProcessInstanceEnd
public void recordProcessInstanceStart(ExecutionEntity processInstance) {
if (getHistoryConfigurationSettings()
.isHistoryEnabledForProcessInstance(processInstance)) {
ObjectNode data = processEngineConfiguration.getObjectMapper()
.createObjectNode();
addCommonProcessInstanceFields(processInstance, data);
getAsyncHistorySession().addHistoricData(getJobServiceConfiguration(),
HistoryJsonConstants.TYPE_PROCESS_INSTANCE_START, data,
processInstance.getTenantId());
}
}
上述代码段中,先通过jackson
的ObjectMapper
创建ObjectNode
对象,再依次将流程实例的属性添加到ObjectNode
对象中,最后调用addHistoricData()
方法将JSON数据写入AsyncHistorySession
中。注意,这时候数据并没有写入数据库,最终数据的写入是在命令执行完的时候,CommandContext
的close()方法会调用AsyncHistorySessionCommandContextCloseListener
的closing()
方法。
在该方法中,调用DefaultAsyncHistoryJobProducer
的historyDataGenerated()
方法实现数据库写入和更新。这里生成的数据包括两部分:一部分是异步历史任务数据,即表ACT_RU_HISTORY_JOB
中的数据;另一部分是历史流程实例的JSON格式数据,这部分数据会写入ACT_GE_BYTEARRAY
表中。ACT_RU_HISTORY_JOB
表通过ADV_ HANDLER_CFG_ID
_字段与ACT_GE_BYTEARRAY
表的ID_字段关联**。**
这里数据的写入可以是单条写入,也可以是批量写入,即将多条历史数据信息放在一个大的JSON中一起存储。其源码实现如下:
详见:
org.flowable.job.service.impl.history.async.DefaultAsyncHistoryJobProducer#createJobsWithHistoricalData
从上述代码可以看到,在批量模式下,多条历史数据写入同一个ArrayNode中,且只会生成一个异步历史任务。而在非批量模式下,每一条历史数据都会生成一条记录以及一个异步历史任务。此外,从上述源码还能看到,批量模式需要满足两个条件:一是工作流引擎配置类中启用批量模式,二是历史据的数量大于指定的阈值,该值默认是10条,可以通过如下方式修改该值:
//初始化工作流引擎配置
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
/**
* 异步历史任务配置
**/
//启用异步历史任务模式
configuration.setAsyncHistoryExecutorActivate( true);
configuration.setAsyncHistoryEnabled( true);
configuration.setAsyncHistoryJsonGroupingEnabled( true);
configuration.setAsyncHistoryJsonGroupingThreshold(3);
上述代码片段,表示启用Flowable的异步历史任务模式,并在异步历史数据量大于或等于3条时,将其组合在一起插入数据库。需要注意的是,这时数据仅仅存在于ACT_GE_BYTEARRAY表中,并没有插入相应的历史数据表中。最终还需要通过线程读取异步历史任务数据,分别插入不同的历史数据表中,其过程如下:
1、首先将工作流引擎配置项asyncHistoryExecutorActivate
设置为true,此时该工作流引擎会启动异步历史任务扫描线程,读取ACT_RU_HISTORY_JOB表中的数据;
2、通过线程池执行异步历史任务;
3、调用各个历史数据对应的Transformer将JSON格式数据转换为对应的实体对象,如活动节点开始时产生的历史数据,会通过ActivityStartHistoryJsonTransformer
类的transformJson()
方法,将JSON格式数据转换为HistoricActivityInstanceEntity
实体对象,并通过historicActivityInstanceEntityManager
的insert()方法将数据写入数据库的ACT_HI_ACTINST表中。其核心实现源码(这里省略了部分代码)如下:
详见源代码:
org.flowable.engine.impl.history.async.json.transformer.ActivityStartHistoryJsonTransformer#transformJson
public class ActivityStartHistoryJsonTransformer extends AbstractHistoryJsonTransformer {
public ActivityStartHistoryJsonTransformer(ProcessEngineConfigurationImpl processEngineConfiguration) {
super(processEngineConfiguration);
}
@Override
public List<String> getTypes() {
return Collections.singletonList(HistoryJsonConstants.TYPE_ACTIVITY_START);
}
@Override
public boolean isApplicable(ObjectNode historicalData, CommandContext commandContext) {
return true;
}
@Override
public void transformJson(HistoryJobEntity job, ObjectNode historicalData, CommandContext commandContext) {
HistoricActivityInstanceEntityManager historicActivityInstanceEntityManager = processEngineConfiguration.getHistoricActivityInstanceEntityManager();
HistoricActivityInstanceEntity historicActivityInstanceEntity = createHistoricActivityInstanceEntity(historicalData, commandContext,
historicActivityInstanceEntityManager);
historicActivityInstanceEntity.setProcessDefinitionId(getStringFromJson(historicalData, HistoryJsonConstants.PROCESS_DEFINITION_ID));
historicActivityInstanceEntity.setProcessInstanceId(getStringFromJson(historicalData, HistoryJsonConstants.PROCESS_INSTANCE_ID));
historicActivityInstanceEntity.setExecutionId(getStringFromJson(historicalData, HistoryJsonConstants.EXECUTION_ID));
historicActivityInstanceEntity.setActivityId(getStringFromJson(historicalData, HistoryJsonConstants.ACTIVITY_ID));
historicActivityInstanceEntity.setActivityName(getStringFromJson(historicalData, HistoryJsonConstants.ACTIVITY_NAME));
historicActivityInstanceEntity.setActivityType(getStringFromJson(historicalData, HistoryJsonConstants.ACTIVITY_TYPE));
historicActivityInstanceEntity.setAssignee(getStringFromJson(historicalData, HistoryJsonConstants.ASSIGNEE));
historicActivityInstanceEntity.setStartTime(getDateFromJson(historicalData, HistoryJsonConstants.START_TIME));
historicActivityInstanceEntity.setTransactionOrder(getIntegerFromJson(historicalData, HistoryJsonConstants.TRANSACTION_ORDER));
historicActivityInstanceEntity.setTenantId(getStringFromJson(historicalData, HistoryJsonConstants.TENANT_ID));
historicActivityInstanceEntityManager.insert(historicActivityInstanceEntity);
dispatchEvent(commandContext, FlowableEventBuilder.createEntityEvent(
FlowableEngineEventType.HISTORIC_ACTIVITY_INSTANCE_CREATED, historicActivityInstanceEntity));
}
}
上述代码先创建了HistoricActivityInstanceEntity
实体对象,然后依次将JSON中的数据赋值给该对象,最终通过HistoricActivityInstanceEntityManager
将数据插入数据库中。影响异步历史存储性能的主要因素包括如下4个:
1、每次读取异步历史任务的数量,默认值为512,可以通过异步任务配置类AsyncJobExecutorConfiguration
的setMaxAsyncJobsDuePerAcquisition()
方法进行修改;
2、异步历史任务全局锁是否开启,通过AsyncJobExecutorConfiguration
的setGlobalAcquireLockEnabled()
方法设置;
3、执行异步任务的线程数量,即异步线程池的核心线程数和最大线程数,默认值都为8,可以通过工作流引擎配置类ProcessEngineConfigurationImpl
的setAsyncHistoryExecutorCorePoolSize()
方法和setAsyncHistoryExecutorMaxPoolSize()
方法分别设置核心线程数和最大线程数;
4、批量异步历史的设置以及批次的大小。
综上所述,完整的异步历史配置大致如下所示:
@Configuration
public class FlowableConfig {
@Autowired
private DataSource dataSource;
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public ProcessEngine createProcessEngine() {
SpringProcessEngineConfiguration engineConf = new SpringProcessEngineConfiguration();
engineConf.setDataSource(dataSource);
engineConf.setTransactionManager(transactionManager);
//启动异步历史
engineConf.setAsyncHistoryExecutorActivate(true);
engineConf.setAsyncHistoryEnabled(true);
AsyncJobExecutorConfiguration jobConf = new AsyncJobExecutorConfiguration();
//设置每批次获取的异步历史任务数
jobConf.setMaxAsyncJobsDuePerAcquisition(100);
jobConf.setGlobalAcquireLockEnabled(true);
engineConf.setAsyncHistoryExecutorConfiguration(jobConf);
//启用批量异步历史存储,阈值为3
engineConf.setAsyncHistoryJsonGroupingEnabled(true);
engineConf.setAsyncHistoryJsonGroupingThreshold(3);
//设置异步历史执行线程池的核心线程数和最大线程数
engineConf.setAsyncHistoryExecutorCorePoolSize(20);
engineConf.setAsyncHistoryExecutorMaxPoolSize(20);
ProcessEngine engine = engineConf.buildProcessEngine();
return engine;
}
}
3.2、基于RocketMQ
的历史数据异步化
与异步任务类似,基于RocketMQ
的异步历史化,也需要将历史异步任务的ID发送到消息队列中,然后通过RocketMQ
的消费端来消费对应消息,获取消息的异步历史任务ID,再由Flowable来执行对应的异步任务。因此,这里也需要继承AbstractMessageBasedJobManager
,并实现sendMessage()
方法。基于RocketMQ
对异步历史的处理如下:
@Slf4j
@Component
public class RocketMqHisMsgJobManager extends AbstractMessageBasedJobManager {
public final static String JOB_HISTORY_TOPIC = "job_history_topic";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private ProcessEngineConfiguration configuration;
@Override
protected void sendMessage(JobInfo job) {
if (job instanceof HistoryJobEntityImpl) {
HistoryJobEntityImpl historyJob = (HistoryJobEntityImpl) job;
String historicalData = historyJob.getAdvancedJobHandlerConfiguration();
ObjectMapper objectMapper = configuration.getObjectMapper();
JsonNode historyNode;
String processInstanceId = "";
try {
historyNode = objectMapper.readTree(historicalData);
if (historyNode instanceof ArrayNode) {
ArrayNode arrayNode = (ArrayNode) historyNode;
for (JsonNode jsonNode : arrayNode) {
processInstanceId = getProcessInstanceId(jsonNode);
if (StringUtils.isNotEmpty(processInstanceId)) {
break;
}
}
} else {
processInstanceId = getProcessInstanceId(historyNode);
}
} catch (Exception e) {
throw new FlowableException("Could not deserialize async " +
"history json for job (id=" + job.getId() + ")", e);
}
log.info("Send history data to MQ: {}", historicalData);
CusJobMessage jobMessage = new CusJobMessage(processInstanceId, historyJob.getId());
rocketMQTemplate.syncSendOrderly(JOB_HISTORY_TOPIC, jobMessage, processInstanceId);
}
}
/**
* <p><b>用途:解析获取流程实例ID<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 9:01 2025/6/24
* @params [jsonNode]
* @param jsonNode
* @return java.lang.String
**/
private String getProcessInstanceId(JsonNode jsonNode) {
ObjectNode jsonData = (ObjectNode) jsonNode
.get(HistoryJsonTransformer.FIELD_NAME_DATA);
String processInstanceId = AsyncHistoryJsonUtil.getStringFromJson(jsonData,
HistoryJsonConstants.PROCESS_INSTANCE_ID);
return processInstanceId == null ? "" : processInstanceId;
}
}
对于异步历史数据发送MQ来说,同一流程实例下需要保证有序性,否则容易出现数据错乱。但是历史数据是以JSON格式字符串的形式转换为二进制数据后存储在数据库中的,因此这里需要先将二进制数据转回JSON格式数据,再从JSON数据中获取流程实例ID作为有序的依据。
还有一点需要注意,并不是所有的历史数据记录中都有流程实例ID,因此如果历史数据中不存在流程实例ID,则可以自己继承AsyncHistoryManager
类,重写相应的方法,补充流程实例ID字段。例如,更新活动实例的历史数据中缺少流程实例ID字段,可以通过如下方式处理:
@Slf4j
public class CustomAsyncHistoryManager extends AsyncHistoryManager {
public CustomAsyncHistoryManager(ProcessEngineConfigurationImpl processEngineConfiguration) {
super(processEngineConfiguration);
}
@Override
public void updateHistoricActivityInstance(ActivityInstance activityInstance) {
if (getHistoryConfigurationSettings().
isHistoryEnabledForActivity(activityInstance)) {
if (activityInstance.getExecutionId() != null) {
ObjectNode data = processEngineConfiguration.getObjectMapper()
.createObjectNode();
putIfNotNull(data, HistoryJsonConstants.PROCESS_INSTANCE_ID, activityInstance.getProcessInstanceId());
putIfNotNull(data, HistoryJsonConstants.RUNTIME_ACTIVITY_INSTANCE_ID, activityInstance.getId());
putIfNotNull(data, HistoryJsonConstants.TASK_ID, activityInstance.getTaskId());
putIfNotNull(data, HistoryJsonConstants.ASSIGNEE, activityInstance.getAssignee());
putIfNotNull(data, HistoryJsonConstants.CALLED_PROCESS_INSTANCE_ID, activityInstance.getCalledProcessInstanceId());
getAsyncHistorySession().addHistoricData(getJobServiceConfiguration(),
HistoryJsonConstants.TYPE_UPDATE_HISTORIC_ACTIVITY_INSTANCE,
data);
}
}
}
/**
* <p><b>用途:相关字段补充<b></p>
* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>
* @since 9:11 2025/6/24
* @params [node, key, value]
* @param node
* @param key
* @param value
* @return void
**/
public static void putIfNotNull(ObjectNode node, String key, String value) {
if (StringUtils.isNotBlank(value)) {
node.put(key, value);
}
}
}
@Bean
public ProcessEngine processEngine() {
SpringProcessEngineConfiguration engineConf = new SpringProcessEngineConfiguration();
engineConf.setDataSource(dataSource);
engineConf.setDatabaseSchemaUpdate(
ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
engineConf.setTransactionManager(transactionManager);
engineConf.setHistoryManager(new CustomAsyncHistoryManager(engineConf));
//省略部分代码
}
通过上述的配置,实现了将异步任务ID有序发送到RocketMQ。接下来,实现对RocketMQ相关数据的消费,并调用Flowable自身的API来执行异步历史任务。其实现如下:
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqHisMsgJobManager.JOB_HISTORY_TOPIC,
consumerGroup = RocketMqHisMsgJobManager.JOB_HISTORY_CONSUMER_GROUP,
consumeMode = ConsumeMode.ORDERLY
)
public class CusHistoricConsumer implements RocketMQListener<CusJobMessage> {
@Resource
private ManagementService managementService;
@Override
public void onMessage(CusJobMessage jobMessage) {
log.info("【History】Consumer job message {}", jobMessage);
Job job = managementService.createJobQuery()
.jobId(jobMessage.getJobId())
.singleResult();
if (Optional.ofNullable(job).isPresent()) {
managementService.executeHistoryJob(jobMessage.getJobId());
}
}
}
上述代码首先指定了消费的Topic,该值需要与异步历史数据发送的Topic保持一致,其次指定按顺序消费,最后调用ManagementService
的executeHistoryJob()
方法执行异步历史任务。此时发起一个流程实例,可以看到以下日志信息:
**特别说明:**这里如果使用单元测试案例进行检查时,要记得让程序休眠等待一段时间,否则单元测试用例结束了。异步线程处理的任务还没完成,就会导致历史数据没有生成。
3.3、基于MongoDB的历史数据异步化
对于上述优化,数据最终都还是与Flowable工作流引擎数据存放在同一个库中。在面对海量历史数据的场景下,历史数据对传统关系型数据库的压力会比较大,这时可以采用分布式数据存储机制来进行处理。这里将介绍基于MongoDB的历史数据异步存储方案。
3.3.1、SpringBoot集成
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
做相应配置:
spring:
data:
mongodb:
host: 127.0.0.1
port: 27017
username: admin
password: 123456
database: flowable_history
3.3.2、自定义DataManager
Flowable对数据的操作都是通过对应的DataManager
实现的。以历史流程实例数据为例,其数据操作默认由MybatisHistoricProcessInstanceDataManager
来完成,该类最终会通过MyBatis
来实现对历史流程实例数据的增、删、改、查。因为需要采用MongoDB来存储异步历史数据,所以这里需要自定义对应的DataManager
来实现基于MongoDB的数据操作。
为了方便操作,可以在相应的实体类中增加注解,所以我们自定义实体类HistoricProcessInstanceDoc
,该类实现HistoricProcessInstanceEntity
接口,其实现如下:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collation = "act_hi_procinst_mongo")
public class HistoricProcessInstanceDoc implements HistoricProcessInstanceEntity {
private static final long serialVersionUID = 1L;
@MongoId
private String id;
private int revision = 1;
@Transient
private boolean isInserted;
@Transient
private boolean isUpdated;
@Transient
private boolean isDeleted;
@Transient
private Object originalPersistentState;
@Indexed
private String processInstanceId;
@Indexed
private String processDefinitionId;
@Indexed
private Date startTime;
private Date endTime;
private ExecutionEntity executionEntity;
public HistoricProcessInstanceDoc(ExecutionEntity processInstanceExecutionEntity) {
this.executionEntity = processInstanceExecutionEntity;
this.processInstanceId = processInstanceExecutionEntity.getProcessInstanceId();
this.processDefinitionId = processInstanceExecutionEntity.getProcessDefinitionId();
this.startTime = processInstanceExecutionEntity.getStartTime();
}
}
在上述代码中,类名上方增加了注解@Document,并指定属性collection的值为act_hi_procinst_mongo
,表示将该实体类存入MongoDB的act_hi_procinst_mongo
表中;id属性上方增加了注解@MonogoId
,表示用id字段作为数据库中文档的ID,该字段值不能重复;属性processInstanceId
、processDefinitionId
和startTime
等字段上方增加了@Indexed注解,表示需要在表中的这些字段上添加索引;isInserted
、isUpdated
等属性上的@Transient注解表示该字段无须保存到MongoDB中。
依照该继承体系,自定义DataManager
也需要实现DataManger
接口。但抽象类AbstractDataManger
和AbstractProcessDataManager
实现了许多通用的方法,如提供了工作流引擎配置类实例及执行过程中的上下文等信息,因此自定义DataManger
可以直接继承自抽象类AbstractProcessDataManager
。具体实现如下:
@Slf4j
public class MongoHistoricPrecessInstanceDataManager extends AbstractProcessDataManager<HistoricProcessInstanceEntity> implements HistoricProcessInstanceDataManager {
private MongoTemplate mongoTemplate;
public MongoHistoricPrecessInstanceDataManager(ProcessEngineConfigurationImpl processEngineConfiguration,
MongoTemplate mongoTemplate) {
super(processEngineConfiguration);
this.mongoTemplate = mongoTemplate;
}
//根据ID查询历史流程实例数据
@Override
public HistoricProcessInstanceEntity findById(String entityId) {
HistoricProcessInstanceDoc entity = mongoTemplate
.findById(entityId, HistoricProcessInstanceDoc.class);
return entity;
}
//插入历史流程实例
public void insert(HistoricProcessInstanceEntity entity) {
mongoTemplate.insert(entity);
}
//更新历史流程实例
public HistoricProcessInstanceEntity update(HistoricProcessInstanceEntity entity) {
Query query = new Query();
query.addCriteria(Criteria.where("id").is(entity.getId()));
Document document = (Document) mongoTemplate.getConverter()
.convertToMongoType(entity);
Update update = Update.fromDocument(document);
mongoTemplate.updateFirst(query, update,
HistoricProcessInstanceDoc.class);
return entity;
}
//根据ID删除流程历史
public void delete(String id) {
Query query = new Query();
query.addCriteria(Criteria.where("id").is(id));
mongoTemplate.remove(query, HistoricProcessInstanceDoc.class);
}
//根据实体对象删除流程历史
public void delete(HistoricProcessInstanceEntity entity) {
Query query = new Query();
query.addCriteria(Criteria.where("id").is(entity.getId()));
mongoTemplate.remove(query, HistoricProcessInstanceDoc.class);
}
//根据流程定义ID查询历史流程实例
@Override
public List<String> findHistoricProcessInstanceIdsByProcessDefinitionId(String processDefinitionId) {
Query query = new Query();
query.addCriteria(Criteria.where("processDefinitionId")
.is(processDefinitionId));
query.fields().include("processInstanceId");
List<HistoricProcessInstanceDoc> list = mongoTemplate.find(
query, HistoricProcessInstanceDoc.class);
return list.stream().map(entity -> entity.getProcessInstanceId())
.collect(Collectors.toList());
}
//根据主流程实例ID查询历史流程实例
@Override
public List<HistoricProcessInstance> findHistoricProcessInstancesBySuperProcessInstanceId(String
superProcessInstanceId) {
Query query = new Query();
query.addCriteria(Criteria.where("superProcessInstanceId")
.is(superProcessInstanceId));
List<HistoricProcessInstanceDoc> list = mongoTemplate.find(
query, HistoricProcessInstanceDoc.class);
List<HistoricProcessInstance> ret = new ArrayList<>(list);
return ret;
}
//根据主流程实例ID批量查询历史流程实例
@Override
public List<String> findHistoricProcessInstanceIdsBySuperProcessInstanceIds(
Collection<String> superProcessInstanceIds) {
Query query = new Query();
query.addCriteria(Criteria.where("superProcessInstanceId")
.in(superProcessInstanceIds));
List<HistoricProcessInstanceDoc> list = mongoTemplate.find(
query, HistoricProcessInstanceDoc.class);
return list.stream().map(entity -> entity.getProcessInstanceId())
.collect(Collectors.toList());
}
//根据条件查询历史流程实例数量
@Override
public long findHistoricProcessInstanceCountByQueryCriteria(HistoricProcessInstanceQueryImpl
historicProcessInstanceQuery) {
Query query = convertQuery(historicProcessInstanceQuery);
return mongoTemplate.count(query, HistoricProcessInstanceDoc.class);
}
//根据条件查询历史流程实例
@Override
public List<HistoricProcessInstance> findHistoricProcessInstancesByQueryCriteria(HistoricProcessInstanceQueryImpl historicProcessInstanceQuery) {
Query query = convertQuery(historicProcessInstanceQuery);
return new ArrayList<>(mongoTemplate.find(query,
HistoricProcessInstanceDoc.class));
}
@Override
public List<HistoricProcessInstance> findHistoricProcessInstancesAndVariablesByQueryCriteria
(HistoricProcessInstanceQueryImpl historicProcessInstanceQuery) {
throw new RuntimeException("不支持的操作");
}
@Override
public List<HistoricProcessInstance> findHistoricProcessInstancesByNativeQuery(Map<String,
Object> parameterMap) {
throw new RuntimeException("不支持的操作");
}
@Override
public long findHistoricProcessInstanceCountByNativeQuery(Map<String, Object> parameterMap) {
throw new RuntimeException("不支持的操作");
}
//根据条件删除历史流程实例数据
@Override
public void deleteHistoricProcessInstances(HistoricProcessInstanceQueryImpl
historicProcessInstanceQuery) {
Query query = convertQuery(historicProcessInstanceQuery);
mongoTemplate.remove(query, HistoricProcessInstanceDoc.class);
}
//根据流程实例ID批量删除历史流程实例数据
@Override
public void bulkDeleteHistoricProcessInstances(Collection<String>
processInstanceIds) {
Query query = new Query();
query.addCriteria(Criteria.where("processInstanceId").in(processInstanceIds));
mongoTemplate.remove(query, HistoricProcessInstanceDoc.class);
}
//部分查询条件实现
private Query convertQuery(HistoricProcessInstanceQueryImpl instanceQuery) {
Query query = new Query();
if (StringUtils.isNotBlank(instanceQuery.getId())) {
query.addCriteria(Criteria.where("id").is(instanceQuery.getId()));
}
if (StringUtils.isNotBlank(instanceQuery.getProcessInstanceId())) {
query.addCriteria(Criteria.where("processInstanceId")
.is(instanceQuery.getProcessInstanceId()));
}
if (StringUtils.isNotBlank(instanceQuery.getProcessDefinitionId())) {
query.addCriteria(Criteria.where("processDefinitionId")
.is(instanceQuery.getProcessDefinitionId()));
}
if (StringUtils.isNotBlank(instanceQuery.getBusinessKey())) {
query.addCriteria(Criteria.where("businessKey").is(instanceQuery.getBusinessKey()));
}
return query;
}
@Override
public Class<? extends HistoricProcessInstanceEntity> getManagedEntityClass() {
return HistoricProcessInstanceDoc.class;
}
@Override
public HistoricProcessInstanceEntity create() {
return new HistoricProcessInstanceDoc();
}
@Override
public HistoricProcessInstanceEntity create(ExecutionEntity processInstanceExecutionEntity) {
return new HistoricProcessInstanceDoc(processInstanceExecutionEntity);
}
}
上面创建了自定义历史流程实例的数据管理类MongoHistoricProcessInstanceDataManager
,接下来需在工作流引擎配置类ProcessEngineConfigurationImpl
中使用该类替换Flowable默认的MybatisHistoricProcessInstanceDataManager
,其配置如下:
//指定历史流程实例数据处理类
configuration.setHistoricProcessInstanceDataManager(new MongoHistoricPrecessInstanceDataManager(configuration, mongoTemplate));
通过以上方式,我们实现了基于MongoDB的自定义历史流程实例DataManager
。最后,重新发起一个流程,可以看到如下日志:
3.4、数据一致性保证
我们可以通过MongoDB来分离运行时数据和历史数据,但同时也引入了一个新的问题,即运行时数据和历史数据一致性问题:如果在数据提交的过程中发生了异常,就可能会出现数据不一致的现象。
在单一数据源的场景中,可以通过数据库事务保证数据的一致性,但在分布式场景下,则需要更复杂的机制来保证数据的一致性。分布式场景下的数据一致性又可以分为强一制性、弱一致性和最终一致性3种情况。
- **强一致性:**最严格的一致性保障,要求所有节点在任意时刻都拥有相同的数据。分布式场景下实现强一致性的复杂度比较高,同时对性能的影响也比较大。
- **弱一致性:**弱一致性不要求所有节点在任何时候数据都相同,也不承诺多久以后能达到相同的状态,但会尽可能地在某个时间级别下达到数据一致性。
- **最终一致性:**是弱一致性的特殊情况,系统会保证在一定时间内达到数据一致的状态。
在历史数据应用场景中,往往无须保证数据的强一致性。因此,我们主要讨论历史数据的最终一致性问题,即允许短时间内数据状态不一致。而历史数据写入MongoDB又分为两种情况,即同步写入和异步写入。异步写入,也就是Flowable中开启了异步历史,在流程的执行过程中,Flowable已经将历史数据以JSON的形式写入了ACT_GE_BYTEARRAY
表,并创建了对应的异步历史任务。这时,只要能保证这些异步历史任务正常执行,就能实现数据的最终一致性。
同步历史数据写入MongoDB即在同一线程中完成运行时数据和历史数据的增、删、改操作。在Flowable中,如果通过MongoDB重写了对应DataManager,但是不开启异步历史,即这种场景。在该场景下写入数据时,如果MongoDB数据写入成功,但由于网络抖动导致返回失败或超时等异常,则会导致整个事务回滚,从而出现数据不一致的情况。
这时,可以将数据发送到消息队列,等待一段时间再进行重试。因此,这里使用RocketMQ延时消息来实现数据一致性保证。RocketMQ延时消息仅支持特定级别的延时,总共分为18个级别,分别对应1秒、5秒、10秒、30秒、1分、2分、3分、4分、5分、6分、7分、8分、9分、10分、20分、30分、1小时和2小时。这里将级别设置为4,生产者类的具体实现如下所示:
完整代码详见:
cn.blnp.net.flowable.boot.ext.producer.HistoricDataCompensationProducer
@Slf4j
@Component
public class HistoricDataCompensationProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public static final String HISTORIC_TOPIC = "historic_mq_topic";
public void sendJobMessage(OpType opType, HistoricProcessInstanceEntity entity) {
log.info("Producer historic variable message,processInstanceId={},variable={}", entity);
HistoricProcessInstanceEntityMessage entityMessage = new HistoricProcessInstanceEntityMessage(opType, entity, System.currentTimeMillis());
Message<HistoricProcessInstanceEntityMessage> message = MessageBuilder.withPayload(entityMessage).build();
rocketMQTemplate.syncSend(HISTORIC_TOPIC, message, 1000, 4);
}
}
以上代码段中,封装了HistoricProcessInstanceEntityMessage
消息类,该类包含了历史流程实例实体、操作类型和发送的时间戳。操作类型主要分为3种:插入(INSERT)、更新(UPDATE)和删除(DELETE)。发送MQ消息的延时级别为4,表示延时30秒开始消费。接下来,在MongoHistoricProcessInstanceDataManager
的增、删、改方法中调用HistoricDataCompensationProducer
发送操作异常的历史流程实例数据。其实现如下:
完整代码详见:
cn.blnp.net.flowable.boot.ext.manager.MongoHistoricPrecessInstanceDataManager#insert
//插入历史流程实例
@Override
public void insert(HistoricProcessInstanceEntity entity) {
log.info("Insert HistoricProcessInstanceEntity {}", entity);
executeWithCompensation(() -> mongoTemplate.insert(entity), entity, OpType.INSERT);
}
//更新历史流程实例
@Override
public HistoricProcessInstanceEntity update(HistoricProcessInstanceEntity entity) {
executeWithCompensation(() -> {
Query query = new Query();
query.addCriteria(Criteria.where("id").is(entity.getId()));
Document document = (Document) mongoTemplate.getConverter()
.convertToMongoType(entity);
Update update = Update.fromDocument(document);
mongoTemplate.updateFirst(query, update,
HistoricProcessInstanceDoc.class);
}, entity, OpType.UPDATE);
return entity;
}
//根据实体对象删除流程历史
@Override
public void delete(HistoricProcessInstanceEntity entity) {
executeWithCompensation(() -> {
Query query = new Query();
query.addCriteria(Criteria.where("id").is(entity.getId()));
mongoTemplate.remove(query, HistoricProcessInstanceDoc.class);
}, entity, OpType.DELETE);
}
private void executeWithCompensation(Runnable runnable,
HistoricProcessInstanceEntity entity,
OpType opType) {
try {
runnable.run();
} catch (Exception ex) {
historicDataProducer.sendJobMessage(opType, entity);
}
}
以上代码段中,表示对MongoDB数据进行增、删、改操作时,调用executeWithCompensation()
方法来执行。在该方法中,如果发生异常,则将历史流程实例数据和对应的操作发送到RocketMQ
。
最后,再消费RocketMQ
中的数据对异常数据进行补偿。需要注意的是,历史流程实例数据中存在版本号,补偿时只需补偿MongoDB
中版本号比RocketMQ
低的数据,其实现如下:
@Slf4j
@Component
@RocketMQMessageListener(
topic = HistoricDataCompensationProducer.HISTORIC_TOPIC,
consumerGroup = "historic_compensation_consumer_group"
)
public class HistoricDataCompensationConsumer implements RocketMQListener<HistoricProcessInstanceEntityMessage> {
@Resource
private ProcessEngineConfigurationImpl configuration;
@Override
public void onMessage(HistoricProcessInstanceEntityMessage entityMessage) {
HistoricProcessInstanceDataManager processInstanceDataManager = configuration.getHistoricProcessInstanceDataManager();
HistoricProcessInstanceEntity entity = processInstanceDataManager.findById(entityMessage.getEntity().getId());
switch (entityMessage.getOpType()) {
case INSERT:
if (entity == null) {
processInstanceDataManager.insert(entityMessage.getEntity());
}
break;
case UPDATE:
if (entity != null && entity.getRevision() <= entityMessage.getEntity().
getRevision()) {
processInstanceDataManager.update(entityMessage.getEntity());
}
break;
case DELETE:
if (entity != null && entity.getRevision() <= entityMessage.getEntity().
getRevision()) {
processInstanceDataManager.delete(entityMessage.getEntity());
}
break;
default:
log.error("Unsupported opType.message={}", entityMessage);
}
}
}