List ToMap优化优化再优化到极致

发布于:2025-06-14 ⋅ 阅读:(20) ⋅ 点赞:(0)

请对以下代码进行优化!

看着是不是很简单?你是不是日常也会这么写?

当前使用的是java21,

//1-统计blockName出现的次数
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        stockList.parallelStream().forEach(st -> {
            String blockName = st.getBlock_name();
            if (map.containsKey(blockName)) {
                map.put(blockName, map.get(blockName) + 1);
            } else {
                map.put(blockName, 1);
            }
        });
//2-保存blockName出现的次数
stockList.parallelStream().forEach(st -> {
    String blockName = st.getBlock_name();
   st.setFinance_type(String.valueOf(map.get(blockName)));
});

优化一

ConcurrentHashMapparallelStream 来统计 stockList 中每个 blockName 的出现次数,并将统计结果设置到每个 stock 对象的 finance_type 属性中。虽然代码功能正确,但存在以下优化空间:

  1. 重复遍历问题:代码使用了两个 parallelStream().forEach 分别处理计数和赋值,增加了开销。可以合并为一次遍历。

  2. 线程安全改进ConcurrentHashMapcontainsKeyget 组合不是原子操作,可能导致并发问题。可以使用 computemerge 方法来确保线程安全。

  3. 性能优化parallelStream 在小数据集上可能引入不必要的线程开销,建议根据数据规模选择是否并行。

  4. 代码简洁性:可以使用 Collectors.groupingBy 简化计数逻辑。

以下是优化后的代码:

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 统计 blockName 出现次数
stockList.parallelStream()
    .collect(Collectors.groupingBy(
        Stock::getBlock_name,
        Collectors.counting()))
    .forEach((blockName, count) -> 
        map.put(blockName, count.intValue()));

// 设置 finance_type
stockList.parallelStream().forEach(st -> 
    st.setFinance_type(String.valueOf(map.get(st.getBlock_name()))));

优化点说明:

  1. 单次统计:使用 Collectors.groupingByCollectors.counting 一次性完成 blockName 的计数,替代手动 forEachput 操作。

  2. 线程安全collect 操作是线程安全的,生成的 Map 结果直接写入 ConcurrentHashMap,避免了 containsKeyget 的非原子操作。

  3. 代码更简洁groupingBy函数式编程风格,逻辑更清晰,减少了手动维护 map 的代码。

  4. 保留并行处理:继续使用 parallelStream 保持并发性能,适合大数据量场景。

进一步优化建议:

  • 并行与串行选择:如果 stockList 数据量较小(例如几百条),parallelStream 的线程切换成本可能高于单线程处理,建议测试切换为 stream() 的性能。

  • 合并操作:如果 finance_type 只依赖于计数结果,可以进一步合并计数和赋值逻辑为单次遍历,但需要权衡代码可读性。例如:

    Map<String, Long> countMap = stockList.parallelStream()
        .collect(Collectors.groupingBy(
            Stock::getBlock_name,
            Collectors.counting()));
    stockList.forEach(st -> 
        st.setFinance_type(String.valueOf(countMap.get(st.getBlock_name()))));

      这里去掉了 ConcurrentHashMap,因为计数后的 Map 只读不写,无需并发支持。

    • 异常处理:确保 getBlock_name 不会返回 null,否则需添加空值检查以避免 NullPointerException

    • 性能测试:建议对优化后的代码进行性能测试,确认在你的实际数据规模下是否提升了效率。

    注意事项:

    • 如果 stockList 非常大,parallelStream 能有效利用多核 CPU,但需确保 Stock 对象的 getBlock_namesetFinance_type 方法线程安全。

    • 如果 finance_type 的值仅为字符串化的计数,考虑直接在 Stock 类中维护计数逻辑,减少 Map 的使用(视业务需求而定)。

    优化二

    针对上百万条数据的场景,性能和线程安全是关键。以下是针对你的需求(ConcurrentHashMap 统计 blockName 出现次数并设置 finance_type)进一步优化的代码,重点提升性能和确保绝对线程安全,同时保持简洁性。

    优化目标

    1. 性能最佳:减少遍历次数,优化并行处理,降低锁竞争。

    2. 线程安全:确保所有操作在高并发下无竞争或数据不一致。

    3. 适用大数量:针对百万级数据,充分利用多核 CPU,减少内存开销。

    优化后的代码

     ConcurrentMap<String, Long> collect1 = stockList.parallelStream().filter(st -> st.getBlock_name() != null).collect(Collectors.groupingByConcurrent(Stock::getBlock_name, Collectors.counting()));
            stockList.parallelStream().forEach(st -> {
                st.setFinance_type(String.valueOf(map.get(st.getBlock_name())));
            });

    优化点详解

    1. 使用 **groupingByConcurrent**

      1. Collectors.groupingByConcurrent 专为并行流设计,内部使用 ConcurrentHashMap 存储结果,确保计数过程线程安全。

      2. 相比普通 groupingBy,它减少了合并阶段的开销,适合百万级数据的高并发场景。

      3. 直接生成计数结果,省去手动维护 ConcurrentHashMap 的步骤。

    2. 单次计数 + 单次赋值

      1. 第一次 parallelStream 完成所有 blockName 的计数,生成 countMap

      2. 第二次 parallelStream 遍历 stockList,将计数结果设置到 finance_type

      3. 虽然仍是两次遍历,但每一步都是高度并行的,且避免了在计数阶段对 ConcurrentHashMap 的竞争性写入。

    3. 线程安全保证

      1. groupingByConcurrent 确保计数过程无竞争。

      2. countMap.get 是只读操作,ConcurrentHashMap 的读操作天然线程安全,无需额外同步。

      3. 假设 Stock.setFinance_type 是简单的字段赋值(无复杂逻辑),则线程安全无问题。如果 setFinance_type 包含复杂逻辑,需确保其线程安全。

    4. 性能优化

      1. 并行流parallelStream 充分利用多核 CPU,适合百万级数据。Java 21 的 Fork/Join 框架会根据 CPU 核心数动态分配任务。

      2. 避免锁竞争groupingByConcurrent 内部优化了 ConcurrentHashMap 的分段锁,减少并发写入的瓶颈。

      3. 内存效率countMap 只存储每个 blockName 的计数,内存占用可控。

    终极优化

    针对你的需求(百万级数据、所有 finance_type 基于最终计数、性能最佳且线程安全),且考虑到系统支持异步框架(如 Java 21 的虚拟线程或 Reactor),以下是使用 CompletableFuture 和虚拟线程优化的最终代码方案。相比之前的 groupingByConcurrent 方案,异步框架可以进一步提升吞吐量,尤其在高并发场景下。代码确保 finance_type 基于最终计数值

    最终优化代码(基于 CompletableFuture 和虚拟线程)

    ·        

    //使用虚拟线程执行器
            Executor virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
            //异步计数
            CompletableFuture<ConcurrentMap<String, Long>> concurrentMapCompletableFuture = CompletableFuture.supplyAsync(() -> stockList.parallelStream().filter(st -> st.getBlock_name() != null)
                            .collect(Collectors.groupingByConcurrent(
                                    Stock::getBlock_name,
                                    Collectors.counting()
                            ))
                    , virtualThreadExecutor);
            //异步赋值
            CompletableFuture<Void> voidCompletableFuture = concurrentMapCompletableFuture.thenAcceptAsync(countMap -> stockList.parallelStream().filter(st -> st.getBlock_name() != null)
                    .forEach(st -> st.setFinance_type(String.valueOf(countMap.get(st.getBlock_name())))), virtualThreadExecutor);
    
            //wait for all
            voidCompletableFuture.join();

    优化说明

    1. 虚拟线程(Java 21)

      1. 使用 Executors.newVirtualThreadPerTaskExecutor() 创建虚拟线程执行器。虚拟线程是 Java 21 的轻量级线程,创建和切换成本极低,适合高并发任务。

      2. 虚拟线程允许为每个任务分配独立线程,最大化并发性,特别适合 I/O 密集或阻塞操作较少的场景(如你的计数和赋值逻辑)。

    2. CompletableFuture 异步处理

      1. 将计数和赋值分为两个异步阶段:

        • countFuture:异步执行 groupingByConcurrent,生成最终计数 Map

        • assignFuture:在计数完成后异步执行 finance_type 赋值。

      2. thenAcceptAsync 确保赋值阶段等待计数完成,保证 finance_type 基于最终计数值。

      3. 异步执行减少主线程阻塞,提高吞吐量。

    3. 线程安全

      1. groupingByConcurrent 使用 ConcurrentHashMap,计数过程线程安全。

      2. 赋值阶段只读 countMap,无竞争,Stock.setFinance_type 假设为简单字段赋值(线程安全)。

      3. 虚拟线程隔离任务,降低线程切换开销。

    4. 性能优化

      1. 并行流 + 虚拟线程parallelStream 结合虚拟线程充分利用多核 CPU,适合百万级数据。

      2. 异步流水线CompletableFuture 将计数和赋值解耦,允许系统在等待计数完成时调度其他任务。

      3. 低开销:虚拟线程的轻量级特性减少上下文切换成本,相比传统线程池更高效。

    5. 空值处理

      1. 添加 filter(st -> st.getBlock_name() != null) 防止 blockNamenull 导致异常。

    为什么优于之前的方案

    • 相比 **groupingByConcurrent****(同步并行流)**:

      • 虚拟线程减少线程池管理的开销,适合高并发任务。

      • CompletableFuture 的异步执行允许更灵活的任务调度,可能在多任务环境中提升整体吞吐量。

    • 相比 AtomicInteger** 单次遍历**:

      • 确保 finance_type 基于最终计数值(如 "44444"),符合你的业务需求。

      • 避免了 AtomicInteger 的 CAS 竞争问题,性能更稳定。

    最后执行结果比较

    怎么样,结果是不是出乎意料。

    java8如何切换到java21?

    已经都设置java21了,但是还报错

    java: 找不到符号   符号:   方法 newVirtualThreadPerTaskExecutor()   位置: 类 java.util.concurrent

    原因 没有完全设置

    1-idea里设置。

    2-pom.xml里再继续设置。