SpringBatch的Step默认使用同步方式批量处理数据,也可以通过配置将读数改为同步,处理和写入改为异步方式。
1、同步处理Step
SpringBatch的Step一般由ItemReader、ItemProcessor和ItemWriter组成,其中ItemProcessor是可选的。他的设计思路的通过ItemReader读取一条数据之后,汇总到inputs中,当达到chunkSize数量时,使用ItemProcessor处理数据,然后使用ItemWriter写入。
这个过程都是同步的操作,不存在异步的过程。实际业务处理过程中,数据一般来源于数据库,如果每次只读取一条数据,效率比较低,可以采用批量读取数据,单条返回的方式提高效率。如:DataReader通过游标批量读取数据
public class DataReader implements ItemReader<InputType> {
// 游标记录上个批次读取的数据
private long lastRowId = 0;
// 单次数据库读取的数据量
private int batchSize;
// 数据缓存迭代器
private Iterator<InputType> cacheIterator;
public DataReader(int batchSize) {
this.batchSize = batchSize;
}
@Override
public InputType read() throws Exception {
if (cacheIterator == null || !cacheIterator.hasNext()) {
// 使用游标方式批量查询数据库
List<InputType> batchList = ....
if (batchList == null || batchList.isEmpty()) {
return null; // 读取结束
}
// 更新lastRowId为当前批次最后一条的rowId
lastRowId = batchList.get(batchList.size() - 1).getRowId();
cacheIterator = batchList.iterator();
}
// 迭代器返回一条数据
return cacheIterator.next();
}
}
DataProcessor将读取的数据做业务处理,转化为OutputType类型数据传递给ItemWriter
public class DataProcessor implements ItemProcessor<InputType, OutputType> {
@Override
public OutputType process(InputType item) throws Exception {
// 处理item,转换为OutputType
return output;
}
}
DataWriter中写入OutputType类型数据到数据库
public class DataWriter implements ItemWriter<OutputType> {
@Override
public void write(List<? extends OutputType> items) throws Exception {
// 写入数据都数据库。。。
}
}
配置同步Step
return stepBuilderFactory.get("step1")
.<InputType, OutputType> chunk(100) // 每100条数据为一个批次,执行processor和writer
.reader(new DataReader(1000)) // 数据库每次读取1000条
.processor(new DataProcessor())
.writer(new DataWriter())
.build();
2、异步处理Step
在大数据量的批处理系统中,希望尽可能地提高性能,这时可以将ItemProcossor和ItemWriter环节采用异步多线程的方式进行优化,这是需要将ItemProcossor和ItemWriter分别包装为AsyncItemProcessor和AsyncItemWriter,如下方法可以实现包装:
private <I, O> AsyncItemProcessor<I, O> wrapAsyncProcessor(ItemProcessor<I, O> processor,
TaskExecutor taskExecutor) {
AsyncItemProcessor<I, O> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor);
asyncItemProcessor.setTaskExecutor(taskExecutor);
return asyncItemProcessor;
}
private <O> AsyncItemWriter<O> wrapAsyncWriter(ItemWriter<O> writer) {
AsyncItemWriter<O> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer);
return asyncItemWriter;
}
配置异步Step
private Step step2() {
AsyncItemProcessor<PayOrderPo, PayOrderPo> asyncItemProcessor =
wrapAsyncProcessor(new DataProcessor(), getAsyncExecutor("TestJobPool"));
AsyncItemWriter<PayOrderPo> asyncItemWriter = wrapAsyncWriter(new DataWriter());
return stepBuilderFactory.get("step2")
.<PayOrderPo, Future<PayOrderPo>> chunk(500)
.reader(new DataReader(1000))
.processor(asyncItemProcessor)
.writer(asyncItemWriter)
.build();
}
private TaskExecutor getAsyncExecutor(String threadPoolName) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix(threadPoolName + "-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
AsyncItemProcessor使用了代理模式,内部代理到ItemProcessor进行实际数据处理,通过taskExecutor线程池异步高性能处理数据
public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {
// 代理的ItemProcessor
private ItemProcessor<I, O> delegate;
private TaskExecutor taskExecutor = new SyncTaskExecutor();
public void afterPropertiesSet() throws Exception {
Assert.notNull(delegate, "The delegate must be set.");
}
@Nullable
public Future<O> process(final I item) throws Exception {
final StepExecution stepExecution = getStepExecution();
FutureTask<O> task = new FutureTask<>(new Callable<O>() {
public O call() throws Exception {
if (stepExecution != null) {
StepSynchronizationManager.register(stepExecution);
}
try {
// 代理的processor实际处理数据
return delegate.process(item);
}
finally {
if (stepExecution != null) {
StepSynchronizationManager.close();
}
}
}
});
// 提交异步任务
taskExecutor.execute(task);
return task;
}
}
处理的过程如下:
- 创建FutureTask,在其线程中实际调用
process(item)
方法进行数据处理。 - 通过
TaskExecutor
异步执行任务FutureTask - 返回
Future
对象给Writer来跟踪异步结果。
AsyncItemWriter同样使用了代理模式,代理到实际处理数据的ItemWriter,主要通过两个步骤进行:
1、获取processor环境的异步处理结果
2、汇总结果到实际的ItemWriter进行数据写入
public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {
// 代理的ItemWriter
private ItemWriter<T> delegate;
public void write(List<? extends Future<T>> items) throws Exception {
// 用于保存异步结果
List<T> list = new ArrayList<>();
// 获取异步结果
for (Future<T> future : items) {
try {
T item = future.get();
if(item != null) {
list.add(future.get());
}
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
if(cause != null && cause instanceof Exception) {
logger.debug("An exception was thrown while processing an item", e);
throw (Exception) cause;
}
else {
throw e;
}
}
}
// 代理到实际的ItemWriter进行数据写入
delegate.write(list);
}
}