前言
1、什么是管道模式
管道模式不属于我们常说的23种设计模式中的一种,它可以看成是责任链模式的一种变体。所谓的管道模式用技术话来说,就是把数据传递给一个任务队列,由任务队列按次序依次对数据进行加工处理。
2、什么样的场景适合用管道模式
当业务流程比较复杂时,需要拆分成多个子步骤,且每个子步骤可以自由组合,替换,新增,删除的场景
实现管道的一般套路
1、封装管道数据透传上下文
public class ChannelHandlerContext extends ConcurrentHashMap<String,Object> { protected static Class<? extends ChannelHandlerContext> contextClass = ChannelHandlerContext.class; protected static final TransmittableThreadLocal<? extends ChannelHandlerContext> CHAIN_CONTEXT = new TransmittableThreadLocal<ChannelHandlerContext>() { @Override protected ChannelHandlerContext initialValue() { try { return contextClass.getDeclaredConstructor().newInstance(); } catch (Throwable e) { throw new RuntimeException(e); } } }; /** * 覆盖默认的管道上下文 * * @param clazz */ public static void setContextClass(Class<? extends ChannelHandlerContext> clazz) { contextClass = clazz; } /** * 获取当前管道上下文 * * */ public static final ChannelHandlerContext getCurrentContext() { return CHAIN_CONTEXT.get(); } /** * 释放上下文资源 * * @return */ public void release() { this.clear(); CHAIN_CONTEXT.remove(); } /** * * 获取上下文默认值 * @param key * @param defaultValue * @return */ public Object getDefault(String key, Object defaultValue) { return Optional.ofNullable(get(key)).orElse(defaultValue); } public static final String CHANNEL_HANDLER_REQUEST_KEY = "channelHandlerRequest"; public ChannelHandlerRequest getChannelHandlerRequest() { return (ChannelHandlerRequest) this.getDefault(CHANNEL_HANDLER_REQUEST_KEY,ChannelHandlerRequest.builder().build()); } }
2、定义管道抽象执行器
public abstract class AbstactChannelHandler { private String channelHandlerName; public String getChannelHandlerName() { return channelHandlerName; } public void setChannelHandlerName(String channelHandlerName) { this.channelHandlerName = channelHandlerName; } public abstract boolean handler(ChannelHandlerContext chx); }
3、定义管道
@Slf4j public class ChannelPipeline { private LinkedBlockingDeque<AbstactChannelHandler> channelHandlers = new LinkedBlockingDeque(); private ChannelHandlerContext handlerContext; public ChannelPipeline addFirst(AbstactChannelHandler channelHandler){ return addFirst(null,channelHandler); } public ChannelPipeline addLast(AbstactChannelHandler channelHandler){ return addLast(null,channelHandler); } public ChannelPipeline addFirst(String channelHandlerName,AbstactChannelHandler channelHandler){ if(StringUtils.isNotBlank(channelHandlerName)){ channelHandler.setChannelHandlerName(channelHandlerName); } channelHandlers.addFirst(channelHandler); return this; } public ChannelPipeline addLast(String channelHandlerName,AbstactChannelHandler channelHandler){ if(org.apache.commons.lang3.StringUtils.isNotBlank(channelHandlerName)){ channelHandler.setChannelHandlerName(channelHandlerName); } channelHandlers.addLast(channelHandler); return this; } public void setChannelHandlers(LinkedBlockingDeque<AbstactChannelHandler> channelHandlers) { this.channelHandlers = channelHandlers; } public ChannelHandlerContext getHandlerContext() { return handlerContext; } public void setHandlerContext(ChannelHandlerContext handlerContext) { this.handlerContext = handlerContext; } public boolean start(ChannelHandlerRequest channelHandlerRequest){ if(channelHandlers.isEmpty()){ log.warn("channelHandlers is empty"); return false; } return handler(channelHandlerRequest); } private boolean handler(ChannelHandlerRequest channelHandlerRequest) { if(StringUtils.isBlank(channelHandlerRequest.getRequestId())){ channelHandlerRequest.setRequestId(String.valueOf(SnowflakeUtils.getNextId())); } handlerContext.put(ChannelHandlerContext.CHANNEL_HANDLER_REQUEST_KEY,channelHandlerRequest); boolean isSuccess = true; try { for (AbstactChannelHandler channelHandler : channelHandlers) { isSuccess = channelHandler.handler(handlerContext); if(!isSuccess){ break; } } if(!isSuccess){ channelHandlers.clear(); } } catch (Exception e) { log.error("{}",e.getMessage()); isSuccess = false; } finally { handlerContext.release(); } return isSuccess; } }
4、根据业务的复杂度拆分不同子任务管道执行器
@Slf4j public class UserCheckChannelHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ User user = (User)params; if(StringUtils.isBlank(user.getFullname())){ log.error("用户名不能为空"); return false; } return true; } return false; } }
@Slf4j public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler { @SneakyThrows @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ User user = (User)params; String fullname = user.getFullname(); HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat(); hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE); String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat); user.setUsername(username); user.setEmail(username + "@qq.com"); return true; } return false; } }
public class UserPwdEncryptChannelHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤三:用户密码明文转密文【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ String encryptPwd = DigestUtil.sha256Hex(((User) params).getPassword()); ((User) params).setPassword(encryptPwd); return true; } return false; } }
public class UserMockSaveChannelHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤四:模拟用户数据落库【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ Map<String, User> userMap = new HashMap<>(); User user = (User)params; userMap.put(user.getUsername(),user); chx.put("userMap",userMap); return true; } return false; } }
public class UserPrintChannleHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤五:打印用户数据【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ Object userMap = chx.get("userMap"); if(userMap instanceof Map){ Map map = (Map)userMap; if(map.containsKey(((User) params).getUsername())){ System.out.println(map.get(((User) params).getUsername())); return true; } } } return false; } }
5、对各个子任务进行编排组合
@Service public class UserServiceImpl implements UserService { @Override public boolean save(User user) { return ChannelPipelineExecutor.pipeline() .addLast(new UserCheckChannelHandler()) .addLast(new UserFillUsernameAndEmailChannelHandler()) .addLast(new UserPwdEncryptChannelHandler()) .addLast(new UserMockSaveChannelHandler()) .addLast(new UserPrintChannleHandler()) .start(ChannelHandlerRequest.builder().params(user).build()); } }
6、测试
Faker faker = Faker.instance(Locale.CHINA); User user = User.builder().age(20) .fullname(faker.name().fullName()) .mobile(faker.phoneNumber().phoneNumber()) .password("123456").build(); userService.save(user);
查看控制台
思考一下:上述实现的管道模式,有没有优化的空间?
在步骤5对各个子任务进行编排组合,假设子业务存在N个步骤,我们需要addLast N次,感觉有点硬编码了。因此我们可以做如下改造
改造
1、定义管道注解
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Component @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public @interface Pipeline { Class consumePipelinesService(); String consumePipelinesMethod(); Class[] args() default {}; int order(); }
2、定义管道扫描器
public class PipelineClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner { public PipelineClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) { super(registry); } @Override protected Set<BeanDefinitionHolder> doScan(String... basePackages) { Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages); for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) { GenericBeanDefinition beanDefinition = (GenericBeanDefinition) beanDefinitionHolder.getBeanDefinition(); String className = beanDefinition.getBeanClassName(); beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",className); beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class); } return beanDefinitionHolders; } @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { return beanDefinition.getMetadata().isInterface(); } }
3、定义管道注册器
public class PipelineImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { PipelineClassPathBeanDefinitionScanner scanner = new PipelineClassPathBeanDefinitionScanner(registry); scanner.addIncludeFilter(new AnnotationTypeFilter(FunctionalInterface.class)); Set<String> basePackages = getBasePackages(importingClassMetadata); String[] basePackageArr = {}; scanner.scan(basePackages.toArray(basePackageArr)); } protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) { Map<String, Object> attributes = importingClassMetadata.getAnnotationAttributes(EnabledPipeline.class.getCanonicalName()); Set<String> basePackages = new HashSet<>(); for (String pkg : (String[]) attributes.get("basePackages")) { if (StringUtils.hasText(pkg)) { basePackages.add(pkg); } } if (basePackages.isEmpty()) { basePackages.add( ClassUtils.getPackageName(importingClassMetadata.getClassName())); } return basePackages; } }
4、定义EnableXXX注解
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(PipelineImportBeanDefinitionRegistrar.class) public @interface EnabledPipeline { String[] basePackages() default {}; }
注:此外还需定义管道代理和管道factoryBean,因为篇幅就不贴了。感兴趣的朋友就查看文末的demo链接
5、将原有的管道任务执行器,改造成如下
@Slf4j @Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 1) public class UserCheckChannelHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】"); String json = JSON.toJSONString(channelHandlerRequest.getParams()); List<User> users = JSON.parseArray(json,User.class); if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){ log.error("用户名不能为空"); return false; } return true; } }
@Slf4j @Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 2) public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler { @SneakyThrows @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】"); String json = JSON.toJSONString(channelHandlerRequest.getParams()); List<User> users = JSON.parseArray(json,User.class); if(CollectionUtil.isNotEmpty(users)){ User user = users.get(0); String fullname = user.getFullname(); HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat(); hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE); String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat); user.setUsername(username); user.setEmail(username + "@qq.com"); return true; } return false; } }
。。。省略剩余管道任务执行器
6、原来的步骤编排,仅需写接口即可
@FunctionalInterface public interface UserService { boolean save(User user); }
仅需这样即可进行编排
7、测试
在启动类上加上@EnabledPipeline注解。示例如下
@SpringBootApplication @EnabledPipeline(basePackages = "com.github.lybgeek.pipeline.spring.test") public class SpringPipelineApplication { public static void main(String[] args) { SpringApplication.run(SpringPipelineApplication.class); } }
@Test public void testPipeline(){ boolean isOk = userService.save(user); Assert.assertTrue(isOk); }
编排的效果和之前的一样
总结
本文主要实现2种不同形式的管道模式,一种基于注解,编排步骤通过注解直接写在了执行器上,通过执行器去定位业务执行方法。另外一种是业务方法里面自己组合调用执行器。通过注解这方式虽然避免了业务方法自己去编排执行器,但也存在当执行器一多的话,就需要翻每个执行器类,看他的执行器顺序,这样可能会出现执行器因为顺序问题,而达不到我们想要的组合效果。基于这个问题,我将在下篇文章,在介绍其他2种实现方式
本文含有隐藏内容,请 开通VIP 后查看