阿里二面:如何定位&避免死锁?连着两个面试问到了!
在面试过程中,死锁是必问的知识点,当然死锁也是我们日常开发中也会遇到的一个问题,同时一些业务场景例如库存扣减,银行转账等都需要去考虑如何避免死锁,一旦线上发生了死锁,那可能年终不保。。。。。下面我们就来聊一聊死锁如何定位,以及如何避免。
什么是死锁
死锁(Deadlock)是指在操作系统里,两个或多个并发线程在执行过程中,因争夺资源而造成的一种互相等待的现象,且无外力干预的情况下,这些线程都无法进一步执行下去。每个线程至少持有一个资源并等待其他线程所持有的资源才能继续执行,从而形成了一个循环等待链,导致所有线程都被阻塞,无法顺利完成。
假设有两个仓库A和B,它们之间在进行商品调拨。线程T1负责将商品从仓库A调拨到仓库B,而线程T2负责将商品从仓库B调拨到仓库A。每个线程在执行调拨操作时,需要先获取调出仓库和调入仓库的锁,以保证调拨操作的原子性。现在,假设线程T1已经获取了仓库A的锁并且正在等待获取仓库B的锁,而线程T2已经获取了仓库B的锁并且正在等待获取仓库A的锁。这时,线程T1持有仓库A的锁并且等待仓库B的锁,线程T2持有仓库B的锁并且等待仓库A的锁。由于彼此都在等待对方持有的锁,因此两个线程都无法继续执行,导致了死锁的发生。
死锁产生的条件
死锁的产生必须满足以下四个条件。当这四个条件同时满足时,就可能发生死锁。
互斥条件
资源不能同时被多个线程占用。如果一个资源被一个线程占用,其他线程必须等待释放。也就是所谓的互斥锁。
如上图线程T1已经持有了资源,那么该资源就不能再同时被线程T2持有,如果线程T2想要获取资源,就要一直等待(即线程T2阻塞),一直到线程T1释放资源。
占有并且等待条件
当前线程已经占有至少一个资源,此时还想请求其他线程占有的其他资源时就会造成等待,在这个等待过程中对已获得的资源也不会释放。
如上图当线程T1已经持有了资源1,又想申请获取资源2,而资源2已经被线程T3持有了,所以线程T1就会处于等待状态,但是线程T1在等待资源2的同时并不会释放自己已经持有的资源1。
不可抢占条件
当前已经被持有的资源只能由持有它的线程释放,其他线程不可以强行占有该资源。
如上图线程T1已经持有了资源 ,在自己使用完之前不能被其他线程获取,线程T2如果也想使用此资源,则只能在线程T1使用完并释放后才能获取。
循环等待条件
在发生死锁时,必然存在一个线程-资源的环形链,链中的每个线程正等待下一个线程所占用资源的释放。
如上图线程T1等待线程T2占有的资源,而线程T2等待线程T1占有的资源,两个线程互相等待,这样就形成了循环等待。
模拟死锁
以文章解释死锁概念的例子为例,我们使用代码模拟死锁。
我们先模拟调拨商品操作库存的代码:
public class SkuStock {
private String sku;
private String warehouse;
private Integer qty;
public SkuStock(String sku, String warehouse, Integer qty) {
this.sku = sku;
this.warehouse = warehouse;
this.qty = qty;
}
/**
* 调拨库存,操作库存
*/
public void transferTo(SkuStock targetSku, int quantity) {
synchronized (this){
System.out.println(Thread.currentThread().getName() + "开始操作库存");
try {
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
synchronized (targetSku){
// 扣减调出仓库的库存
this.qty -= quantity;
// 增加目标仓库的库存
targetSku.qty += quantity;
System.out.println(Thread.currentThread().getName() + "操作库存结束");
}
}
}
}
然后我们在模拟线程T1进行仓库A向仓库B调拨商品,线程t2进行仓库B向仓库A调拨商品。
public static void main(String[] args) {
SkuStock skuStockA = new SkuStock("SKU", "WA", 100);
SkuStock skuStockB = new SkuStock("SKU", "WB", 100);
Thread thread1 = new Thread(() -> {
skuStockA.transferTo(skuStockB, 50);
}, "T1");
Thread thread2 = new Thread(() -> {
skuStockB.transferTo(skuStockA, 60);
}, "T2");
thread1.start();
thread2.start();
}
此时我们运行代码,就会发现代码只打印了开始操作库存,没有结束操作的日志,此时就会发生了死锁。
死锁排查
当我们的程序发生死锁时,我们需要排查,找出问题所在,关于死锁的排查工具,我们可以使用JDK自带的jstack
工具,也可以使用一些可视化工具例如:VisualVM
,JConsole
等。
jstack工具
jstack
是JDK自带的一款强大的故障诊断工具,主要用于获取Java应用程序的线程堆栈信息,这对于分析Java程序的运行状态、排查性能瓶颈、定位死锁、冻结线程以及其他多线程相关的问题具有非常重要的作用。
对于以上死锁程序,我们先使用jps
工具列出当前系统中所有的Java进程的进程ID(PID)。
然后针对目标Java进程,使用jstack
命令生成线程堆栈快照,它将输出Java进程中所有线程的详细堆栈信息。
jstack 24749
然后我们可以看到输出的日志中,指明了应用程序发生死锁的原因。
可以看到对于线程T1等待着线程T2锁住的0x000000070fd53c38
这个资源,同时锁住了0x000000070fd53bc0
这个资源,而对于线程T2,它等待着线程T1锁住的0x000000070fd53bc0
这个资源,同时锁住了0x000000070fd53c38
这个资源,这样就发生了死锁。
jstack
输出中会包含有关线程等待锁的信息。如果存在死锁,你会看到线程在等待一个它自己或其他线程已经持有的锁,形成一个等待链条。死锁信息通常会明确指出哪些线程参与了死锁。
VisualVM
VisualVM
是一款强大的Java性能分析和故障排除工具,它是Oracle开发并随JDK一起提供的一个综合性桌面应用程序。VisualVM
整合了多个独立的JDK命令行工具的功能,如jstat
、jmap
、jstack
、jinfo
等,并且提供了丰富的图形用户界面,使开发者能够更容易地监控和分析Java应用程序的性能、内存消耗、线程行为、垃圾收集等各方面信息。
他会提示你发生了死锁了,进入Thread Dump
中查看具体的信息。
效果等同于使用jstack
命令输出的日志信息。
如何避免死锁问题的发生
前面我们提到,产生死锁的四个必要条件是:互斥条件、占有并等待条件、不可抢占条件、循环等待条件。那么避免死锁问题就只需要破环其中一个条件就可以。
破坏互斥条件
为避免死锁的发生,我们应该避免使用互斥锁,我们可以将其中的操作改为原子操作。
比如上述例子中,我们将发生死锁的库存操作的代码:
synchronized (targetSku){
// 扣减调出仓库的库存
this.qty -= quantity;
// 增加目标仓库的库存
targetSku.qty += quantity;
System.out.println(Thread.currentThread().getName() + "操作库存结束");
}
这里我们不再使用synchronized
关键字,而是通过AtomicInteger
的compareAndSet
方法(CAS操作)来实现并发下的库存扣减操作。这样做的好处是可以避免死锁,每次操作都是原子性的,不会出现持有锁的线程等待另一个线程释放锁的情况。
private AtomicInteger qtyAtomic = new AtomicInteger();
public void transferTo1(SkuStock targetSku, int quantity) {
synchronized (this){
System.out.println(Thread.currentThread().getName() + "开始操作库存");
try {
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
// 扣减调出仓库的库存
this.qtyAtomic.addAndGet(-quantity);
// 增加目标仓库的库存
targetSku.qtyAtomic.addAndGet(quantity);
System.out.println(Thread.currentThread().getName() + "操作库存结束");
}
}
使用transferTo1
方法重新执行程序,正常实现库存操作。
破坏占有且等待条件
对于占有且等待条件,线程持有资源我们是无法破坏的,既然无法破坏占有,那我们就破坏等待,我们不等待资源了。破坏占有且等待条件,可以采取的方法之一就是一次性获取所有需要的资源,而不是持有部分资源后再等待其他资源。在Java中,确实没有一种直接的方式允许一个线程一次性获取多个资源。但是,你可以使用一种类似资源管理器的方式来模拟一次性获取多个资源的情况。例如,你可以创建一个资源管理器对象,该对象负责管理所有需要的资源,并在需要时为线程提供这些资源。其他线程可以向资源管理器请求资源,如果资源可用,则立即返回,如果资源不可用,则进入等待状态。
针对上述示例,我们定义一个库存资源管理器:
public class SkuAllocator{
private static SkuAllocator skuAllocator = new SkuAllocator();
private SkuAllocator(){}
public static SkuAllocator getSkuAllocator(){
return skuAllocator;
}
private List<Object> list = Lists.newArrayList();
/**
*、一次性获取多个资源
* @param objs 资源
* @return 是否申请资源成功
*/
synchronized boolean apply(Object...objs){
List<Object> containsList = Stream.of(objs)
.filter(e -> list.contains(e)).collect(Collectors.toList());
if (!containsList.isEmpty()){
return false;
}
list.addAll(Lists.newArrayList(objs));
return true;
}
/**
* 释放资源
* @param objs 资源
*/
synchronized void free(Object...objs){
Stream.of(objs).forEach(e -> list.remove(e));
}
}
在这个资源管理器中,我们提供了两个方法apply
以及free
,其中apply
用于将所有的资源放获取到,而free
用于释放所有的资源。
然后我们改造操作库存时,线程执行操作库存,需要调用apply
将所有的资源都拿到,然后执行后面的库存扣减,而其他线程在执行apply
时,因为已经有现成获取到了资源,即资源管理器中list
已存在资源,所以会返回false
,这样其他的线程会一直等待下去,知道当前线程释放资源。
private SkuAllocator skuAllocator = SkuAllocator.getSkuAllocator();
public void transferTo2(SkuStock targetSku, int quantity) {
// 一次性申请库存增加以及扣减资源,如果线程可以拿到资源,即管理器中存在资源,
// while条件不成立就往下继续执行扣减库存,如果没有拿到资源,则while中是true,则while就一直自循环
while (!skuAllocator.apply(this, targetSku)){;}
try {
synchronized (this){
System.out.println(Thread.currentThread().getName() + "开始操作库存");
try {
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
synchronized (targetSku){
// 扣减调出仓库的库存
this.qty -= quantity;
// 增加目标仓库的库存
targetSku.qty += quantity;
System.out.println(Thread.currentThread().getName() + "操作库存结束");
}
}
}finally {
// 用完,则释放资源,让其他线程使用
skuAllocator.free(this, targetSku);
System.out.println(Thread.currentThread().getName() + "释放资源...");
}
}
调用该方法,也会让库存扣减成功。
破坏不可抢占条件
对于不可抢占条件,我们无法抢占或者释放其他线程持有的资源,但是我们可以给线程设置资源持有的超时时间,如果超过这个时间还没有释放资源,则自动释放资源。这样其他的线程就有就会获取资源了。
private final Lock lock = new ReentrantLock();
public void transferTo3(SkuStock targetSku, int quantity) throws InterruptedException {
while (true){
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
System.out.println(String.format("当前线程 %s 获得对象锁 %s", Thread.currentThread().getName(), lock));
if (targetSku.lock.tryLock()) {
try {
System.out.println(String.format("当前线程 %s 获得对象锁 %s", Thread.currentThread().getName(), targetSku.lock));
// 扣减调出仓库的库存
this.qty -= quantity;
// 增加目标仓库的库存
targetSku.qty += quantity;
System.out.println(Thread.currentThread().getName() + " 操作库存结束");
break;
} finally {
targetSku.lock.unlock();
}
}
} finally {
lock.unlock();
}
}
}
}
执行结果如下:
破坏循环等待条件
对于循环等待条件,他因为交叉获取资源,导致形成了一个环形等待。破坏这个条件,我们可以采取顺序获取资源。确保所有的线程都按照相同的顺序获取资源。这样如果线程T1获取资源1,同时线程T2也来获取资源1时,会等待,知道线程T1释放之后再去获取资源1,同样然后获取资源2。
针对上述示例,我们对库存增加id或者库存操作创建时间,这样我们使用这个ID,对库存资源进行排序,然后按照这个顺序去占用资源。
public void transferTo4(SkuStock targetSku, int quantity) throws InterruptedException {
SkuStock firstSku = this.id < targetSku.id ? this : targetSku;
SkuStock secondSku = this != firstSku ? this : targetSku;
synchronized (firstSku){
System.out.println(Thread.currentThread().getName() + "开始操作库存");
try {
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
synchronized (secondSku){
// 扣减调出仓库的库存
this.qty -= quantity;
// 增加目标仓库的库存
targetSku.qty += quantity;
System.out.println(Thread.currentThread().getName() + " 操作库存结束");
}
}
}
执行结果如下:
在上述4种破坏死锁条件中,我们可以观察到,在为避免死锁时,除了第一种方案——使用原子操作代替互斥锁外,其余三种方案都会导致并发操作变为串行执行,在一定程度上会牺牲性能。因此,在某些情况下,我们不应过分追求破坏死锁的四个必要条件,因为即使这些条件被满足,死锁仍然有一定的几率发生。我们应该关注的是如何有效地避免死锁的发生,而不是完全消除死锁的可能性。因此,设计时应该考虑采取合适的措施来降低死锁的概率,并在发生死锁时能够及时恢复系统的正常运行状态。
结论
死锁问题的产生是由两个或者以上线程并行执行的时候,争夺资源而互相等待造成的。他必须同时满足互斥条件,占用且等待条件,不可抢占条件,循环等待条件这四个条件,才可能发生。在日常系统开发中,我们要避免死锁。避免死锁的方式通常有:
-
按顺序获取资源: 给资源编号,所有线程按照编号递增的顺序请求资源,释放资源时按照相反的顺序释放。这样可以避免循环等待条件的发生。
-
加锁顺序统一: 确定所有线程加锁的顺序,要求所有线程都按照相同的顺序获取锁,这样可以避免占有且等待条件的发生。
-
超时放弃: 当尝试获取资源失败时,设置超时时间,超过一定时间后放弃获取资源,并释放已占有的资源,以避免持续等待而导致的死锁。
-
死锁检测和恢复: 定期检测系统中的死锁情况,一旦检测到死锁,采取相应的措施进行恢复,例如中断某些线程、回滚事务等。
-
资源分配策略: 使用资源分配策略,确保资源的合理分配和使用,避免资源过度竞争和浪费,从而降低死锁的发生概率。
-
避免嵌套锁: 尽量避免在持有一个锁的情况下去请求另一个锁,以减少死锁的可能性。
-
使用并发库和工具: Java中可以使用
java.util.concurrent
包中的高级同步工具,如Semaphore
、ReentrantLock
(支持尝试获取锁及超时机制)、StampedLock
(支持乐观读写)等,它们提供了比synchronized
关键字更灵活的控制方式,有助于预防死锁。
本文已收录于我的个人博客:码农Academy的博客,专注分享Java技术干货,包括Java基础、Spring Boot、Spring Cloud、Mysql、Redis、Elasticsearch、中间件、架构设计、面试题、程序员攻略等