Java中的管程

Java是利用管程解决并发编程问题的,那么究竟什么是管程?而它又是如何解决并发问题的呢?

什么是管程

管程,英文名是 Monitor ,因此有的时候会被翻译为监视器。其实你也许很早就接触到这个概念了,比如 synchronized
关键字,很多文章就介绍过其原理是使用了监视器,只是你那个时候还并不知道监视器管程,其实是一回事。

我们来看看维基百科上的概念:

管程 (英语:Monitors,也称为监视器) 是一种程序结构,结构内的多个子程序(对象或模块)形成的多个工作线程互斥访问共享资源。

感觉这句话听得有点迷糊,但下面这句话应该就很好理解了:

管程提供了一种机制,线程可以临时放弃互斥访问,等待某些条件得到满足后,重新获得执行权恢复它的互斥访问。

我的理解是:我们通过管程管理 Java 中的类,使得类是线程安全的。

这应该是管程最终要达到的效果,那么,它是怎么做到的呢?

管程模型

管程这个概念最早来源于操作系统,操作系统发展了那么多年,管程的实现也有多种方式,主流的有三种:Hasen模型Hoare模型MESA模型, Java 中借鉴的是MESA模型,让我们来重点看一下。

谈到MESA模型,就不得不提到并发主要解决2个核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即多个线程之间如何通信、协作。

如何解决互斥呢?我们可以在操作共享变量之前,增加一个等待队列,每一个线程想要操作共享变量的话,都需要在等待队列中等待,直到管程选出一个线程操作共享变量。

那又是如何解决同步的呢?线程在操作共享变量时候,它不一定是直接执行,可能有一些自己的执行条件限制(比如取钱操作要求账户里一定要有钱,出队操作要求队列一定不能是空的),我们将这些限制称之为条件变量,每一个条件变量也有自己对应的等待队列,当线程发现自己的条件变量不满足时,就进入相应的等待队列中排队,直至条件变量满足,那么其等待队列中的线程也不会是立马执行,而是到最开始共享变量对应的等待队列中再次排队,重复之前的过程。

可以参考下面这幅图:

理论说了那么多,还是来看看用代码是如何实现的吧

实现

首先可以自定一个支持并发的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class MyQueen {

// 共享变量(任何操作之前,都需要获得该锁才可以执行)
private final Lock lock = new ReentrantLock();

// 条件变量:队列不满
private final Condition notFull = lock.newCondition();

// 条件变量:队列不空
private final Condition notEmpty = lock.newCondition();

/**
* 存储队列的容器
*/
private final LinkedList<Integer> list = new LinkedList<>();

/**
* 最大容量
*/
private int capacity;

/**
* 当前容器中存储的数量
*/
private int size;

public MyQueen(int capacity) {
this.capacity = capacity;
this.size = 0;
}

/**
* 入队
*/
public void enter(int value) {
lock.lock();
try {
// 如果队列已满,则需要等到队列不满
while (size >= capacity) {
notFull.await(1, TimeUnit.MILLISECONDS);
}

// 入队
list.add(value);
size++;
System.out.println(value + " has bean entered");
// 通知可以出队
notEmpty.signal();
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}

/**
* 出队
*/
public int dequeue() {
Integer result = null;
lock.lock();
try {
// 如果队列已空,则需要等到队列不空
while (size <= 0) {
notEmpty.await(1, TimeUnit.MILLISECONDS);
}

// 出队
result = list.removeFirst();
size--;
System.out.println(result + " has bean dequeued");
// 通知可以入队
notFull.signal();
return result;
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
return result;
}

public static void main(String[] args) {
MyQueen myQueen = new MyQueen(3);
new Thread(new Pruducer("producer1", myQueen, 0, 2)).start();
new Thread(new Pruducer("producer2", myQueen, 2, 5)).start();
new Thread(new Consumer("consumer2", myQueen, 5)).start();
new Thread(new Consumer("consumer1", myQueen, 3)).start();
}
}

定义生产者和消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Pruducer implements Runnable {

private final MyQueen queen;

/**
* 该线程的名字
*/
private final String name;

/**
* 开始的大小
*/
private final int start;

/**
* 需要生产的资料个数
*/
private final int size;

public Pruducer(String name, MyQueen queen, int start, int size) {
this.name = name;
this.queen = queen;
this.start = start;
this.size = size;
}


@Override
public void run() {
for (int i = 1; i <= size; i++) {
int now = start + i;
// System.out.println(name + " produce : " + now + " start");
queen.enter(now);
// System.out.println(name + " produce : " + now + " end");
}
}
}

class Consumer implements Runnable {

private final MyQueen queen;

/**
* 该线程的名字
*/
private final String name;

/**
* 需要消费的资料个数
*/
private final int size;

public Consumer(String name, MyQueen queen, int size) {
this.name = name;
this.queen = queen;
this.size = size;
}


@Override
public void run() {
for (int i = 1; i <= size; i++) {
// System.out.println(name + " consume start");
int result = queen.dequeue();
// System.out.println(name + " consume : " + result + " end");
}
}
}

做一个测试的main方法:

1
2
3
4
5
6
7
public static void main(String[] args) {
MyQueen myQueen = new MyQueen(3);
new Thread(new Pruducer("producer1", myQueen, 0, 2)).start();
new Thread(new Pruducer("producer2", myQueen, 2, 5)).start();
new Thread(new Consumer("consumer1", myQueen, 3)).start();
new Thread(new Consumer("consumer2", myQueen, 5)).start();
}

结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1 has bean entered
2 has bean entered
3 has bean entered
1 has bean dequeued
2 has bean dequeued
3 has bean dequeued
4 has bean entered
5 has bean entered
6 has bean entered
4 has bean dequeued
5 has bean dequeued
6 has bean dequeued
7 has bean entered
8 has bean entered
9 has bean entered
7 has bean dequeued
8 has bean dequeued
9 has bean dequeued

虽然满足我想要的结果,但显示的内容有些奇怪,总是容器先被填满之后,然后容器被清空,感觉原因应该和可重入锁有关。

总结

以上就是我对于管程的一些理解,如果大家有什么疑问,欢迎在下方留言。

健健 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
如果您感觉文章不错,也愿意支持一下作者的话