Java并发阻塞队列(put和take、offer和poll、drainTo)的使用

阻塞队列,是一种常用的并发数据结构,常用于生产者-消费者模式。

阻塞队列基础概念

在Java中,有很多种阻塞队列:

1
2
3
4
5
6
ArrayBlockingQueue //最常用
LinkedBlockingQueue //不会满的
SynchronousQueue //size为0
PriorityBlockingQueue
CompletionService //BlockingQueue + Executor
TransferQueue //JDK 7中更快的SynchronousQueue

生产者消费者模型

        producer    ------------------------>    consumer
                    +----+----+----+----+----+
                    +    +      +    +    +    +
                    +----+----+----+----+----+
                  /                            \ for (;;) {
    // 如果队列满则阻塞                                blockingQ.take(); // 如果队列空则阻塞
    blockingQ.put(object);                        }

使用阻塞队列

        +--------------+-------------+
        |          Queue<T>          |
        +--------------+-------------+
        +--------------+-------------+
        | add(E) : boolean           |
        | remove() : E               |
        | offer() : boolean          |
        | poll() : E                 | 
        | element() : E              |
        | peek() : E                 |
        +--------------+-------------+
                       ^
                      /|\
                继承自  |     
                       |
+----------------------+---------------------+
|             BlockingQueue<T>               |
+----------------------+---------------------+
+----------------------+---------------------+
| put(E)                                     |
| take() : E                                 |
| offer(E, long, TimeUnit) : boolean         |
| poll(long, TimeUnit) : E                   |
| remainingCapacity()                        |
| drainTo(Collection<? super E>) : int       |
| drainTo(Collection<? super E>, int ) : int |
+----------------------+---------------------+

BlockingQueue使用注意:

  • 使用BlockingQueue的时候,尽量不要使用从Queue继承下来的方法,否则就失去了Blocking的特性了。
  • 在BlockingQueue中,要使用put和take,而非offer和poll。如果 要使用offer和poll,也是要使用带等待时间参数的offer和poll。
  • 使用drainTo批量获得其中的内容,能够减少锁的次数。

代码片段一(错误)

1
2
3
4
5
6
7
8
9
final BlockingQueue<Object> blockingQ = new ArrayBlockingQueue<Object>(10);
Thread thread = new Thread("consumer thread") {
public void run() {
for (;;) {
Object object = blockingQ.poll();//错误,不等待就会直接返回
handle(object);
}
}
};

代码片段二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final BlockingQueue<Object> blockingQ = new ArrayBlockingQueue<Object>(10);
Thread thread = new Thread("consumer thread") {
public void run() {
for (;;) {
try {
//注意此处
Object object = blockingQ.take(); // 等到有数据才继续
handle(object);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
// handle exception
}
}
}
};

代码片段三

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final BlockingQueue<Object> blockingQ = new ArrayBlockingQueue<Object>(10);
Thread thread = new Thread("consumer thread") {
public void run() {
for (;;) {
try {
//注意此处
Object object = blockingQ.poll(1, TimeUnit.SECONDS); //防止死等
if (object == null) {
continue; //或者做其他处理
}
} catch (InterruptedException e) {
break;
} catch (Exception e) {
// handle exception
}
}
}
};

实现一个简单的阻塞队列

未取得锁就直接执行wait、notfiy、notifyAll会抛异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BlockingQ {
private Object notEmpty = new Object();
private Queue<Object> linkedList = new LinkedList<Object>();
//
public Object take() throws InterruptedException {
synchronized (notEmpty) {
if (linkedList.size() == 0) {
//要执行wait操作,必须先取得该对象的锁
//执行wait操作之后,锁会释放
//被唤醒之前,需要先获得锁
notEmpty.wait();
}
return linkedList.poll();
}
}
//
public void offer(Object object) {
synchronized (notEmpty) {
if (linkedList.size() == 0) {
//要执行notify和notifyAll操作,都必须先取得该对象的锁
notEmpty.notifyAll();
}
linkedList.add(object);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class BlockingQ {
private Object notEmpty = new Object();
private Object notFull = new Object();
private Queue<Object> linkedList = new LinkedList<Object>();
private int maxLength = 10;
//
public Object take() throws InterruptedException {
//分别需要对notEmpty和notFull加锁
synchronized (notEmpty) {
if (linkedList.size() == 0) {
notEmpty.wait();
}
//分别需要对notEmpty和notFull加锁
synchronized (notFull) {
if (linkedList.size() == maxLength) {
notFull.notifyAll();
}
return linkedList.poll();
}
}
//
public void offer(Object object) throws InterruptedException {
//分别需要对notEmpty和notFull加锁
synchronized (notEmpty) {
if (linkedList.size() == 0) {
notEmpty.notifyAll();
}
//分别需要对notEmpty和notFull加锁
synchronized (notFull) {
if (linkedList.size() == maxLength) {
notFull.wait();
}
linkedList.add(object);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class BlockingQ {
private Lock lock = new ReentrantLock();
// 一个锁可以创建多个Condition
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
private Queue<Object> linkedList = new LinkedList<Object>();
private int maxLength = 10;
//
public Object take() throws InterruptedException {
lock.lock();
try {
if (linkedList.size() == 0) {
// 要执行await操作,必须先取得该Condition的锁。
// 执行await操作之后,锁会释放。
// 被唤醒之前,需要先获得锁。
notEmpty.await();
}
if (linkedList.size() == maxLength) {
notFull.signalAll();
}
return linkedList.poll();
} finally {
lock.unlock();
}
}
//
public void offer(Object object) throws InterruptedException {
lock.lock();
try {
if (linkedList.size() == 0) {
//要执行signal和signalAll操作,都必须先取 得该对象
notEmpty.signalAll();
}
if (linkedList.size() == maxLength) {
notFull.await();
}
linkedList.add(object);
} finally {
lock.unlock();
}
}
}

注意:未锁就直接执行await、signal、siganlAll会抛异常