Java并发编程相关问题

总结摘要
总结Java中并发相关知识点可能提的问题

针对每个Java并发编程相关问题,我准备了一个“一句话原理 + 一句话源码 + 一句话项目/场景”的结构化回答,体现深度同时展现实战能力。

锁机制

synchronized 和 ReentrantLock 的区别?

锁的实现层次

一句话原理:synchronized是JVM层面的关键字,通过monitor对象实现;ReentrantLock是JDK层面的API接口实现,基于AQS(AbstractQueuedSynchronizer)框架,提供更灵活的锁机制。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// synchronized的字节码实现
public void syncMethod() {
    synchronized(this) {  // monitorenter指令
        // 同步代码块
    }  // monitorexit指令
}
// JVM通过monitor对象实现,支持锁升级(偏向锁→轻量级锁→重量级锁)

// ReentrantLock的AQS实现
public class ReentrantLock implements Lock {
    private final Sync sync;  // 继承AQS的内部类
    
    // 公平锁实现
    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&  // 公平性检查
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;  // 可重入
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

项目场景:在交易系统的支付接口中,使用synchronized实现简单的防重入控制,利用JVM的锁优化机制自动适应并发压力;而在需要超时控制的高级场景,选择ReentrantLock提供更精细的控制能力。

功能特性对比

一句话原理:ReentrantLock相比synchronized提供三大高级功能:可中断锁、超时获取锁、公平锁选择,同时支持多个条件变量(Condition),而synchronized只支持简单的wait/notify。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ReentrantLock的高级特性
ReentrantLock lock = new ReentrantLock(true);  // 公平锁
Condition notFull = lock.newCondition();  // 创建条件变量
Condition notEmpty = lock.newCondition();

// 可中断锁
if (lock.tryLock(1, TimeUnit.SECONDS)) {  // 尝试1秒获取锁
    try {
        // 业务逻辑
    } finally {
        lock.unlock();
    }
}

// 多个条件变量的应用
class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 队列满时等待
            items[putptr] = x;
            notEmpty.signal();    // 唤醒取线程
        } finally {
            lock.unlock();
        }
    }
}

// synchronized的局限
public synchronized void method() throws InterruptedException {
    wait();  // 只能有一个等待队列
    notify();  // 无法指定唤醒特定线程
}

项目场景:在消息队列的消费者客户端,使用ReentrantLock的多个Condition实现精细化线程协作:队列满时生产者等待notFull,队列空时消费者等待notEmpty,比synchronized的单一等待队列效率更高。

性能与使用便捷性

一句话原理:synchronized在JDK 1.6后引入锁升级机制(偏向锁→轻量级锁→重量级锁),低竞争场景性能与ReentrantLock相当,且使用更简单(自动释放锁);ReentrantLock需要显式加解锁,但提供更丰富的监控和调试能力。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// synchronized的锁升级(JVM内部实现)
// 偏向锁:只有一个线程访问时,记录线程ID
// 轻量级锁:少量竞争时,CAS自旋
// 重量级锁:竞争激烈时,线程挂起

// ReentrantLock的监控能力
ReentrantLock lock = new ReentrantLock();
int holdCount = lock.getHoldCount();  // 当前线程持有锁次数
int queueLength = lock.getQueueLength();  // 等待线程数
boolean hasQueuedThreads = lock.hasQueuedThreads();  // 是否有等待线程

// 使用便捷性对比
// synchronized:简单不易错
public void easyMethod() {
    synchronized(this) {  // 自动解锁
        // 业务逻辑
    }
}

// ReentrantLock:需手动解锁
public void complexMethod() {
    lock.lock();
    try {
        // 业务逻辑
    } finally {
        lock.unlock();  // 必须finally中释放
    }
}

项目场景:在微服务的限流组件中,使用ReentrantLock的getQueueLength()方法监控等待线程数,动态调整限流阈值;而普通业务方法使用synchronized简化代码,避免忘记释放锁导致死锁。

选型指南与最佳实践

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/**
 * synchronized vs ReentrantLock 选型决策树
 */
public class LockSelectionGuide {
    
    /**
     * 场景1:简单同步,不需要高级特性
     * 选择:synchronized
     * 原因:代码简洁,JVM自动优化,不易出错
     */
    public class SimpleCounter {
        private int count;
        
        public synchronized void increment() {  // 简单同步
            count++;
        }
        
        public synchronized int getCount() {
            return count;
        }
    }
    
    /**
     * 场景2:需要超时控制
     * 选择:ReentrantLock
     * 原因:tryLock支持超时,避免死锁
     */
    public class TimeoutTask {
        private final ReentrantLock lock = new ReentrantLock();
        
        public boolean executeWithTimeout() {
            try {
                if (lock.tryLock(3, TimeUnit.SECONDS)) {
                    try {
                        // 执行任务
                        return true;
                    } finally {
                        lock.unlock();
                    }
                } else {
                    log.warn("获取锁超时");
                    return false;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    }
    
    /**
     * 场景3:读写分离场景
     * 选择:ReentrantReadWriteLock(基于ReentrantLock)
     * 原因:读读不互斥,提高并发度
     */
    public class CacheManager {
        private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        private final Map<String, Object> cache = new HashMap<>();
        
        public Object get(String key) {
            rwl.readLock().lock();  // 读锁
            try {
                return cache.get(key);
            } finally {
                rwl.readLock().unlock();
            }
        }
        
        public void put(String key, Object value) {
            rwl.writeLock().lock();  // 写锁
            try {
                cache.put(key, value);
            } finally {
                rwl.writeLock().unlock();
            }
        }
    }
    
    /**
     * 场景4:生产者消费者模式
     * 选择:ReentrantLock + Condition
     * 原因:多个Condition精确唤醒
     */
    public class BoundedQueue<T> {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = lock.newCondition();
        private final Condition notEmpty = lock.newCondition();
        private final T[] items;
        private int putIndex, takeIndex, count;
        
        @SuppressWarnings("unchecked")
        public BoundedQueue(int capacity) {
            items = (T[]) new Object[capacity];
        }
        
        public void put(T item) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    notFull.await();  // 队列满,等待取
                }
                items[putIndex] = item;
                if (++putIndex == items.length) putIndex = 0;
                count++;
                notEmpty.signal();  // 通知取线程
            } finally {
                lock.unlock();
            }
        }
        
        public T take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    notEmpty.await();  // 队列空,等待放
                }
                T item = items[takeIndex];
                items[takeIndex] = null;
                if (++takeIndex == items.length) takeIndex = 0;
                count--;
                notFull.signal();  // 通知放线程
                return item;
            } finally {
                lock.unlock();
            }
        }
    }
    
    /**
     * 场景5:需要锁状态监控
     * 选择:ReentrantLock
     * 原因:提供丰富的监控方法
     */
    public class MonitorableService {
        private final ReentrantLock lock = new ReentrantLock();
        
        public void monitorLock() {
            // 监控信息
            int holdCount = lock.getHoldCount();
            int queueLength = lock.getQueueLength();
            boolean isLocked = lock.isLocked();
            boolean isFair = lock.isFair();
            
            log.info("锁状态:持有次数={}, 等待队列长度={}, 是否锁定={}, 是否公平={}",
                     holdCount, queueLength, isLocked, isFair);
        }
    }
}

/**
 * 性能对比与演进
 * 
 * JDK版本演进对synchronized的优化:
 * JDK 1.5:synchronized性能较差,重量级锁
 * JDK 1.6:引入锁升级机制,性能大幅提升
 * JDK 1.7:进一步优化,自适应自旋
 * JDK 1.8:偏向锁延迟优化
 * JDK 15+:引入虚拟线程,synchronized更加轻量
 * 
 * 当前选择建议:
 * 1. 95%的场景用synchronized就够了
 * 2. 需要高级特性时用ReentrantLock
 * 3. 读写分离用ReentrantReadWriteLock
 * 4. 高并发计数器用LongAdder替代锁
 */

// 面试金句
// "synchronized和ReentrantLock的选择本质是简单性和灵活性的权衡。
//  synchronized像自动挡汽车,简单易用,JVM帮你搞定优化;
//  ReentrantLock像手动挡,功能更强大但需要更小心操作。
//  在我主导的支付系统中,普通业务用synchronized保障代码可维护性,
//  而在需要超时控制的远程调用场景使用ReentrantLock避免永久阻塞。
//  理解两者的底层实现和适用场景才能做出正确的技术选型"

什么是偏向锁、轻量级锁、重量级锁?锁升级的过程是怎样的?

三种锁状态核心原理

一句话原理:Java对象头中的Mark Word根据并发竞争程度,实现锁的渐进式升级:偏向锁(单线程)、轻量级锁(低竞争)、重量级锁(高竞争),在保证线程安全的同时最小化性能开销。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 对象头中的Mark Word(32位JVM布局)
// 无锁状态: | unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 |
// 偏向锁:   | thread:23 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 |
// 轻量锁:   | ptr_to_lock_record:30 | lock:2 |
// 重量锁:   | ptr_to_monitor:30 | lock:2 |

// HotSpot源码中的锁状态定义
enum LockState {
    inflated,           // 重量级锁(依赖操作系统互斥量)
    fast_lock,          // 轻量级锁(CAS自旋)
    biased_lock         // 偏向锁(记录线程ID)
};

// JVM锁升级的核心逻辑
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, TRAPS) {
    if (UseBiasedLocking) {  // 是否开启偏向锁(默认开启)
        if (!BiasedLocking::revoke_and_rebias(obj, THREAD)) {
            // 尝试获取偏向锁失败,升级为轻量级锁
            slow_enter(obj, lock, THREAD);
        }
    } else {
        slow_enter(obj, lock, THREAD);  // 直接进入轻量级锁
    }
}

项目场景:在用户登录服务中,大部分时间只有一个线程处理某个用户的session(偏向锁),偶尔有多个线程同时读取(升级为轻量级锁自旋),只有在极端并发下才升级为重量级锁,这种自适应机制保证了系统在99%场景下的高性能。

锁升级完整流程

一句话原理:锁升级是单向不可逆的过程:无锁→偏向锁(记录线程ID)→轻量级锁(CAS自旋)→重量级锁(线程挂起),JVM根据竞争激烈程度自动调整,在性能和阻塞之间找到平衡点。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 偏向锁获取
void BiasedLocking::revoke_and_rebias(Handle obj, TRAPS) {
    markOop mark = obj->mark();
    if (mark->has_bias_pattern()) {  // 对象支持偏向锁
        if (mark->biased_locker() == THREAD) {
            // 当前线程已经获得偏向锁,直接返回
            return;
        } else {
            // 其他线程竞争偏向锁,需要撤销偏向
            // 在全局安全点撤销偏向锁
            BiasedLocking::revoke_at_safepoint(obj);
            // 升级为轻量级锁
        }
    }
}

// 轻量级锁获取
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
    markOop mark = obj->mark();
    if (mark->is_neutral()) {  // 无锁状态
        // CAS尝试将对象头替换为指向锁记录的指针
        lock->set_displaced_header(mark);
        if (mark == obj()->cas_set_mark((markOop)lock, mark)) {
            // CAS成功,获得轻量级锁
            return;
        }
    }
    // CAS失败或有竞争,膨胀为重量级锁
    ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

// 重量级锁膨胀
ObjectMonitor* ObjectSynchronizer::inflate(Thread * Self, oop object) {
    // 创建ObjectMonitor对象,管理等待队列
    ObjectMonitor * monitor = new ObjectMonitor();
    monitor->set_header(mark);
    monitor->set_owner(NULL);
    // 将对象头指向monitor
    object->set_mark(markOopDesc::encode(monitor));
    return monitor;
}

项目场景:在秒杀系统的库存扣减中,刚开始只有少数线程竞争(轻量级锁CAS自旋),当瞬间涌入大量请求时,JVM自动将锁升级为重量级锁,让线程进入操作系统等待队列,避免CPU空转浪费资源。

锁升级优化细节

一句话原理:JVM通过批量重偏向批量撤销机制优化偏向锁,当某个类的大量对象被不同线程竞争时,JVM会撤销该类的偏向锁,避免频繁的偏向锁撤销操作带来的性能损耗。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 偏向锁批量撤销优化
class BiasedLocking {
    // 每个类维护一个偏向锁撤销计数器
    static int* _biased_lock_revocation_count;
    
    // 当某个类的撤销次数达到阈值(默认20)
    // 执行批量重偏向
    static void bulk_rebias(klassOop k) {
        // 重新计算该类的所有对象的epoch
        // 使旧的偏向锁失效
    }
    
    // 当撤销次数达到阈值(默认40)
    // 执行批量撤销,禁用该类的偏向锁
    static void bulk_revoke(klassOop k) {
        // 该类的新对象不再使用偏向锁
        klass->set_prototype_header(markOopDesc::prototype());
    }
}

// 锁撤销的触发条件
void BiasedLocking::revoke(Handle obj, TRAPS) {
    // 1. 调用hashCode()会撤销偏向锁
    // 2. 其他线程竞争偏向锁
    // 3. 批量重偏向/撤销阈值触发
    // 4. GC过程中
}

项目场景:在连接池管理中,刚开始每个连接对象偏向于创建它的线程,随着连接在不同线程间借用,JVM检测到某个连接类的偏向锁频繁撤销,自动执行批量重偏向或撤销,优化了连接池的性能表现。

实战分析与性能调优

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
 * 锁升级机制实战指南
 */
public class LockUpgradeGuide {
    
    /**
     * 1. 查看锁状态(通过JOL工具)
     */
    public void checkLockStatus() {
        Object obj = new Object();
        
        // 无锁状态
        System.out.println(ClassLayout.parseInstance(obj).toPrintable());
        // 输出:对象头包含hashcode、分代年龄等
        
        synchronized (obj) {
            // 偏向锁状态(如果JVM开启)
            System.out.println(ClassLayout.parseInstance(obj).toPrintable());
        }
    }
    
    /**
     * 2. 偏向锁的启用和禁用
     */
    public void biasedLockConfig() {
        // JVM参数配置
        // -XX:+UseBiasedLocking     启用偏向锁(JDK8默认开启)
        // -XX:BiasedLockingStartupDelay=0  立即启用偏向锁(默认延迟4秒)
        // -XX:-UseBiasedLocking      禁用偏向锁
        
        // 为什么默认延迟4秒?
        // 因为JVM启动初期有大量锁竞争,偏向锁反而降低性能
    }
    
    /**
     * 3. 轻量级锁自旋优化
     */
    public void spinLockConfig() {
        // -XX:PreBlockSpin=10  自旋次数(JDK10废弃,改为自适应自旋)
        
        // 自适应自旋:JVM根据上次自旋成功情况动态调整
        // 如果上次自旋成功,这次多自旋几次
        // 如果很少成功,减少自旋或直接升级
    }
    
    /**
     * 4. 重量级锁监控
     */
    public void monitorLock() {
        // 通过jstack查看线程状态
        // "Thread-0" #10 prio=5 os_prio=0 tid=0x00007f8a3410a800 nid=0x4e3b waiting for monitor entry
        //    java.lang.Thread.State: BLOCKED (on object monitor)
        
        // 通过jstat查看锁信息
        // jstat -gcutil pid 1000
    }
    
    /**
     * 5. 实战场景:不同竞争程度的锁表现
     */
    public class LockPerformanceDemo {
        
        // 场景1:无竞争(偏向锁最优)
        public void noContention() {
            Object lock = new Object();
            for (int i = 0; i < 1000000; i++) {
                synchronized (lock) {  // 同一个线程反复获取
                    // 业务逻辑
                }
            }
            // 偏向锁:仅第一次CAS,后续无开销
        }
        
        // 场景2:轻度竞争(轻量级锁最优)
        public void lightContention() {
            Object lock = new Object();
            CountDownLatch latch = new CountDownLatch(2);
            
            Thread t1 = new Thread(() -> {
                for (int i = 0; i < 1000; i++) {
                    synchronized (lock) {  // 少量交替
                        // 业务逻辑
                    }
                }
                latch.countDown();
            });
            
            Thread t2 = new Thread(() -> {
                for (int i = 0; i < 1000; i++) {
                    synchronized (lock) {  // 少量交替
                        // 业务逻辑
                    }
                }
                latch.countDown();
            });
            
            t1.start(); t2.start();
            // 轻量级锁:CAS自旋,无线程挂起
        }
        
        // 场景3:重度竞争(重量级锁最优)
        public void heavyContention() {
            Object lock = new Object();
            CountDownLatch latch = new CountDownLatch(100);
            
            for (int i = 0; i < 100; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        synchronized (lock) {  // 100个线程激烈竞争
                            // 业务逻辑
                        }
                    }
                    latch.countDown();
                }).start();
            }
            // 重量级锁:线程挂起,避免CPU空转
        }
    }
    
    /**
     * 6. 锁升级的代价分析
     */
    public void upgradeCost() {
        // 偏向锁撤销:需要在全局安全点执行,代价高
        
        // 轻量级锁膨胀:创建ObjectMonitor,进入内核态
        
        // 重量级锁阻塞:线程上下文切换(微秒级)
        
        // 启示:尽量让锁停留在偏向/轻量级状态
        // 避免调用hashCode(),避免多线程交替执行短任务
    }
}

/**
 * 锁升级总结表
 * 
 * 锁状态   适用场景       实现方式                性能
 * 偏向锁   单线程访问    记录线程ID              纳秒级
 * 轻量锁   线程交替       CAS自旋                 百纳秒级
 * 重量锁   多线程竞争    操作系统互斥量           微秒级
 * 
 * 升级条件:
 * 偏向锁→轻量锁:其他线程竞争
 * 轻量锁→重量锁:自旋超过阈值 or 线程数超过CPU核数
 * 
 * 不可逆原因:
 * 1. 已经升级说明有竞争,保持重量锁避免反复升级
 * 2. 锁记录信息已丢失,无法降级
 * 3. 简单性原则:锁升级只进不退
 */

// 面试金句
// "Java的锁升级机制是一种'自适应优化'的典范。就像交通信号灯:
//  偏向锁是绿灯直接通行(单线程);
//  轻量级锁是黄灯减速观察(CAS自旋);
//  重量级锁是红灯停车等待(线程挂起)。
//  JVM根据车流量自动切换信号,既保证了通行效率,又避免了交通事故。
//  在实际项目中,理解锁升级机制帮助我们优化了高频交易系统的锁设计,
//  通过避免hashCode调用和合理控制线程数,让90%的锁停留在偏向锁状态,
//  系统吞吐量提升了30%"

什么是乐观锁和悲观锁?CAS 是什么?有哪些问题(ABA、自旋开销)?

悲观锁原理

一句话原理:悲观锁默认并发冲突概率高,操作数据前先加锁阻塞其他线程,保证数据强一致性,但降低了并发性能,典型代表:synchronized、ReentrantLock。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 悲观锁的典型实现
public class PessimisticLockExample {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;
    
    public void increment() {
        lock.lock();  // 先加锁,假设一定会冲突
        try {
            count++;  // 安全操作
        } finally {
            lock.unlock();
        }
    }
}

// 数据库悲观锁示例
@Transactional
public void updateWithPessimisticLock(Long id) {
    // SELECT * FROM account WHERE id = ? FOR UPDATE
    // 数据库行锁,阻塞其他事务
    Account account = accountRepository.findByIdWithLock(id);
    account.setBalance(account.getBalance() - 100);
    accountRepository.save(account);
}

项目场景:在金融系统的账户扣款操作中,使用数据库悲观锁(SELECT FOR UPDATE)防止并发扣款导致余额错误。虽然并发度降低,但保证了资金安全,符合金融业务的强一致性要求。

乐观锁原理

一句话原理:乐观锁默认并发冲突概率低,操作数据时不加锁,更新时检查版本号或CAS判断数据是否被修改过,适用于读多写少场景,典型实现:版本号机制、CAS算法。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 版本号机制实现乐观锁
public class OptimisticLockExample {
    private int version = 0;  // 版本号
    private int count = 0;
    
    public boolean increment() {
        int currentVersion = version;  // 读取版本号
        int newValue = count + 1;
        
        // 更新时检查版本号是否变化
        if (currentVersion == version) {
            count = newValue;
            version++;
            return true;
        }
        return false;  // 更新失败,重试
    }
}

// 数据库乐观锁示例
@Transactional
public void updateWithOptimisticLock(Long id, int version) {
    // UPDATE account SET balance = balance - 100, version = version + 1
    // WHERE id = ? AND version = ?
    // 返回更新行数,为0表示冲突
    int updatedRows = accountRepository.updateBalance(id, version);
    if (updatedRows == 0) {
        throw new OptimisticLockException("数据已被修改");
    }
}

项目场景:在商品库存系统中,使用版本号乐观锁更新库存,避免了长事务锁定。秒杀场景下即使冲突较多,通过重试机制也能保证最终一致性,系统吞吐量比悲观锁提升3倍。

CAS核心机制

一句话原理:CAS(Compare And Swap)是乐观锁的核心实现,包含三个操作数——内存位置V、预期值A、新值B,仅当V的值等于A时才更新为B,整个过程是CPU原子指令保证的硬件级操作。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Unsafe类提供的CAS原子操作
public final class Unsafe {
    // native方法,CPU指令级别原子性
    public final native boolean compareAndSwapObject(Object o, long offset,
                                                      Object expected, Object x);
    public final native boolean compareAndSwapInt(Object o, long offset,
                                                   int expected, int x);
    public final native boolean compareAndSwapLong(Object o, long offset,
                                                    long expected, long x);
}

// AtomicInteger的CAS实现
public class AtomicInteger extends Number {
    private volatile int value;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    public final int incrementAndGet() {
        for (;;) {  // 自旋重试
            int current = get();
            int next = current + 1;
            // CAS尝试更新
            if (compareAndSet(current, next))
                return next;
        }
    }
    
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
}

// 底层汇编实现(x86)
// lock cmpxchg 指令锁定总线保证原子性

项目场景:在ConcurrentHashMap的初始化中,使用CAS保证只有一个线程执行初始化:casTabAt(tab, i, null, new Node)。这种无锁操作避免了加锁开销,提升了高并发场景的性能。

CAS三大问题详解

4.1 ABA问题

一句话原理:ABA问题指变量从A→B→A的变化过程中,CAS误认为值没变过而错误更新。虽然值相同,但状态可能已改变,导致数据不一致。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ABA问题演示
public class ABAProblem {
    private static AtomicInteger atomicInt = new AtomicInteger(100);
    
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            atomicInt.compareAndSet(100, 101);
            atomicInt.compareAndSet(101, 100);
            System.out.println("线程1完成100→101→100");
        });
        
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(1000);  // 等待t1完成ABA
                boolean success = atomicInt.compareAndSet(100, 200);
                System.out.println("CAS操作" + (success ? "成功" : "失败"));
                // 输出:CAS操作成功(但实际上中间被修改过)
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        t1.start();
        t2.start();
    }
}

// 解决方案:AtomicStampedReference(带版本号)
AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(100, 0);

public void solveABA() {
    int[] stampHolder = new int[1];
    Integer value = asr.get(stampHolder);
    int stamp = stampHolder[0];
    
    // 同时比较值和版本号
    asr.compareAndSet(value, 200, stamp, stamp + 1);
}

项目场景:在无锁栈的实现中,如果只检查值不检查版本,可能将栈顶从A→B→A,导致CAS误判成功,破坏栈结构。使用AtomicStampedReference解决,每次修改同时更新版本号。

4.2 自旋开销

一句话原理:CAS失败后通常采用自旋重试,高并发下长时间自旋会浪费CPU资源,甚至导致性能下降,极端情况下比锁更差。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 自旋开销示例
public class SpinOverhead {
    private AtomicLong count = new AtomicLong(0);
    
    public void spinIncrement() {
        for (;;) {  // 高并发下可能自旋上万次
            long current = count.get();
            long next = current + 1;
            if (count.compareAndSet(current, next)) {
                break;
            }
            // 自旋等待,CPU空转
        }
    }
}

// 优化策略:自适应自旋 + 后退策略
public class BackoffCAS {
    public void incrementWithBackoff() {
        int maxSpins = 100;
        int spins = 0;
        long delay = 1;
        
        for (;;) {
            long current = count.get();
            long next = current + 1;
            if (count.compareAndSet(current, next)) {
                break;
            }
            
            if (++spins > maxSpins) {
                // 自旋超过阈值,让出CPU
                Thread.yield();
                spins = 0;
            } else {
                // 指数后退,减少总线竞争
                delay = Math.min(delay * 2, 1000);
                Thread.sleep(delay);
            }
        }
    }
}

项目场景:在限流计数器中,高并发下大量线程自旋CAS更新计数器,导致CPU飙升。采用LongAdder分段累加替代AtomicLong,将竞争分散到多个Cell,显著降低自旋开销。

4.3 单变量局限

一句话原理:CAS只能保证单个共享变量的原子操作,无法直接用于多个变量的复合操作,需要额外同步机制或封装成对象。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 单变量局限示例
public class MultiVarProblem {
    private int x, y;  // 两个变量无法原子更新
    
    // 错误:不能同时更新x和y
    public void updateWrong(int newX, int newY) {
        x = newX;  // 非原子操作
        y = newY;
    }
    
    // 解决方案1:封装成对象
    static class Pair {
        final int x;
        final int y;
        Pair(int x, int y) { this.x = x; this.y = y; }
    }
    
    private AtomicReference<Pair> pair = new AtomicReference<>(new Pair(0, 0));
    
    public void updatePair(int newX, int newY) {
        for (;;) {
            Pair current = pair.get();
            Pair newPair = new Pair(newX, newY);
            if (pair.compareAndSet(current, newPair)) {
                break;
            }
        }
    }
    
    // 解决方案2:使用锁
    private final ReentrantLock lock = new ReentrantLock();
    public void updateWithLock(int newX, int newY) {
        lock.lock();
        try {
            x = newX;
            y = newY;
        } finally {
            lock.unlock();
        }
    }
}

项目场景:在订单状态机中,需要同时更新订单状态和版本号。使用AtomicReference封装状态对象,保证原子性更新,既避免加锁又保证数据一致性。

实战选型与优化指南

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
 * 乐观锁vs悲观锁选型决策树
 */
public class LockSelectionOptimization {
    
    /**
     * 1. 选型判断标准
     */
    public void selectionGuide() {
        // 使用悲观锁的场景:
        // - 写操作频繁,冲突概率高(如热点账户扣款)
        // - 需要强一致性,不能容忍失败重试
        // - 操作时间较长,自旋不划算
        
        // 使用乐观锁的场景:
        // - 读多写少,冲突概率低(如商品浏览计数)
        // - 可以接受重试,对延迟不敏感
        // - 追求高吞吐量,需要无锁并发
    }
    
    /**
     * 2. CAS优化实战
     */
    public class CASOptimization {
        
        // 优化1:减少自旋次数
        private class OptimizedCounter {
            private AtomicLong count = new AtomicLong();
            private static final int MAX_SPINS = 100;
            
            public long increment() {
                int spins = 0;
                for (;;) {
                    long current = count.get();
                    long next = current + 1;
                    if (count.compareAndSet(current, next)) {
                        return next;
                    }
                    if (++spins >= MAX_SPINS) {
                        Thread.yield();  // 让出CPU
                        spins = 0;
                    }
                }
            }
        }
        
        // 优化2:分段累加(LongAdder原理)
        private class StripedCounter {
            private AtomicLong[] cells;
            private AtomicLong base = new AtomicLong();
            
            public void increment() {
                // 根据线程哈希选择cell
                int index = Thread.currentThread().hashCode() & (cells.length - 1);
                AtomicLong cell = cells[index];
                if (cell == null) {
                    // 懒加载cell
                }
                cell.incrementAndGet();
            }
            
            public long sum() {
                long sum = base.get();
                for (AtomicLong cell : cells) {
                    if (cell != null) {
                        sum += cell.get();
                    }
                }
                return sum;
            }
        }
    }
    
    /**
     * 3. 实战场景:库存扣减
     */
    public class StockService {
        
        // 方案1:乐观锁+重试
        public boolean deductStockOptimistic(Long productId, int count) {
            int retries = 3;
            while (retries-- > 0) {
                Stock stock = stockRepository.findById(productId);
                if (stock.getCount() < count) {
                    return false;
                }
                int updated = stockRepository.deductWithVersion(
                    productId, count, stock.getVersion());
                if (updated > 0) {
                    return true;
                }
                // 重试
            }
            throw new RuntimeException("系统繁忙,请稍后重试");
        }
        
        // 方案2:悲观锁
        @Transactional
        public boolean deductStockPessimistic(Long productId, int count) {
            Stock stock = stockRepository.findByIdWithLock(productId);
            if (stock.getCount() < count) {
                return false;
            }
            stock.setCount(stock.getCount() - count);
            stockRepository.save(stock);
            return true;
        }
        
        // 方案3:Redis Lua脚本(乐观锁实现)
        // local stock = redis.call('GET', KEYS[1])
        // if tonumber(stock) >= tonumber(ARGV[1]) then
        //     redis.call('DECRBY', KEYS[1], ARGV[1])
        //     return 1
        // end
        // return 0
    }
    
    /**
     * 4. 性能监控指标
     */
    public void monitorMetrics() {
        // 监控指标:
        // - CAS失败率:过高说明冲突严重
        // - 自旋次数:衡量CPU浪费
        // - 重试次数:乐观锁性能指标
        
        // 调优方向:
        // - 冲突率高时,考虑悲观锁
        // - 自旋多时,加入后退策略
        // - ABA频繁时,使用版本号机制
    }
}

/**
 * 总结对比表
 * 
 * 特性        悲观锁             乐观锁(CAS)
 * 原理        先加锁再操作        先操作,冲突重试
 * 并发度      低                 高
 * 适用场景    写多读少            读多写少
 * 实现        synchronized       AtomicInteger
 * 问题        死锁、性能瓶颈       ABA、自旋开销
 * 数据库实现   SELECT FOR UPDATE  版本号
 * 
 * CAS三大问题解决方案:
 * 1. ABA问题 → AtomicStampedReference
 * 2. 自旋开销 → 分段累加、后退策略
 * 3. 单变量局限 → AtomicReference封装对象
 */

// 面试金句
// "悲观锁和乐观锁是并发控制的两个极端哲学:悲观锁'先锁后做',乐观锁'先做后验'。
//  CAS作为乐观锁的核心,通过CPU原子指令实现无锁并发,但带来了ABA、自旋开销等问题。
//  在实际项目中,我曾在库存系统中混合使用:热点商品用悲观锁保一致性,
//  普通商品用乐观锁提吞吐量,同时用AtomicStampedReference解决ABA问题,
//  配合自适应后退策略减少自旋浪费。这套方案让系统在双11期间扛住了10万QPS,
//  同时又保证了数据一致性理解这些原理才能在复杂业务中做出最优选择"

volatile 关键字的作用?能保证原子性吗?为什么?

volatile核心作用

一句话原理:volatile通过内存屏障实现两个核心语义:可见性(强制线程从主内存读取最新值)和有序性(禁止指令重排序),保证多线程环境下共享变量的实时同步。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Java内存模型中的volatile规则
public class VolatileExample {
    volatile boolean flag = false;  // volatile变量
    
    // 线程A执行
    public void writer() {
        flag = true;  // volatile写
        // 插入StoreStore屏障(普通写之前)
        // 插入StoreLoad屏障(volatile写之后)
    }
    
    // 线程B执行
    public void reader() {
        if (flag) {  // volatile读
            // 插入LoadLoad屏障(volatile读之后)
            // 插入LoadStore屏障(volatile读之后)
            doSomething();
        }
    }
}

// HotSpot源码中的内存屏障实现
// x86平台:volatile写实现
__asm__ volatile (
    "lock; addl $0,0(%%rsp)"  // lock前缀,强制刷新缓存
    : : : "memory"
);

项目场景:在微服务的配置中心,使用volatile修饰的配置项缓存,当配置变更时,所有线程立即看到新值,无需加锁,既保证了实时性又提升了性能。

原子性分析

一句话原理:volatile不能保证原子性,因为它只保证读写操作的可见性,而像count++这样的复合操作(读-改-写)在并发执行时仍会产生数据覆盖,需要synchronized或Atomic类保证原子性。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// volatile非原子性演示
public class VolatileNonAtomic {
    volatile int count = 0;  // volatile变量
    
    public void increment() {
        count++;  // 非原子操作!
        // 字节码层面:
        // getfield      #2  (读取count)
        // iconst_1         (常量1)
        // iadd             (相加)
        // putfield      #2  (写回count)
    }
    
    public static void main(String[] args) {
        VolatileNonAtomic example = new VolatileNonAtomic();
        
        // 10个线程并发执行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    example.increment();
                }
            }).start();
        }
        
        Thread.sleep(3000);
        System.out.println(example.count);  // 永远小于10000
        // 原因:线程A读取count=5,线程B也读取count=5
        // A写回6,B也写回6,丢失一次更新
    }
}

// 原子性解决方案
public class AtomicSolution {
    AtomicInteger count = new AtomicInteger(0);  // CAS保证原子性
    
    public void increment() {
        count.incrementAndGet();  // 原子操作
    }
    
    // 或者使用synchronized
    int syncCount = 0;
    public synchronized void syncIncrement() {
        syncCount++;  // 锁保证原子性
    }
}

项目场景:在抢红包系统中,使用volatile修饰红包余额会导致超发问题。因为多个线程同时读取余额为10,各自扣减后写回9,实际发了多个红包。必须使用AtomicLong或synchronized保证扣减的原子性。

为什么不能保证原子性

一句话原理:volatile只保证读写的内存可见性,但不提供互斥访问机制,复合操作的多个步骤之间可能被其他线程打断,导致数据不一致,这是由Java内存模型的底层设计决定的。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// JSR-133内存模型规范
public class VolatileMemoryModel {
    
    // 场景分析:volatile int x = 0
    
    // 线程A
    x = 1;  // volatile写:直接写入主内存
    
    // 线程B
    int y = x;  // volatile读:从主内存读取
    
    // 这两个操作都是原子的(单个volatile读写)
    
    // 但复合操作不是原子的:
    // x++ 等价于:
    int temp = x;  // volatile读(原子)
    temp = temp + 1;  // 普通计算(非同步)
    x = temp;  // volatile写(原子)
    
    // 在线程A执行完temp=x后,线程B可能已经修改了x
    // 导致A写回时覆盖B的更新
}

// JVM规范中的说明
// 对于long/double类型的volatile变量,在32位JVM上也不能保证原子性
// 因为64位操作需要两次32位写入
volatile long bigValue;  // 32位JVM上可能非原子

项目场景:在分布式ID生成器中,如果只使用volatile记录当前序列号,多个服务节点同时读取后自增再写回,会产生重复ID。必须使用数据库乐观锁或Redis原子自增保证序列号的唯一性。

深入理解与实战应用

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
 * volatile完整实战指南
 */
public class VolatileCompleteGuide {
    
    /**
     * 1. volatile的正确使用场景
     */
    public class CorrectUsage {
        
        // 场景1:状态标志
        volatile boolean running = true;
        
        public void stop() {
            running = false;  // 单个volatile写
        }
        
        public void run() {
            while (running) {  // 单个volatile读
                // 业务逻辑
            }
        }
        
        // 场景2:一次性安全发布
        volatile Singleton instance;
        
        public Singleton getInstance() {
            if (instance == null) {
                synchronized (this) {
                    if (instance == null) {
                        instance = new Singleton();  // volatile保证构造函数完成后再发布
                    }
                }
            }
            return instance;
        }
        
        // 场景3:独立观察变量
        volatile int temperature;
        
        public void updateTemperature(int temp) {
            temperature = temp;  // 传感器更新温度
        }
        
        public void checkTemperature() {
            int current = temperature;  // 读取最新温度
            if (current > 100) {
                alarm();
            }
        }
    }
    
    /**
     * 2. volatile的错误使用示例
     */
    public class WrongUsage {
        
        // 错误1:依赖volatile实现计数器
        volatile int counter;
        
        public void increment() {
            counter++;  // 非原子,会丢失更新
        }
        
        // 错误2:复合条件判断
        volatile int a, b;
        
        public void checkAndUpdate() {
            if (a > b) {  // 读取a和b可能不是同一时刻的快照
                // 业务逻辑
            }
        }
        
        // 错误3:依赖volatile实现线程安全集合
        volatile List<String> list = new ArrayList<>();
        
        public void addItem(String item) {
            list.add(item);  // ArrayList本身线程不安全
        }
    }
    
    /**
     * 3. volatile与锁的性能对比
     */
    public class PerformanceComparison {
        volatile int volatileCount = 0;
        int synchronizedCount = 0;
        AtomicInteger atomicCount = new AtomicInteger(0);
        
        // volatile:约5ns
        public void volatileWrite() {
            volatileCount = 1;
        }
        
        // synchronized:约50ns(未竞争)
        public synchronized void syncWrite() {
            synchronizedCount = 1;
        }
        
        // AtomicInteger:约20ns(CAS)
        public void atomicWrite() {
            atomicCount.set(1);
        }
        
        // 结论:volatile > Atomic > synchronized
        // 但volatile无法保证复合操作
    }
    
    /**
     * 4. 双重检查锁定的volatile必要性
     */
    public class DoubleCheckedLocking {
        // 必须加volatile
        private volatile static DoubleCheckedLocking instance;
        
        public static DoubleCheckedLocking getInstance() {
            if (instance == null) {
                synchronized (DoubleCheckedLocking.class) {
                    if (instance == null) {
                        instance = new DoubleCheckedLocking();
                        // new操作:
                        // 1. 分配内存
                        // 2. 初始化对象
                        // 3. 赋值给instance
                        // 如果没有volatile,2和3可能重排序
                        // 导致其他线程拿到未初始化的对象
                    }
                }
            }
            return instance;
        }
    }
    
    /**
     * 5. 内存屏障的实际应用
     */
    public class MemoryBarrierExample {
        int a = 0;
        volatile int flag = 0;
        
        public void thread1() {
            a = 1;                 // 普通写
            flag = 1;              // volatile写:插入StoreStore屏障,保证a的写完成
                                   // 插入StoreLoad屏障,防止后续读重排序
        }
        
        public void thread2() {
            if (flag == 1) {       // volatile读:插入LoadLoad屏障,防止后续读重排序
                int b = a;          // 一定能看到a=1
                                   // 插入LoadStore屏障,防止后续写重排序
                System.out.println(b);
            }
        }
    }
    
    /**
     * 6. 实战:高性能事件监听器
     */
    public class EventListener {
        private volatile boolean active = true;
        private volatile String lastEvent;
        private AtomicLong eventCount = new AtomicLong();
        
        // 生产者线程
        public void publishEvent(String event) {
            if (!active) return;
            
            // 使用Atomic保证原子性
            eventCount.incrementAndGet();
            
            // 使用volatile保证可见性
            lastEvent = event;  // 最后一个事件,覆盖没关系
        }
        
        // 消费者线程
        public void monitor() {
            while (active) {
                String event = lastEvent;  // 实时看到最新事件
                if (event != null) {
                    // 处理事件
                    long count = eventCount.get();  // 精确计数
                    System.out.println("处理事件:" + event + ",总数:" + count);
                }
                
                // 主动让出CPU
                Thread.yield();
            }
        }
        
        public void shutdown() {
            active = false;  // 状态标志,安全使用volatile
        }
    }
}

/**
 * volatile总结表
 * 
 * 特性        volatile    synchronized    Atomic
 * 可见性      ✓           ✓               ✓
 * 原子性      ✗           ✓               ✓
 * 有序性      ✓           ✓               ✓
 * 性能       高          低(有竞争)       中
 * 
 * volatile能保证:
 * ✓ 单个volatile读/写的原子性
 * ✓ 64位变量(long/double)在64位JVM上的原子性
 * ✓ 可见性和有序性
 * 
 * volatile不能保证:
 * ✗ 复合操作(如i++)的原子性
 * ✗ 多个volatile变量的组合原子性
 * ✗ 线程安全的集合操作
 * 
 * 正确使用场景:
 * 1. 状态标志位
 * 2. 一次性安全发布
 * 3. 独立观察变量
 * 4. 单次写入多次读取
 */

// 面试金句
// "volatile是Java提供的最轻量级的同步机制,它通过内存屏障保证可见性和有序性,
//  但不提供原子性保证。这就像公告牌:volatile确保你看到的是最新消息,
//  但如果两个人同时修改公告牌上的数字,仍然会覆盖彼此的结果。
//  在项目中,我常用volatile实现线程的优雅停止,配合AtomicInteger完成精确计数,
//  这种组合既保证了性能,又解决了原子性问题。理解volatile的边界,
//  才能在并发编程中做出正确的技术选型"

线程池

线程池的核心参数有哪些?线程池的执行流程是怎样的?

线程池七大核心参数

一句话原理:线程池通过七大核心参数构建弹性资源池:核心线程数(常驻)、最大线程数(峰值)、空闲存活时间(回收)、工作队列(缓冲)、线程工厂(命名)、拒绝策略(饱和保护),实现线程的精细化管理。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// ThreadPoolExecutor核心构造方法
public ThreadPoolExecutor(int corePoolSize,      // 核心线程数
                          int maximumPoolSize,   // 最大线程数
                          long keepAliveTime,    // 空闲线程存活时间
                          TimeUnit unit,         // 时间单位
                          BlockingQueue<Runnable> workQueue,  // 工作队列
                          ThreadFactory threadFactory,        // 线程工厂
                          RejectedExecutionHandler handler)   // 拒绝策略

// 源码中的参数校验与存储
public ThreadPoolExecutor(...) {
    // 参数合法性校验
    if (corePoolSize < 0 || maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    
    // 存储核心参数
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

// 四种内置拒绝策略
// AbortPolicy: 抛出RejectedExecutionException(默认)
// CallerRunsPolicy: 调用者线程执行
// DiscardPolicy: 直接丢弃
// DiscardOldestPolicy: 丢弃队列头部任务

项目场景:在电商秒杀系统中,配置核心线程20、最大线程50、队列容量1000,既保证日常流量(20线程处理),又应对秒杀峰值(50线程+队列缓冲),配合CallerRunsPolicy拒绝策略,压力过大时由tomcat线程池处理,实现平滑降级。

线程池执行流程

一句话原理:线程池执行任务时遵循三级缓冲机制:优先核心线程执行 → 核心线程满则加入工作队列 → 队列满则创建临时线程至最大线程 → 超过最大线程触发拒绝策略,形成阶梯式资源分配。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// ThreadPoolExecutor.execute()核心流程
public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    
    int c = ctl.get();
    // 第一步:当前线程数 < 核心线程数,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))  // true表示核心线程
            return;
        c = ctl.get();
    }
    
    // 第二步:线程数 >= 核心线程数,尝试加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 二次检查,防止线程池关闭
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    // 第三步:队列满,尝试创建非核心线程(直到最大线程数)
    else if (!addWorker(command, false))  // false表示非核心线程
        // 第四步:超过最大线程数,执行拒绝策略
        reject(command);
}

// 线程池状态流转(ctl高3位存储状态)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// RUNNING: 接受新任务并处理队列任务
// SHUTDOWN: 不接受新任务,但处理队列任务
// STOP: 不接受新任务,不处理队列任务,中断进行中任务
// TIDYING: 所有任务终止,即将执行terminated()
// TERMINATED: terminated()执行完毕

项目场景:在日志异步处理系统中,平时流量平稳时核心线程处理;日志峰值时先进入队列缓冲;若队列积压超过1000,自动创建临时线程处理;极端突发流量超过最大线程50时,触发CallerRunsPolicy由业务线程处理,保证系统不崩溃。

参数动态调整与监控

一句话原理:线程池提供动态调参能力,可通过setCorePoolSize/setMaximumPoolSize实时调整,配合beforeExecute/afterExecute钩子方法,实现运行时的自适应优化。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 动态调整核心线程数
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0) throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    
    // 如果新核心线程数大于当前,需要创建新线程
    if (delta > 0) {
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    } 
    // 如果新核心线程数小于当前,需要回收空闲线程
    else if (delta < 0) {
        interruptIdleWorkers();
    }
}

// 钩子方法:任务执行前后
public class MonitorThreadPool extends ThreadPoolExecutor {
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        // 记录任务开始时间
        ThreadLocalHolder.setStartTime(System.currentTimeMillis());
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        // 计算任务执行时间
        long cost = System.currentTimeMillis() - ThreadLocalHolder.getStartTime();
        // 上报监控指标
        metricsCollector.recordTaskTime(cost);
        ThreadLocalHolder.clear();
    }
    
    @Override
    protected void terminated() {
        super.terminated();
        // 线程池关闭后的清理工作
        log.info("线程池已终止,共处理任务:" + getCompletedTaskCount());
    }
}

项目场景:在微服务网关中,实现动态线程池监控组件,根据QPS、响应时间、队列积压等指标,通过配置中心动态调整线程池参数。高峰期自动扩容核心线程,低谷期回收空闲线程,实现资源弹性伸缩,CPU利用率提升25%。

实战配置指南与优化经验

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
 * 线程池最佳实践大全
 */
public class ThreadPoolBestPractice {
    
    /**
     * 1. 参数计算公式
     */
    public class ThreadPoolCalculator {
        
        // CPU密集型任务:核心线程数 = CPU核心数 + 1
        int cpuCoreCount = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor cpuIntensive = new ThreadPoolExecutor(
            cpuCoreCount + 1,           // 核心线程
            cpuCoreCount * 2,            // 最大线程
            60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000)  // 有界队列
        );
        
        // IO密集型任务:核心线程数 = CPU核心数 * (1 + 平均等待时间/平均计算时间)
        // 经验值:2 * CPU核心数
        ThreadPoolExecutor ioIntensive = new ThreadPoolExecutor(
            cpuCoreCount * 2,             // 核心线程
            cpuCoreCount * 4,              // 最大线程
            60, TimeUnit.SECONDS,
            new SynchronousQueue<>()       // 直接提交队列
        );
        
        // 混合型任务:根据实际压测确定
        ThreadPoolExecutor mixed = new ThreadPoolExecutor(
            50, 200,                       // 根据压测确定
            30, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000) // 有界队列防OOM
        );
    }
    
    /**
     * 2. 四种工作队列选择
     */
    public class WorkQueueSelection {
        
        // LinkedBlockingQueue:无界队列(任务可无限堆积,可能OOM)
        // 适用场景:任务产生速度可控,对延迟不敏感
        BlockingQueue<Runnable> unboundedQueue = new LinkedBlockingQueue<>();
        
        // ArrayBlockingQueue:有界队列(防OOM,需要合理设置大小)
        // 适用场景:系统资源有限,必须控制积压
        BlockingQueue<Runnable> boundedQueue = new ArrayBlockingQueue<>(1000);
        
        // SynchronousQueue:直接提交(不存储任务,直接创建线程)
        // 适用场景:希望任务立即执行,可接受快速创建线程
        BlockingQueue<Runnable> directQueue = new SynchronousQueue<>();
        
        // DelayQueue:延迟队列(定时任务专用)
        // 适用场景:ScheduledThreadPoolExecutor使用
    }
    
    /**
     * 3. 线程工厂自定义
     */
    public class NamedThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        
        public NamedThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
            t.setDaemon(false);        // 非守护线程
            t.setPriority(Thread.NORM_PRIORITY);
            
            // 设置未捕获异常处理器
            t.setUncaughtExceptionHandler((thread, throwable) -> {
                log.error("线程{}执行异常", thread.getName(), throwable);
            });
            
            return t;
        }
    }
    
    /**
     * 4. 拒绝策略实战选择
     */
    public class RejectStrategySelector {
        private final ThreadPoolExecutor executor;
        
        public RejectStrategySelector() {
            // 场景1:核心业务,不能丢任务 → CallerRunsPolicy
            executor = new ThreadPoolExecutor(
                10, 20, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1000),
                new NamedThreadFactory("core-biz"),
                new ThreadPoolExecutor.CallerRunsPolicy()
            );
            
            // 场景2:非核心日志,可丢弃 → DiscardPolicy
            ThreadPoolExecutor logExecutor = new ThreadPoolExecutor(
                5, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                new NamedThreadFactory("log"),
                new ThreadPoolExecutor.DiscardPolicy()
            );
            
            // 场景3:监控告警,需明确感知 → AbortPolicy(默认)
            ThreadPoolExecutor monitorExecutor = new ThreadPoolExecutor(
                2, 5, 60, TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                new NamedThreadFactory("monitor"),
                new ThreadPoolExecutor.AbortPolicy()
            );
        }
        
        // 自定义拒绝策略:降级到Redis缓存
        public class RedisBackupPolicy implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (r instanceof Task) {
                    Task task = (Task) r;
                    redisClient.lpush("task:backup", task.toJson());
                    log.warn("线程池满,任务{}已备份到Redis", task.getId());
                }
            }
        }
    }
    
    /**
     * 5. 监控指标体系
     */
    public class ThreadPoolMonitor {
        private final ThreadPoolExecutor executor;
        
        public void monitor() {
            // 核心监控指标
            int activeCount = executor.getActiveCount();           // 活跃线程数
            long completedCount = executor.getCompletedTaskCount(); // 完成任务数
            long taskCount = executor.getTaskCount();               // 总任务数
            int queueSize = executor.getQueue().size();            // 队列长度
            int largestPoolSize = executor.getLargestPoolSize();   // 历史最大线程数
            long keepAliveTime = executor.getKeepAliveTime(TimeUnit.SECONDS);
            
            // 计算指标
            double queueRatio = (double) queueSize / executor.getQueue().remainingCapacity();
            double activeRatio = (double) activeCount / executor.getMaximumPoolSize();
            
            // 告警阈值
            if (queueRatio > 0.8) {
                log.warn("队列积压严重,当前占用率:{}", queueRatio);
            }
            
            if (activeRatio > 0.9) {
                log.warn("线程池繁忙,活跃线程占比:{}", activeRatio);
            }
        }
    }
    
    /**
     * 6. 优雅关闭最佳实践
     */
    public void shutdownGracefully(ExecutorService executor) {
        // 第一步:停止接收新任务
        executor.shutdown();
        
        try {
            // 第二步:等待已有任务执行完成(最多60秒)
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                // 第三步:强制停止正在执行的任务
                executor.shutdownNow();
                
                // 第四步:再次等待(最多60秒)
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("线程池无法正常终止");
                }
            }
        } catch (InterruptedException e) {
            // 第五步:当前线程被中断,尝试强制停止
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

/**
 * 线程池参数配置总结表
 * 
 * 场景类型     核心线程   最大线程    队列类型        队列大小    拒绝策略
 * 计算密集型   CPU+1      CPU*2       ArrayBlockingQueue 100-200  Abort
 * IO密集型     CPU*2      CPU*4       SynchronousQueue    N/A     CallerRuns
 * 定时任务     core       最大         DelayQueue         无界     Abort
 * 异步日志     2-5        10-20       LinkedBlockingQueue 1000    Discard
 * 网关业务     50         200         ArrayBlockingQueue  2000    CallerRuns
 * 
 * 配置原则:
 * 1. 队列必须有界,防OOM
 * 2. 线程必须有名字,方便排查
 * 3. 必须有监控告警
 * 4. 拒绝策略要符合业务场景
 * 5. 必须优雅关闭
 */

// 面试金句
// "线程池通过七大参数构建了完整的资源管理闭环:核心线程保障基础处理能力,
//  工作队列实现流量削峰填谷,临时线程应对突发峰值,拒绝策略守住系统底线。
//  就像城市交通系统:核心线程是主干道(固定车道),队列是停车场(缓冲),
//  临时线程是应急车道(临时扩容),拒绝策略是交通管制(饱和保护)。
//  在项目中,我根据业务场景设置参数:IO密集型任务配置2*CPU核心线程,
//  配合SynchronousQueue和CallerRunsPolicy,让系统在10万QPS下依然稳定。
//  更重要的是建立完整的监控体系,通过活跃线程数、队列积压等指标预判风险,
//  实现线程池的动态调优"

如何合理配置线程池大小(CPU 密集型 vs IO 密集型)?

CPU密集型任务配置

一句话原理:CPU密集型任务主要消耗CPU资源,最佳线程数设置为 CPU核心数 + 1(或CPU核心数),避免过多线程导致频繁上下文切换,反而降低CPU利用率。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 获取CPU核心数
int cpuCores = Runtime.getRuntime().availableProcessors();

// CPU密集型线程池配置
ThreadPoolExecutor cpuIntensivePool = new ThreadPoolExecutor(
    cpuCores,                    // 核心线程 = CPU核心数
    cpuCores + 1,                // 最大线程 = CPU核心数+1(预留一个处理页缺失)
    60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100), // 有界队列,避免堆积过多任务
    new NamedThreadFactory("cpu-intensive"),
    new ThreadPoolExecutor.AbortPolicy()
);

// ForkJoinPool的默认并行度
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
// 默认并行度 = CPU核心数 - 1
int parallelism = forkJoinPool.getParallelism();

// 源码中的计算依据
public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

// 为什么是CPU核心数+1? - 来自《Java并发编程实战》
// 即使CPU密集也可能有缺页中断IO暂停+1作为补偿

项目场景:在视频转码服务中,每个转码任务都是纯CPU计算。配置线程数等于CPU核心数(32核服务器配置32线程),实测32线程比64线程吞吐量提升30%,因为减少了线程切换开销,CPU利用率稳定在95%以上。

IO密集型任务配置

一句话原理:IO密集型任务大量时间在等待IO,最佳线程数可按公式 CPU核心数 * (1 + 平均等待时间 / 平均计算时间) 计算,或经验值设为 2 * CPU核心数,通过更多线程提高CPU利用率。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// IO密集型线程池配置公式
int cpuCores = Runtime.getRuntime().availableProcessors();
// 假设平均请求等待100ms,计算10ms
double waitTime = 100.0;  // IO等待时间
double computeTime = 10.0; // 计算时间

// 理论最佳线程数 = CPU核心数 * (1 + 等待时间/计算时间)
int optimalThreads = (int) (cpuCores * (1 + waitTime / computeTime));
// 结果:8核 * (1 + 100/10) = 8 * 11 = 88线程

// 经验值配置(2倍CPU核心数)
ThreadPoolExecutor ioIntensivePool = new ThreadPoolExecutor(
    cpuCores * 2,              // 核心线程
    cpuCores * 4,              // 最大线程
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<>(),   // 直接提交,快速创建临时线程
    new NamedThreadFactory("io-intensive"),
    new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行作为降级
);

// 源码中的设计思路
public void execute(Runnable command) {
    // 对于IO密集型,SynchronousQueue让任务直接提交给线程
    // 避免任务在队列中等待,充分利用IO等待时间
}

项目场景:在API网关服务中,每个请求涉及数据库查询、Redis访问、外部API调用(平均耗时100ms,CPU计算5ms)。配置88线程(8核服务器),吞吐量达到5000 QPS,CPU利用率85%,比固定线程池提升3倍。

动态自适应配置

一句话原理:实际生产环境可通过 动态监控 + 自适应算法 实时调整线程池大小,根据QPS、响应时间、CPU负载等指标,通过配置中心动态调参,实现资源的弹性伸缩。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
 * 动态自适应线程池
 */
public class DynamicThreadPool extends ThreadPoolExecutor {
    
    private final String poolName;
    private final MetricsCollector metrics;
    private final ConfigCenter config;
    
    public DynamicThreadPool(String poolName, int coreSize, int maxSize) {
        super(coreSize, maxSize, 60L, TimeUnit.SECONDS, 
              new ArrayBlockingQueue<>(1000));
        this.poolName = poolName;
        this.metrics = MetricsCollector.getInstance();
        this.config = ConfigCenter.getInstance();
        
        // 启动监控任务
        scheduleAdjustment();
    }
    
    private void scheduleAdjustment() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(this::adjustPoolSize, 10, 30, TimeUnit.SECONDS);
    }
    
    private void adjustPoolSize() {
        // 获取当前指标
        double cpuLoad = metrics.getCpuLoad();           // CPU负载
        double qps = metrics.getQps(poolName);           // QPS
        double avgResponseTime = metrics.getAvgResponseTime(poolName); // 平均响应时间
        int queueSize = this.getQueue().size();          // 队列积压
        
        // 计算目标线程数
        int targetCoreSize = calculateTargetSize(cpuLoad, qps, avgResponseTime, queueSize);
        
        // 通过配置中心动态调整
        if (targetCoreSize != this.getCorePoolSize()) {
            config.updatePoolConfig(poolName, targetCoreSize, this.getMaximumPoolSize());
            this.setCorePoolSize(targetCoreSize);
            log.info("线程池{}核心数调整为:{} (原:{}),CPU负载:{},QPS:{}", 
                     poolName, targetCoreSize, oldSize, cpuLoad, qps);
        }
    }
    
    private int calculateTargetSize(double cpuLoad, double qps, 
                                   double responseTime, int queueSize) {
        int baseThreads = Runtime.getRuntime().availableProcessors();
        
        // CPU负载过高时减少线程
        if (cpuLoad > 0.9) {
            return (int) (baseThreads * 0.8);
        }
        
        // 响应时间超标或队列积压时增加线程
        if (responseTime > 1000 || queueSize > 500) {
            int current = getCorePoolSize();
            return Math.min(current + 2, getMaximumPoolSize());
        }
        
        // 根据QPS估算
        double targetQps = 1000.0 / responseTime * getCorePoolSize();
        if (qps > targetQps * 0.9) {
            return Math.min(getCorePoolSize() + 1, getMaximumPoolSize());
        }
        
        return getCorePoolSize();
    }
}

项目场景:在双11大促期间,订单处理线程池通过动态调整,10分钟内自动从50线程扩容到120线程,扛住10倍流量峰值;大促结束后自动回收,避免资源浪费。相比固定配置,系统吞吐量提升45%,资源利用率提高30%。

实战配置指南与性能调优

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/**
 * 线程池大小配置完整指南
 */
public class ThreadPoolSizeGuide {
    
    /**
     * 1. 任务类型分类测试
     */
    public class TaskClassifier {
        
        // CPU密集型测试
        public void testCpuIntensive() {
            // 执行大量计算任务
            IntStream.range(0, 1000).parallel().forEach(i -> {
                long result = 0;
                for (int j = 0; j < 1000000; j++) {
                    result += Math.pow(j, 2);
                }
            });
        }
        
        // IO密集型测试
        public void testIoIntensive() {
            // 模拟IO等待
            IntStream.range(0, 1000).parallel().forEach(i -> {
                try {
                    Thread.sleep(100);  // 模拟IO
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
    }
    
    /**
     * 2. 压测工具方法
     */
    public class PerformanceTester {
        
        public void benchmark(ThreadPoolExecutor pool, Runnable task, 
                             int taskCount, String taskType) {
            // 预热
            for (int i = 0; i < 100; i++) {
                pool.execute(task);
            }
            
            // 等待任务完成
            while (pool.getCompletedTaskCount() < 100) {
                Thread.sleep(100);
            }
            
            // 清空统计
            pool.purge();
            
            // 正式测试
            long start = System.currentTimeMillis();
            CountDownLatch latch = new CountDownLatch(taskCount);
            
            for (int i = 0; i < taskCount; i++) {
                pool.execute(() -> {
                    try {
                        task.run();
                    } finally {
                        latch.countDown();
                    }
                });
            }
            
            latch.await();
            long cost = System.currentTimeMillis() - start;
            
            // 计算指标
            double qps = taskCount * 1000.0 / cost;
            double cpuLoad = getCpuLoad();
            int activeThreads = pool.getActiveCount();
            
            log.info("{} - 线程数:{} | QPS:{:.2f} | CPU负载:{:.2f} | 耗时:{}ms",
                     taskType, pool.getPoolSize(), qps, cpuLoad, cost);
        }
    }
    
    /**
     * 3. 常用配置参考表
     */
    public class ConfigurationReference {
        
        // 场景1:纯计算(图像处理、加密解密)
        // 配置:core = CPU核心数,max = CPU核心数+1
        // 队列:ArrayBlockingQueue(100-200)
        
        // 场景2:纯IO(数据库访问、RPC调用)
        // 配置:core = 2*CPU核心数,max = 4*CPU核心数
        // 队列:SynchronousQueue 或 小容量队列
        
        // 场景3:混合型(业务处理 + 少量IO)
        // 配置:通过公式计算 + 压测验证
        // 公式:core = CPU核心数 * (1 + wait/compute)
        
        // 场景4:定时任务
        // 配置:core = 1-10,max = 同core
        // 队列:DelayedWorkQueue
        
        // 场景5:异步日志
        // 配置:core = 2-5,max = 10-20
        // 队列:LinkedBlockingQueue(1000-5000)
        // 拒绝策略:DiscardPolicy 或 DiscardOldestPolicy
    }
    
    /**
     * 4. 实际生产配置案例
     */
    public class ProductionCases {
        
        // 案例1:支付系统(IO密集型+强一致性)
        @Bean("paymentExecutor")
        public ThreadPoolExecutor paymentExecutor() {
            int cpuCores = Runtime.getRuntime().availableProcessors();
            return new ThreadPoolExecutor(
                cpuCores * 4,           // 核心线程
                cpuCores * 8,           // 最大线程
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(500),  // 有界队列,防止任务堆积
                new NamedThreadFactory("payment"),
                new ThreadPoolExecutor.CallerRunsPolicy()  // 降级到业务线程
            );
        }
        
        // 案例2:商品推荐(计算密集型)
        @Bean("recommendExecutor")
        public ThreadPoolExecutor recommendExecutor() {
            int cpuCores = Runtime.getRuntime().availableProcessors();
            return new ThreadPoolExecutor(
                cpuCores,               // 核心线程
                cpuCores + 1,            // 最大线程
                30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(200),
                new NamedThreadFactory("recommend"),
                new ThreadPoolExecutor.AbortPolicy()  // 失败快速反馈
            );
        }
        
        // 案例3:日志收集(非核心业务)
        @Bean("logExecutor")
        public ThreadPoolExecutor logExecutor() {
            return new ThreadPoolExecutor(
                2, 10,
                60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10000),  // 较大队列,缓冲日志
                new NamedThreadFactory("log"),
                new ThreadPoolExecutor.DiscardPolicy()  // 压力大时丢弃日志
            );
        }
    }
    
    /**
     * 5. 监控指标与告警阈值
     */
    public class MonitorIndicators {
        
        public void monitorConfig() {
            // CPU密集型监控
            // - CPU利用率 > 90% 正常,< 50% 可能线程太少
            // - 队列积压 > 100 需要扩容
            
            // IO密集型监控
            // - CPU利用率 < 60% 可能需要增加线程
            // - 平均响应时间 > 500ms 可能线程不够或外部服务慢
            // - 活跃线程数接近最大线程数 需要扩容
            
            // 通用告警阈值
            // - 队列占用率 > 80%
            // - 拒绝策略触发次数 > 0
            // - 线程池活跃度 > 90%
            // - 任务超时比例 > 5%
        }
    }
    
    /**
     * 6. 常见问题与解决方案
     */
    public class CommonProblems {
        
        // 问题1:线程数配置过大
        // 症状:CPU上下文切换飙升,响应时间变长
        // 解决:减小线程数,增加队列容量
        
        // 问题2:线程数配置过小
        // 症状:CPU利用率低,队列积压严重
        // 解决:增加线程数,监控响应时间
        
        // 问题3:队列无界导致OOM
        // 症状:内存持续增长,GC频繁
        // 解决:必须使用有界队列,设置合理容量
        
        // 问题4:拒绝策略不当
        // 症状:核心业务任务丢失
        // 解决:CallerRunsPolicy降级或自定义持久化
    }
}

/**
 * 线程池配置总结表
 * 
 * 任务类型    核心线程公式               最大线程公式               队列建议
 * CPU密集型   CPU核心数                  CPU核心数+1               有界队列(100-200)
 * IO密集型    CPU核心数*(1+wait/compute)  2 * 核心线程              SynchronousQueue/小队列
 * 混合型      根据压测确定                根据压测确定              有界队列(500-1000)
 * 
 * 经验公式:
 * 1. CPU密集型:n = Ncpu + 1
 * 2. IO密集型:n = Ncpu * (1 + Twait/Tcompute)
 * 3. 通用公式:n = Ncpu * Utarget * (1 + Twait/Tcompute)
 *    Utarget: 目标CPU利用率(0.8-0.9)
 * 
 * 配置原则:
 * 1. 先分类(CPU/IO)
 * 2. 再公式(理论值)
 * 3. 后压测(验证调整)
 * 4. 终监控(持续优化)
 */

// 面试金句
// "线程池大小配置没有银弹,需要根据任务类型、硬件资源、业务指标综合权衡。
//  我的配置方法论是:先分类(CPU/IO),再公式(计算理论值),后压测(验证调整),终监控(持续优化)。
//  就像做饭一样,CPU密集型是'炒菜',需要快速翻炒(线程数≈锅的数量);
//  IO密集型是'炖汤',可以同时炖多锅(线程数可以更多)。
//  在支付系统中,我们通过压测发现IO密集型线程数设为CPU核心数的4倍时,
//  吞吐量达到峰值,超过8倍反而下降,因为线程切换开销超过了IO并发的收益。
//  理解这些原理才能在生产环境中做出科学的配置决策"

线程池的拒绝策略有哪些?分别适用于什么场景?

AbortPolicy(中止策略)

一句话原理默认拒绝策略,当任务无法提交时直接抛出RejectedExecutionException异常,让调用者明确感知系统过载,适用于核心业务必须感知失败的场景。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AbortPolicy源码实现
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 直接抛出异常,没有任何妥协
        throw new RejectedExecutionException("Task " + r.toString() + 
                                             " rejected from " + e.toString());
    }
}

// 使用示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.AbortPolicy()  // 显式指定(默认就是AbortPolicy)
);

try {
    executor.execute(task);
} catch (RejectedExecutionException e) {
    // 明确感知提交失败,进行降级处理
    log.error("任务提交失败,执行降级", e);
    fallbackService.process(task);
}

项目场景:在交易核心的订单处理系统中,使用AbortPolicy确保系统过载时立即抛出异常,触发熔断降级机制,将订单快速失败返回给用户"系统繁忙,请稍后重试",避免订单丢失或状态不一致。

CallerRunsPolicy(调用者运行策略)

一句话原理:当任务被拒绝时,由提交任务的线程(通常是主线程)直接执行该任务,通过让调用者参与执行来减缓提交速度,实现自然的流量控制,适用于降级有损但不想丢任务的场景。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// CallerRunsPolicy源码实现
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 直接在当前线程(调用者线程)执行任务
            r.run();
        }
    }
}

// 使用示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.CallerRunsPolicy()  // 调用者运行
);

// 主线程提交任务
public void submitTask(Runnable task) {
    // 如果线程池满,主线程会被阻塞执行该任务
    executor.execute(task);
    // 当线程池过载时,这里会成为同步调用
    log.debug("任务提交完成,当前线程:{}", Thread.currentThread().getName());
}

// 流量控制效果
// 提交线程本身成为执行线程自然降低了任务提交速度

项目场景:在API网关中,当业务线程池满时采用CallerRunsPolicy,让Netty的IO线程直接处理业务逻辑。虽然会增加IO线程的负担,但能有效减缓请求提交速度,避免雪崩。同时因为IO线程数量有限,也自然限制了并发度。

DiscardPolicy(丢弃策略)

一句话原理静默丢弃被拒绝的任务,既不抛出异常也不执行,直接丢弃,适用于非核心、可丢失的辅助任务,如日志收集、统计上报等场景。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// DiscardPolicy源码实现
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 什么都不做,直接丢弃
        // 没有任何通知,任务静默消失
    }
}

// 使用示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 5, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    new ThreadPoolExecutor.DiscardPolicy()  // 静默丢弃
);

// 监控报警扩展
public class MonitoredDiscardPolicy extends ThreadPoolExecutor.DiscardPolicy {
    private final AtomicLong discardCount = new AtomicLong();
    private final String poolName;
    
    public MonitoredDiscardPolicy(String poolName) {
        this.poolName = poolName;
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        long count = discardCount.incrementAndGet();
        if (count % 100 == 0) {  // 每100次记录一次日志
            log.warn("线程池{}已丢弃{}个任务", poolName, count);
        }
        super.rejectedExecution(r, e);
    }
}

项目场景:在用户行为日志收集系统中,使用DiscardPolicy处理访问日志。高峰期每秒百万级日志,即使丢弃一部分也不影响核心业务,确保日志系统不会拖垮主业务流程。配合自定义监控,每丢100条记录一次日志,方便评估影响。

DiscardOldestPolicy(丢弃最旧策略)

一句话原理:丢弃队列中等待最久的任务,然后重新尝试提交当前任务,适用于追求最新数据的业务场景,如实时推荐、实时监控等,确保系统处理的是最新任务。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// DiscardOldestPolicy源码实现
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 弹出队列头部元素(最旧的任务)
            e.getQueue().poll();
            // 重新尝试提交当前任务
            e.execute(r);
        }
    }
}

// 使用示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.DiscardOldestPolicy()  // 丢弃最旧
);

// 自定义版本:带优先级判断
public class PriorityDiscardOldestPolicy extends ThreadPoolExecutor.DiscardOldestPolicy {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (r instanceof PriorityTask) {
            PriorityTask newTask = (PriorityTask) r;
            
            // 检查队列中是否有低优先级任务
            for (Object task : e.getQueue()) {
                if (task instanceof PriorityTask) {
                    PriorityTask oldTask = (PriorityTask) task;
                    if (oldTask.getPriority() < newTask.getPriority()) {
                        // 丢弃低优先级的旧任务
                        e.getQueue().remove(oldTask);
                        e.execute(newTask);
                        return;
                    }
                }
            }
        }
        // 没有更低优先级的,丢弃当前任务
        super.rejectedExecution(r, e);
    }
}

项目场景:在实时股票行情推送系统中,使用DiscardOldestPolicy。当处理不过来时,丢弃队列中最旧的行情数据,确保推送的是最新价格。因为旧的价格对用户已经失去价值,保证系统的实时性比完整性更重要。

实战选型指南与最佳实践

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/**
 * 拒绝策略完整选型指南
 */
public class RejectPolicyGuide {
    
    /**
     * 1. 四种策略对比表
     */
    public class PolicyComparison {
        // AbortPolicy:抛异常,最严厉
        // 优点:调用者明确感知失败
        // 缺点:需要调用者处理异常,可能造成业务中断
        // 适用:核心交易、支付等强一致业务
        
        // CallerRunsPolicy:调用者执行,最温和
        // 优点:不丢任务,自然限流
        // 缺点:可能阻塞调用线程
        // 适用:网关、入口层,希望保吞吐但可接受延迟
        
        // DiscardPolicy:静默丢弃,最宽松
        // 优点:永不失败,永不阻塞
        // 缺点:任务丢失无感知
        // 适用:日志、监控、统计等非核心
        
        // DiscardOldestPolicy:丢弃最旧
        // 优点:保证最新数据被处理
        // 缺点:旧任务可能永远得不到执行
        // 适用:实时数据、行情、秒杀状态更新
    }
    
    /**
     * 2. 场景化配置示例
     */
    public class ScenarioConfig {
        
        // 场景1:订单支付(核心业务,不能丢,需要感知)
        @Bean("orderExecutor")
        public ThreadPoolExecutor orderExecutor() {
            return new ThreadPoolExecutor(
                10, 20, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(500),
                new NamedThreadFactory("order"),
                new ThreadPoolExecutor.AbortPolicy() {  // 自定义异常处理
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                        // 发送告警
                        alertService.send("订单线程池满,当前队列:{}", e.getQueue().size());
                        // 记录被拒任务到Redis,后续补偿
                        if (r instanceof OrderTask) {
                            OrderTask task = (OrderTask) r;
                            redisClient.lpush("order:rejected", task.toJson());
                        }
                        // 仍然抛出异常,让调用方降级
                        throw new RejectedExecutionException("订单系统繁忙");
                    }
                }
            );
        }
        
        // 场景2:API网关(流量入口,保吞吐)
        @Bean("gatewayExecutor")
        public ThreadPoolExecutor gatewayExecutor() {
            return new ThreadPoolExecutor(
                50, 200, 60, TimeUnit.SECONDS,
                new SynchronousQueue<>(),  // 直接提交,快速响应
                new NamedThreadFactory("gateway"),
                new ThreadPoolExecutor.CallerRunsPolicy()  // 让Netty线程处理
            );
        }
        
        // 场景3:用户行为日志(非核心,可丢)
        @Bean("logExecutor")
        public ThreadPoolExecutor logExecutor() {
            return new ThreadPoolExecutor(
                2, 5, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10000),  // 大队列缓冲
                new NamedThreadFactory("log"),
                new MonitoredDiscardPolicy("log")  // 带监控的丢弃策略
            );
        }
        
        // 场景4:实时推荐(追求最新)
        @Bean("recommendExecutor")
        public ThreadPoolExecutor recommendExecutor() {
            return new ThreadPoolExecutor(
                10, 20, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                new NamedThreadFactory("recommend"),
                new ThreadPoolExecutor.DiscardOldestPolicy()
            );
        }
    }
    
    /**
     * 3. 自定义拒绝策略实战
     */
    public class CustomPolicy {
        
        // 场景:混合策略 - 先重试,再降级
        public class HybridPolicy implements RejectedExecutionHandler {
            private final int retryCount;
            private final RejectedExecutionHandler fallbackPolicy;
            
            public HybridPolicy(int retryCount, RejectedExecutionHandler fallback) {
                this.retryCount = retryCount;
                this.fallbackPolicy = fallback;
            }
            
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                // 尝试重试几次
                for (int i = 0; i < retryCount; i++) {
                    try {
                        Thread.sleep(100);  // 等待一下
                        e.execute(r);  // 重试
                        return;
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        break;
                    } catch (RejectedExecutionException ex) {
                        // 仍然被拒,继续重试
                    }
                }
                
                // 重试失败,执行降级策略
                fallbackPolicy.rejectedExecution(r, e);
            }
        }
        
        // 场景:优先级策略 - 根据任务优先级决定丢弃哪个
        public class PriorityPolicy implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!(r instanceof PrioritizedTask)) {
                    // 非优先级任务,走默认策略
                    new ThreadPoolExecutor.AbortPolicy().rejectedExecution(r, e);
                    return;
                }
                
                PrioritizedTask newTask = (PrioritizedTask) r;
                
                // 查找队列中优先级最低的任务
                PrioritizedTask lowestPriorityTask = null;
                for (Object task : e.getQueue()) {
                    if (task instanceof PrioritizedTask) {
                        PrioritizedTask pt = (PrioritizedTask) task;
                        if (lowestPriorityTask == null || 
                            pt.getPriority() < lowestPriorityTask.getPriority()) {
                            lowestPriorityTask = pt;
                        }
                    }
                }
                
                // 如果新任务优先级高于队列中的最低优先级,则替换
                if (lowestPriorityTask != null && 
                    newTask.getPriority() > lowestPriorityTask.getPriority()) {
                    e.getQueue().remove(lowestPriorityTask);
                    e.execute(newTask);
                    log.info("丢弃低优先级任务:{},执行高优先级:{}", 
                             lowestPriorityTask.getId(), newTask.getId());
                } else {
                    // 否则丢弃新任务
                    log.warn("优先级不够,丢弃任务:{}", newTask.getId());
                }
            }
        }
        
        // 场景:持久化策略 - 拒绝时存入数据库,后续补偿
        public class PersistPolicy implements RejectedExecutionHandler {
            private final TaskRepository taskRepository;
            
            public PersistPolicy(TaskRepository taskRepository) {
                this.taskRepository = taskRepository;
            }
            
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (r instanceof PersistableTask) {
                    PersistableTask task = (PersistableTask) r;
                    // 存入数据库
                    taskRepository.save(new FailedTask(task, "线程池满"));
                    log.warn("任务已持久化,待后续补偿:{}", task.getId());
                } else {
                    // 不能持久化的任务,记录日志后丢弃
                    log.error("无法持久化的任务被丢弃:{}", r);
                }
            }
        }
    }
    
    /**
     * 4. 监控与告警配置
     */
    public class MonitorConfig {
        
        public void setupMonitor(ThreadPoolExecutor executor, String poolName) {
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            
            scheduler.scheduleAtFixedRate(() -> {
                // 获取拒绝策略触发次数(通过自定义计数器)
                long rejectCount = getRejectCount(poolName);
                if (rejectCount > 0) {
                    // 发送告警
                    alertService.warn("线程池{}发生{}次拒绝", poolName, rejectCount);
                    
                    // 根据拒绝次数调整策略
                    if (rejectCount > 100) {
                        log.error("拒绝次数过多,建议扩容或优化");
                        // 自动扩容或切换策略
                    }
                }
                
                // 监控队列积压
                int queueSize = executor.getQueue().size();
                int remainingCapacity = executor.getQueue().remainingCapacity();
                double usage = (double) queueSize / (queueSize + remainingCapacity);
                
                if (usage > 0.8) {
                    log.warn("队列占用率已达{}%,即将触发拒绝策略", usage * 100);
                }
            }, 10, 10, TimeUnit.SECONDS);
        }
    }
}

/**
 * 拒绝策略选型总结表
 * 
 * 策略名称      行为                   适用场景               风险等级
 * AbortPolicy   抛异常                 核心业务              高(明确失败)
 * CallerRuns    调用者执行             网关/入口层           中(阻塞调用者)
 * DiscardPolicy 静默丢弃               日志/监控              低(任务丢失)
 * DiscardOldest 丢弃最旧,执行最新       实时数据/行情          低(可能永远不执行)
 * 
 * 选型原则:
 * 1. 核心业务:AbortPolicy + 降级补偿
 * 2. 流量入口:CallerRunsPolicy + 监控
 * 3. 非核心业务:DiscardPolicy + 统计
 * 4. 实时场景:DiscardOldestPolicy + 优先级判断
 */

// 面试金句
// "拒绝策略是线程池的最后一道防线,四种策略分别对应不同的容错哲学:
//  AbortPolicy是'宁为玉碎'(核心业务必须感知失败),
//  CallerRunsPolicy是'同甘共苦'(调用者一起扛),
//  DiscardPolicy是'难得糊涂'(非核心任务丢了就丢了),
//  DiscardOldestPolicy是'喜新厌旧'(最新数据最重要)。
//  在支付系统中,我采用分层策略:核心交易用AbortPolicy+Redis补偿,
//  网关层用CallerRunsPolicy自然限流,日志用带监控的DiscardPolicy。
//  这种组合既保证了核心业务的可靠性又提高了系统的整体韧性"

你用过哪些线程池?Executors 提供的几种线程池有什么问题?

常用线程池类型及原理

一句话原理:JDK通过Executors提供四种预定义线程池:FixedThreadPool(固定大小)、CachedThreadPool(可缓存)、SingleThreadExecutor(单线程)、ScheduledThreadPool(定时调度),分别对应不同业务场景的快速使用需求。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 1. FixedThreadPool - 固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 特点:核心线程=最大线程,无线程回收,无界队列

// 2. CachedThreadPool - 可缓存线程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
// 特点:核心线程0,最大线程无限,60秒回收,同步队列

// 3. SingleThreadExecutor - 单线程池
public static ExecutorService newSingleThreadExecutor() {
    return new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 特点:单线程顺序执行,无界队列

// 4. ScheduledThreadPool - 定时线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 特点支持延迟/周期性任务无界延迟队列

项目场景:在监控系统中,使用FixedThreadPool处理固定数量的数据采集任务;在异步短信通知中,使用CachedThreadPool应对突发流量;在顺序写入日志时,使用SingleThreadExecutor保证线程安全;在定时报表生成中,使用ScheduledThreadPool执行周期任务。

FixedThreadPool/CachedThreadPool问题剖析

一句话原理:FixedThreadPool使用无界LinkedBlockingQueue,任务堆积可能导致内存溢出;CachedThreadPool允许创建无限线程,线程数暴增同样导致内存溢出CPU过载,两者都是生产环境的隐患。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// FixedThreadPool的隐患:无界队列
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()); // 无界队列!
}

// 问题演示:任务无限堆积导致OOM
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
    executor.submit(() -> {
        Thread.sleep(1000);  // 任务处理慢
        return "result";
    });
} // 最终:java.lang.OutOfMemoryError

// CachedThreadPool的隐患:无限线程
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  // 最大线程无限!
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// 问题演示:线程无限创建导致OOM
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
    executor.submit(() -> {
        Thread.sleep(1000);  // 每个线程都长时间占用
        return "result";
    });
} // 最终无法创建新线程 -> OOM

项目场景:某次线上事故中,团队使用FixedThreadPool处理消息队列消息,消费者处理变慢导致消息堆积,无界队列占用所有内存,引发OOM崩溃。改用ArrayBlockingQueue后,队列满时触发拒绝策略,保护了系统稳定性。

SingleThreadExecutor/ScheduledThreadPool问题剖析

一句话原理:SingleThreadExecutor同样存在无界队列风险;ScheduledThreadPool使用无界延迟队列,任务堆积同样导致OOM,且异常未捕获时线程退出,后续任务不再执行。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// SingleThreadExecutor的隐患:无界队列
public static ExecutorService newSingleThreadExecutor() {
    return new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()); // 同样无界
}

// ScheduledThreadPool的隐患:无界延迟队列
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue()); // 无界队列!
}

// 异常处理隐患
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> {
    throw new RuntimeException("任务异常");  // 线程退出
}, 0, 1, TimeUnit.SECONDS);
// 执行一次后,线程消失,后续任务不再执行,没有任何日志!

// 正确做法:捕获所有异常
executor.scheduleAtFixedRate(() -> {
    try {
        // 业务逻辑
    } catch (Exception e) {
        log.error("定时任务异常", e);  // 必须捕获
    }
}, 0, 1, TimeUnit.SECONDS);

项目场景:在定时统计任务中,某个凌晨任务抛出空指针异常,导致线程池中唯一线程退出,后续所有定时任务全部停止,直到三天后才被发现。之后强制要求所有定时任务必须内部捕获异常,并增加线程池监控。

最佳实践与替代方案

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
 * 线程池最佳实践:拒绝Executors,手动创建
 */
public class ThreadPoolBestPractice {
    
    /**
     * 1. CPU密集型任务
     */
    public ThreadPoolExecutor cpuIntensivePool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCores, cpuCores + 1,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),  // 有界队列防OOM
            new NamedThreadFactory("cpu-intensive"),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
    
    /**
     * 2. IO密集型任务
     */
    public ThreadPoolExecutor ioIntensivePool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCores * 2, cpuCores * 4,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2000),
            new NamedThreadFactory("io-intensive"),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    /**
     * 3. 定时任务线程池
     */
    public ScheduledExecutorService scheduledPool() {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
            5,
            new NamedThreadFactory("scheduled")
        );
        executor.setRemoveOnCancelPolicy(true);  // 取消时移除队列
        executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        return executor;
    }
    
    /**
     * 4. 混合型任务 - 可动态调整
     */
    public class DynamicThreadPool extends ThreadPoolExecutor {
        public DynamicThreadPool() {
            super(10, 20, 60L, TimeUnit.SECONDS,
                  new ArrayBlockingQueue<>(1000),
                  new NamedThreadFactory("dynamic"),
                  new ThreadPoolExecutor.AbortPolicy());
        }
        
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            // 记录开始时间
        }
        
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t != null) {
                log.error("任务执行异常", t);
            }
            // 监控任务执行时间
        }
    }
    
    /**
     * 5. 线程工厂:命名+守护+异常处理
     */
    public static class NamedThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final boolean daemon;
        
        public NamedThreadFactory(String namePrefix) {
            this(namePrefix, false);
        }
        
        public NamedThreadFactory(String namePrefix, boolean daemon) {
            this.namePrefix = namePrefix;
            this.daemon = daemon;
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
            t.setDaemon(daemon);
            t.setUncaughtExceptionHandler((thread, e) -> 
                log.error("线程{}异常", thread.getName(), e));
            return t;
        }
    }
}

/**
 * Executors陷阱总结表
 * 
 * 线程池类型       问题                    后果                解决方案
 * FixedThreadPool  无界队列                OOM                 ArrayBlockingQueue
 * CachedThreadPool 无限线程                OOM                 new ThreadPoolExecutor
 * SingleThread     无界队列                OOM                 有界队列
 * ScheduledThread  无界队列 + 异常静默      OOM + 任务停止      手动创建+异常捕获
 * 
 * 阿里巴巴Java开发规范:
 * 【强制】线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,
 * 这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
 */

// 面试金句
// "Executors提供的四种线程池就像'快餐',方便但不够健康:
//  FixedThreadPool的无界队列可能撑爆内存,
//  CachedThreadPool的无限线程同样导致OOM,
//  ScheduledThreadPool不仅队列无界,异常时还会静默消失。
//  在实际项目中,我曾经就踩过这个坑:一个定时任务因为NPE停止运行三天才被发现。
//  所以现在团队强制要求:所有线程池必须通过ThreadPoolExecutor手动创建,
//  指定有界队列、明确拒绝策略、自定义线程工厂,就像'私房菜'一样精雕细琢,
//  虽然代码量多了几行但系统的稳定性和可维护性大大提升"

AQS

AQS 是什么?它的核心设计思想是什么?

AQS是什么

一句话原理:AQS(AbstractQueuedSynchronizer)是Java并发包的基石,提供了一个基于FIFO等待队列的实现框架,用于构建锁和同步器,如ReentrantLock、CountDownLatch、Semaphore等都是基于AQS实现的。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// AQS核心结构
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    // 1. 同步状态 - volatile保证可见性
    private volatile int state;
    
    // 2. CLH等待队列头尾指针
    private transient volatile Node head;
    private transient volatile Node tail;
    
    // 3. 队列节点结构
    static final class Node {
        volatile int waitStatus;    // 等待状态
        volatile Node prev;         // 前驱节点
        volatile Node next;         // 后继节点
        volatile Thread thread;     // 等待的线程
        Node nextWaiter;            // 条件队列中的后继节点
    }
    
    // 4. 核心操作方法
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    // 5. 模板方法设计
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&            // 子类实现尝试获取
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败入队
            selfInterrupt();
    }
}

项目场景:在分布式锁组件中,通过继承AQS实现自定义的同步器,利用state表示锁持有状态,CLH队列管理等待线程,实现了公平/非公平锁的切换,支撑了每天数亿次的高并发锁请求。

AQS核心设计思想

一句话原理:AQS体现了三大设计哲学:模板方法模式(定义算法骨架,子类实现细节)、CLH锁变体(自旋+阻塞混合的等待队列)、CAS无锁并发(通过乐观锁维护状态和队列),实现了一个可扩展的同步器框架。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 1. 模板方法模式 - 定义骨架,子类实现钩子
public abstract class AbstractQueuedSynchronizer {
    
    // 模板方法:获取锁的公共流程
    public final void acquire(int arg) {
        // 1. 尝试获取(子类实现)
        if (!tryAcquire(arg)) {
            // 2. 获取失败,创建节点加入队列
            Node node = addWaiter(Node.EXCLUSIVE);
            // 3. 在队列中自旋/阻塞等待
            if (acquireQueued(node, arg))
                selfInterrupt();
        }
    }
    
    // 子类需要实现的钩子方法
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
}

// 2. CLH队列变体 - 自旋+阻塞的混合实现
final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 前驱是头节点,尝试获取锁(自旋)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return interrupted;
            }
            // 获取失败,检查是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt(); // 线程阻塞
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

// 3. CAS无锁并发 - 保证线程安全
private Node addWaiter(Node mode) {
    Node node = new Node(mode);
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            // CAS设置尾节点
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue(); // 初始化队列
        }
    }
}

项目场景:在Sentinel的限流组件中,利用AQS的共享模式实现并发令牌获取。通过继承AQS重写tryAcquireShared/tryReleaseShared,state表示剩余令牌数,CAS更新保证原子性,CLH队列管理等待线程,实现了高性能的限流器。

AQS的两种模式

一句话原理:AQS支持独占模式(如ReentrantLock)和共享模式(如Semaphore、CountDownLatch),通过state的语义不同实现不同同步器,同时提供条件队列(ConditionObject)实现线程间的精确唤醒。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// 1. 独占模式 - ReentrantLock实现
public class ReentrantLock {
    // 内部Sync继承AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 独占式获取锁
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 状态为0,尝试获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                // 重入
                int nextc = c + acquires;
                setState(nextc);
                return true;
            }
            return false;
        }
        
        // 独占式释放锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
}

// 2. 共享模式 - Semaphore实现
public class Semaphore {
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);  // state表示剩余许可数
        }
        
        // 共享式获取
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || 
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        
        // 共享式释放
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
}

// 3. 条件队列 - 精确唤醒
public class ConditionObject implements Condition {
    private Node firstWaiter;  // 条件队列头
    private Node lastWaiter;   // 条件队列尾
    
    // await操作:当前线程释放锁,进入条件队列
    public final void await() throws InterruptedException {
        // 创建条件队列节点
        Node node = addConditionWaiter();
        // 释放锁
        int savedState = fullyRelease(node);
        // 阻塞直到被signal
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
        }
    }
    
    // signal操作:将条件队列节点移到同步队列
    public final void signal() {
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);  // 转移节点到同步队列
    }
}

项目场景:在RPC框架的异步转同步调用中,利用AQS的条件队列实现等待-通知机制。调用线程执行await()等待响应,响应到达时执行signal()唤醒调用线程,相比wait/notify更精确可控,避免了惊群效应。

实战应用与扩展

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
 * 基于AQS的自定义同步器实战
 */
public class CustomAQSExample {
    
    /**
     * 1. 自定义独占锁 - 不可重入
     */
    public class NonReentrantLock extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;
        
        @Override
        protected boolean tryAcquire(int acquires) {
            // 状态为0才能获取
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        @Override
        protected boolean tryRelease(int releases) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);  // 直接释放,不考虑重入
            return true;
        }
        
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1 && 
                   getExclusiveOwnerThread() == Thread.currentThread();
        }
        
        public void lock() {
            acquire(1);
        }
        
        public void unlock() {
            release(1);
        }
    }
    
    /**
     * 2. 自定义共享锁 - 限流器
     */
    public class RateLimiter extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;
        
        public RateLimiter(int permits) {
            setState(permits);
        }
        
        @Override
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0) {
                    return remaining;  // 负数表示获取失败
                }
                if (compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }
        
        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }
        
        public boolean tryAcquire() {
            return tryAcquireShared(1) >= 0;
        }
        
        public void release() {
            releaseShared(1);
        }
    }
    
    /**
     * 3. 可超时锁 - 避免死锁
     */
    public class TimeoutLock extends AbstractQueuedSynchronizer {
        
        public boolean tryLock(long timeout, TimeUnit unit) 
                throws InterruptedException {
            return tryAcquireNanos(1, unit.toNanos(timeout));
        }
        
        @Override
        protected boolean tryAcquire(int arg) {
            // 非公平获取
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
    
    /**
     * 4. 实战:数据库连接池
     */
    public class ConnectionPool {
        private final List<Connection> connections;
        private final Sync sync;
        
        public ConnectionPool(int size) {
            connections = new ArrayList<>(size);
            for (int i = 0; i < size; i++) {
                connections.add(createConnection());
            }
            sync = new Sync(size);
        }
        
        // 基于AQS的同步器
        private class Sync extends AbstractQueuedSynchronizer {
            Sync(int permits) {
                setState(permits);
            }
            
            @Override
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - 1;
                    if (remaining < 0 || compareAndSetState(available, remaining)) {
                        return remaining;
                    }
                }
            }
            
            @Override
            protected boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + 1;
                    if (compareAndSetState(current, next)) {
                        return true;
                    }
                }
            }
        }
        
        public Connection borrowConnection() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
            return connections.remove(0);  // 获取后移除
        }
        
        public void returnConnection(Connection conn) {
            connections.add(conn);
            sync.releaseShared(1);
        }
    }
}

/**
 * AQS设计思想总结
 * 
 * 1. 模板方法模式:固定流程,可变细节
 *    - 获取锁流程:tryAcquire → addWaiter → acquireQueued
 *    - 释放锁流程:tryRelease → unparkSuccessor
 * 
 * 2. CLH队列变体:自旋+阻塞的混合
 *    - 前几个节点自旋等待
 *    - 后续节点阻塞,避免CPU浪费
 * 
 * 3. CAS无锁并发:保证线程安全
 *    - 状态更新:compareAndSetState
 *    - 队列操作:compareAndSetTail/compareAndSetHead
 * 
 * 4. 两种模式:独占+共享
 *    - state含义由子类定义
 *    - 共享模式可同时多个线程获取
 * 
 * 5. 条件队列:精确唤醒
 *    - 独立的条件等待队列
 *    - signal时转移到同步队列
 */

// 面试金句
// "AQS是Java并发包的灵魂,它的设计堪称教科书级:
//  模板方法模式让子类只需关注'是否允许获取'的业务逻辑;
//  CLH队列变体通过自旋+阻塞的混合策略平衡了性能和CPU消耗;
//  CAS保证了所有状态变更的线程安全性。
//  就像乐高积木的底座,ReentrantLock、CountDownLatch、Semaphore都是基于它搭建的。
//  在项目中,我曾通过继承AQS实现了一个数据库连接池的同步器,
//  只用几十行代码就完成了复杂的等待-通知逻辑,这让我深刻体会到
//  一个好的框架设计能给开发者带来多大的便利"

AQS 是如何实现锁的获取与释放的?

锁获取核心流程

一句话原理:AQS通过模板方法模式定义锁获取的标准流程:先尝试快速获取(tryAcquire),失败则创建节点加入CLH队列尾部,然后通过自旋+阻塞的方式等待前驱节点唤醒,整个过程由CAS保证线程安全。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// AQS独占锁获取核心方法
public final void acquire(int arg) {
    // 1. 先尝试快速获取(子类实现)
    if (!tryAcquire(arg)) {
        // 2. 获取失败,创建独占节点加入等待队列
        Node node = addWaiter(Node.EXCLUSIVE);
        // 3. 在队列中自旋/阻塞等待
        if (acquireQueued(node, arg))
            selfInterrupt();  // 等待过程中被中断
    }
}

// 创建节点并入队
private Node addWaiter(Node mode) {
    Node node = new Node(mode);  // 创建当前线程的节点
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            // CAS设置尾节点,保证线程安全
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();  // 初始化队列
        }
    }
}

// 队列中等待获取锁
final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();  // 获取前驱节点
            // 前驱是头节点,尝试获取锁(自旋机会)
            if (p == head && tryAcquire(arg)) {
                setHead(node);  // 成为新的头节点
                p.next = null;  // 原头节点出队
                return interrupted;
            }
            // 获取失败,检查是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt(); // 线程阻塞
        }
    } catch (Throwable t) {
        cancelAcquire(node);  // 异常时取消获取
        throw t;
    }
}

项目场景:在数据库连接池中,当所有连接都被占用时,新的请求线程通过AQS的acquireQueued进入等待队列。前几个线程自旋尝试获取(减少上下文切换),后续线程直接阻塞(避免CPU浪费),这种机制让连接池在高并发下既能快速响应又能节省资源。

锁释放核心流程

一句话原理:锁释放时先尝试释放(tryRelease),成功后找到队列中第一个有效等待节点,通过unpark唤醒该节点线程,被唤醒的线程会重新尝试获取锁,形成头节点接力的公平传递机制。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// AQS独占锁释放核心方法
public final boolean release(int arg) {
    // 1. 尝试释放锁(子类实现)
    if (tryRelease(arg)) {
        Node h = head;
        // 2. 队列中有等待节点,唤醒后继
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);  // 唤醒后继节点
        return true;
    }
    return false;
}

// 唤醒后继节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);  // 清除状态
    
    // 获取下一个需要唤醒的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {  // 后继取消或不存在
        s = null;
        // 从尾部向前找第一个有效节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);  // 唤醒线程
}

// 被唤醒线程继续执行acquireQueued循环
final boolean acquireQueued(final Node node, int arg) {
    for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {  // 唤醒后成功获取
            setHead(node);
            p.next = null;
            return interrupted;
        }
        // ...
    }
}

项目场景:在ReentrantLock的公平锁实现中,释放锁时总是唤醒等待时间最长的线程(队列头部后的第一个节点)。这种设计保证了请求的公平性,在账务处理系统中避免了线程饥饿问题,确保了每个请求最终都能得到处理。

中断与超时处理

一句话原理:AQS提供可中断(acquireInterruptibly)和可超时(tryAcquireNanos)的获取版本,通过检测中断标志和超时时间,在线程等待过程中及时响应中断或超时,避免永久阻塞。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 可中断的锁获取
public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())  // 先检查中断
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);  // 可中断的队列等待
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())  // 阻塞时检查中断
                throw new InterruptedException();  // 中断时抛出异常
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

// 带超时的锁获取
public final boolean tryAcquireNanos(int arg, long nanosTimeout) 
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || 
           doAcquireNanos(arg, nanosTimeout);  // 超时版本
}

private boolean doAcquireNanos(int arg, long nanosTimeout) 
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {  // 超时返回false
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);  // 限时阻塞
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

项目场景:在分布式锁组件中,为了防止死锁,所有锁获取都设置超时时间。通过tryAcquireNanos实现"获取锁最多等待3秒"的业务需求,超时后自动放弃并返回失败,配合重试机制,既保证了系统可用性又避免了线程永久阻塞。

完整流程源码分析

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
 * AQS锁获取释放完整流程源码分析
 */
public class AQSFlowAnalysis {
    
    /**
     * 1. ReentrantLock非公平锁获取流程
     */
    public void nonfairLockFlow() {
        // ReentrantLock.lock() 调用链
        // → NonfairSync.lock()
        //   → compareAndSetState(0, 1) 直接CAS尝试一次
        //   → AbstractQueuedSynchronizer.acquire(1)
        //     → NonfairSync.tryAcquire(1)
        //       → nonfairTryAcquire(1) 再次尝试CAS
        //         → 成功则设置owner线程
        //         → 失败则返回false
        //     → addWaiter(Node.EXCLUSIVE) 入队
        //     → acquireQueued(node, 1) 队列中等待
    }
    
    /**
     * 2. 节点状态流转
     */
    public void nodeStatusFlow() {
        // Node.waitStatus 取值:
        // 0: 初始状态
        // CANCELLED = 1: 线程取消等待
        // SIGNAL = -1: 后继需要唤醒
        // CONDITION = -2: 在条件队列中
        // PROPAGATE = -3: 共享模式传播
        
        // 节点状态变化:
        // 新建节点 → 0
        // 加入队列 → 设置前驱为SIGNAL(shouldParkAfterFailedAcquire)
        // 被唤醒 → 尝试获取锁,成功后成为头节点
        // 取消等待 → 设置为CANCELLED
    }
    
    /**
     * 3. 共享模式获取
     */
    public void sharedAcquireFlow() {
        // Semaphore.acquire() 调用链
        // → AbstractQueuedSynchronizer.acquireSharedInterruptibly(1)
        //   → tryAcquireShared(1) 子类实现
        //     → 剩余许可 > 0 返回正数(成功)
        //     → 剩余许可 = 0 返回0(成功,无剩余)
        //     → 剩余许可 < 0 返回负数(失败)
        //   → 失败时调用doAcquireSharedInterruptibly(1)
        //     → 创建共享节点加入队列
        //     → 在队列中等待,被唤醒后传播唤醒后续共享节点
    }
    
    /**
     * 4. 实战:监控锁等待队列
     */
    public class LockMonitor {
        private final ReentrantLock lock = new ReentrantLock();
        
        public void monitorLock() {
            // 通过AQS的getQueuedThreads方法获取等待线程
            Collection<Thread> queuedThreads = lock.getQueuedThreads();
            log.info("当前等待锁的线程数:{}", queuedThreads.size());
            
            // 判断是否有线程在等待
            boolean hasQueued = lock.hasQueuedThreads();
            
            // 判断当前线程是否在队列中
            boolean queued = lock.hasQueuedThread(Thread.currentThread());
            
            // 获取队列长度
            int queueLength = lock.getQueueLength();
        }
    }
    
    /**
     * 5. 条件队列与同步队列的转换
     */
    public void conditionQueueFlow() {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        
        // await流程:
        // 1. 当前线程持有锁
        // 2. 创建条件队列节点,加入条件队列
        // 3. 完全释放锁(保存重入次数)
        // 4. 阻塞直到被signal或中断
        // 5. 被唤醒后重新竞争锁
        // 6. 从条件队列移除,加入同步队列
        
        // signal流程:
        // 1. 将条件队列的第一个节点转移到同步队列
        // 2. 唤醒该节点线程(如果需要)
    }
}

/**
 * AQS获取释放总结表
 * 
 * 阶段          操作            线程状态        关键方法
 * 尝试获取      tryAcquire      运行中          compareAndSetState
 * 入队          addWaiter      运行中          compareAndSetTail
 * 自旋等待      acquireQueued   运行中          tryAcquire
 * 阻塞          park            阻塞            LockSupport.park
 * 唤醒          unpark          就绪            LockSupport.unpark
 * 获取成功      setHead         运行中          acquireQueued返回
 * 释放锁        tryRelease      运行中          setState
 * 唤醒后继      unparkSuccessor 运行中          unpark
 * 
 * 核心设计特点:
 * 1. 先自旋后阻塞:减少上下文切换
 * 2. CAS无锁并发:保证队列操作线程安全
 * 3. 精确唤醒:只唤醒下一个节点
 * 4. 支持中断/超时:避免永久阻塞
 */

// 面试金句
// "AQS的锁获取释放流程就像'排队办理业务':
//  tryAcquire是'看看柜台有没有空'(快速尝试),
//  addWaiter是'取号排队'(入队),
//  acquireQueued是'等待叫号'(自旋+阻塞),
//  release是'办理完叫下一位'(唤醒后继)。
//  这种设计巧妙之处在于:前几个人可以'凑近柜台看看'(自旋),
//  后面的人可以'坐着休息'(阻塞),既保证了效率又节省了资源。
//  在我开发的分布式限流组件中,正是利用这个机制让数万线程优雅排队,
//  避免了系统雪崩理解这个流程才能真正掌握Java并发的精髓"

ReentrantLock 是如何基于 AQS 实现的?

整体架构设计

一句话原理:ReentrantLock通过内部Sync类继承AQS,利用state状态表示锁持有次数(0表示未持有,≥1表示持有且记录重入次数),通过exclusiveOwnerThread记录当前持有锁的线程,实现可重入特性。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// ReentrantLock内部架构
public class ReentrantLock implements Lock {
    // 同步器:所有并发逻辑委托给Sync
    private final Sync sync;
    
    // 继承AQS的内部抽象类
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 记录持有锁的线程(继承自AbstractOwnableSynchronizer)
        // protected transient Thread exclusiveOwnerThread;
        
        // 非公平尝试获取(子类共用)
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();  // 获取状态
            if (c == 0) {
                // 无锁状态,CAS尝试获取
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);  // 设置持有线程
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                // 当前线程已持有,重入
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);  // 更新状态(不需要CAS,因为当前线程独占)
                return true;
            }
            return false;
        }
        
        // 释放锁(子类共用)
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);  // 完全释放,清空持有线程
            }
            setState(c);  // 更新状态(当前线程独占)
            return free;
        }
    }
}

项目场景:在交易系统的账务处理中,使用ReentrantLock保护账户余额操作。state记录重入次数,同一个线程多次进入转账方法(如A转B,B转A的嵌套调用)不会死锁,这正是基于AQS的exclusiveOwnerThread和state协同实现的。

非公平锁实现

一句话原理:非公平锁在获取时直接CAS竞争,不管队列中是否有等待线程,体现"插队"行为;释放时唤醒后继节点,通过抢占式设计提高吞吐量,但可能导致线程饥饿。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 非公平锁实现
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    
    // lock方法:直接CAS竞争
    final void lock() {
        // 上来就CAS尝试获取锁(插队)
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);  // 获取失败,进入AQS流程
    }
    
    // 尝试获取(AQS的tryAcquire调用)
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);  // 调用父类的nonfairTryAcquire
    }
}

// nonfairTryAcquire实现(插队的秘密)
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 即使队列中有等待线程,这里仍然CAS竞争(插队)
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 重入逻辑
        int nextc = c + acquires;
        setState(nextc);
        return true;
    }
    return false;
}

// 非公平锁的释放(与公平锁相同)
// 释放时唤醒队列中的下一个节点

项目场景:在网关流量控制中,使用非公平ReentrantLock保护限流计数器。允许新请求"插队"获取锁,虽然极端情况下老请求可能饥饿,但吞吐量提升30%,网关场景下更看重整体处理能力。

公平锁实现

一句话原理:公平锁在获取时先检查队列中是否有等待线程,若有则直接入队排队,体现"先来后到";通过**hasQueuedPredecessors()**方法判断是否应该排队,保证获取锁的顺序与请求时间一致。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 公平锁实现
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    
    // lock方法:直接进入AQS流程(没有插队机会)
    final void lock() {
        acquire(1);  // 直接走AQS标准流程,没有CAS插队
    }
    
    // 公平尝试获取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 关键区别:先检查队列中是否有等待线程
            if (!hasQueuedPredecessors() &&  // 没有前驱等待才竞争
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            // 重入逻辑(与非公平相同)
            int nextc = c + acquires;
            setState(nextc);
            return true;
        }
        return false;
    }
}

// 判断是否有前驱等待节点
public final boolean hasQueuedPredecessors() {
    Node t = tail;  // 尾节点
    Node h = head;  // 头节点
    Node s;
    // 头尾不同且(第二个节点不是当前线程)说明有等待线程
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

// 公平锁的释放(与非公平相同,都是唤醒后继)
protected final boolean tryRelease(int releases) {
    // 同Sync实现
}

项目场景:在账务系统的提现请求处理中,使用公平ReentrantLock保证请求按照到达顺序处理。虽然吞吐量略低,但避免了长事务被短事务无限插队导致的超时问题,满足了业务的公平性要求。

Condition条件队列

一句话原理:ReentrantLock通过newCondition()创建基于AQS内部类ConditionObject的条件队列,实现线程的精确等待与唤醒,每个Condition维护独立的等待队列,比wait/notify更灵活可控。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Condition条件队列实现
public class ConditionObject implements Condition {
    // 条件队列的头尾节点
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    
    // await操作:当前线程释放锁,进入条件队列
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 创建条件队列节点
        Node node = addConditionWaiter();
        // 完全释放锁(保存重入次数)
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 检查是否在同步队列中(被signal会转移到同步队列)
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);  // 阻塞
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 被唤醒后重新竞争锁
        if (acquireQueued(node, savedState) && interruptMode != -1)
            interruptMode = 1;
        // 清理取消的节点
        // ...
    }
    
    // signal操作:将条件队列节点转移到同步队列
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);  // 转移节点
    }
    
    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;  // 从条件队列移除
        } while (!transferForSignal(first) &&  // 转移到同步队列
                 (first = firstWaiter) != null);
    }
    
    // 转移到同步队列
    final boolean transferForSignal(Node node) {
        // CAS修改状态
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
        // 加入同步队列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
        // 唤醒线程
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
}

项目场景:在消息队列的消费者客户端,使用ReentrantLock的Condition实现精细化线程协作:队列满时生产者等待notFull,队列空时消费者等待notEmpty。比synchronized的单一等待队列效率更高,避免了"惊群效应"。

完整源码流程与实战

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/**
 * ReentrantLock基于AQS实现完整解析
 */
public class ReentrantLockAnalysis {
    
    /**
     * 1. 非公平锁完整调用链
     */
    public void nonfairFlow() {
        // lock()调用链
        // NonfairSync.lock()
        //   ├─ 直接CAS插队:compareAndSetState(0, 1)
        //   └─ 失败则 acquire(1)
        //        ├─ tryAcquire(1) → nonfairTryAcquire(1)
        //        │    ├─ 再次CAS尝试(又一次插队机会)
        //        │    └─ 重入检查
        //        ├─ addWaiter(Node.EXCLUSIVE) 入队
        //        └─ acquireQueued(node, 1) 队列等待
        //             ├─ 前驱是head则尝试获取
        //             ├─ 否则shouldParkAfterFailedAcquire
        //             └─ parkAndCheckInterrupt() 阻塞
    }
    
    /**
     * 2. 公平锁完整调用链
     */
    public void fairFlow() {
        // lock()调用链
        // FairSync.lock() → acquire(1)
        //   ├─ tryAcquire(1) → 公平尝试
        //   │    ├─ 检查hasQueuedPredecessors()(无前驱才竞争)
        //   │    └─ 重入检查
        //   ├─ addWaiter(Node.EXCLUSIVE) 入队
        //   └─ acquireQueued(node, 1) 队列等待(与非公平相同)
    }
    
    /**
     * 3. 可重入实现原理
     */
    public void reentrantPrinciple() {
        ReentrantLock lock = new ReentrantLock();
        
        // 第一次获取:state=0 → CAS设为1
        lock.lock();
        // state=1, exclusiveOwnerThread=ThreadA
        
        // 第二次获取(同一个线程):state=1 → 检查owner=ThreadA
        lock.lock();  // 重入
        // state=2, exclusiveOwnerThread=ThreadA
        
        // 释放一次:state=2 → state=1
        lock.unlock();
        // 状态变为1,仍然持有锁
        
        // 再释放:state=1 → state=0
        lock.unlock();
        // state=0, exclusiveOwnerThread=null,完全释放
    }
    
    /**
     * 4. 实战:生产者消费者模型
     */
    public class BoundedQueue<T> {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = lock.newCondition();
        private final Condition notEmpty = lock.newCondition();
        private final T[] items;
        private int putIndex, takeIndex, count;
        
        @SuppressWarnings("unchecked")
        public BoundedQueue(int capacity) {
            items = (T[]) new Object[capacity];
        }
        
        public void put(T item) throws InterruptedException {
            lock.lock();  // 获取锁
            try {
                while (count == items.length) {
                    // 队列满,等待notFull条件
                    notFull.await();
                }
                items[putIndex] = item;
                if (++putIndex == items.length) putIndex = 0;
                count++;
                // 唤醒等待取数据的线程
                notEmpty.signal();
            } finally {
                lock.unlock();  // 释放锁
            }
        }
        
        public T take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    // 队列空,等待notEmpty条件
                    notEmpty.await();
                }
                T item = items[takeIndex];
                items[takeIndex] = null;
                if (++takeIndex == items.length) takeIndex = 0;
                count--;
                // 唤醒等待放数据的线程
                notFull.signal();
                return item;
            } finally {
                lock.unlock();
            }
        }
    }
    
    /**
     * 5. 锁监控与调试
     */
    public class LockMonitor {
        private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
        
        public void businessMethod() {
            boolean acquired = false;
            try {
                // 尝试获取锁,带超时
                acquired = lock.tryLock(3, TimeUnit.SECONDS);
                if (!acquired) {
                    log.warn("获取锁超时,当前等待队列长度:{}", lock.getQueueLength());
                    // 降级处理
                    return;
                }
                
                // 业务逻辑
                log.debug("当前持有锁的线程:{},等待线程数:{}", 
                         lock.getOwner(), lock.getQueueLength());
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("被中断", e);
            } finally {
                if (acquired) {
                    // 检查当前线程是否持有锁
                    if (lock.isHeldByCurrentThread()) {
                        lock.unlock();
                    }
                }
            }
        }
    }
    
    /**
     * 6. 性能对比测试
     */
    public class PerformanceTest {
        private final ReentrantLock fairLock = new ReentrantLock(true);
        private final ReentrantLock nonfairLock = new ReentrantLock(false);
        private int counter = 0;
        
        public void testFairVsNonfair(int threads, int loops) {
            // 公平锁吞吐量:约80万次/秒
            // 非公平锁吞吐量:约120万次/秒
            // 非公平锁性能提升约50%,但有线程饥饿风险
            
            // 适用场景:
            // 公平锁:账务、订单等对顺序有要求的业务
            // 非公平锁:缓存、计数器等高性能场景
        }
    }
}

/**
 * ReentrantLock实现总结表
 * 
 * 组件              作用                          AQS机制
 * Sync              抽象同步器                     继承AQS
 * NonfairSync       非公平锁实现                   插队CAS + nonfairTryAcquire
 * FairSync          公平锁实现                     hasQueuedPredecessors检查
 * ConditionObject   条件队列                      独立等待队列 + 转移机制
 * 
 * 核心状态:
 * state: 0-未持有, ≥1-持有次数
 * exclusiveOwnerThread: 当前持有线程
 * 
 * 关键特性:
 * 1. 可重入:state记录重入次数
 * 2. 公平/非公平:通过hasQueuedPredecessors控制
 * 3. 条件队列:多个Condition独立等待
 * 4. 可中断:响应InterruptedException
 * 5. 可超时:tryLock(timeout)支持
 */

// 面试金句
// "ReentrantLock是AQS教科书级的应用案例:state记录重入次数,
//  exclusiveOwnerThread标识持有线程,公平/非公平通过hasQueuedPredecessors区分,
//  Condition提供精确等待队列。就像乐高积木,AQS提供了基础模块,
//  ReentrantLock通过不同组合搭建出功能完善的锁组件。
//  在项目中,我曾通过分析ReentrantLock源码解决了死锁问题:
//  通过jstack发现线程在Condition处等待,结合代码分析是signal遗漏导致,
//  这个经验让我深刻体会到理解底层实现的重要性"

CountDownLatch 和 CyclicBarrier 的区别?底层是如何实现的?

核心概念区别

一句话原理:CountDownLatch是一次性门闩,主线程等待多个子线程完成任务;CyclicBarrier是循环屏障,多个线程相互等待到某个点后再同时继续,可重复使用。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// CountDownLatch:基于AQS共享模式实现计数器
public class CountDownLatch {
    // 内部Sync继承AQS
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);  // 设置计数器初始值
        }
        
        int getCount() {
            return getState();
        }
        
        // 尝试获取共享锁:state=0时成功,否则阻塞
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        
        // 尝试释放共享锁:CAS减1
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;  // 减到0时唤醒等待线程
            }
        }
    }
}

// CyclicBarrier:基于ReentrantLock和Condition实现
public class CyclicBarrier {
    // 使用ReentrantLock保护内部状态
    private final ReentrantLock lock = new ReentrantLock();
    // 条件队列用于线程等待
    private final Condition trip = lock.newCondition();
    private final int parties;  // 总线程数
    private int count;          // 剩余等待线程数
    
    // 屏障动作(所有线程到达时执行)
    private final Runnable barrierCommand;
    
    // 下一代(重置用)
    private Generation generation = new Generation();
}

项目场景:在数据报表系统中,CountDownLatch用于等待多个数据源查询完成后再汇总;CyclicBarrier用于并行计算中的阶段同步,如机器学习迭代计算,每轮迭代结束后同步一次结果。

CountDownLatch实现原理

一句话原理:CountDownLatch利用AQS的共享模式,state表示计数器初始值,await()调用tryAcquireShared等待state=0,countDown()调用tryReleaseShared原子减1,减到0时唤醒所有等待线程。

一句话源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// CountDownLatch核心源码
public class CountDownLatch {
    private final Sync sync;
    
    // 构造函数:初始化计数器
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    // await:等待计数器归零
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    // countDown:计数器减1
    public void countDown() {
        sync.releaseShared(1);
    }
    
    // 获取当前计数
    public long getCount() {
        return sync.getCount();
    }
    
    // 内部同步器实现
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
        
        int getCount() {
            return getState();
        }
        
        // 获取共享锁:state=0时成功,否则进入队列等待
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        
        // 释放共享锁:CAS减1,减到0时返回true唤醒等待线程
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;  // 关键:只有减到0时才唤醒
            }
        }
    }
}

// 使用示例
CountDownLatch latch = new CountDownLatch(3);

// 线程1
new Thread(() -> {
    doWork();
    latch.countDown();  // 计数器2
}).start();

// 线程2
new Thread(() -> {
    doWork();
    latch.countDown();  // 计数器1
}).start();

// 线程3
new Thread(() -> {
    doWork();
    latch.countDown();  // 计数器0,唤醒主线程
}).start();

// 主线程等待
latch.await();  // 阻塞直到计数器归零
System.out.println("所有任务完成");

项目场景:在微服务启动时,使用CountDownLatch等待所有依赖组件(数据库连接池、缓存客户端、消息队列)初始化完成后再对外提供服务。每个组件初始化完成后调用countDown,主线程await,确保服务启动的完整性。

CyclicBarrier实现原理

一句话原理:CyclicBarrier基于ReentrantLockCondition实现,每个线程await时计数器减1并进入条件队列等待,最后一个线程到达时执行屏障动作并调用signalAll唤醒所有等待线程,同时重置计数器进入下一代。

一句话源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
// CyclicBarrier核心源码
public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties;      // 屏障线程总数
    private final Runnable barrierCommand;  // 屏障动作
    private Generation generation = new Generation();  // 当前代
    private int count;  // 剩余等待线程数
    
    // 内部类表示一代(用于重置)
    private static class Generation {
        boolean broken = false;  // 是否损坏
    }
    
    // 核心等待方法
    private int dowait(boolean timed, long nanos) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();  // 加锁保护内部状态
        try {
            final Generation g = generation;
            
            if (g.broken)
                throw new BrokenBarrierException();
            
            if (Thread.interrupted()) {
                breakBarrier();  // 中断时损坏屏障
                throw new InterruptedException();
            }
            
            int index = --count;  // 计数器减1
            if (index == 0) {  // 最后一个线程到达
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();  // 执行屏障动作
                    ranAction = true;
                    nextGeneration();  // 进入下一代(重置计数器,唤醒所有线程)
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();  // 动作异常时损坏屏障
                }
            }
            
            // 不是最后一个线程,进入循环等待
            for (;;) {
                try {
                    if (!timed)
                        trip.await();  // 条件队列等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 中断处理
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                
                if (g.broken)
                    throw new BrokenBarrierException();
                
                if (g != generation)
                    return index;  // 新一代,返回
                
                if (timed && nanos <= 0L) {
                    breakBarrier();  // 超时损坏屏障
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    // 进入下一代
    private void nextGeneration() {
        trip.signalAll();  // 唤醒所有等待线程
        count = parties;   // 重置计数器
        generation = new Generation();  // 新的一代
    }
    
    // 损坏屏障
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
}

// 使用示例
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障,执行汇总操作");
});

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        System.out.println("线程" + Thread.currentThread().getId() + "执行第一阶段");
        try {
            barrier.await();  // 等待其他线程
            System.out.println("线程" + Thread.currentThread().getId() + "执行第二阶段");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
}

项目场景:在并行计算框架中,使用CyclicBarrier实现迭代计算同步。每轮迭代结束后所有线程在屏障处等待,主线程执行汇总和判断是否继续迭代,然后自动重置进入下一轮,完美支持多轮计算场景。

完整对比与实战应用

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/**
 * CountDownLatch vs CyclicBarrier 完整对比
 */
public class LatchVsBarrier {
    
    /**
     * 1. 核心区别对比表
     */
    public class DifferenceTable {
        // CountDownLatch                     CyclicBarrier
        // 计数器递减                         计数器递减
        // 一次性使用                          可循环使用
        // 主线程等待子线程                      多个线程相互等待
        // 基于AQS共享模式                      基于ReentrantLock+Condition
        // countDown()后线程继续执行              await()阻塞直到全部到达
        // 不能重置                              可以reset()重置
        // 无屏障动作                            可设置屏障动作
    }
    
    /**
     * 2. CountDownLatch实战:并行数据加载
     */
    public class ParallelDataLoader {
        private final ExecutorService executor = Executors.newFixedThreadPool(5);
        
        public DataSet loadData() throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(3);
            DataSet result = new DataSet();
            
            // 并行加载三个数据源
            executor.submit(() -> {
                result.setUserData(loadUserData());
                latch.countDown();
            });
            
            executor.submit(() -> {
                result.setOrderData(loadOrderData());
                latch.countDown();
            });
            
            executor.submit(() -> {
                result.setProductData(loadProductData());
                latch.countDown();
            });
            
            // 等待所有数据加载完成
            latch.await(5, TimeUnit.SECONDS);  // 最多等待5秒
            return result;
        }
    }
    
    /**
     * 3. CyclicBarrier实战:多阶段并行计算
     */
    public class MultiStageCompute {
        private final CyclicBarrier barrier;
        private final double[][] data;
        private final int threads;
        
        public MultiStageCompute(double[][] data, int threads) {
            this.data = data;
            this.threads = threads;
            this.barrier = new CyclicBarrier(threads, this::mergeResults);
        }
        
        // 屏障动作:合并结果
        private void mergeResults() {
            System.out.println("所有线程完成当前阶段,开始合并");
            // 全局汇总逻辑
        }
        
        public void compute() {
            int rowsPerThread = data.length / threads;
            
            for (int i = 0; i < threads; i++) {
                final int startRow = i * rowsPerThread;
                final int endRow = (i == threads - 1) ? data.length : startRow + rowsPerThread;
                
                new Thread(() -> {
                    try {
                        // 第一阶段计算
                        for (int row = startRow; row < endRow; row++) {
                            for (int col = 0; col < data[row].length; col++) {
                                data[row][col] = Math.sqrt(data[row][col]);
                            }
                        }
                        barrier.await();  // 第一阶段同步
                        
                        // 第二阶段计算
                        for (int row = startRow; row < endRow; row++) {
                            for (int col = 0; col < data[row].length; col++) {
                                data[row][col] = Math.log(data[row][col] + 1);
                            }
                        }
                        barrier.await();  // 第二阶段同步
                        
                        // 第三阶段...
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    
    /**
     * 4. 异常处理与超时控制
     */
    public class ExceptionHandling {
        
        // CountDownLatch超时
        public void latchWithTimeout() {
            CountDownLatch latch = new CountDownLatch(3);
            try {
                if (!latch.await(3, TimeUnit.SECONDS)) {
                    log.warn("等待超时,部分任务未完成");
                    // 降级处理
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("等待被中断");
            }
        }
        
        // CyclicBarrier损坏处理
        public void barrierWithBrokenHandling(CyclicBarrier barrier) {
            try {
                barrier.await(3, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (BrokenBarrierException e) {
                // 屏障损坏:可能某个线程被中断或超时
                log.error("屏障已损坏,重置后重试");
                barrier.reset();  // 重置屏障
            } catch (TimeoutException e) {
                log.error("等待超时");
            }
        }
    }
    
    /**
     * 5. 选型指南
     */
    public class SelectionGuide {
        
        // 选CountDownLatch的场景:
        // - 一个线程等待多个线程完成
        // - 只需要一次同步
        // - 不需要复用
        // 例子:服务启动等待、并行任务汇总
        
        // 选CyclicBarrier的场景:
        // - 多个线程相互等待
        // - 需要多轮同步
        // - 需要复用屏障
        // 例子:并行计算迭代、游戏大厅等待玩家
        
        // 选Phaser(更高级的替代)的场景:
        // - 需要动态调整参与者数量
        // - 需要更灵活的同步阶段
        // 例子:分段任务、动态线程池
    }
    
    /**
     * 6. 监控与调试
     */
    public class Monitoring {
        
        public void monitorLatch(CountDownLatch latch) {
            long count = latch.getCount();
            log.info("当前剩余计数:{}", count);
            
            // 无法获取等待线程数(AQS未暴露)
        }
        
        public void monitorBarrier(CyclicBarrier barrier) {
            int parties = barrier.getParties();
            int waiting = barrier.getNumberWaiting();
            boolean broken = barrier.isBroken();
            
            log.info("总线程:{},等待中:{},是否损坏:{}", 
                     parties, waiting, broken);
        }
    }
}

/**
 * 总结对比表
 * 
 * 特性              CountDownLatch              CyclicBarrier
 * 设计目的           一个线程等待多个线程          多个线程相互等待
 * 计数器方向         递减                        递减
 * 可重用性           一次性                       可循环使用
 * 屏障动作           无                          可设置Runnable
 * 底层实现           AQS共享模式                  ReentrantLock+Condition
 * 等待方式           await()阻塞                  await()阻塞
 * 唤醒条件           计数器归零                   最后一个线程到达
 * 损坏处理           无                           BrokenBarrierException
 * 超时支持           await(timeout)               await(timeout)
 * 
 * 性能对比:
 * CountDownLatch: 轻量级,CAS操作
 * CyclicBarrier: 稍重,涉及Lock和Condition
 */

// 面试金句
// "CountDownLatch和CyclicBarrier就像'发令枪'和'接力棒'的区别:
//  CountDownLatch是裁判等待所有运动员就绪后发令(一次性),
//  CyclicBarrier是接力赛中运动员互相等待交接(可循环)。
//  底层实现上,CountDownLatch基于AQS的共享模式,简洁高效;
//  CyclicBarrier基于ReentrantLock的条件队列,功能更丰富。
//  在项目中,我用CountDownLatch实现微服务优雅启动,
//  用CyclicBarrier实现机器学习模型的批量推理,
//  理解它们的区别和实现能帮我们在不同场景下做出正确选择"