Springboot 集成 SpringBatch 批处理组件

发布于:2025-06-30 ⋅ 阅读:(17) ⋅ 点赞:(0)

1.Spring Batch 简介

Spring Batch 是 Spring 生态系统中的​​企业级批处理框架​​,专门设计用于处理大规模数据作业。它提供了批处理应用所需的核心功能,解决了传统批处理应用开发中的重复性问题,使开发人员能够专注于业务逻辑而非基础设施。

  • 核心价值与定位​​

    ​​问题解决​​:自动化处理​​周期性的、数据密集型的​​任务(如报表生成、数据迁移、对账结算)
    ​​典型场景​​:
    每月财务报表生成
    银行日终批量交易处理
    电商平台每日用户行为分析
    百万级数据迁移(如旧系统到新系统)

2.批处理工具架构和示例

接口
ItemReader
处理 ItemProcessor
ItemWriter

项目结构

在这里插入图片描述

依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringBatcher</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-boot.version>3.5.3</spring-boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>4.0.3</version>
        </dependency>


        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope> <!-- 通常只需运行时依赖 -->
        </dependency>

    </dependencies>

</project>

启动类

package org.example;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class BatchApp {
    public static void main(String[] args) {
        SpringApplication.run(BatchApp.class, args);
    }
}

2.1 批处理任务持久化控制

示例代码基于 H2 存储

package org.example.config;

import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;

/**
 * @Author zhx && moon
 * @Since 21
 * @Date 2025-06-20 PM 4:21
 */
@Configuration
public class BatchJobConfig {

    @Bean
    public JobRepository jobRepository(DataSource dataSource) throws Exception {
        JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
        bean.setDataSource(dataSource);
        bean.setDatabaseType("H2");
        bean.setTransactionManager(new DataSourceTransactionManager(dataSource));
        bean.afterPropertiesSet();
        return bean.getObject();
    }

}

2.2 实现一个读取器

以 Excel 文件读取为例

package org.example.job.common;

import com.alibaba.excel.EasyExcel;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;

import java.io.File;
import java.util.List;

/**
 * @Author zhx && moon
 * @Since 21
 * @Date 2025-06-24 PM 2:16
 */
public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean {

    private final Class<T> clazz;
    private final String filePath;
    private List<T> cacheList;
    private int index = 0;

    public EasyExcelItemReader(Class<T> clazz, String filePath) {
        this.clazz = clazz;
        this.filePath = filePath;
    }

    @Override
    public void afterPropertiesSet() {
        try {
            // 一次性读取Excel所有数据(适用于中小文件)
            cacheList = EasyExcel.read(new File(filePath))
                    .head(clazz)
                    .sheet()
                    .headRowNumber(1) // 跳过标题行
                    .doReadSync();
        } catch (Exception e) {
            throw new RuntimeException("read excel failed ", e);
        }
    }

    @Override
    public T read() {
        if (index < cacheList.size()) {
            return cacheList.get(index++);
        }
        // 重置读取的位置
        index = 0;
        return null;
    }
}

2.3 定义批处理JOB

package org.example.job;

import org.example.entity.User;
import org.example.job.common.EasyExcelItemReader;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * @Author zhx && moon
 * @Since 21
 * @Date 2025-06-24 PM 2:08
 */
@Component
public class SVCJob {

    /**
     * Excel 读取
     * @return
     */
    @Bean("easyExcelItemReader")
    public EasyExcelItemReader<User> easyExcelItemReader() {
        return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx");
    }

    /**
     * 数据处理器 对读取的数据进行加工
     * @return
     */
    @Bean("getNameProcessors")
    public ItemProcessor<User, String> getNameProcessors() {
        return item -> {
            return item.getName();
        };
    }

    /**
     * 配置写入器(保持不变)
     * @return
     */
    @Bean("nameWriter")
    public ItemWriter<String> nameWriter() {
        return items -> {
            for (String item : items) {
                System.out.println("User Name: " + item);
            }
        };
    }

    /**
     * 配置批处理步骤(使用新版API)
     * @param jobRepository
     * @param transactionManager
     * @param reader
     * @param processor
     * @param writer
     * @return
     */
    @Bean("easyExcelStep")
    public Step easyExcelStep(JobRepository jobRepository,
                           PlatformTransactionManager transactionManager,
                           @Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader,
                           @Qualifier("getNameProcessors") ItemProcessor<User, String> processor,
                           @Qualifier("nameWriter") ItemWriter<String> writer) {

        return new StepBuilder("easyExcelStep", jobRepository)
                .<User, String>chunk(100, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant()
                .skipLimit(1)
                .skip(IllegalArgumentException.class)
                .listener(new StepExecutionListener() {
                    @Override
                    public void beforeStep(StepExecution stepExecution) {
                        System.out.println("start to processor data ...");
                    }
                })
                .build();
    }


    /**
     * 配置批处理作业
     * @param jobRepository
     * @param importStep
     * @return
     */
    @Bean("easyExcelImportJobs")
    public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) {

        return new JobBuilder("easyExcelImportJobs", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(importStep)
                .listener(new JobExecutionListener() {
                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        System.out.println("Job Finished!State: " + jobExecution.getStatus());
                    }
                })
                .build();
    }

}

2.4数据实体

package org.example.entity;

import com.alibaba.excel.annotation.ExcelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author zhx && moon
 * @Since 21
 * @Date 2025-06-24 PM 2:20
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

    @ExcelProperty("姓名")
    private String name;

    @ExcelProperty("编号")
    private String employeeId;

    @ExcelProperty("年龄")
    private Integer age;

}

2.5 接口类

package org.example.controller;

import jakarta.annotation.Resource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author zhx && moon
 * @Since 21
 * @Date 2025-06-23 PM 4:40
 */
@RestController
@RequestMapping("/job")
public class JobManage {

    @Autowired
    private JobLauncher jobLauncher;

    @Resource(name = "easyExcelImportJobs")
    Job job;

    @GetMapping("/start")
    public void start(){
        try {
            JobParameters params = new JobParametersBuilder()
                    .addLong("uniqueId", System.nanoTime())
                    .toJobParameters();
            jobLauncher.run(job, params);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

}

4.6 H2 配置

spring:
  datasource:
    url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb  #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdb
    driver-class-name: org.h2.Driver
    username: sa
    password: sa
  h2:
    console:
      enabled: true
      path: /h2/db-console
      settings:
        web-allow-others: true
  batch:
    jdbc:
      initialize-schema: always




4.7 H2 数据库脚本

-- Autogenerated: do not edit this file

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP(9) NOT NULL,
	START_TIME TIMESTAMP(9) DEFAULT NULL ,
	END_TIME TIMESTAMP(9) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP(9),
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	PARAMETER_NAME VARCHAR(100) NOT NULL ,
	PARAMETER_TYPE VARCHAR(100) NOT NULL ,
	PARAMETER_VALUE VARCHAR(2500) ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP(9) NOT NULL,
	START_TIME TIMESTAMP(9) DEFAULT NULL ,
	END_TIME TIMESTAMP(9) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP(9),
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT LONGVARCHAR ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT LONGVARCHAR ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;

3.测试

启动服务

在这里插入图片描述

测试 H2 连接

在这里插入图片描述

测试数据

在这里插入图片描述

触发 JOB

在这里插入图片描述

JOB 执行记录

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到