概述
有一批企业要处理,线程池10个线程并发处理企业。同时每个企业要异步请求6个接口。
用两个线程池,一个处理企业,一个处理每个企业的异步请求。
代码
异步线程池
package com.zou.metabox.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class AsyncConfig {
private static final int CORE_POOL_SIZE = 60;
private static final int MAX_POOL_SIZE = 100;
private static final int QUEUE_CAPACITY = 100;
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(QUEUE_CAPACITY);
executor.setThreadNamePrefix("asyncExecutor-");
executor.initialize();
return executor;
}
}
测试的controller接口
@GetMapping("/start")
public String startProcessing(@RequestParam int size) {
return "Processing spend " + enterpriseService.processEnterprises(size);
}
核心逻辑
并发处理企业,以及异步请求6个接口
package com.zou.metabox.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.*;
@Slf4j
@Service
public class EnterpriseService {
@Resource
private EnterpriseApiService enterpriseApiService;
// 使用懒加载方式创建线程池,避免在shutdown后无法重新使用
private volatile ExecutorService executorService;
private ExecutorService getExecutorService() {
if (executorService == null || executorService.isShutdown()) {
synchronized (this) {
if (executorService == null || executorService.isShutdown()) {
executorService = Executors.newFixedThreadPool(10);
}
}
}
return executorService;
}
public String processEnterprises(int size) {
long batchStartTime = System.currentTimeMillis(); // 记录批次开始时间
// 使用线程安全的队列
ConcurrentLinkedQueue<Map<String, String>> list1 = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Map<String, String>> list2 = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Map<String, String>> list3 = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Map<String, String>> list4 = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Map<String, String>> list5 = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Map<String, String>> list6 = new ConcurrentLinkedQueue<>();
CountDownLatch countDownLatch = new CountDownLatch(size);
ExecutorService executor = getExecutorService(); // 获取线程池实例
for (int i = 1; i <= size; i++) {
final int enterpriseId = i;
executor.submit(() -> {
long enterpriseStartTime = System.currentTimeMillis(); // 记录单个企业开始时间
try {
CompletableFuture<Map<String, String>> future1 = enterpriseApiService.callApi1(enterpriseId);
CompletableFuture<Map<String, String>> future2 = enterpriseApiService.callApi2(enterpriseId);
CompletableFuture<Map<String, String>> future3 = enterpriseApiService.callApi3(enterpriseId);
CompletableFuture<Map<String, String>> future4 = enterpriseApiService.callApi4(enterpriseId);
CompletableFuture<Map<String, String>> future5 = enterpriseApiService.callApi5(enterpriseId);
CompletableFuture<Map<String, String>> future6 = enterpriseApiService.callApi6(enterpriseId);
// 并发调用6个API,等待所有的api调用完成
CompletableFuture.allOf(future1, future2, future3, future4, future5, future6).join();
// 在所有异步请求完成后,再写入列表
list1.add(future1.join());
list2.add(future2.join());
list3.add(future3.join());
list4.add(future4.join());
list5.add(future5.join());
list6.add(future6.join());
long enterpriseEndTime = System.currentTimeMillis();
log.info("Finished processing enterprise: {}, Time taken: {}ms", enterpriseId, (enterpriseEndTime - enterpriseStartTime));
} catch (InterruptedException e) {
log.error("出错:{}", e.getMessage(), e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("出错:{}", e.getMessage(), e);
}
// log打印每个列表的数量
log.info("list1 size: {}", list1.size());
log.info("list2 size: {}", list2.size());
log.info("list3 size: {}", list3.size());
log.info("list4 size: {}", list4.size());
log.info("list5 size: {}", list5.size());
log.info("list6 size: {}", list6.size());
long batchEndTime = System.currentTimeMillis();
long time = batchEndTime - batchStartTime;
log.info("Batch processing completed. Total time: {} ms", time);
return time/1000 + " s";
}
}
异步接口
package com.zou.metabox.service;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Service
public class EnterpriseApiService {
@Async("asyncExecutor")
public CompletableFuture<Map<String, String>> callApi1(int enterpriseId) throws InterruptedException {
// 模拟API调用
Thread.sleep(1000); // 假设每个API需要1秒响应时间
Map<String, String> map = new HashMap<>();
map.put("function name", enterpriseId + " callApi1");
return CompletableFuture.completedFuture(map);
}
@Async("asyncExecutor")
public CompletableFuture<Map<String, String>> callApi2(int enterpriseId) throws InterruptedException {
// 模拟API调用
Thread.sleep(1000); // 假设每个API需要1秒响应时间
Map<String, String> map = new HashMap<>();
map.put("function name", enterpriseId + " callApi2");
return CompletableFuture.completedFuture(map);
}
@Async("asyncExecutor")
public CompletableFuture<Map<String, String>> callApi3(int enterpriseId) throws InterruptedException {
// 模拟API调用
Thread.sleep(1000); // 假设每个API需要1秒响应时间
Map<String, String> map = new HashMap<>();
map.put("function name", enterpriseId + " callApi3");
return CompletableFuture.completedFuture(map);
}
@Async("asyncExecutor")
public CompletableFuture<Map<String, String>> callApi4(int enterpriseId) throws InterruptedException {
// 模拟API调用
Thread.sleep(1000); // 假设每个API需要1秒响应时间
Map<String, String> map = new HashMap<>();
map.put("function name", enterpriseId + " callApi4");
return CompletableFuture.completedFuture(map);
}
@Async("asyncExecutor")
public CompletableFuture<Map<String, String>> callApi5(int enterpriseId) throws InterruptedException {
// 模拟API调用
Thread.sleep(1000); // 假设每个API需要1秒响应时间
Map<String, String> map = new HashMap<>();
map.put("function name", enterpriseId + " callApi5");
return CompletableFuture.completedFuture(map);
}
@Async("asyncExecutor")
public CompletableFuture<Map<String, String>> callApi6(int enterpriseId) throws InterruptedException {
// 模拟API调用
Thread.sleep(1000); // 假设每个API需要1秒响应时间
Map<String, String> map = new HashMap<>();
map.put("function name", enterpriseId + " callApi6");
return CompletableFuture.completedFuture(map);
}
}
测试接口
传入参数10
传入参数100