Queue在线程池中的应用
BlockingQueue是一个接口,jdk中实现了该接口的class包括
ArrayBlockingQueue
LinkedBlockingQueue
LinkedBlockingDeque
LinkedTransferQueue
SynchronousQueue
PriorityBlockingQueue
DelayQueue
BlockingQueue提供了四种应对策略来处理这种资源不能被立即满足的场景
空值 | 抛出异常 | 返回一个特殊值 | 阻塞 | 调用者提供一个超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | put(e, time ,timeUnit) |
移除 | remove() | poll() | take() | poll(time,timeUnit) |
检查 | element() | peek() | 不可用 | 不可用 |
jdk的ThreadPoolExecutor的构造函数中需要传入一个BlockingQueue
jdk中提供的线程池选择的队列:
newCachedThreadPool使用了SynchronousQueue(每一个插入操作必须等待另一个线程的对应移除操作)
newFixedThreadPool使用了LinkedBlockingQueue(这个队列是无界的),干活的线程就那么多,任务多了就加入队列好了
queue的重要性在于,在线程池(ThreadPoolExecutor)中,获取任务使用的是queue的poll方法,添加任务使用的是queue的offer方法。
ArrayBlockingQueue的实现:
ArrayBlockingQueue是一个由数组实现的 有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加。
主要的成员变量就这么几个
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex; //下一次从队列中取的index
/** items index for next put, offer, or add */
int putIndex; // 下一次往队列中添加的index
/** Number of elements in the queue */
int count;
final ReentrantLock lock; //读写操作都要拿到这个锁
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
- 入列核心方法是一个private方法enqueue(put ,offer都代理给了这个方法)
出列的核心方法是dequeue(take, poll,peek和remove方法都代理给了这个方法) - 这两个方法的调用是被包在一个lock.lock和lock.unlock中的,所以是线程安全的的。
- 构造函数可以传一个fair进来。enqueue方法里面还有一个notEmpty.signal(), 其实就是典型的通知消费者。同理,dequeue里面有个notFull.signal(),就是通知生产者
- 底层的数组是不会自动扩容的,但是如果一直添加元素,超出了底层数组的长度的话。offer会return false, put会block当前线程,add会throw new IllegalStateException(“Queue full”);
- takeIndex可以看做是fifo队列的head, putIndex可以看做是fifo队列的tail,因为数组本身没有队列的概念,所以需要人为去维护两根指针。可以认为任何时候,底层的数组中是有一个区间是存放元素的。其余位置都是空的。比如遍历所有元素的方式是从取一个int i, 从takeIndex开始一直到putIndex(中途假如碰到了i= items.length,i变为0)。takeIndex和putIndex谁大谁小不一定,都是从0开始的,并且都会往后自增,一旦触碰到items.length,从0再来。所以遍历所有元素的过程就像是takeIndex去追赶putIndex。
这种做法应该叫做两根指针循环从数组中取元素。
LinkedBlockingQueue的实现
LinkedBlockingQueue是Executors中使用的创建线程池的静态方法中使用的参数,显然更推荐使用。主要用的是两个方法,
put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。官方文档提到了, LinkedBlockingQueue的吞吐量通常要高于基于数组的队列,但在大多数并发应用程序中,其可预知的性能要低一些 , 内部的lock只能是unfair的。
LinkedBlockingQueue是用单向链表实现的,主要的成员变量包括:
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
主要就是head和last两根指针,外加一个AtomicInteger记录size,take锁和put锁以及对应的用于唤醒等待的线程们的condition。head和last的特征是其item = null
核心方法是enqueue和dequeue
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//这里应该是先执行右边那个=,再执行左边那个= , 也就是所有的入列元素都是以next的方式被添加到当前的last的next位置上
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC //head自己指向自己?
head = first;
E x = first.item;
first.item = null; //head后面的元素就是要被移除的元素
return x;
}
- 入列的前提是putLock.isHeldByCurrentThread(),出列的前提是takeLock.isHeldByCurrentThread()
- 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
- 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。
以上的ArrayBlockingQueue和LinkedBlockingQueue都还算简单,SynchronousQueue的实现则较为复杂
PriorityBlockingQueue
Priority queue represented as a balanced binary heap,The element with the lowest value is in queue[0](所以是一个平衡小顶堆)
DelayQueue
并发包下面的延时阻塞队列,附带一个Delayed接口用于用于实现定时任务
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
}
队列中的元素都实现了Delayed的接口,通过getDelay方法实现延迟调度。在queue.take()的时候被add进queue的元素按照getDelay的返回值排序,越早到期的元素越先出队。
ScheduledThreadPoolExecutor中使用了内部实现类DelayedWorkQueue,而是使用数组又实现了一遍优先级队列,本质上没有什么区别。详细介绍
ArrayDeque
双端队列是一种特殊的队列,它的两端都可以进出元素,故而得名双端队列。ArrayDeque是一种以数组方式实现的双端队列,它是非线程安全的。
Deque是一个接口,定义了addFirst,addLast, removeFirst, removeLast等操作,因此可以从两端进行操作。
- ArrarDeque的构造函数也可以传入size,默认初始容量是16,最小容量是8。必须是2的幂
- 内部维护了一个elements(Object[]) ,同时还有两根指针head(下一次remove和pop的位置)和tail(下一次add的位置)
- add操作等同于addLast。
- head和tail都是从0开始的。addLast操作使得tail+1,head不变。第一次addFirst操作使得head从0变为length-1(比如说7),随后的addFirst操作使得head递减。当head==tail的时候,doubleCapacity。
- getFirst的做法
(E) elements[head];
getLast用的是这样的
(E) elements[(tail - 1) & (elements.length - 1)];
通过取模的方式让头尾指针在数组范围内循环,x & (len – 1) = x % len,使用&的方式更快;这也是数组长度必须为2的指数幂的原因。
6.doubleCapacity(扩容的方式有点绕),扩容时,head==tail。以head为边界,右边的挪到x2之后数组的最开头,左边的跟着挪到上述数据的后面,这样填满x2数组的左半部分,同时保证了head=0,tail在最尾部。
- 通过取模的方式让头尾指针在数组范围内循环(head往左走,tail往右走,两者相遇后扩容)