ThreadLocal & ThreadPool

Reference: JavaGuide

ThreadLocal

The Thread class has a variable threadLocals, which can be considered a HashMap used to store thread-private content.

1
2
3
/* ThreadLocal values pertaining to this thread. This map is maintained
    * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

The ThreadLocal class has an instance method set, which actually stores the content in threadLocals. The key is the ThreadLocal object itself.

1
2
3
4
5
6
7
8
9
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); // Get the thread's private threadLocals object
    if (map != null) {
        map.set(this, value);
    } else {
        createMap(t, value);
    }
}

A single thread can create multiple ThreadLocal objects to store different values, but all these values are stored in the same ThreadLocalMap.

Memory Leaks

In ThreadLocalMap, the key (ThreadLocal) is a weak reference, while the value (the value we store) is a strong reference. If a thread remains alive, the key will be garbage collected during the next GC cycle, but the value will not.

To avoid memory leaks, call the remove() instance method after using a ThreadLocal.

Thread Safety

ThreadLocalMap is thread-safe because it can only be accessed by the current thread.

Thread Pool

Creating a Thread Pool

  1. new ThreadPoolExecutor()
  2. The Executors utility class, e.g., Executors.newFixedThreadPool(3);

Core Parameters

  1. corePoolSize
  2. maximumPoolSize
  3. workQueue
  4. keepAliveTime
  5. unit
  6. threadFactory
  7. handler

When a task arrives, if the number of working threads is less than corePoolSize, a thread is immediately assigned to execute the task. If the core threads are all busy, the task is placed in the workQueue. If the workQueue is full, the number of concurrently working threads can increase up to maximumPoolSize. If the number of concurrently working threads has reached maximumPoolSize, the rejection policy is used to reject the task. When the number of threads is greater than corePoolSize, idle threads can survive for at most keepAliveTime.

ThreadPoolExecutor Internals

When a new ThreadPoolExecutor is created, it does not create all core threads by default. A new thread is created for each submitted task to execute it. This is done to save resources. When the number of threads is less than the core pool size, even if there are idle threads, a new thread will be created when a new task arrives.

Why use a blocking queue? To keep the core threads alive. When a core thread finishes its current task, it retrieves a new task from the blocking queue. If the blocking queue is empty, the thread blocks on the queue.

Core and non-core threads are not fundamentally different. When destroying idle non-core threads, any thread is arbitrarily chosen from the existing threads.

How are idle threads destroyed? Reference source code video When the number of existing threads > core pool size, all threads execute workQueue.poll(keepAliveTime, unit) to try to get a task. If the number of tasks in the blocking queue is less than the number of existing threads, some threads will inevitably fail to get a task. These threads (A, B) are marked as timeout. These threads loop again to try to get a task, but since they are already marked as timeout, they execute compareAndDecrementWorkerCount(), which calls CAS to reduce workerCount, and then exit the loop, and the thread is destroyed.

When the number of existing threads = core pool size, they execute workQueue.take(), which is an infinite block.

Rejection Policies

  1. CallerRunsPolicy: The thread that calls the execute() method runs the task.
  2. AbortPolicy: Rejects the new task and throws a RejectedExecutionException. Default policy.
  3. DiscardPolicy: Directly discards the new task.
  4. DiscardOldestPolicy: Discards the oldest unprocessed task.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {

                r.run(); // Executed directly by the main thread, not a thread from the pool
            }
        }
    }

If you want all tasks to be executed, choose CallerRunsPolicy.

Common Blocking Queues

  1. ArrayBlockingQueue: Implemented with an array, the capacity cannot be changed after initialization.
  2. LinkedBlockingQueue: Implemented with a linked list. Unbounded by default, supports being bounded. The head and tail of the queue are managed by two separate locks, so enqueuing and dequeuing do not interfere with each other.
  3. SynchronousQueue: No capacity, does not store tasks.
  4. DelayedWorkQueue: Tasks inside are sorted by execution time. If the queue is full, it automatically expands, increasing its capacity by 50%.

Built-in Thread Pools

It is not recommended to use the built-in thread pools.

  1. FixedThreadPool: corePoolSize = maximumPoolSize, the blocking queue is a LinkedBlockingQueue, which supports dynamic expansion. This means the default size of the blocking queue is Integer.MAX_VALUE. If too many tasks arrive, they will all be put into the blocking queue. This can lead to an accumulation of too many requests, causing an OOM.
  2. SingleThreadPool: Only one thread. The blocking queue implementation is the same as FixedThreadPool.
  3. CachedThreadPool: The blocking queue is a SynchronousQueue. When a task arrives, an idle thread is used to execute it; if there are no idle threads, a new thread is created to execute it. The maximum number of threads is Integer.MAX_VALUE. This can also lead to an OOM.
  4. ScheduledThreadPool: Can submit timed tasks/periodic tasks. At most, only core pool size threads can be created, and the blocking queue can expand to Integer.MAX_VALUE.

Thread Exceptions

  1. If a task is submitted using execute(Runnable), and an exception is thrown within the task and not caught, the task terminates, and the exception information is printed to the console. The thread pool will destroy this thread and create a new one to maintain the core pool size.
  2. If a task is submitted using submit(Runnable), any exception thrown in the task will be wrapped in the Future object returned by submit.
1
2
3
4
5
6
7
8
try {
    // Get the result of the task submitted by submit(), and catch exceptions
    future.get();
} catch (ExecutionException e) {
    System.out.println("Caught exception from Task 2: " + e.getCause().getMessage());
} catch (InterruptedException e) {
    System.out.println("Task 2 was interrupted");
}

Naming Thread Pools

  1. Use ThreadPoolBuilder
1
2
3
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
  1. Implement ThreadFactory yourself and override the newThread() method.

How to Set the Core Pool Size

  1. CPU-intensive tasks: Complex calculations.
  2. I/O-intensive tasks: e.g., disk I/O, network I/O. More threads can be set for this type of task because each thread needs to wait after completing a task, and this time can be used to switch to another thread for execution.

Optimal number of threads = Number of CPU cores * (1 + Thread wait time) / Thread computation time

You can use VisualVM to view the total execution time of a method and the time it actually occupies the CPU. The difference between the two is the wait time.

Thread Pool Monitoring

1
2
3
executor.getActiveCount();      // Current number of active threads
executor.getQueue().size();     // Number of tasks waiting in the queue
executor.getCompletedTaskCount(); // Number of completed tasks

shutdown() and shutdownNow()

shutdown(): Graceful shutdown, waits for submitted tasks to complete.

shutdownNow(): Immediate shutdown, attempts to interrupt running tasks and returns a list of unexecuted tasks.

FutureTask

TODO

AQS

  1. private volatile int state;
  2. The synchronization queue is actually a doubly linked list, a variant of the CLH queue. Each node in the list is a thread. A node first tries to acquire the lock through spinning, and if that fails, it blocks.

Node States

  1. CANCELLED (1): Indicates that the node is cancelled, the thread timed out, or was interrupted while waiting. Cancelled nodes are removed from the queue.
  2. SIGNAL (-1): The successor of the current node needs to be awakened. When the current node releases its resource, it wakes up the subsequent node. This is the most common state in AQS.
  3. CONDITION (-2): The node is waiting on a condition and is placed in the condition queue.

AQS-ReentrantLock

  1. state=0 means no thread holds the lock; state represents the number of re-entrant acquisitions of this lock by a thread.
  2. AQS at this point is in exclusive mode, only one thread can hold the ReentrantLock.
  3. Threads that fail to acquire the lock will enter the CLH blocking queue. A fair lock strictly acquires the lock in the order that threads entered the blocking queue.
  4. AQS relies on CAS operations to achieve thread-safe state updates and queue operations. When acquiring a lock, it tries to update the state from 0 to 1 using CAS, indicating that the lock has been acquired.

acquire Method

  1. Calls tryAcquire to get the lock. If it fails, a Node is created containing the Thread, waitStatus, and pointers to the previous and next nodes. This Node is added to the CLH queue.
  2. It continuously spins, checking if its predecessor is the head. If so, it tries to acquire the lock again. If successful, it sets itself as the head and returns.
  3. If it cannot acquire the lock or its predecessor is not the head, it will block again depending on the situation.

CountDownLatch