ThreadLocal&线程池

参考:JavaGuide

ThreadLocal

Thread类里面有一个变量threadLocals,可以视为一个 HashMap,用来存放线程私有的内容。

1
2
3
/* ThreadLocal values pertaining to this thread. This map is maintained
    * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal类里面有一个instance 方法 set,实际上是把内容存在threadLocals里面。key是该ThreadLocal对象自己

1
2
3
4
5
6
7
8
9
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);//拿到这个线程所私有的threadLocals对象
    if (map != null) {
        map.set(this, value);
    } else {
        createMap(t, value);
    }
}

一个线程可以创建多个ThreadLocal对象,存放不同的值,但是这些值都是存在同一个ThreadLocalMap里面的。

内存泄露

ThreadLocalMap里面,对于key(ThreadLocal)是弱引用,对于 value(我们存储的值)是强引用。 如果一个线程持续存活,那么下一次垃圾回收时,会回收掉 key, 但不会回收掉 value。

避免内存泄露:使用完 ThreadLocal之后,调用它的instance方法remove().

线程安全

ThreadLocalMap是线程安全的,因为他只能被当前线程访问

线程池

创建线程池

  1. new ThreadPoolExecutor()
  2. Executors工具类。比如Executors.newFixedThreadPool(3);

核心参数

  1. corePoolSize
  2. maximumPoolSize
  3. workQueue
  4. keepAliveTime
  5. unit
  6. threadFactory
  7. handler

当有任务到来时,如果正在工作的线程数不到corePoolSize,则直接安排线程执行该任务;
如果核心线程已满,则将该任务放入workQueue。
如果workQueue已满,则此时可以同时工作的线程数量变成maximumPoolSize。
如果同时工作的线程已经达到maximumPoolSize,则采用拒绝策略去拒绝该任务。
当线程数量大于corePoolSize时,空闲线程最多可以存活keepAliveTime。

ThreadPoolExecutor底层原理

在新建一个ThreadPoolExecutor的时候,不会默认创建所有核心线程。每次有一个任务提交,就会创建一个线程,让它去执行任务。这样的目的是节省资源
当线程数量不足核心线程数时,即使有空闲线程,新任务来的时候,也会去创建一个新线程。

为什么一定要用阻塞队列?为了保活核心线程。当某个核心线程执行完当前任务,它从阻塞队列中获取新的任务,如果阻塞队列此时为空,则该线程阻塞在队列。

核心线程和非核心线程本质上没有区别,在销毁空闲非核心线程时,在现有线程中任意选择

怎么实现销毁空闲线程呢? 参考源码视频
现有线程数>核心线程数的时候,所有线程都去执行 workQueue.poll(keepAliveTime, unit),尝试拿到某个任务。如果阻塞队列中的任务数<现有线程数时,必然有某几个线程拿不到任务,拿不到任务的线程A,B标记为 timeout. 这几个线程再次执行循环尝试拿到任务,但由于已经被标记 timeout,它们执行compareAndDecrementWorkerCount(), 其中调用 CAS去减少workerCount,然后退出循环,该线程被销毁。

现有线程数=核心线程数时,它们去执行 workQueue.take(),也就是无限阻塞。

拒绝策略

  1. CallerRunsPolicy:调用execute()方法的线程去运行该任务。
  2. AbortPolicy: 拒绝新任务并抛出RejectedExecutionException异常。默认策略。
  3. DiscardPolicy: 直接丢弃新任务。
  4. DiscardOldestPolicy: 丢弃最早未处理任务。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {

                r.run();// 直接主线程执行,而不是线程池中的线程执行
            }
        }
    }

如果希望所有任务都被运行,就选择CallerRunsPolicy.

常见阻塞队列

  1. ArrayBlockingQueue: 数组实现,容量一旦初始化,就无法修改
  2. LinkedBlockingQueue: 链表实现。默认无界,支持有界。队头和队尾由两把锁负责,因此入队和出队互不影响。
  3. SynchronousQueue: 没有容量,不存储任务。
  4. DelayedWorkQueue: 里面的任务按照执行时间排序。如果任务满了,就自动扩容,增加原先容量的 50%。

内置线程池

不推荐使用内置线程池。

  1. FixedThreadPool:核心线程数=最大线程数,阻塞队列为LinkedBlockingQueue,支持动态扩容。也就是说阻塞队列的默认大小为 Integer.MAX_VALUE, 如果有太多任务到达,都会放进阻塞队列。这样可能堆积太多请求,导致 OOM。
  2. SingleThreadPool: 只有一个线程。阻塞队列实现与FixedThreadPool相同。
  3. CachedThreadPool: 阻塞队列为SynchronousQueue。当有任务到来时,用空闲线程去执行;如果没有空闲线程,则创建一个新线程去执行。最大线程数为Integer.MAX_VALUE。也可能导致 OOM。
  4. ScheduledThreadPool: 可以提交定时任务/周期性任务。最多只能创建核心线程数的线程,阻塞队列可以扩容至Integer.MAX_VALUE。

线程异常

  1. 如果是使用 execute(Runnable)提交任务,那么如果这个任务里面抛出了异常,并且没有在任务中被捕获,那么任务终止,异常信息打印在控制台。线程池会销毁这个线程,并且创建一个新的线程来保证核心线程数不变。
  2. 如果使用 submit(Runnable)提交任务,那么任务中抛出的异常会包装在submit返回的Future对象中。
1
2
3
4
5
6
7
8
try {
    // 获取submit()提交的任务的结果,捕获异常
    future.get();
} catch (ExecutionException e) {
    System.out.println("Caught exception from Task 2: " + e.getCause().getMessage());
} catch (InterruptedException e) {
    System.out.println("Task 2 was interrupted");
}

给线程池命名

  1. 使用ThreadPoolBuilder
1
2
3
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
  1. 自己实现ThreadFactory,override newThread()方法。

如何设置核心线程数

  1. CPU密集型任务:复杂的计算。
  2. I/O型任务:比如磁盘I/P,网络I/O。这类任务可以设置较多的线程,因为每个线程执行完任务后需要等待,这个时间切换至其它线程执行。

最佳线程数=CPU核心数(1+线程等待时间)/线程计算时间

可以使用visualVM查看某个方法执行的总时间,和它实际占用CPU的时间。两个时间的差值即为等待时间

线程池监控

1
2
3
executor.getActiveCount();      // 当前活跃线程数
executor.getQueue().size();     // 队列中等待的任务数
executor.getCompletedTaskCount(); // 已完成任务数

shutdown() 和 shutdownNow()

shutdown():平滑关闭,等待已提交任务执行完成。

shutdownNow():立即关闭,尝试中断正在执行的任务,并返回未执行的任务列表。

FutureTask

TODO

AQS

  1. private volatile int state;
  2. 同步队列,其实是一个双向链表,CLH 队列的变体。链表的每个节点是一个线程。节点先通过自旋获取锁,不行的话就阻塞

节点状态

  1. CANCELLED(1):表示节点被取消,线程超时/线程在等待过程中被中断。被取消的节点会从队列中被移除。
  2. SIGNAL(-1):当前节点的后继节点需要被唤醒。当前节点释放资源时,唤醒后续节点,这是AQS中最常见的状态
  3. CONDITION(-2):节点正在等待条件,被放入条件队列。

AQS-ReentrantLock

  1. state=0 表示没有线程持有该锁;state代表某个线程对于这个锁的重入次数。
  2. AQS 在此时体现为独占模式,只有一个线程能占有RreentrantLock.
  3. 没有获取锁的线程会进入阻塞队列CLH队列。公平锁是严格按照线程进入阻塞队列的顺序来获取锁。
  4. AQS 依赖 CAS操作来实现线程安全的状态更新队列操作。获取锁时,通过 CAS 尝试把 state 从 0 更新为 1,表示获取到锁。

acquire 方法

  1. 调用 tryAcquire去获取锁,如果失败,则创建一个 Node,里面放 Thread, waitStatus,和指向前后节点的指针。这个 Node 被加入到 CLH 队列中。
  2. 不断自旋检查前驱节点是不是头节点,是的话就再次尝试获取锁,成功就把自己设为头节点并返回。
  3. 要是获取不到或者前驱节点不是头节点,就会根据情况再次阻塞等待。

CountDownLatch