并发编程——14 线程池参数动态化

发布于:2025-09-04 ⋅ 阅读:(20) ⋅ 点赞:(0)

1 线程池在业务中的实践

1.1 业务场景

  • 在当今的互联网业界,为了最大程度利用 CPU 的多核性能,并行运算的能力是不可或缺的。通过线程池管理线程获取并发性是一个非常基础的操作,下面看两个典型的使用线程池获取并发性的场景。

  • 场景一:快速响应用户请求

    • 描述:当用户发起实时请求(如查看商品信息,需聚合价格、优惠、库存、图片等信息)时,服务追求短响应时间;

    • 分析:从用户体验看,响应越快越好。而商品信息聚合这类功能复杂,存在调用级联等情况。此时用线程池,把各调用封装成任务并行执行,能缩短总体响应时间。为了最大程度提升响应速度,不设置队列缓冲并发任务,调高corePoolSize(核心线程数)和maxPoolSize(最大线程数),尽可能创建多的线程快速执行任务。如下图所示,串行执行时获取价格、库存、图片是依次进行,耗时久;并行执行时这些操作同时开展,能更快返回结果;

    在这里插入图片描述

  • 场景2:快速处理批量任务

    • 描述:像统计报表(计算全国各门店有某属性的商品以辅助营销策略)这类离线大量计算任务,需要快速执行;

    • 分析:这类场景任务量大,虽也希望快,但不需要瞬时完成,更关注单位时间内处理更多任务(吞吐量优先)。所以要用多线程并行计算,但要设置队列缓冲并发任务,调整合适的corePoolSize。因为线程数过多会引发线程上下文切换频繁,降低处理速度和吞吐量。如下图所示,串行执行批量任务时,任务依次进行,总耗时是各任务耗时之和;并行执行时,任务先缓存到队列,然后线程并行处理队列里的任务,能更高效完成,提升吞吐量;

    在这里插入图片描述

1.2 实际问题及方案思考

  • 线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面其运行机制难理解,合理配置需要强依赖开发人员的经验与知识;另一方面,线程池执行情况和任务类型(如IO密集型、CPU密集型)关联性强,不同任务运行表现差异大,业界缺乏成熟普适的经验策略;

  • 关于线程池配置不合理引发的故障,下面举一些例子:

    • Case1:XX页面展示接口大量调用降级

      • 事故描述:XX页面展示接口出现大量调用降级情况,数量级达到几十到上百;
      • 事故原因:该服务展示接口内部逻辑采用线程池进行并行计算,由于没有预先估算好调用的流量,使得最大线程数设置偏小。当任务量超过线程池承载能力时,大量抛出RejectedExecutionException异常,从而触发接口降级条件。如下图:请求进入展示接口后生成多个任务,线程池因容量不足,只能处理部分任务,其余任务被拒绝;

      在这里插入图片描述

    • Case2:XX业务服务不可用S2级故障

      • 事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,进而导致大量下游服务调用失败;
      • 事故原因:该服务处理请求的内部逻辑使用线程池做资源隔离,但队列设置过长,且最大线程数设置失效。当请求数量增加时,大量任务堆积在队列中,任务执行时间被大幅拉长,最终造成下游服务大量调用超时。如下图:请求生成任务后,任务都在队列中缓冲等待执行,而核心线程数设置过小,导致任务执行速率低;

      在这里插入图片描述

  • 业务中要使用线程池,而线程池使用不当又会导致故障,那么我们怎样才能更好地使用线程池呢?

2 线程池参数动态化

2.1 引出

  • 在日常项目开发中,线程池用于处理并发场景以提升任务处理效率。但传统方式下,线程池参数难以精准预设,只能在服务运行时不断调整参数,且每次调整都需重启服务,这会对服务可用性造成影响,因此需要一种不重启服务就能动态调整参数的方案;

  • 那如何实现在不重启服务的前提下,动态调整线程池参数呢?

    在这里插入图片描述

    • 旧流程:修改线程池参数 → 重新发布服务 → 查看服务是否运行正常 → 结束。流程繁琐,且重新发布意味着服务会中断或重启

    • 新流程:仅需修改线程池参数即可完成,无需重启服务,大幅简化了参数调整的成本,提升了服务的灵活性;

2.2 线程池可调整的参数

  • 线程池构造参数有8个,但最核心的3个是corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、workQueue(任务队列)。这3个参数直接决定了线程池的任务分配和线程分配策略,结合不同业务场景,又衍生出两种典型队列选择逻辑:
    • 场景1:并行执行子任务,提高响应速度。这类场景追求即时响应(如前文快速响应用户请求场景),应使用同步队列。同步队列不缓存任务,任务到达后会立即尝试执行,避免任务在队列中等待的时间开销,保障响应速度;

    • 场景2:并行执行大批量任务,提升吞吐量。这类场景追求单位时间处理更多任务(如前文快速处理批量任务场景),应使用有界队列。有界队列会缓存大批量任务,通过队列缓冲来削峰填谷,同时要声明队列容量,防止任务无限制堆积导致内存溢出等问题。

2.3 实现思路

  • JDK提供的ThreadPoolExecutor类,通过多个publicsetter方法(如setCorePoolSizesetMaximumPoolSizesetKeepAliveTime等),支持在运行时动态修改线程池核心参数

    在这里插入图片描述

  • setCorePoolSize为例,修改后线程池会根据新核心线程数当前工作线程数原始值的对比,采取不同策略:

    • 若新核心线程数 < 当前工作线程数:说明有多余的worker线程,会尝试将它们中断,回收多余线程;

    • 若新核心线程数 > 原始值,且任务队列中有等待任务:会创建新的Worker线程,加快任务执行;

    在这里插入图片描述

  • 示例代码:

    public class DynamicThreadPool {
        
        // 初始化ThreadPoolExecutor
        private ThreadPoolExecutor executor;
        // 指定核心线程数、最大线程数、存活时间、任务队列等基础参数
        public DynamicThreadPool(int corePoolSize, 
                                 int maximumPoolSize, 
                                 long keepAliveTime, 
                                 TimeUnit unit, 
                                 BlockingQueue<Runnable> workQueue) {
            this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        // 调用ThreadPoolExecutor的setCorePoolSize和setMaximumPoolSize,动态修改核心线程数和最大线程数,无需重启服务
        public void adjustThreadPool(int newCorePoolSize, int newMaximumPoolSize) {
            this.executor.setCorePoolSize(newCorePoolSize);
            this.executor.setMaximumPoolSize(newMaximumPoolSize);
        }
    
        // 向线程池提交任务(调用execute方法)
        public void submitTask(Runnable task) {
            this.executor.execute(task);
        }
    
        // 打印线程池当前状态(核心线程数、最大线程数、活跃线程数),用于观察参数调整效果
        public void print(){
            System.out.println("核心线程数:" + executor.getCorePoolSize()
                    + " " +"最大线程数:" + executor.getMaximumPoolSize()
                    +" " + "活跃线程数:" + executor.getActiveCount());
        }
    
        // 关闭线程池
        public void shutdown() {
            this.executor.shutdown();
        }
    
        // 测试
        public static void main(String[] args) throws InterruptedException {
            // 初始化任务队列和DynamicThreadPool,核心 / 最大线程数均为 10
            BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
            DynamicThreadPool dynamicThreadPool = new DynamicThreadPool(10, 10, 10, TimeUnit.SECONDS, workQueue);
    
            // 提交 30 个任务(每个任务休眠 10 秒,模拟耗时操作)
            for (int i = 0; i < 30; i++) {
                dynamicThreadPool.submitTask(() -> {
                    log.info(Thread.currentThread().getName() + "开始执行任务");
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    
            // 休眠 1 秒后,打印“修改前”的线程池状态
            Thread.sleep(1000);
            System.out.println("======修改前=======");
            dynamicThreadPool.print();
    
            // 动态调整线程池参数。调用adjustThreadPool,将核心线程数改为 5、最大线程数改为 8
            dynamicThreadPool.adjustThreadPool(5, 8);
    
            System.out.println("======修改后=======");
            dynamicThreadPool.print();
    
            // 打印 “修改后” 的线程池状态,再休眠 10 秒后再次打印,观察参数调整对线程池运行的影响
            Thread.sleep(10000);
            dynamicThreadPool.print();
            
            // 关闭线程池
            dynamicThreadPool.shutdown();
        }
    }
    

2.4 实现方案

2.4.1 基于 Nacos 配置中心动态调整线程池参数

  • 借助Nacos的Listener机制,在Spring Bean初始化时,开启对Nacos中线程池配置文件(如threadPool.yml的监听。当Nacos中的配置发生变更时,通过监听逻辑实时更新线程池的核心参数(无需重启服务);

    nacosConfigManager.getConfigService().addListener("threadPool.yml", nacosConfigProperties.getGroup(),
                    new Listener() {
                        @Override
                        public Executor getExecutor() {
                            return null;
                        }
                        @Override
                        public void receiveConfigInfo(String configInfo) {
                            
                        }
                    });
    
    • nacosConfigManager.getConfigService():获取Nacos的配置服务实例,用于与Nacos服务器交互(获取配置、监听配置等);

    • .addListener(...):为指定配置添加监听器,当配置发生变更时触发回调逻辑;

      • 第一个参数"threadPool.yml":要监听的配置文件名(Nacos中存储的配置标识);
      • 第二个参数nacosConfigProperties.getGroup():配置所属的分组(Nacos中配置的组织方式,默认分组为DEFAULT_GROUP);
      • 第三个参数new Listener(){...}:匿名内部类实现的监听器,包含配置变更时的处理逻辑;
    • Listener接口有两个核心方法,用于定义配置变更的处理规则:

      • getExecutor()方法

        • 返回值:Executor线程池实例(可为null);
        • 作用:指定执行receiveConfigInfo回调方法的线程池;
          • 若返回null:默认使用Nacos客户端内部的线程池执行回调;
          • 若返回自定义线程池:则在该线程池中执行回调,避免回调逻辑阻塞Nacos内部线程;
      • receiveConfigInfo(String configInfo)方法

        • 参数configInfo:变更后的配置内容(字符串形式,如threadPool.yml的最新文本内容);
        • 作用:配置变更时的核心处理逻辑。在实际业务中,这里会编写解析configInfo的代码(如将字符串转为YAML/JSON对象),然后提取新的线程池参数(如corePoolSizemaxPoolSize),最后调用线程池的setter方法完成动态更新;
  • 示例代码:

    @Configuration
    @Data
    public class MyDynamicThreadPool implements InitializingBean {
    
        // 通过@Value注解,从配置中注入线程池的初始参数
        @Value("${threadPool.corePoolSize}")
        private int corePoolSize; // 核心线程数
        @Value("${threadPool.maxPoolSize}")
        private int maxPoolSize; // 最大线程数
        @Value("${threadPool.queueCapacity}")
        private int queueCapacity; // 队列容量
        @Value("${threadPool.keepAliveSeconds}")
        private int keepAliveSeconds; // 线程存活时间
    
        private static ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
        @Autowired
        private NacosConfigManager nacosConfigManager;
    
        @Autowired
        private NacosConfigProperties nacosConfigProperties;
    
        // 实现InitializingBean接口,在 Bean 初始化时:
        @Override
        public void afterPropertiesSet() throws Exception {
            // 创建ThreadPoolTaskExecutor实例
            threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            // 配置初始线程池参数
            threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
            threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
            threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
            threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
            // 设置线程名前缀
            threadPoolTaskExecutor.setThreadNamePrefix( "SHISAN--");
            // 设置拒绝策略
            threadPoolTaskExecutor.setRejectedExecutionHandler(
                    new RejectedExecutionHandler() {
                          @Override
                          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                 System.out.println("队列已满,丢弃任务");
                           }
                    });
            threadPoolTaskExecutor.initialize();
            
            // 监听 Nacos 中threadPool.yml配置的变更
            nacosConfigManager.getConfigService().addListener("threadPool.yml", nacosConfigProperties.getGroup(),
                    new Listener() {
                        
                        @Override
                        public Executor getExecutor() {
                            return null;
                        }
                        
                        // 当配置变更时,receiveConfigInfo方法会被触发
                        @Override
                        public void receiveConfigInfo(String configInfo) {
                            System.out.println("动态修改前-->");
                            print();
                            // 解析新的配置内容(通过 YAML 解析工具将配置字符串转为Map)
                            Yaml yaml = new Yaml();
                            InputStream inputStream = new ByteArrayInputStream(configInfo.getBytes());
                            Map<String, Object> dataMap = yaml.load(inputStream);
                            JSONObject pool =  new JSONObject(dataMap).getJSONObject("threadPool");
                            // 调用ThreadPoolTaskExecutor的setter方法,动态更新线程池参数
                            threadPoolTaskExecutor.setCorePoolSize(pool.getInteger("corePoolSize"));
                            threadPoolTaskExecutor.setMaxPoolSize(pool.getInteger("maxPoolSize"));
                            threadPoolTaskExecutor.setQueueCapacity(pool.getInteger("keepAliveSeconds"));
                            threadPoolTaskExecutor.setQueueCapacity(pool.getInteger("queueCapacity"));
                            System.out.println("动态修改后-->");
                            print();
                        }
                    });
        }
        
        // 提交任务到线程池执行
        public void execute(Runnable runnable){
            threadPoolTaskExecutor.execute(runnable);
        }
    
        // 打印线程池当前状态(核心线程数、最大线程数、阻塞队列已用 / 总容量、活跃线程数),用于观察参数调整效果
        public void print(){
            System.out.println("核心线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getCorePoolSize()
                    + " " +"最大线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getMaximumPoolSize()
                    +" " + "阻塞队列数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size()
                    + "/" + queueCapacity
                    +" " + "活跃线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount());
        }
    }
    

2.4.2 使用DynamicTp——基于配置中心的轻量级动态可监控线程池

  • DynamicTp 是基于配置中心实现的轻量级动态线程池管理工具,核心功能涵盖:

    • 动态调参:支持不重启服务,动态调整线程池参数(如核心线程数、最大线程数等);

    • 通知报警:当线程池运行状态异常(如队列积压、线程活跃度过高等)时,通过企微、钉钉、 Lark、邮件等渠道发送告警;

    • 运行监控:对线程池的运行状态(如活跃线程数、队列长度等)进行监控,结合监控系统(Prometheus、InfluxDB + Grafana 等)可视化展示;

    • 三方包线程池管理:能管理第三方中间件(如 Tomcat、Dubbo、RocketMQ 等)的线程池;

  • 官网:首页 | dynamictp

  • 架构分为三大核心模块,相互协作实现线程池的动态化、可监控管理:

    • 配置中心:支持多种主流配置中心(Nacos、Apollo、Zookeeper、Consul 等),用于存储线程池的配置(如创建/删除线程池配置、修改参数、告警相关配置等),是“动态调参”的配置来源;

    • SpringBoot 服务:承载业务自定义线程池(如 DtpExecutor、OrderedDtpExecutor 等)和第三方中间件线程池(如 Tomcat、Dubbo 等的线程池),并基于配置中心的配置,实现任务增强(对任务执行的扩展)、运行监控(采集线程池运行指标)、通知告警(异常时触发告警)、动态调参(应用配置中心的参数变更);

    • 监控模块:通过指标采集(如 JsonLog、MicroMeter、Endpoint 等方式),将线程池运行指标对接监控系统(Prometheus、InfluxDB、Grafana 等),实现线程池状态的可视化监控;

    在这里插入图片描述

  • 核心配置:

    spring:
      dynamic:
        tp:
          enabled: true                            # 是否启用 dynamictp,默认true
          executors:                               # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
            - threadPoolName: dtpExecutor1         # 线程池名称,必填
              threadPoolAliasName: 测试线程池        # 线程池别名,可选
              executorType: common                 # 线程池类型 common、eager、ordered、scheduled、priority,默认 common
              corePoolSize: 5                      # 核心线程数,默认1
              maximumPoolSize: 8                   # 最大线程数,默认cpu核数
              queueCapacity: 2000                  # 队列容量,默认1024
              queueType: VariableLinkedBlockingQueue      # 任务队列,查看源码QueueTypeEnum枚举类,默认VariableLinkedBlockingQueue
              rejectedHandlerType: CallerRunsPolicy       # 拒绝策略,查看RejectedTypeEnum枚举类,默认AbortPolicy
              keepAliveTime: 10                           # 空闲线程等待超时时间,默认60
              threadNamePrefix: Fox                       # 线程名前缀,默认dtp
              allowCoreThreadTimeOut: false               # 是否允许核心线程池超时,默认false
              waitForTasksToCompleteOnShutdown: true      # 参考spring线程池设计,优雅关闭线程池,默认true
              awaitTerminationSeconds: 5                  # 优雅关闭线程池时,阻塞等待线程池中任务执行时间,默认3,单位(s)
              preStartAllCoreThreads: false               # 是否预热所有核心线程,默认false