java并发

从并发编程bug聊起

近几年,我们电脑的 CPU,内存和 I/O 设备都在不段的更迭。但是始终这三者之间存在着一个核心的矛盾,这三者之间的速度差异。为了合理的利用 CPU 的高性能,平衡这三者之间的差距,有了一下的解决方法:

  • CPU 增加缓存,以平衡与内存之间的速度差异。
  • 操作系统增加了进程,线程,以分时复用 CPU,进而均衡 CPU 和 I/O 设备之间的速度差异。
  • 编译层程序优化指令执行的次序,使得缓存能够得到更加合理的利用。

而很多并发程序的 Bug 的根源也在这里。

源头之一:缓存导致的可见性问题

在单核时代。所有的线程都在一个 CPU 上执行,CPU 缓存和内存的数据一致性很容易解决。所有的线程都操作的是同一个 CPU 的缓存,这对于另一个线程来说是一定可见的。如下图,线程 A 操作了 变量 V 这一操作,线程 B 是可见的。

CPU和内存的关系

一个线程对共享变量的修改,其他线程能立刻看见,这个称为可见性。

但是现在,CPU 都是多核,每颗 CPU 都有着自己的缓存,这个时候 CPU 缓存和内存之间的一致性就不是那么好解决的了。每个线程操作在不同的 CPU 缓存上。如下图,线程 A 在自己的 CPU 缓存中修改了共享变量 V。这一操作对于线程 B 而言是不可见的。

多核CPU和内存关系

下面我们通过一段代码来验证多核场景下可见性的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Visible01 {
private long count;
private void add10k(){
int idx = 10000;
while (idx > 0){
count++;
idx--;
}
}

public static void main(String[] args) throws InterruptedException {
Visible01 visible = new Visible01();
Thread thr1 = new Thread(() -> visible.add10k());
Thread thr2 = new Thread(() -> visible.add10k());
//启动两个线程
thr1.start();
thr2.start();
//等待两个线程结束
thr1.join();
thr2.join();
System.out.println(visible.count);
}
}

咋眼看,代码的输出结果应该是 20000 。但是实际上的结果是在 10000 - 20000 之间的随机数。而导致这样的原因就是,假设两个线程时同时启动的,都会将 count = 0 读入到各自的 CPU 缓存中。执行完 count++ 以后,将结果 count = 1 写入到内存中。这样内存中的值就是 1 而不是我们所期待的 2。而之后两个线程都是基于 CPU 缓存中的 count 值来计算,所以导致最后结果小于 20000。

如果循环 10000 次的 count ++ 改成循环 1 亿次,就会发现就结果时更接近 1 亿而不是 2 亿。导致这样的原因就是两个线程不是同时启动的,有一个时间差。

源头之二:进程切换带来的原子性问题

由于 I/O 太慢,操作系统提出的进程线程的概念。操作系统允许某个进程执行一小段时间,例如 50 毫秒,每过 50 毫秒操作系统就会进行一次进程切换,这 50 毫秒的时间我们称为 时间片。

线程切换

Java 中并发程序多时基于多线程的,也就会涉及到任务切换。任务切换的时机大多是在时间片结束的时候,而现在的高级语言中的一条语句往往需要多条 CPU 指令来完成。例如 count += 1,至少需要 3 条 CPU 指令。

  1. 首先将 count 从内存加载到 CPU 寄存器
  2. 在寄存器中执行 +1 操作
  3. 将结果写入内存(缓存机制导可能写入的是 CPU 缓存而不是内存)

任务切换可以发生在任何一条 CPU 指令执行完, 而不是高级语言的一条语句。对于上面 3 条 CPU 指令而言,如果线程 A 在指令 1 执行完以后任务切换,线程 A 和 线程 B 按照下图的序列执行,那么我们就会发现两个线程都执行了 Count += 1 的操作,但是得到的结果不是我们期望的 2 ,而是1。

非原子操作

在我们的潜意识里,Count += 1 是一个不可分割的操作,是一个原子。

我们把一个或多个操作在 CPU 执行的过程中不被中断的特征成为原子性。CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语言中的一条语句。

源头之三:编译优化以后带来的有序性问题

有序性指的是程序按照先后顺序执行。但是编译器为了优化性能,有时候会改变程序中语句的先后顺序。

在 Java 一个经典的列子就是使用双重锁检查创建单例对象,例如下面的代码,在通过 getInstance() 方法获得单例对象的时候,首先判断对象是否为空,如果为空就锁定对象,再判断一次对象是否为空,如果还是为空,就创建一个对象的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Singleton{
static Singleton instance;
static Singleton getInstance(){
if (instance == null){
synchronized (Singleton.class){
if (instacne == null){
instance == new Singleton;
}
}
}
return instance;
}
}

但是这样创建对象还存在一个问题。对于 new 操,我们认为的应该是:

  1. 分配一块内存 M
  2. 再内存 M 上初始化 Singleton 对象
  3. 将 M 的地址赋值给 instance 变量

但是实际上可能编译器优化过后的执行顺序就成了 1-3-2。这样导致的问题就是,假设线程 A 执行了 getInstance() 方法,按照 1-3-2 的顺序 new 一个对象。但是在执行完 3 的时候发生了线程切换。这个时候线程 B 也执行了 getInstance() 方法,这样在第一个判断的时候,线程 B 就判断 instance 是部位空的,直接返回了 instance。但是这个时候的 instance 是还有没初始化的,就可能会出现空指针异常。

Java 内存模型:解决可见性和有序性问题

上面我们讲到由于 CPU 和内存的速度差异,会导致 可见性,原子性,有序性 这三个问题,为了解决这些问题,就引出了 Java内存模型(JMM:Java memory Model)

解决可见性,有序性的问题最直接的方法就是 禁用缓存和编译优化。但是这样就因噎废食了。合理的方案应该是 按需禁用缓存和编译优化。可以认为 Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说这些方法包括 volatile,synchronize,final 这三个关键字和 Happens-Before 原则。

其中:第一是锁 ,锁操作是具备happens-before关系的,解锁操作happens-before之后对同一把锁的加锁操作。实际上 ,在解锁的时候,JVM需要强制刷新缓存,使得当前线程所修改的内存对其他线程可见。第二是volatile字 段,volatile字段可以看成是一种不保证原子性的同步但保证可见性的特性,其性能往往是优于锁操作的 。但是,频繁地访问 volatile 字段也会出现因为不断地强制刷新缓存而影响程序的性能的问题。第三是 final 修饰符,final修饰的实例字段则是涉及到新建对象的发布问题。当一个对象包含final修饰的实例字段时 ,其他线程能够看到已经初始化的final实例字段,这是安全的。

Happens-Before 的意思 前面一个操作的结果对后续操作时可见的。Happens -Before 是具有传递性的。其中的七项原则:

  1. 程序次序规则:在一个线程内,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操作 。准确地说,应该是控制流顺序而不是程序代码顺序,因为要考虑分支、循环等结构。
  2. 管程锁定规则:一个unlock操作先行发生于后面对同一个锁的lock操作。这里必须强调的是同一个锁 ,而”后面”是指时间上的先后顺序。
  3. volatile变量规则:对一个volatile变量的写操作先行发生于后面对这个变量的读操作,这里的”后面”同 样是指时间上的先后顺序。
  4. 线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作
  5. .线程终止规则:线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过 Thread.join() 方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行。
  6. 线程中断规则:对线程 interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可 以通过 Thread.interrupted()方法检测到是否有中断发生。
  7. 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

互斥锁:解决原子性问题

锁是一种通用的技术方案,Java 语言中提供的 synchronize 关键字,就是一种锁的实现。synchronize 关键字可以修饰方法,也可以修饰代码块,基本上的使用都是这个样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Test{
// 修饰非静态方法
synchronize void foo(){
// 临界区
}

// 修饰静态方法
synchronize static void bar(){
// 临界区
}

// 修饰代码块
Object lock = new Object();
void baz(){
synchronize (lock) {
// 临界区
}
}
}

在 Java 中只有修饰代码块的时候,才会显式的锁定一个对象。在修饰静态方法,非静态方法的时候都是隐式的锁定。

  • 当修饰静态方法的时候,锁定的是当前类的 Class 对象,就相当于 synchronize (Tets.class) static void bar()
  • 当修饰非静态方法的是,锁定的当前的实例对象 this,就相当于 synchornize (this) void foo()

对于上面的 count++ 的问题,就可以使用互斥锁来解决问题。这样在一个线程操作 count 的时候加锁了,所以另一个线程是不能访问 count 变量的,再结合 Happens-Before 原则的管程锁定后原则,这样就不会出现并发的问题了。

1
2
3
4
5
6
7
8
9
10
public class Visible01 {
private long count;
private synchronize void add10k(){
int idx = 10000;
while (idx > 0){
count++;
idx--;
}
}
}

但是上面的例子是一个被保护资源对应一个锁的情况,但是实际上一个锁是可以对应多个保护资源的。但是不能多个锁来保护一个资源。

当保护多个资源的时候,搜先需要区分这些资源之间是否存在联系。

保护多个没有关联的资源

假设一个 Account 类中有两个成员变量,balance 和 password。取款 withdrew 和查看余额 getBalance 会访问balance,修改密码 updatePassword 和 查看密码 getPassword 会访问 password。而 balance 和 password 之家你是没有关系的。我们可以创建一个 blaLock 对象来保护 balance,创建一个 pwdLock 对象来保护 password。不同的资源使用不同的锁保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Account {
private final Object balLock = new Object(); // 和余额相关的锁
private final Object pwdLock = new Object(); // 和密码相关的锁

private Integer balance;
private Integer password;

//取钱操作
public Integer withdrew(int num){
synchronized (balLock){
balance -= num;
}
return balance;
}
// 查看余额操作
public int getBalance(){
synchronized (balLock){
return balance;
}
}
}

当然我们也可以使用一个锁来保护两个对象,例如我们可以通过对 this 加锁来保证这个实例中的所有资源。但是这样性能会很差,在 balance 被操作的时候,password 也是不可访问的。用不同的锁对受保护资源进行精细化管理,能够提升性能。这样的锁叫做细粒度锁。

保护有关联的多个资源

在对多个资源的保护上这个问题就有点复杂了。例如银行转账业务,账户 A 减少 100 ,账户 B 增加 100 元。这两个账户之间就是有关联的。应该怎么实现这个问题呢

1
2
3
4
5
6
7
8
9
10
class Account{
private int balance;

synchronize void transfer(Account target,int amt){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}

上面的代码似乎是可以解决问题的。但是这样确实错误的。原因在于,账户 A 调用方法向账户 B 转账。这个时候获得 A 这个锁,但是这个锁却不能保护 B 的余额。

下面我们具体分析一下,假设有A、B、C三个账户,余额都是200元,我们用两个线程分别执行两个转账操 作:账户A转给账户 B 100 元,账户B转给账户 C 100 元,最后我们期望的结果应该是账户A的余额是100元,账户 B 的余额是200元,账户C的余额是300元。

我们假设线程 1 执行账户 A 转账户 B 的操作,线程 2 执行账户 B 转账户 C 的操作。这两个线程分别在两颗 CPU 上 同时执行,那它们是互斥的吗?我们期望是,但实际上并不是。因为线程 1 锁定的是账户 A 的实例 (A.this),而线程 2 锁定的是账户B的实例(B.this),所以这两个线程可以同时进入临界区 transfer()。同 时进入临界区的结果是什么呢?线程 1 和线程 2 都会读到账户B的余额为 200,导致最终账户B的余额可能是 300(线程1后于线程2写B.balance,线程2写的B.balance值被线程1覆盖),可能是100(线程1先于线程2 写B.balance,线程1写的B.balance值被线程2覆盖),就是不可能是200。

那么怎么解决上面问题呢,只需要一个能覆盖所有受保护资源的锁就行了。稍微想一下,似乎 Account.class 类是符合我们的要求的,Account.class 是所有 Account 对象共享的,并且也是唯一的。

1
2
3
4
5
6
7
8
9
10
11
12
class Account{
private int balance;

void transfer(Account target,int amt){
synchronize (Account.class) {
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}

但是这样做似乎有回到了上面的问题:这样的做法效率太低。两个账户之间的转账,只需锁住这两个账户,但是却锁住了 class ,导致同一时间只有一笔转账操作可以进行。

再想想似乎是有更好的解决方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Account{
private int balance;

void transfer(Account target,int amt){
//锁定转出账户
synchronize (this) {
//锁定转入账户
synchronize (target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}

这看上去似乎很完美,提升了性能。但是却有可能会导致死锁。

死锁

上面转账的代码是怎么发生死锁的呢?我们假设线程 T1 执行账户 A 转账户 B 的操作,账户 A.transfer(账户B); 同时线程 T2 执行账户 B 转账户A的操作,账户 B.transfer(账户A)。当 T1 和 T2 同时执行完锁定转出账户的代码时,T1获 得了账户 A 的锁(对于 T1,this是账户 A),而 T2 获得了账户B的锁(对于 T2,this是账户 B)。之后 T1 和T2 在执行 锁定转入账户 的代码时,T1 试图获取账户B的锁时,发现账户B已经被锁定(被T2锁定),所以T1 开始等待; T2 则试图获取账户A的锁时,发现账户 A 已经被锁定(被 T1 锁定),所以 T2 也开始等待。于是 T1和 T2 会无期 限地等待下去,也就是我们所说的死锁了。

当一下四个条件都满足的时候就会发生死锁:

  • 互斥 :共享资源 X 和 Y 同一时间只能被一个线程占有
  • 占有且等待:当 T1 占有 X 时,等待 Y 的时候,不放弃 X
  • 持续等待:T1 持续等待 T2 释放 Y,T2 持续等待 T1 释放 X。
  • 不可抢占:其他线程不能抢占获得 T1 的资源 X

只有上面的四个条件都成立的时候才会发生死锁,也就是只要破坏上面任意一个条件就可以解决死锁。其中互斥这个条件不能被破坏的。而其余三个都是可以破坏的条件。

  1. 占有且等待:在申请资源的时候,一次性申请说有的资源,这样就不存在等待的条件了。
  2. 不可抢占:在占有部分线程进一步申请其他资源的时候,如果申请不成功就放弃已经申请成功的资源。这样就不存在不可抢占。
  3. 持续等待:这个条件可以通过按序申请来解决。假定每个资源是线性有序的,每次申请资源的时候,先锁定小号的资源,再锁定大号的资源。这样就不存在持续等待的问题了。

解决占有且等待

在转账业务中只要每次申请时同时申请两个账户,就可以解决这个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Account{
int balance;
Allocator allocator; // 应该为单例对象
public void transfer(int awt, Account target){
while(!allocator.apply(this,target));
try{
synchronize (this){
sychronize(target){
// 执行转账操作
}
}
}finally{
allocator.release(this,target);
}
}
}
class Allocator{
ArrayList<Account> lock = new ArrayList<Account>;
//一次申请所有的资源
public synchronize boolean apply(Account src, Account target){
if (lock.contains(src) || lock.contains(target)){
return false;
}else{
lock.add(src);
lock.add(target);
return true;
}
}

public synchronize void release(Account src,Account target){
lock.remove(src);
lock.remove(target);
}
}

但是这样的做法中在申请时使用了 while(!allocator.apply(this,target)); 。这样无疑增加了系统的开销。

解决不可抢占

解决持续等待

解决持续等待的方法就是,将共享资源按一定的规则,线性排序,每次从小的资源开始申请。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Account {
int balance;
int id;

public void transfer(int awt, Account target){
Account left = this;
Account right = target;
if (this.id > target.id){
left = target;
right = this;
}
// 先锁定小的
synchronize (left){
// 再锁定大的
sychronize(right){
// 转账操作
}
}
}
}

通知等待优化占用等待

但是上面解决占用且等待的的方法并不是最优的方法。更为好的方式是使用 通知等待

Java 中实现通知等待的方式一般使用 wait() notify() notifyAll() 这三个方法。其中 notify() 方法会随机的通知等待队列中的一个线程,而 notifyAll() 方法会通知所有的等待队列中的线程。由于使用 notify() 方法可以会出现永远不能通知到某个线程的情况,所以一般使用 notifyAll() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Account {

private Integer balance;

public Account (int balance){
this.balance = balance;
}
// 转账操作
public void transfer(int awt, Account target){
Allocator.getInstance().lock(this,target);
this.balance -= awt;
target.balance += awt;
Allocator.getInstance().release(this,target);
}
// 测试方法
public static void main(String[] args) throws InterruptedException {
Account src = new Account(10000);
Account target = new Account(0);
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
src.transfer(1,target);
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("src="+src.balance);
System.out.println("target="+target.balance);
}
}

// 单例的锁类
class Allocator{

private Allocator(){}

private static class Single{
private static final Allocator INSTANCE = new Allocator();
}

public static Allocator getInstance(){return Single.INSTANCE;}

List list = new ArrayList();

public synchronized void lock(Account src, Account target){
if (list.contains(src) || list.contains(target)){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(src);
list.add(target);
}

public synchronized void release(Account src, Account target){
list.remove(src);
list.remove(target);
this.notifyAll();
}
}

其中使用了 CountDownLatch 这个类,这个是用于多个线程同步的。构造函数需要传入一个 int 类型的参数初始化。每次调用 countDown() 方法将其中的计数器减一。调用 await() 方法会一直阻塞当前线程,直到计时器的值为0。

sleep() 和 wait() 方法都可以让当前线程挂起一段时间,之间有以下差异:

  • sleep 方法是 Thread 类中的方法,wait 方法是 Object 类的方法。
  • sleep 方法会继续持有当前的锁,wait 方法会放弃当前的锁。
  • sleep 必须传入一个参数,指定醒来的时间。wait 方法不传递参数,需要 notify 方法来通知它醒来。
  • sleep 方法可以在任何地方使用。wait 方法只能在同步块或者同步方法中使用。