java 并发编程-阻塞队列
阻塞队列
- 接口Queue
- 接口BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingDeque
- SynchronousQueue
- PriorityBlockingQueue
- DelayQueue
接口Queue
接口BlockingQueue
阻塞队列(BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。
ArrayBlockingQueue
ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象。使用两个Condition-notFull和notEmpty来实现队列阻塞。
public static void main(String[] args) {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
arrayBlockingQueue.put(i);
System.out.println("往队列添加元素:" + i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Integer take = null;
try {
Thread.sleep(2000);
take = arrayBlockingQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("从队列拿出元素:" + take);
}
}
}).start();
}
LinkedBlockingDeque
LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE。为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。LinkedBlockingQueue写入取出使用两把锁,删除时两把锁同时加锁。也是是用两个Condition实现阻塞。
SynchronousQueue
SynchronousQueue是一个没有数据缓冲的BlockingQueue,它的容量为0,生产者线程对其的插入操作put必须等待消费者的移除操作take。
PriorityBlockingQueue
PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。
DelayQueue
DelayQueue是一个支持延时获取元素的阻塞队列,内部采用优先队列PriorityQueue存储元素,同时元素必须实现Delayed接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
public class Main {
public static void main(String[] args) {
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));
delayQueue.put(new Order("order2", System.currentTimeMillis(), 3000));
delayQueue.put(new Order("order3", System.currentTimeMillis(), 2000));
while (!delayQueue.isEmpty()) {
try {
Order take = delayQueue.take();
System.out.println("出队订单:" + take.orderId);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
static class Order implements Delayed {
private String orderId;
private long createTime;
private long delayTIme;
public Order(String orderId, long createTime, long delayTIme) {
this.orderId = orderId;
this.createTime = createTime;
this.delayTIme = delayTIme;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = createTime + delayTIme - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MICROSECONDS);
}
@Override
public int compareTo(Delayed o) {
long diff = this.getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS);
return Long.compare(diff, 0);
}
}
}