java 锁顺序死锁
最近在研究“锁顺序死锁”这个经典问题,虽然知道是怎么回事,但是不太清楚要怎么解决/避免。
于是我认真地看了《Java并发编程实战》第⑩章,把代码敲了一遍,加深印象。
一、自己制造一个死锁(什么是锁顺序死锁?)
“死锁”、“抱死”这些都是经典的概念了:
在线程A持有锁L并且想获得锁M的同时,线程B持有锁M并且尝试获得锁L,那么这两个线程将永远地等待下去。
既然对概念了解得很清楚,那么能不能用代码重现一遍呢?
(1)简单例子
先写一个可能诱发死锁的类:
LeftRightDeadLock.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
package com.test.testBank; public class LeftRightDeadLock { private final Object left = new Object(); private final Object right = new Object(); public void leftRight() { synchronized (left) { synchronized (right) { System.out.println(Thread.currentThread() + ":leftRight"); } } } public void rightLeft() { synchronized (right) { synchronized (left) { System.out.println(Thread.currentThread() + ":rightLeft"); } } } } |
一看就知道,这段代码的目的就是诱发死锁。写两个线程调用之:
TestLeftRightDeadLock.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.test.testBank; public class TestLeftRightDeadLock { public static void main(String[] args) { LeftRightDeadLock leftRightDeadLock = new LeftRightDeadLock(); LeftThread leftThread = new LeftThread(leftRightDeadLock); RightThread rightThread = new RightThread(leftRightDeadLock); Thread thread1 = new Thread(leftThread); Thread thread2 = new Thread(rightThread); thread1.start(); thread2.start(); } } class LeftThread implements Runnable { private LeftRightDeadLock leftRightDeadLock; public LeftThread(LeftRightDeadLock leftRightDeadLock) { // TODO Auto-generated constructor stub this.leftRightDeadLock = leftRightDeadLock; } @Override public void run() { // TODO Auto-generated method stub leftRightDeadLock.leftRight(); } } class RightThread implements Runnable { private LeftRightDeadLock leftRightDeadLock; public RightThread(LeftRightDeadLock leftRightDeadLock) { // TODO Auto-generated constructor stub this.leftRightDeadLock = leftRightDeadLock; } @Override public void run() { // TODO Auto-generated method stub leftRightDeadLock.rightLeft(); } } |
运行之,结果发现,无论运行几次,运行结果均为:
1 2 |
Thread[Thread-0,5,main]:leftRight Thread[Thread-1,5,main]:rightLeft |
不是说好了这段代码会诱发死锁吗?为什么没有出现?
这是因为:main方法是顺序调用的,一定是thread1.start()再thread2.start(),在锁的释放和获取上没有出现冲突,所以没有死锁。
(2)使用sleep制造死锁场景
改写一下LeftRightDeadLock.java中的leftRight方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public void leftRight() { synchronized (left) { try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } synchronized (right) { System.out.println(Thread.currentThread() + ":leftRight"); } } } |
这就百分之百会造成死锁了:
- Thread1执行时,先获取left锁,然后Thread.sleep(3000)让出cpu时间片,同时保留left锁(sleep会保留,wait会释放)。Thread2得到时间片,开始执行。
- Thread2执行时,先获取Right锁,然后等待Thread1放出Left锁,进入阻塞状态。
- Thread1的sleep(3000)执行完毕,等待Thread2放出Right锁。
- Thread1、Thread2进入环状等待。
(3)多条线程同时执行
还原一下leftRight方法:
1 2 3 4 5 6 7 |
public void leftRight() { synchronized (left) { synchronized (right) { System.out.println(Thread.currentThread() + ":leftRight"); } } } |
在简单例子(1)中,线程数量太少了(并且main方法顺序执行),所以死锁没有出现。现在尝试改写TestLeftRightDeadLock.java,使用线程池让多条线程“同时”执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
package com.test.testBank; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class TestLeftRightDeadLock { public static void main(String[] args) { LeftRightDeadLock leftRightDeadLock = new LeftRightDeadLock(); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); LeftThread leftThread = new LeftThread(leftRightDeadLock); RightThread rightThread = new RightThread(leftRightDeadLock); for (int i = 0; i < 1000; i++) { threadPoolExecutor.execute(leftThread); threadPoolExecutor.execute(rightThread); } threadPoolExecutor.shutdown(); } } |
记得使用shutdown方法让threadPoolExecutor关闭。
在这里我们使用了CachedThreadPool,这是四种线程池中的一种,具有以下特点:
- 它是一个可以无限扩大的线程池。
- 比较适合处理执行时间比较小的任务。
- corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大。
- keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死。
- 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
现在使用for循环执行1000次,就能模拟多条线程“同时”执行的效果,看看能不能触发死锁。
运行结果为:
1 2 3 4 |
..... Thread[pool-1-thread-1,5,main]:leftRight Thread[pool-1-thread-7,5,main]:leftRight // 卡住不动了 |
实验证明,只要线程数量够多,LeftRightDeadLock就会触发死锁。
(4)如何解决这个例子中的问题?
现在我们需要明白一个概念:
如果所有线程都以固定的顺序来获取锁,那么在程序中就不会出现锁顺序死锁问题了。
很明显,leftRight()和rightLeft()在嵌套获取锁时,锁的顺序是不一致的。既然我们发现了这个问题,在业务允许的范围内,可以手动调整取锁顺序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public void leftRight() { synchronized (left) { synchronized (right) { System.out.println(Thread.currentThread() + ":leftRight"); } } } public void rightLeft() { synchronized (left) { synchronized (right) { System.out.println(Thread.currentThread() + ":rightLeft"); } } } |
这就不存在死锁问题了!是不是意味着一切都万事大吉了呢?
当然不是。要清楚,因为leftRight()和rightLeft()中存在嵌套获取锁的行为,所以我们才能提前预知,并且控制取锁顺序。
那么,假如存在一段代码,锁顺序不是固定的(动态的锁顺序),我们又该怎么办呢?
二、解决锁顺序死锁
(1)动态的锁顺序死锁
现在有这样一段代码:
BadTranferTest.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package com.test.testBank; public class BadTranferTest { public void transferMoney(Account fromAccount, Account toAccount, long amount) { synchronized (fromAccount) { synchronized (toAccount) { if (fromAccount.getMoney() - amount < 0) { throw new RuntimeException("amount error!"); } else { fromAccount.debit(amount); toAccount.credit(amount); } } } } } class Account { private long money; public void debit(long amount) { money = money - amount; } public void credit(long amount) { money = money + amount; } public long getMoney() { return money; } } |
乍一看,所有的线程似乎都是按照同样的顺序来获得锁,似乎是正常的?
其实问题出在传参中,可能出现以下情况:
1 2 |
A:transferMoney(myAccount, yourAccount, 10); B:transferMoney(yourAccount, myAccount, 10); |
如果执行时机不当,那么A可能获得myAccount的锁并且等待yourAccount的锁,B获得yourAccount的锁并且等待myAccount的锁,死锁问题又出现了。
动态的锁顺序死锁似乎要比第一个例子隐蔽一些,我们要怎么检查这种错误呢?
很简单,看是否存在嵌套取锁操作。如果没有对嵌套取锁做任何处理,那百分之百会出现死锁问题。
(2)通过锁顺序来避免死锁
既然问题出现在这种情况:
1 2 |
A:transferMoney(myAccount, yourAccount, 10); B:transferMoney(yourAccount, myAccount, 10); |
我们能不能想个办法,无论fromAccount、toAccount这两个参数怎么传,让取锁顺序完全相同?那不就像第一个例子一样搞定了嘛。
解决方案如下:
GoodTransfer.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package com.test.testBank; public class GoodTranferTest { public void transferMoney(Account fromAccount, Account toAccount, long amount) { // 自定义一个内部类 class TransferHelper { public void tranfer() { if (fromAccount.getMoney() - amount < 0) { throw new RuntimeException("amount error!"); } else { fromAccount.debit(amount); toAccount.credit(amount); } } } int fromHash = System.identityHashCode(fromAccount); int toHash = System.identityHashCode(toAccount); if (fromHash < toHash) { synchronized (fromAccount) { synchronized (toAccount) { new TransferHelper().tranfer(); } } } else { synchronized (toAccount) { synchronized (fromAccount) { new TransferHelper().tranfer(); } } } } } class Account { private long money; public void debit(long amount) { money = money - amount; } public void credit(long amount) { money = money + amount; } public long getMoney() { return money; } } |
两个Account对象都老老实实做一次hash,这就得到了唯一的取锁顺序(每对fromAccount和toAccount的取锁顺序都是固定的),回避死锁。
(3)万一hash值相同怎么办?
遗憾的是,hash算法不能保证完全离散,万一两个对象算出来的hash值相同,就会统一走这段逻辑:
1 2 3 4 5 |
synchronized (toAccount) { synchronized (fromAccount) { new TransferHelper().tranfer(); } } |
上面的代码就又出错了。
对此,可以加上一个“加时赛锁”进行处理:
GoodTranferTest.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package com.test.testBank; public class GoodTranferTest { private static final Object tieLock = new Object(); public void transferMoney(Account fromAccount, Account toAccount, long amount) { // 自定义一个内部类 class TransferHelper { public void tranfer() { if (fromAccount.getMoney() - amount < 0) { throw new RuntimeException("amount error!"); } else { fromAccount.debit(amount); toAccount.credit(amount); } } } int fromHash = System.identityHashCode(fromAccount); int toHash = System.identityHashCode(toAccount); if (fromHash < toHash) { synchronized (fromAccount) { synchronized (toAccount) { new TransferHelper().tranfer(); } } } else if (fromHash > toHash) { synchronized (toAccount) { synchronized (fromAccount) { new TransferHelper().tranfer(); } } } else { synchronized (tieLock) { synchronized (fromAccount) { synchronized (toAccount) { new TransferHelper().tranfer(); } } } } } } |
tieLock就是“加时赛锁”。
但是,这种做法有一个问题,就是整个程序只有这一个全局锁。如果算法恶劣,散列冲突经常出现,就会有一堆“产生冲突”的线程去争抢这个锁,可能影响并发效率
当然,世界上不存在完美的解决方案。因为identityHashCode()出现散列冲突的频率非常低,这项技术还是以最小的代价换来了最大的安全性。
(4)存在“绝对不同”的情况
上面的identityHashCode()只是一种解决方案,切不可生搬硬套。
举个例子,如果Account中存在一个唯一的、不可变的、可以比较的键值,比如long类型的id,那就更容易确定锁的顺序了,不需要使用“加时赛锁”。
GoodTranferTest.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package com.test.testBank; public class GoodTranferTest { public void transferMoney(Account fromAccount, Account toAccount, long amount) { // 自定义一个内部类 class TransferHelper { public void tranfer() { if (fromAccount.getMoney() - amount < 0) { throw new RuntimeException("amount error!"); } else { fromAccount.debit(amount); toAccount.credit(amount); } } } long fromId = fromAccount.getId(); long toId = toAccount.getId(); if (fromId < toId) { synchronized (fromAccount) { synchronized (toAccount) { new TransferHelper().tranfer(); } } } else { synchronized (toAccount) { synchronized (fromAccount) { new TransferHelper().tranfer(); } } } } } |
只要id能搞定,就不需要用到identityHashCode()和“加时赛锁”。
三、总结
这一节我们研究的例子全都是锁顺序死锁。锁顺序死锁有个很明显的特征,就是锁的双重嵌套,我们能够比较容易地发现问题,提早预防。
但是在日常编程中,锁的调用可能会比较隐蔽(不使用显示的对象锁,而是用synchronized修饰方法),一眼看上去往往没问题,最终却莫名其妙地产生死锁。
这种状况往往发生在不同对象的同步方法相互调用之时,往往被称为“在协作对象之间发生的死锁”。下一节我将继续写写代码,研究研究。