ThreadLocal & ThreadPool
Reference: JavaGuide
ThreadLocal
The Thread class has a variable threadLocals, which can be considered a HashMap used to store thread-private content.
| |
The ThreadLocal class has an instance method set, which actually stores the content in threadLocals. The key is the ThreadLocal object itself.
| |
A single thread can create multiple ThreadLocal objects to store different values, but all these values are stored in the same ThreadLocalMap.
Memory Leaks
In ThreadLocalMap, the key (ThreadLocal) is a weak reference, while the value (the value we store) is a strong reference.
If a thread remains alive, the key will be garbage collected during the next GC cycle, but the value will not.
To avoid memory leaks, call the remove() instance method after using a ThreadLocal.
Thread Safety
ThreadLocalMap is thread-safe because it can only be accessed by the current thread.
Thread Pool
Creating a Thread Pool
new ThreadPoolExecutor()- The
Executorsutility class, e.g.,Executors.newFixedThreadPool(3);
Core Parameters
corePoolSizemaximumPoolSizeworkQueuekeepAliveTimeunitthreadFactoryhandler
When a task arrives, if the number of working threads is less than corePoolSize, a thread is immediately assigned to execute the task.
If the core threads are all busy, the task is placed in the workQueue.
If the workQueue is full, the number of concurrently working threads can increase up to maximumPoolSize.
If the number of concurrently working threads has reached maximumPoolSize, the rejection policy is used to reject the task.
When the number of threads is greater than corePoolSize, idle threads can survive for at most keepAliveTime.
ThreadPoolExecutor Internals
When a new ThreadPoolExecutor is created, it does not create all core threads by default. A new thread is created for each submitted task to execute it. This is done to save resources.
When the number of threads is less than the core pool size, even if there are idle threads, a new thread will be created when a new task arrives.
Why use a blocking queue? To keep the core threads alive. When a core thread finishes its current task, it retrieves a new task from the blocking queue. If the blocking queue is empty, the thread blocks on the queue.
Core and non-core threads are not fundamentally different. When destroying idle non-core threads, any thread is arbitrarily chosen from the existing threads.
How are idle threads destroyed?
Reference source code video
When the number of existing threads > core pool size, all threads execute workQueue.poll(keepAliveTime, unit) to try to get a task. If the number of tasks in the blocking queue is less than the number of existing threads, some threads will inevitably fail to get a task. These threads (A, B) are marked as timeout. These threads loop again to try to get a task, but since they are already marked as timeout, they execute compareAndDecrementWorkerCount(), which calls CAS to reduce workerCount, and then exit the loop, and the thread is destroyed.
When the number of existing threads = core pool size, they execute workQueue.take(), which is an infinite block.
Rejection Policies
CallerRunsPolicy: The thread that calls theexecute()method runs the task.AbortPolicy: Rejects the new task and throws aRejectedExecutionException. Default policy.DiscardPolicy: Directly discards the new task.DiscardOldestPolicy: Discards the oldest unprocessed task.
| |
If you want all tasks to be executed, choose CallerRunsPolicy.
Common Blocking Queues
ArrayBlockingQueue: Implemented with an array, the capacity cannot be changed after initialization.LinkedBlockingQueue: Implemented with a linked list. Unbounded by default, supports being bounded. The head and tail of the queue are managed by two separate locks, so enqueuing and dequeuing do not interfere with each other.SynchronousQueue: No capacity, does not store tasks.DelayedWorkQueue: Tasks inside are sorted by execution time. If the queue is full, it automatically expands, increasing its capacity by 50%.
Built-in Thread Pools
It is not recommended to use the built-in thread pools.
FixedThreadPool:corePoolSize=maximumPoolSize, the blocking queue is aLinkedBlockingQueue, which supports dynamic expansion. This means the default size of the blocking queue isInteger.MAX_VALUE. If too many tasks arrive, they will all be put into the blocking queue. This can lead to an accumulation of too many requests, causing an OOM.SingleThreadPool: Only one thread. The blocking queue implementation is the same asFixedThreadPool.CachedThreadPool: The blocking queue is aSynchronousQueue. When a task arrives, an idle thread is used to execute it; if there are no idle threads, a new thread is created to execute it. The maximum number of threads isInteger.MAX_VALUE. This can also lead to an OOM.ScheduledThreadPool: Can submit timed tasks/periodic tasks. At most, only core pool size threads can be created, and the blocking queue can expand toInteger.MAX_VALUE.
Thread Exceptions
- If a task is submitted using
execute(Runnable), and an exception is thrown within the task and not caught, the task terminates, and the exception information is printed to the console. The thread pool will destroy this thread and create a new one to maintain the core pool size. - If a task is submitted using
submit(Runnable), any exception thrown in the task will be wrapped in theFutureobject returned bysubmit.
| |
Naming Thread Pools
- Use
ThreadPoolBuilder
| |
- Implement
ThreadFactoryyourself and override thenewThread()method.
How to Set the Core Pool Size
- CPU-intensive tasks: Complex calculations.
- I/O-intensive tasks: e.g., disk I/O, network I/O. More threads can be set for this type of task because each thread needs to wait after completing a task, and this time can be used to switch to another thread for execution.
Optimal number of threads = Number of CPU cores * (1 + Thread wait time) / Thread computation time
You can use VisualVM to view the total execution time of a method and the time it actually occupies the CPU. The difference between the two is the wait time.
Thread Pool Monitoring
| |
shutdown() and shutdownNow()
shutdown(): Graceful shutdown, waits for submitted tasks to complete.
shutdownNow(): Immediate shutdown, attempts to interrupt running tasks and returns a list of unexecuted tasks.
FutureTask
TODO
AQS
private volatile int state;- The synchronization queue is actually a doubly linked list, a variant of the CLH queue. Each node in the list is a thread. A node first tries to acquire the lock through spinning, and if that fails, it blocks.
Node States
CANCELLED(1): Indicates that the node is cancelled, the thread timed out, or was interrupted while waiting. Cancelled nodes are removed from the queue.SIGNAL(-1): The successor of the current node needs to be awakened. When the current node releases its resource, it wakes up the subsequent node. This is the most common state in AQS.CONDITION(-2): The node is waiting on a condition and is placed in the condition queue.
AQS-ReentrantLock
state=0means no thread holds the lock;staterepresents the number of re-entrant acquisitions of this lock by a thread.- AQS at this point is in exclusive mode, only one thread can hold the
ReentrantLock. - Threads that fail to acquire the lock will enter the CLH blocking queue. A fair lock strictly acquires the lock in the order that threads entered the blocking queue.
- AQS relies on CAS operations to achieve thread-safe state updates and queue operations. When acquiring a lock, it tries to update the
statefrom 0 to 1 using CAS, indicating that the lock has been acquired.
acquire Method
- Calls
tryAcquireto get the lock. If it fails, aNodeis created containing theThread,waitStatus, and pointers to the previous and next nodes. ThisNodeis added to the CLH queue. - It continuously spins, checking if its predecessor is the head. If so, it tries to acquire the lock again. If successful, it sets itself as the head and returns.
- If it cannot acquire the lock or its predecessor is not the head, it will block again depending on the situation.