SpringBatch处理数据性能优化

发布于:2025-06-29 ⋅ 阅读:(21) ⋅ 点赞:(0)

SpringBatch的Step默认使用同步方式批量处理数据,也可以通过配置将读数改为同步,处理和写入改为异步方式。

1、同步处理Step

SpringBatch的Step一般由ItemReader、ItemProcessor和ItemWriter组成,其中ItemProcessor是可选的。他的设计思路的通过ItemReader读取一条数据之后,汇总到inputs中,当达到chunkSize数量时,使用ItemProcessor处理数据,然后使用ItemWriter写入。

这个过程都是同步的操作,不存在异步的过程。实际业务处理过程中,数据一般来源于数据库,如果每次只读取一条数据,效率比较低,可以采用批量读取数据,单条返回的方式提高效率。如:DataReader通过游标批量读取数据

public class DataReader implements ItemReader<InputType> {
    // 游标记录上个批次读取的数据
    private long lastRowId = 0;

    // 单次数据库读取的数据量
    private int batchSize;

    // 数据缓存迭代器
    private Iterator<InputType> cacheIterator;

    public DataReader(int batchSize) {
        this.batchSize = batchSize;
    }

    @Override
    public InputType read() throws Exception {
        if (cacheIterator == null || !cacheIterator.hasNext()) {
            // 使用游标方式批量查询数据库
            List<InputType> batchList = ....

            if (batchList == null || batchList.isEmpty()) {
                return null; // 读取结束
            }

            // 更新lastRowId为当前批次最后一条的rowId
            lastRowId = batchList.get(batchList.size() - 1).getRowId();
            cacheIterator = batchList.iterator();
        }
        // 迭代器返回一条数据
        return cacheIterator.next();
    }
}

DataProcessor将读取的数据做业务处理,转化为OutputType类型数据传递给ItemWriter

public class DataProcessor implements ItemProcessor<InputType, OutputType> {

    @Override
    public OutputType process(InputType item) throws Exception {
        // 处理item,转换为OutputType
        return output;
    }
}

DataWriter中写入OutputType类型数据到数据库

public class DataWriter implements ItemWriter<OutputType> {
    @Override
    public void write(List<? extends OutputType> items) throws Exception {
        // 写入数据都数据库。。。
    }
}

配置同步Step

return stepBuilderFactory.get("step1")
    .<InputType, OutputType> chunk(100)  // 每100条数据为一个批次,执行processor和writer
    .reader(new DataReader(1000)) // 数据库每次读取1000条
    .processor(new DataProcessor())
    .writer(new DataWriter())
    .build();

2、异步处理Step

在大数据量的批处理系统中,希望尽可能地提高性能,这时可以将ItemProcossor和ItemWriter环节采用异步多线程的方式进行优化,这是需要将ItemProcossor和ItemWriter分别包装为AsyncItemProcessor和AsyncItemWriter,如下方法可以实现包装:

private <I, O> AsyncItemProcessor<I, O> wrapAsyncProcessor(ItemProcessor<I, O> processor,
                                                           TaskExecutor taskExecutor) {
    AsyncItemProcessor<I, O> asyncItemProcessor = new AsyncItemProcessor<>();
    asyncItemProcessor.setDelegate(processor);
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    return asyncItemProcessor;
}

private <O> AsyncItemWriter<O> wrapAsyncWriter(ItemWriter<O> writer) {
    AsyncItemWriter<O> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(writer);
    return asyncItemWriter;
}

配置异步Step

private Step step2() {
    AsyncItemProcessor<PayOrderPo, PayOrderPo> asyncItemProcessor =
        wrapAsyncProcessor(new DataProcessor(), getAsyncExecutor("TestJobPool"));
    AsyncItemWriter<PayOrderPo> asyncItemWriter = wrapAsyncWriter(new DataWriter());
    return stepBuilderFactory.get("step2")
        .<PayOrderPo, Future<PayOrderPo>> chunk(500)
        .reader(new DataReader(1000))
        .processor(asyncItemProcessor)
        .writer(asyncItemWriter)
        .build();
}
private TaskExecutor getAsyncExecutor(String threadPoolName) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(200);
    executor.setKeepAliveSeconds(60);
    executor.setThreadNamePrefix(threadPoolName + "-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setAllowCoreThreadTimeOut(true);
    executor.initialize();
    return executor;
}

AsyncItemProcessor使用了代理模式,内部代理到ItemProcessor进行实际数据处理,通过taskExecutor线程池异步高性能处理数据

public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {
    // 代理的ItemProcessor
	private ItemProcessor<I, O> delegate;

	private TaskExecutor taskExecutor = new SyncTaskExecutor();

	public void afterPropertiesSet() throws Exception {
		Assert.notNull(delegate, "The delegate must be set.");
	}


	@Nullable
	public Future<O> process(final I item) throws Exception {
		final StepExecution stepExecution = getStepExecution();
		FutureTask<O> task = new FutureTask<>(new Callable<O>() {
			public O call() throws Exception {
				if (stepExecution != null) {
					StepSynchronizationManager.register(stepExecution);
				}
				try {
                    // 代理的processor实际处理数据
					return delegate.process(item);
				}
				finally {
					if (stepExecution != null) {
						StepSynchronizationManager.close();
					}
				}
			}
		});
        // 提交异步任务
		taskExecutor.execute(task);
		return task;
	}
}

处理的过程如下:

  1. 创建FutureTask,在其线程中实际调用 process(item) 方法进行数据处理。
  2. 通过 TaskExecutor 异步执行任务FutureTask
  3. 返回 Future 对象给Writer来跟踪异步结果。

AsyncItemWriter同样使用了代理模式,代理到实际处理数据的ItemWriter,主要通过两个步骤进行:

1、获取processor环境的异步处理结果

2、汇总结果到实际的ItemWriter进行数据写入

public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {
    // 代理的ItemWriter
	private ItemWriter<T> delegate;

	public void write(List<? extends Future<T>> items) throws Exception {
        // 用于保存异步结果
		List<T> list = new ArrayList<>();
        // 获取异步结果
		for (Future<T> future : items) {
			try {
				T item = future.get();

				if(item != null) {
					list.add(future.get());
				}
			}
			catch (ExecutionException e) {
				Throwable cause = e.getCause();

				if(cause != null && cause instanceof Exception) {
					logger.debug("An exception was thrown while processing an item", e);

					throw (Exception) cause;
				}
				else {
					throw e;
				}
			}
		}
		
        // 代理到实际的ItemWriter进行数据写入
		delegate.write(list);
	}
}

网站公告

今日签到

点亮在社区的每一天
去签到