一、Java中实现多线程有几种方法?
1.继承Thread
2.实现Runnable
3.实现Callable(配合FutureTask)
4.线程池创建
/** * @author 寇申海 */ public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //方式一 MyThread m1 = new MyThread(); m1.start(); //方式二 MyRunnable m2 = new MyRunnable(); Thread t1 = new Thread(m2); t1.start(); //方式三 MyCallable m3 = new MyCallable(); FutureTask<String> ft = new FutureTask<>(m3); Thread t2 = new Thread(ft); t2.start(); System.out.println(ft.get()); } static class MyThread extends Thread { @Override public void run() { System.out.println("继承Thread"); } } static class MyRunnable implements Runnable { @Override public void run() { System.out.println("实现Runnable"); } } static class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "实现Callable"; } } } /** * 继承Thread * 实现Runnable * 实现Callable */
二、线程池
线程池好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
this逃逸
this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor结构
创建线程池有几种方式?
线程池创建方式总共有7种,总体来说分为2类(6种通过Executors创建,1种通过ThreadPoolExecutor创建)
1.通过 ThreadPoolExecutor 创建线程池
2.通过 Executors 创建线程池
1.Executors.newFixedThreadPool(固定数量的线程池):创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待
/** * @author 寇申海 */ public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); //方式一 Future<String> submit = executorService.submit(() -> Thread.currentThread().getName()); System.out.println(submit.get()); //方式二 executorService.execute(() -> System.out.println(Thread.currentThread().getName())); executorService.shutdown(); /** * execute与submit区别 * 1.execute和submit都属于线程池的方法。execute只能提交Runnable类型的任务,submit既能提交Runnable类型任务也能提交Callable类型任务 * 2.execute没有返回值,submit有返回值 */ } } /** * pool-1-thread-1 * pool-1-thread-2 */
/** * @author 寇申海 * ⾃定义线程池名称或优先级 */ public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建线程工厂 ThreadFactory threadFactory = r -> { //注意:要把任务Runnable设置给新创建的线程 Thread thread = new Thread(r); //设置线程命名规则 thread.setName("线程-" + r.hashCode()); //设置线程优先级 thread.setPriority(Thread.MAX_PRIORITY); return thread; }; //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(2,threadFactory); Future<Integer> submit = executorService.submit(() -> { int num = new Random().nextInt(10); System.out.println("线程优先级:" + Thread.currentThread().getPriority()); System.out.println("随机数:" + num); return num; }); System.out.println("返回结果:" + submit.get()); executorService.shutdown(); } } /** * 线程优先级:10 * 随机数:2 * 返回结果:2 */
2.Executors.newCachedThreadPool(带缓存的线程池):创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程
/** * @author 寇申海 */ public class Test { public static void main(String[] args) { //创建线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 3; i++) { int finalI = i; executorService.submit(() -> System.out.println("线程名称:" + Thread.currentThread().getName() + ",i = " + finalI)); } executorService.shutdown(); /** * 优点:根据任务数量创建线程池,并且一定时间内可以重复使用这些线程,产生相应的线程池 * 缺点:适用短时间有大量任务的场景,它的缺点是可能会占用很多的资源 */ } } /** * 线程名称:pool-1-thread-1,i = 0 * 线程名称:pool-1-thread-2,i = 1 * 线程名称:pool-1-thread-3,i = 2 */
3.Executors.newSingleThreadExecutor(单线程的线程池):创建单个线程数的线程池,它可以保证先进先出的执行顺序
/** * @author 寇申海 */ public class Test { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { executorService.submit(() -> System.out.println("线程名称:" + Thread.currentThread().getName())); } executorService.shutdown(); /** * 单线程的线程池意义 * 1.服用线程 * 2.单线程的线程池提供任务队列和拒绝策略(当任务队列满了之后(Integer.MAX_VALUE),新来的任务就会拒绝策略) */ } } /** * 线程名称:pool-1-thread-1 * 线程名称:pool-1-thread-1 * 线程名称:pool-1-thread-1 * 线程名称:pool-1-thread-1 * 线程名称:pool-1-thread-1 */
4.Executors.newScheduledThreadPool(执行定时任务的线程池):创建一个可以执行延迟任务的线程池
/** * @author 寇申海 * 延迟执行(一次) */ public class Test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); System.out.println("添加任务的时间:" + LocalDateTime.now()); scheduledExecutorService.schedule(() -> System.out.println("执行子任务:" + LocalDateTime.now()),3, TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } } /** * 添加任务的时间:2022-10-18T18:09:08.895300600 * 执行子任务:2022-10-18T18:09:11.899567 */
/** * @author 寇申海 * 固定频率执行 */ public class Test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); System.out.println("添加任务时间:" + LocalDateTime.now()); //第一次2秒后执行,定时任务每隔4秒执行一次 scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("执行子任务:" + LocalDateTime.now()), 2,4, TimeUnit.SECONDS); } } /** * 添加任务时间:2022-10-19T06:26:02.416476300 * 执行子任务:2022-10-19T06:26:04.424423 * 执行子任务:2022-10-19T06:26:08.425506900 * 执行子任务:2022-10-19T06:26:12.423395600 */
/** * @author 寇申海 * 固定频率执行 */ public class Test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); System.out.println("添加任务时间:" + LocalDateTime.now()); //第一次2秒后执行,定时任务每隔5秒执行一次 //哪个值以哪个值作为定时任务的执行周期 scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println("执行子任务时间:" + LocalDateTime.now()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }, 2, 4, TimeUnit.SECONDS); } } /** * 添加任务时间:2022-10-19T06:34:09.394032400 * 执行子任务时间:2022-10-19T06:34:11.405311900 * 执行子任务时间:2022-10-19T06:34:16.409683500 * 执行子任务时间:2022-10-19T06:34:21.417926800 */
/** * @author 寇申海 * 固定频率执行 */ public class Test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); System.out.println("添加任务时间:" + LocalDateTime.now()); scheduledExecutorService.scheduleWithFixedDelay(() -> { System.out.println(Thread.currentThread().getName() + "执行子任务时间:" + LocalDateTime.now()); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } },2,4, TimeUnit.SECONDS); } } /** * 添加任务时间:2022-10-19T06:44:36.281818700 * pool-1-thread-1执行子任务时间:2022-10-19T06:44:38.293835500 * pool-1-thread-1执行子任务时间:2022-10-19T06:44:43.322202300 * pool-1-thread-2执行子任务时间:2022-10-19T06:44:48.340703400 */
1.scheduleAtFixedRate:以上一次任务的开始时间,作为下次定时任务的参考时间
2.scheduleWithFixedDelay:以上一次任务的结束时间,作为下次定时任务的参考时间
5.Executors.newSingleThreadScheduledExecutor(定时任务单线程的线程池):创建一个单线程的可以执行延迟任务的线程池
/** * @author 寇申海 */ public class Test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); System.out.println("添加任务时间:" + LocalDateTime.now()); scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("执行子任务时间:" + LocalDateTime.now()),2,4, TimeUnit.SECONDS); } } /** * 添加任务时间:2022-10-19T06:53:17.957962800 * 执行子任务时间:2022-10-19T06:53:19.976263 * 执行子任务时间:2022-10-19T06:53:23.973588 */
6.Executors.newWorkStealingPool(根据当前CPU生成线程池):创建一个抢占式执行的线程池(任务执行顺序不确定)【jdk1.8 添加】
/** * @author 寇申海 */ public class Test { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newWorkStealingPool(); CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { int finalI = i; executorService.submit(() -> { System.out.println("线程名称:" + Thread.currentThread().getName() + ",i = " + finalI); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); } } /** * 线程名称:ForkJoinPool-1-worker-5,i = 4 * 线程名称:ForkJoinPool-1-worker-1,i = 0 * 线程名称:ForkJoinPool-1-worker-2,i = 1 * 线程名称:ForkJoinPool-1-worker-4,i = 3 * 线程名称:ForkJoinPool-1-worker-3,i = 2 */
7.ThreadPoolExecutor:最原始的创建线程池的方式,它包含了7个参数可供设置
/** * @author 寇申海 * 注意:使用Executors直接创建线程池,允许的请求或创建队列长度为Integer.MAX_VALUE,可能会堆积大量请求,导致OOM,通过ThreadPoolExecutor方式来明确线程池的运行规则,规避资源耗尽的风险 * ThreadPoolExecutor继承AbstractExecutorService * AbstractExecutorService实现ExecutorService接口 * 建议将线程池对象设置为静态变量或单例,业务每接收一个请求,业务代码就会创建一个线程池,并且用完之后没有shutdown,线程一直存活,线程对象和线程池对象没有被GC掉,导致系统出现OOM */ public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, //线程池的核心线程数(偶数),即使这些线程是空闲的也不会被销毁,除非设置 allowCoreThreadTimeOut(正常情况下创建工作的线程数,是一种常驻线程,除非设置allowCoreThreadTimeOut,默认为false,核心线程在空闲时间也会保持活动状态,设为true,核心线程将使用keepAliveTime时间超时等待后被销毁) //当提交一个任务时,如果线程池的线程数小于corePoolSize时,则会创建一个线程来提交任务 int maximumPoolSize, //线程池的最大线程数(此值>=1) long keepAliveTime, //线程数超过核心线程数时,将在终止前等待新任务的最长时间(多余的空闲线程存活时间,超过核心线程数的多余空闲线程,当空闲时间达到keepAliveTime时,多余空闲线程被销毁直接只剩下corePoolSize个线程为止) TimeUnit unit, //keepAliveTime时间单位,天、小时、分钟、秒 BlockingQueue<Runnable> workQueue, //任务执行前,保存任务的阻塞队列,此队列仅保存由execute方法提交的Runnable任务(被提交但尚未提交的任务) ThreadFactory threadFactory, //创建新线程使用的工厂 RejectedExecutionHandler handler //到达线程边界和队列容量而采取的拒绝策略(队列满了并且工作线程数大于等于线程池的最大线程数采取何种拒绝执行runnable策略) ) { } }
public static final ThreadPoolExecutor executorService = new ThreadPoolExecutor( 8, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(512), ThreadUtil.newNamedThreadFactory("laokou-oss-service",true), new ThreadPoolExecutor.CallerRunsPolicy() );
线程池各个参数相互关系
BlockingQueue详解
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
SynchronousQueue:不存储任何元素的阻塞队列,即单个元素的队列,任何一次插入操作都要等待相应的删除/读取操作
LinkedBlockingQueue:由链表结构组成无界阻塞队列(默认值为Integer.MAX_VALUE)
DelayedWorkQueue:使用优先级队列实现的延迟无界阻塞队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
RejectedExecutionHandler详解(4种拒绝策略,都是RejectedExecutionHandler内部类,当队列满了并且线程数大于等于最大线程数所采取的拒绝策略)
AbortPolicy:线程池队列满了,丢掉这个任务并且抛出RejectedExecutionException异常
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() {} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy:线程池队列满了,直接丢掉这个任务,什么也不做
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() {} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy:线程池队列满了,丢弃最旧未处理的请求,然后重试execute,除非执行器关闭,在这种情况下,该任务将被丢弃
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() {} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
CallerRunsPolicy:线程池队列满了,直接在execute方法调用线程中运行被拒绝任务(调用执行自己的线程运行任务),除非执行器关闭,在这种情况下,该任务被丢弃(因此这种策略会降低新任务提交速度,影响程序的整体性能,如果您的应用程序可以承受延迟并且你要求任何一个任务都要被执行,可以选择这个策略)
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() {} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
线程池源码解析
// 存放线程池的运行状态(runState)和线程池有效线程的数量(workCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & COUNT_MASK; } //任务队列 private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常 if (command == null) throw new NullPointerException(); // ctl中保存的线程池当前的一些状态信息 int c = ctl.get(); // 下面涉及到3步操作 // 1.首先判断当前线程池中运行的任务数量是否小于corePoolSize // 如果小于的话,通过addWorker(command,true)新建一个线程,并将任务(command)添加到该线程中,然后,启动该线程从而执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前之行的任务数量大于等于 corePoolSize的时候会走这里 // 通过isRunning方法判断线程池状态,线程池处于RUNNING 状态并且队列可以加入任务,该任务才会被加入 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次获取线程池状态,如果线程池状态不是RUNNING状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果当前线程池为空创建一个线程并执行 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 通过addWorker(command,false)新建一个线程,并将任务(command)添加到线程中,然后,启动该线程从而执行任务 // 如果addWorker(command,false)执行失败,则通过reject()执行相应的拒绝策略的内容 else if (!addWorker(command, false)) reject(command); }
addWorker 这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则的话返回false
// 全局锁,并发操作必备 private final ReentrantLock mainLock = new ReentrantLock(); // 跟踪线程池的最大大小 private int largestPoolSize; // 工作线程集合,存放线程池中的所有的(活跃的)工作线程,只有在持有全局锁mainLock前提下才能访问该集合 private final HashSet<Worker> workers = new HashSet<>(); // 获取线程池状态 private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 判断线程池的状态是否为Running private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * 添加新的工作线程到线程池 * @param firstTask 要执行 * @param core 参数为true,表示线程池基本大小,为false使用线程池最大大小 * @return 添加成功返回true否则返回false */ private boolean addWorker(Runnable firstTask, boolean core) { retry: // 这两句用来获取线程池的状态 for (int c = ctl.get();;) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { // 获取线程池中工作的线程数 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 原子操作将workCount数量加1 if (compareAndIncrementWorkerCount(c)) break retry; // 如果线程状态改变了就再次执行上述操作 c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); // 是否是Running,线程状态是存活的话,就会将工作线程添加到工作线程集合中 // 如果线程状态小于STOP,也就是RUNNING或SHUTDOWN状态下,同时传入任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker // firstTask == null 证明只新建线程而不执行任务 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); workers.add(w); // 工作线程是否启动成功 workerAdded = true; // 更新当前工作线程最大容量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { // 释放锁 mainLock.unlock(); } // 如果工作线程添加成功,则调用worker内部线程实例t的Thread#start()方法启动真实线程实例 if (workerAdded) { t.start(); // 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程中移除对应worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
ThreadPoolExecutor执行execute方法提交任务时,线程池处理流程
1.判断线程池的线程数量是否已经达到核心线程数(corePoolSize),如果否,则创建一个新的线程(核心)来执行任务,且任务结束后该线程保留在线程池种,不做销毁处理,如果是,则进入下个流程
2.判断阻塞队列(workQueue)是否已满,如果否,则添加任务到阻塞队列,等待执行;如果是,则进入下个流程
3.判断线程池线程数是否达到最大线程(maxumunPoolSize),如果否,创建一个线程(非核心)来执行任务;如果是,则执行对应的拒绝策略
注意:线程池的线程数量超过corePoolSize时,每当有线程的空闲时间超过keepAliveTime,线程会被终止,直到线程池的数量不大于corePoolSize为止
ThreadPoolExecutor的BlockingQueue与Thread执行流程
1.新创建一个线程时,会让这个线程执行当前任务
2.线程执行完后,会一直循环从阻塞队列BlockingQueue中获取任务来执行,当队列为空时,线程被阻塞,当有新任务添加到阻塞队列时,阻塞的线程会被唤醒继续执行任务
IO密集型任务:文件读写、DB读写、网络请求
CPU密集型任务:计算型代码、Gson转换
IO密集型最佳线程数 = CPU核数*[(IO耗时 / CPU耗时)+ 1] ≈ CPU核数*2
CPU密集型最佳线程数 = CPU核数 + 1
三、ThreadLocal详解
多线程访问同一个共享变量很容易出现并发问题,特别是当多个线程对同一个共享变量进行写入操作时。一般为了避免这种情况,我们会使用 synchronized 或 Lock 关键字对代码块加锁,但这种方式,一是会让没获取到锁的线程进行阻塞等待,二是需要使用者对锁有一定的了解,无疑提高了编程难度。TheadLocal可以做这件事,虽然ThreadLocal并不是为了解决这个问题而出现的。
ThreadLocal提供线程本地的实例,它与普通变量的区别在于,每个使用该变量的线程都会初始化一个完全独立的本地实例副本(线程隔离,避免并发场景下的线程安全问题),ThreadLocal变量通常被 private static 修饰,当一个线程结束时,它所使用的所有ThreadLocal相对的实例副本都会被回收
ThreadLocal非常适用于这样的场景:每个线程需要自己独立的实例且该实例需要在多个方法中使用
/** * @author 寇申海 */ public class Test { public static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { executorService.execute(() -> { try { System.out.println(simpleDateFormat.parse("2022-01-01 01:00:00")); } catch (ParseException e) { throw new RuntimeException(e); } }); } executorService.shutdown(); } } /** * Exception in thread "pool-1-thread-2" Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-3" java.lang.NumberFormatException: empty String * at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) * at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110) * at java.base/java.lang.Double.parseDouble(Double.java:651) * at java.base/java.text.DigitList.getDouble(DigitList.java:169) * at java.base/java.text.DecimalFormat.parse(DecimalFormat.java:2202) * at java.base/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1937) * at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1545) * at java.base/java.text.DateFormat.parse(DateFormat.java:397) * at org.demo.juc.Test11.lambda$main$0(Test11.java:16) * at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) * at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) * at java.base/java.lang.Thread.run(Thread.java:833) */
/** * @author 寇申海 */ public class Test { public static ThreadLocal<DateFormat> tl = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { executorService.execute(() -> { try { System.out.println(tl.get().parse("2022-01-01 01:00:00") + ",线程名称:" + Thread.currentThread().getName()); } catch (ParseException e) { throw new RuntimeException(e); } }); } executorService.shutdown(); } } /** *Sat Jan 01 01:00:00 CST 2022,线程名称:pool-1-thread-2 *Sat Jan 01 01:00:00 CST 2022,线程名称:pool-1-thread-3 *Sat Jan 01 01:00:00 CST 2022,线程名称:pool-1-thread-1 */
1.Thread类中,有个ThreadLocal.ThreadLocalMap的成员变量
2.ThreadLocalMap内部维护Entry数组,每个Entry代表一个完整的对象,key是ThreadLocal本身,value是ThreadLocal的泛型对象值
static class ThreadLocalMap { static class Entry extends WeakReference<ThreadLocal<?>> { Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } //Entry数组 private Entry[] table; // ThreadLocalMap构造器,ThreadLocal作为key ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } ThreadLocalMap getMap(Thread t) { //返回Thread对象的ThreadLocalMap return t.threadLocals; } }
public class ThreadLocal<T> { public void set(T value) { //获取当前线程t Thread t = Thread.currentThread(); //根据当前线程获取ThreadLocalMap ThreadLocalMap map = getMap(t); //如果获取ThreadLocalMap对象为空 if (map != null) { //按K,V设置到ThreadLocalMap中 map.set(this, value); } else { //创建一个性的ThreadLocalMap createMap(t, value); } } //调用ThreadLocalMap构造函数 void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } public T get() { //获取当前线程 Thread t = Thread.currentThread(); //根据当前线程获取到ThreadLocalMap ThreadLocalMap map = getMap(t); //如果获取ThreadLocalMap对象不为空 if (map != null) { //由this(即ThreadLocal对象)得到对应的value,即ThreadLocal泛型值 ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } //初始化ThreadLocal成员变量的值 return setInitialValue(); } //初始化value的值 protected T initialValue() { return null; } private T setInitialValue() { //初始化value的值 T value = initialValue(); //获取当前线程 Thread t = Thread.currentThread(); //以当前线程为key,获取ThreadLocalMap成员变量,它是一个ThreadLocalMap ThreadLocalMap map = getMap(t); //如果获取ThreadLocalMap不为空 if (map != null) { //K,V设置到ThreadLocalMap中 map.set(this, value); } else { //实例化ThreadLocalMap成员变量 createMap(t, value); } //ThreadLocal的又一个扩展,该ThreadLocal关联的值在线程结束前会被特殊处理,处理方法取决与回调方法threadTerminated(T value) if (this instanceof TerminatingThreadLocal) { TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this); } return value; } public void remove() { //获取ThreadLocalMap对象 ThreadLocalMap m = getMap(Thread.currentThread()); //判断ThreadLocalMap对象不为空 if (m != null) { m.remove(this); } } }
1.Thread线程类有一个类型为ThreadLocal.ThreadLocalMap的实例变量threadLocals,即每个线程都有一个属于自己ThreadLocalMap
2.ThreadLocalMap内部维护着E你try数组,每个Entry代表一个完整的对象,key是ThreadLocal本身,value是ThreadLocal的泛型值
3.并发多线程场景下,每个线程Thread,在往ThreadLocal设置值的时候,都是往自己的ThreadLocalMap里面存,读也是以某个ThreadLocal作为引用,在自己的map里找到对应的key (int index = key.nextHashCode & (len - 1))(线性探测再散列的开放寻址法,解决哈希冲突的算法),从而可以实现线程隔离
线程Id不能直接作为ThreadLocalMap的key(使用thread id无法确定是哪个ThreadLocal)
ThreadLocal导致内存泄露
ThreadLocalMap使用ThreadLocal的弱引用作为key,当ThreadLocal变量被手动设置为null,即一个ThreadLocal没有外部强引用来引用它,当系统发生GC时,ThreadLocal一定会被回收,这样的话,ThreadLocal就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话(比如线程池的核心线程),这些key为null的Entry的value就会一直存在一条强引用链:Thread变量 -> Thread对象 -> ThreadLocalMap -> Entry -> value -> Object 永远无法回收,造成内存泄露
ThreadLocalMap设计中已经考虑到这种情况。所以也加上了一些防护措施:ThreadLocal的get、set、remove方法,都会清除线程ThreadLocalMap里所有key为null的value
private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.refersTo(key)) { e.value = value; return; } //如果key等于null,则说明索引之前放的key(threadLocal)被回收了,这通常是因为外部将threadLocal变量置为null //又因为entry对threadLocal持有的弱引用,一轮GC过后,对象被回收 //这种情况下,既然用户代码都已经将ThreadLocal置为null,那么也就没打算再通过该对象作为key去取到之前放threadLocalMap的value,因此ThreadLocalMap中会直接替换掉这种不新鲜entry if (e.refersTo(null)) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; //触发一次Log(N)复杂度的扫描,目的是清除过期entry if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.refersTo(key)) return e; else return getEntryAfterMiss(key, i, e); } private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { if (e.refersTo(key)) return e; //Entry的key为null,则表明没有外部引用,且被GC回收,是一个过期Entry if (e.refersTo(null)) //删除过期Entry expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; }
key是弱引用,GC回收不会影响ThreadLocal正常工作
ThreadLocal的key是弱引用,ThreadLocal变量引用着它,是不会被GC回收的,除非手动把ThreadLocal变量设置为null
弱引用:具有弱引用的对象拥有更短暂的生命周期,如果一个对象只有弱引用存在,则下次GC会回收掉该对象(不管当前内存空间足够与否)
/** * @author 寇申海 */ public class Test { public static void main(String[] args) { Object obj = new Object(); WeakReference<Object> wr = new WeakReference<>(obj); System.out.println("GC回收前,弱引用:" + wr.get()); //触发系统垃圾回收 System.gc(); System.out.println("GC回收之后,弱引用:" + wr.get()); //手动设置为object对象为null obj = null; System.gc(); System.out.println("对象object设置为null,GC回收之后,弱引用:" + wr.get()); } } /** * GC回收前,弱引用:java.lang.Object@404b9385 * GC回收之后,弱引用:java.lang.Object@404b9385 * 对象object设置为null,GC回收之后,弱引用:null */
/** * @author 寇申海 */ public class Test { private static ThreadLocal<Object> tl = new ThreadLocal<>(); public static void main(String[] args) throws InterruptedException { //创建线程工厂 ThreadFactory threadFactory = r -> { //注意:要把任务Runnable设置给新创建的线程 Thread thread = new Thread(r); //设置线程命名规则 thread.setName("线程-" + r.hashCode()); //设置线程优先级 thread.setPriority(Thread.MAX_PRIORITY); return thread; }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { threadPoolExecutor.execute(() -> { System.out.println("创建对象..."); Custom cus = new Custom(); tl.set(cus); //对象置为null,表示此对象不在使用了 cus = null; //tl.remove(); }); Thread.sleep(1000); } threadPoolExecutor.shutdown(); } static class Custom { private byte[] buffer = new byte[1024 * 1024 * 2000]; } } /** * 创建对象... * 创建对象... * Exception in thread "线程-1867083167" java.lang.OutOfMemoryError: Java heap space */
手动设置cus= null ,但还是会内存泄露,因为我们使用线程池池,线程池有很长的生命周期,线程池一直持有cus对象的value值,即使设置cus=null,引用还是存在(一个个object对象放在list的列表,单独设置为null,对象还是存在)
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.refersTo(key)) { //清除entry e.clear(); expungeStaleEntry(i); return; } } }
Entry的key设置为弱引用
强引用:平时new了一个对象就是强引用,比如Object obj = new Object();即使内存不足情况下,JVM宁愿抛出OutOfMemory错误也不会被回收
软引用:一个对象只具有软引用,内存空间足够,垃圾回收器就不会回收它,内存不够时,则会回收这些对象的内存
弱引用:具有弱引用对象拥有短暂的生命周期,如果一个对象只有弱引用存在,则下次GC将会回收掉该对象(不管当前内存空间足够与否)
虚引用:如果一个对象仅仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收,虚引用主要用来跟踪对象被垃圾回收器回收的活动
为了应对非常大或长时间的用户,哈希表使用弱引用的key
1.如果key使用强引用,当ThreadLocal要被回收时,ThreadLocalMap还持有ThreadLocal的强引用,没有手动删除,ThreadLocal就不会被回收,出现Entry出现内存泄露
2.如果key使用弱引用,当ThreadLocal被回收时,因为ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收,value则在下一次ThreadLocalMap调用set、get、remove时候会被清除
注意:内存泄露根本原因是不再使用Entry,没有从线程ThreadLocalMap中删除
删除不再使用的Entry有两种方式:
1.使用完ThreadLocal手动调用remove,把Entry从ThreadLocalMap删除
2.ThreadLocalMap的自动清除机制去清除过期的Entry(ThreadLocalMap set()、get()都会触发对过期Entry的清除)
InheritableThreadLocal保证父子线程间的共享数据
/** * @author 寇申海 */ public class Test { public static void main(String[] args) { ThreadLocal<String> threadLocal = new ThreadLocal<>(); InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>(); threadLocal.set("123"); inheritableThreadLocal.set("123"); new Thread(() -> { System.out.println("ThreadLocal value:" + threadLocal.get()); System.out.println("InheritableThreadLocal value:" + inheritableThreadLocal.get()); }).start(); } } /** * ThreadLocal value:null * InheritableThreadLocal value:123 */
子线程可以获取父线程的InheritableThreadLocal类型变量值,不能获取ThreadLocal(线程隔离)变量的值
应用场景
1.使用日期工具类,当用到SimpleDateFormat,使用ThreadLocal保证线程安全
2.保证同一个线程获取的数据库连接Connect是同一个,使用ThreadLocal解决线程安全问题
3.使用mdc保存日志
四、JMM(内存模型)详解
CPU缓存模型
CPU高速缓存是为了解决CPU处理速度和内存处理速度不对等的问题
内存可以看作外存的高速缓存
CPU Cache缓存的是内存数据用于解决CPU处理速度和内存不匹配的问题,内存缓存的是硬盘数据用于解决硬盘访问速度过慢的问题
CPU Cache工作方式:复制一份数据到CPU Cache中,当CPU需要用到的时候就可以直接从CPU Cache中读取数据,当运算完之后,再将运算得到的数据写回Main Memory中,但是,这样存在内存缓存不一致性的问题(执行i++操作,两个线程同时执行的话,假设两个线程从CPU Cache读取i = 1,两个线程做了1++之后再写入Main Memory之后 i = 2,正确结果应该是i = 3)
CPU为了解决内存缓存不一致性问题可以通过定制缓存一致性协议(MESI协议)或者其他手段来解决
指令重排序
指令重排序:系统在执行代码的时候并不一定按照你写的代码顺序依次执行
常见的指令重排序有2种情况:
1.编译器优化重排:编译器(包括JVM、JIT编译器等)在不改变单线程程序语义的前提下,重新安排语句的执行顺序
2.指令并行重排:现代处理器采用了指令并行技术来将多条指令重叠执行,如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序
Java源码会经历 编译器优化重排 -> 指令并行重排 -> 内存系统重排,最终变成操作系统可执行的指令序列
指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致
编译器和处理器的指令重排序的处理方式不一样,对于编译器,通过禁止特定类型的编译器重排序的方式来禁止重排序,对于处理器,通过插入内存屏障的方式来禁止特定类型的处理器重排序,指令并行重排和内存系统重排都属于处理器级别的指令重排序
JMM抽象线程与主内存的关系
1.线程之间的共享变量必须存储在主内存中
2.线程可以把变量保存到本地内存(机器寄存器)中,而不是直接在主内存中进行读写(这就可能造成一个线程在主内存修改一个变量的值,而另外一个线程还继续使用它在寄存器中变量值的拷贝,造成数据不一致)
主内存、本地内存
主内存:所有线程创建的实例都放在主内存中,不管该实例对象是成员变量还是方法中的本地变量(局部变量)
本地内存:每个线程都有一个私有的本地内存来存储共享变量的副本,并且,每个线程只能访问自己的本地内存,无法访问其他线程的本地内存,本地内存是JMM抽象出来的一个概念,存储了主内存中的共享变量的副本
从图上来看,线程1和线程2之间进行通信,必须经历下面2个步骤:
1.线程1把本地内存中修改过的共享变量副本的值同步到主内存中去
2.线程2到主内存中读取对应的共享变量的值
也就是说:JMM为共享变量提供了可见性的保证
Java内存模型定义8种同步操作
- 锁定(lock): 作用于主内存中的变量,将他标记为一个线程独享变量。
- 解锁(unlock): 作用于主内存中的变量,解除变量的锁定状态,被解除锁定状态的变量才能被其他线程锁定。
- read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用。
- load(载入):把 read 操作从主内存中得到的变量值放入工作内存的变量的副本中。
- use(使用):把工作内存中的一个变量的值传给执行引擎,每当虚拟机遇到一个使用到变量的指令时都会使用该指令。
- assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
- store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的 write 操作使用。
- write(写入):作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值放入主内存的变量中。
除了8种同步操作之外,还规定了如下
- 不允许一个线程无原因地(没有发生过任何 assign 操作)把数据从线程的工作内存同步回主内存中。
- 一个新的变量只能在主内存中 “诞生”,不允许在工作内存中直接使用一个未被初始化(load 或 assign)的变量,换句话说就是对一个变量实施 use 和 store 操作之前,必须先执行过了 assign 和 load 操作。
- 一个变量在同一个时刻只允许一条线程对其进行 lock 操作,但 lock 操作可以被同一条线程重复执行多次,多次执行 lock 后,只有执行相同次数的 unlock 操作,变量才会被解锁。
- 如果对一个变量执行 lock 操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行 load 或 assign 操作初始化变量的值。
- 如果一个变量事先没有被 lock 操作锁定,则不允许对它执行 unlock 操作,也不允许去 unlock 一个被其他线程锁定住的变量。
Java内存区域和JMM区别
JVM内存结构和Java虚拟机的运行时区相关,定义了JVM在运行时如何分区存储数据结构,就比如说堆主要用于存放对象实例
Java内存模型和Java并发编程相关,抽象了线程和主内存之间的关系,就比如说线程之间的共享变量就必须存储在主内存种,规定了从Java源代码到CPU可执行指令的这个转化过程要遵循哪些和并发相关的原则和规范,其主要目的是为了简化多线程编程,增加程序可移植性
happens-before原则
- 为了对编译器和处理器的约束尽可能少,只要不改变程序的执行结果(单线程程序和正确执行的多线程程序),编译器和处理器怎么进行重排序优化都行。
- 对于会改变程序执行结果的重排序,JMM 要求编译器和处理器必须禁止这种重排序。
happens-before原则定义
- 如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,并且第一个操作的执行顺序排在第二个操作之前。
- 两个操作之间存在 happens-before 关系,并不意味着 Java 平台的具体实现必须要按照 happens-before 关系指定的顺序来执行。如果重排序之后的执行结果,与按 happens-before 关系来执行的结果一致,那么 JMM 也允许这样的重排序。
happens-before 原则表达的意义其实并不是一个操作发生在另外一个操作的前面,虽然这从程序员的角度上来说也并无大碍。更准确地来说,它更想表达的意义是前一个操作的结果对于后一个操作是可见的,无论这两个操作是否在同一个线程里。
happens-before常见原则
- 程序顺序规则 :一个线程内,按照代码顺序,书写在前面的操作 happens-before 于书写在后面的操作;
- 解锁规则 :解锁 happens-before 于加锁;
- volatile 变量规则 :对一个 volatile 变量的写操作 happens-before 于后面对这个 volatile 变量的读操作。说白了就是对 volatile 变量的写操作的结果对于发生于其后的任何操作都是可见的。
- 传递规则 :如果 A happens-before B,且 B happens-before C,那么 A happens-before C;
- 线程启动规则 :Thread 对象的 start()方法 happens-before 于此线程的每一个动作。
如果两个操作不满足上述任意一个 happens-before 规则,那么这两个操作就没有顺序的保障,JVM 可以对这两个操作进行重排序
happens-before 和 JMM关系
并发编程三个重要特性
原子性
一次操作或者多次操作,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么都不执行。
在 Java 中,可以借助synchronized 、各种 Lock 以及各种原子类实现原子性。
synchronized 和各种 Lock 可以保证任一时刻只有一个线程访问该代码块,因此可以保障原子性。各种原子类是利用 CAS (compare and swap) 操作(可能也会用到 volatile或者final关键字)来保证原子操作。
可见性
当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。
在 Java 中,可以借助synchronized 、volatile 以及各种 Lock 实现可见性。
如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。
有序性
由于指令重排序问题,代码的执行顺序未必就是编写代码时候的顺序。
我们上面讲重排序的时候也提到过:
指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。
在 Java 中,volatile 关键字可以禁止指令进行重排序优化。
- Java 是最早尝试提供内存模型的语言,其主要目的是为了简化多线程编程,增强程序可移植性的。
- CPU 可以通过制定缓存一致协议(比如 MESI 协议open in new window)来解决内存缓存不一致性问题。
- 为了提升执行速度/性能,计算机在执行程序代码的时候,会对指令进行重排序。 简单来说就是系统在执行代码的时候并不一定是按照你写的代码的顺序依次执行。指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。
- 你可以把 JMM 看作是 Java 定义的并发编程相关的一组规范,除了抽象了线程和主内存之间的关系之外,其还规定了从 Java 源代码到 CPU 可执行指令的这个转化过程要遵守哪些和并发相关的原则和规范,其主要目的是为了简化多线程编程,增强程序可移植性的。
- JSR 133 引入了 happens-before 这个概念来描述两个操作之间的内存可见性。