Java多线程批量处理数据

发布于:2024-04-26 ⋅ 阅读:(24) ⋅ 点赞:(0)
package com.demo.studydemo.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
 * 批量数据处理示例类。
 * 该类演示了如何通过多线程批量处理数据,将数据集分割成多个部分,每个部分由一个单独的线程进行处理。
 * @author lhj
 * @date 2024/04/26
 */
public class BatchDataProcessingExample {

    /**
     * 程序的主入口函数。
     * @param args 命令行参数(未使用)
     * @throws InterruptedException 如果线程在等待时被中断,则抛出此异常。
     */
    public static void main(String[] args) throws InterruptedException {
        // 初始化数据处理参数
        int dataSize = 1_000_000; // 需要处理的数据量
        int batchSize = 100_000; // 每个线程处理的数据量
        int threadCount = (dataSize + batchSize - 1) / batchSize; // 根据数据量和批量大小计算所需线程数
        System.out.println("需要启动" + threadCount + "个线程处理数据");

        // 模拟数据集
        List<Integer> data = new ArrayList<>(dataSize);
        for (int i = 0; i < dataSize; i++) {
            data.add(i);
        }

        // 使用CountDownLatch来同步线程
        CountDownLatch latch = new CountDownLatch(threadCount);

        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        // 分配任务到线程池
        for (int i = 0; i < threadCount; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, dataSize);
            List<Integer> subList = data.subList(start, end);
            executor.submit(new DataProcessor(subList, latch));
        }

        // 等待所有任务完成
        latch.await();
        System.out.println("所有数据处理完成");

        // 关闭线程池
        executor.shutdown();
    }

    /**
     * 数据处理任务类,实现了Runnable接口。
     */
    static class DataProcessor implements Runnable {
        private final List<Integer> dataList; // 待处理的数据列表
        private final CountDownLatch latch; // 线程同步的计数器

        /**
         * DataProcessor 构造函数。
         * @param dataList 待处理的数据列表。
         * @param latch 线程同步的计数器。
         */
        public DataProcessor(List<Integer> dataList, CountDownLatch latch) {
            this.dataList = dataList;
            this.latch = latch;
        }

        /**
         * 执行数据处理的方法。
         */
        @Override
        public void run() {
            processData(dataList);
            // 任务完成,计数器减一
            latch.countDown();
            System.out.println("当前线程"+ Thread.currentThread().getName() +"完成数据处理");
        }

        /**
         * 示例数据处理方法。
         * @param sublist 待处理的数据子列表。
         */
        private void processData(List<Integer> sublist) {
            // 简化处理逻辑,仅打印处理的数据范围
            System.out.println("线程 " + Thread.currentThread().getName() + " 正在处理数据范围: " + sublist.get(0) + " 到 " + sublist.get(sublist.size() - 1));
            // 可以在此处添加实际的数据处理逻辑
        }
    }
}


Java中,通过`ExecutorService`接口创建的线程池提供了两种主要的方法来提交任务:`submit()`和`execute()`。这两种方法都可以用来执行`Runnable`或`Callable`任务,但它们之间存在一些差异:

1. **execute()**- **参数**:接受一个`Runnable`类型的任务。
   - **返回值**:`void`,也就是说它不返回任何结果。
   - **异常处理**:它不提供直接获取任务执行时抛出异常的机制。如果任务执行期间抛出了异常,它会被传递给线程池的`uncaughtExceptionHandler`处理,而不是直接暴露给调用者。
   - **用途**:通常用于不需要关注任务执行结果的场景,或者通过其他方式(如Future、回调等)来处理结果或异常。

2. **submit()**- **参数**:可以接受`Runnable`或`Callable`类型的任务。如果传入的是`Callable`,那么`submit()`会返回一个`Future`对象。
   - **返回值**:对于`Runnable`,返回一个表示任务执行完成的`Future<Void>`;对于`Callable`,返回一个`Future<T>`,其中`T`是`Callable`任务返回的结果类型。
   - **异常处理**:如果任务执行期间抛出了异常,这个异常会被封装进返回的`Future`对象中,调用者可以通过`Future.get()`方法来获取异常或结果,从而可以更精细地控制异常处理逻辑。
   - **用途**:适用于需要获取任务执行结果或需要处理任务执行过程中可能抛出异常的场景。`Future`提供了检查任务是否完成、取消任务以及获取结果的方法。

**总结**- 如果你不需要关心任务的执行结果,或者任务没有结果需要返回,可以使用`execute()`。
- 如果你需要获得任务的执行结果,或者需要能够取消任务、检查任务状态等高级功能,应该使用`submit()`,尤其是当你提交的是`Callable`任务时。通过`submit()`得到的`Future`对象为你提供了更多的灵活性和控制力。