ArrayBlockingQueue的使用
ArrayBlockingQueue
是一个由数组支持的有界阻塞队列,它实现了 Queue
接口,并且继承了 BlockingQueue
接口。它的特点是:
- 有界:容量固定。
- 阻塞:如果队列为空,消费者线程会被阻塞,直到队列有元素;如果队列满了,生产者线程会被阻塞,直到队列有空闲位置。
- 线程安全:多线程环境下可以安全使用,避免了同步问题。
使用场景
- 生产者-消费者模式:可以用来处理生产者-消费者模型中的数据传递问题,生产者将任务放入队列,消费者从队列中获取任务处理。
- 限制并发数:当你希望限制并发执行的线程数时,使用
ArrayBlockingQueue
作为缓冲区可以避免资源被过度占用。 - 任务调度:在任务调度系统中,可以使用它来限制并发执行的任务数量,并且确保任务能够按顺序处理。
主要方法
offer(E e)
:插入元素(如果队列没有满)。poll(long timeout, TimeUnit unit)
:获取元素,若队列为空则等待指定时间。put(E e)
:插入元素,如果队列满则阻塞等待。take()
:取出元素,如果队列为空则阻塞等待。
1、生产者消费者模型
假设我们有一个生产者消费者场景,生产者往队列中生产数据,消费者从队列中消费数据。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
int count = 1;
while (true) {
queue.put(count); // 阻塞方式,如果队列满了会等待
System.out.println("Produced: " + count);
count++;
Thread.sleep(500); // 模拟生产过程
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 阻塞方式,如果队列为空会等待
System.out.println("Consumed: " + item);
Thread.sleep(1000); // 模拟消费过程
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ArrayBlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 队列容量为 10
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start(); // 启动生产者线程
consumerThread.start(); // 启动消费者线程
}
}
代码解析:
- 生产者(Producer):每隔 500 毫秒生产一个整数并放入队列。如果队列已满,生产者线程将被阻塞,直到队列有空位。
- 消费者(Consumer):每隔 1 秒从队列中取出一个整数并处理。如果队列为空,消费者线程将被阻塞,直到队列中有元素。
ArrayBlockingQueue
的容量是 10,这意味着队列最多可以存储 10 个元素。当队列满时,生产者会被阻塞;当队列空时,消费者会被阻塞。
2、限制并发数
在多线程程序中,我们可能希望控制同时执行的线程数,比如限制最大并发线程数。可以通过 ArrayBlockingQueue
来实现这一点。
示例:限制最大并发线程数的场景
假设我们有 10 个任务,而我们希望最多同时只有 3 个线程在运行。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Task implements Runnable {
private final int taskId;
public Task(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务处理
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class LimitConcurrencyExample {
public static void main(String[] args) {
// 创建一个有容量为 3 的队列,用来限制并发数为 3
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(3);
// 创建一个线程池,最多 3 个线程同时执行任务
ExecutorService executor = Executors.newFixedThreadPool(3);
// 将任务放入队列
for (int i = 1; i <= 10; i++) {
final int taskId = i;
try {
// 添加任务到队列
queue.put(() -> new Task(taskId).run());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 执行队列中的任务
for (int i = 0; i < 10; i++) {
try {
Runnable task = queue.take(); // 获取任务
executor.submit(task); // 提交任务到线程池
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdown(); // 关闭线程池
}
}
代码解析:
ArrayBlockingQueue
:这里我们用容量为 3 的队列来限制同时执行的任务数。队列满了时,生产者线程会阻塞,直到队列有空位。ExecutorService
:我们用一个固定大小为 3 的线程池来执行任务,确保最多只有 3 个线程在同时运行。- 任务调度:我们将任务放入队列中,使用
queue.take()
方法从队列中取任务,并提交到线程池执行。
在这个示例中,线程池最多同时运行 3 个任务。如果有超过 3 个任务,它们会被依次加入队列,等待有线程空闲后执行。
3、任务调度
在任务调度场景中,我们希望控制任务的执行顺序和延迟。通过 ArrayBlockingQueue
可以实现简单的任务调度。
示例:任务调度
假设我们有多个任务,并希望按顺序执行这些任务,或者希望控制每个任务的执行间隔。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
class ScheduledTask implements Runnable {
private final int taskId;
public ScheduledTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("Executing task " + taskId + " at " + System.currentTimeMillis());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class TaskSchedulingExample {
public static void main(String[] args) {
// 创建一个有容量的队列,用来存放任务
BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(10);
// 模拟调度任务,每隔一定时间添加到队列中
Thread scheduler = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
Runnable task = new ScheduledTask(i);
taskQueue.put(task); // 将任务加入队列
System.out.println("Scheduled task " + i);
TimeUnit.SECONDS.sleep(2); // 每隔 2 秒添加一个任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 执行任务
Thread executor = new Thread(() -> {
try {
while (true) {
Runnable task = taskQueue.take(); // 获取队列中的任务
task.run(); // 执行任务
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
scheduler.start(); // 启动调度线程
executor.start(); // 启动执行线程
}
}
代码解析:
- 调度任务:
scheduler
线程每隔 2 秒向队列中添加一个任务。我们模拟了任务调度的过程。 - 任务执行:
executor
线程从队列中取出任务并执行。由于ArrayBlockingQueue
是阻塞队列,因此executor
线程会在队列为空时阻塞,等待任务到达。 - 任务调度间隔:在调度线程中使用了
TimeUnit.SECONDS.sleep(2)
来控制任务的添加间隔。
在这个示例中,任务被按顺序调度,并且每隔 2 秒就有一个任务被放入队列,直到所有任务都被执行。
使用场景总结
- 生产者-消费者模式:可以用于限制生产和消费速率,避免资源过度消耗。
- 线程池管理:结合
ExecutorService
使用,作为任务队列来管理并发任务的执行。 - 流量控制:控制请求的流量或任务的处理速率,避免过载。
ArrayBlockingQueue
提供了一个简单、有效的方式来处理并发任务,尤其适用于生产者-消费者模型以及对资源进行有效管理的场景。