当前位置: 首页 > news >正文

ArrayBlockingQueue的使用

ArrayBlockingQueue 是一个由数组支持的有界阻塞队列,它实现了 Queue 接口,并且继承了 BlockingQueue 接口。它的特点是:

  • 有界:容量固定。
  • 阻塞:如果队列为空,消费者线程会被阻塞,直到队列有元素;如果队列满了,生产者线程会被阻塞,直到队列有空闲位置。
  • 线程安全:多线程环境下可以安全使用,避免了同步问题。

使用场景

  1. 生产者-消费者模式:可以用来处理生产者-消费者模型中的数据传递问题,生产者将任务放入队列,消费者从队列中获取任务处理。
  2. 限制并发数:当你希望限制并发执行的线程数时,使用 ArrayBlockingQueue 作为缓冲区可以避免资源被过度占用。
  3. 任务调度:在任务调度系统中,可以使用它来限制并发执行的任务数量,并且确保任务能够按顺序处理。

主要方法

  • offer(E e):插入元素(如果队列没有满)。
  • poll(long timeout, TimeUnit unit):获取元素,若队列为空则等待指定时间。
  • put(E e):插入元素,如果队列满则阻塞等待。
  • take():取出元素,如果队列为空则阻塞等待。

1、生产者消费者模型

假设我们有一个生产者消费者场景,生产者往队列中生产数据,消费者从队列中消费数据。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            int count = 1;
            while (true) {
                queue.put(count);  // 阻塞方式,如果队列满了会等待
                System.out.println("Produced: " + count);
                count++;
                Thread.sleep(500);  // 模拟生产过程
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer item = queue.take();  // 阻塞方式,如果队列为空会等待
                System.out.println("Consumed: " + item);
                Thread.sleep(1000);  // 模拟消费过程
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);  // 队列容量为 10

        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));

        producerThread.start();  // 启动生产者线程
        consumerThread.start();  // 启动消费者线程
    }
}

代码解析:

  • 生产者(Producer):每隔 500 毫秒生产一个整数并放入队列。如果队列已满,生产者线程将被阻塞,直到队列有空位。
  • 消费者(Consumer):每隔 1 秒从队列中取出一个整数并处理。如果队列为空,消费者线程将被阻塞,直到队列中有元素。
  • ArrayBlockingQueue 的容量是 10,这意味着队列最多可以存储 10 个元素。当队列满时,生产者会被阻塞;当队列空时,消费者会被阻塞。

2、限制并发数

在多线程程序中,我们可能希望控制同时执行的线程数,比如限制最大并发线程数。可以通过 ArrayBlockingQueue 来实现这一点。

示例:限制最大并发线程数的场景

假设我们有 10 个任务,而我们希望最多同时只有 3 个线程在运行。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Task implements Runnable {
    private final int taskId;

    public Task(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {
        System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);  // 模拟任务处理
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class LimitConcurrencyExample {
    public static void main(String[] args) {
        // 创建一个有容量为 3 的队列,用来限制并发数为 3
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(3);

        // 创建一个线程池,最多 3 个线程同时执行任务
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 将任务放入队列
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            try {
                // 添加任务到队列
                queue.put(() -> new Task(taskId).run());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        // 执行队列中的任务
        for (int i = 0; i < 10; i++) {
            try {
                Runnable task = queue.take();  // 获取任务
                executor.submit(task);  // 提交任务到线程池
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        executor.shutdown();  // 关闭线程池
    }
}

代码解析:

  • ArrayBlockingQueue:这里我们用容量为 3 的队列来限制同时执行的任务数。队列满了时,生产者线程会阻塞,直到队列有空位。
  • ExecutorService:我们用一个固定大小为 3 的线程池来执行任务,确保最多只有 3 个线程在同时运行。
  • 任务调度:我们将任务放入队列中,使用 queue.take() 方法从队列中取任务,并提交到线程池执行。

在这个示例中,线程池最多同时运行 3 个任务。如果有超过 3 个任务,它们会被依次加入队列,等待有线程空闲后执行。


3、任务调度

在任务调度场景中,我们希望控制任务的执行顺序和延迟。通过 ArrayBlockingQueue 可以实现简单的任务调度。

示例:任务调度

假设我们有多个任务,并希望按顺序执行这些任务,或者希望控制每个任务的执行间隔。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

class ScheduledTask implements Runnable {
    private final int taskId;

    public ScheduledTask(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {
        System.out.println("Executing task " + taskId + " at " + System.currentTimeMillis());
        try {
            Thread.sleep(1000);  // 模拟任务执行
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class TaskSchedulingExample {
    public static void main(String[] args) {
        // 创建一个有容量的队列,用来存放任务
        BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(10);

        // 模拟调度任务,每隔一定时间添加到队列中
        Thread scheduler = new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    Runnable task = new ScheduledTask(i);
                    taskQueue.put(task);  // 将任务加入队列
                    System.out.println("Scheduled task " + i);
                    TimeUnit.SECONDS.sleep(2);  // 每隔 2 秒添加一个任务
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 执行任务
        Thread executor = new Thread(() -> {
            try {
                while (true) {
                    Runnable task = taskQueue.take();  // 获取队列中的任务
                    task.run();  // 执行任务
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        scheduler.start();  // 启动调度线程
        executor.start();  // 启动执行线程
    }
}

代码解析:

  • 调度任务scheduler 线程每隔 2 秒向队列中添加一个任务。我们模拟了任务调度的过程。
  • 任务执行executor 线程从队列中取出任务并执行。由于 ArrayBlockingQueue 是阻塞队列,因此 executor 线程会在队列为空时阻塞,等待任务到达。
  • 任务调度间隔:在调度线程中使用了 TimeUnit.SECONDS.sleep(2) 来控制任务的添加间隔。

在这个示例中,任务被按顺序调度,并且每隔 2 秒就有一个任务被放入队列,直到所有任务都被执行。

使用场景总结

  • 生产者-消费者模式:可以用于限制生产和消费速率,避免资源过度消耗。
  • 线程池管理:结合 ExecutorService 使用,作为任务队列来管理并发任务的执行。
  • 流量控制:控制请求的流量或任务的处理速率,避免过载。

ArrayBlockingQueue 提供了一个简单、有效的方式来处理并发任务,尤其适用于生产者-消费者模型以及对资源进行有效管理的场景。

相关文章:

  • 英语学习4.9
  • 基于php的成绩分析和预警与预测网站(源码+lw+部署文档+讲解),源码可白嫖!
  • 十四种逻辑器件综合对比——《器件手册--逻辑器件》
  • 记录centos8安装宝塔过程(两个脚本)
  • 【微知】Mellanox网卡网线插入后驱动的几个日志?(Cable plugged;IPv6 ... link becomes ready)
  • Oracle 23ai Vector Search 系列之5 向量索引(Vector Indexes)
  • 【VitePress】新增md文件后自动更新侧边栏导航
  • LeetCode 1223 投骰子模拟
  • 安卓AssetManager【一】-资源的查找过程
  • 从MySQL快速上手数据仓库Hive
  • 论文阅读笔记——Multi-Token Attention
  • 华为机试—最大最小路
  • 为什么在删除数据库存在‘if exists‘语句
  • 判断两个 IP 地址是否在同一子网 C
  • Redis实现分布式定时任务
  • 畅游Diffusion数字人(23):字节最新表情+动作模仿视频生成DreamActor-M1
  • dfs和bfs算法
  • PyTorch DataLoader 参数详解
  • Autoware源码总结
  • 路由策略/策略路由之Filter-Policy
  • wordpress for bae/西安seo排名外包
  • 自建网站免费/怎么自己做一个网址
  • 杭州知名设计公司/seo关键词优化经验技巧
  • 营销型和展示型网站/百度站长管理平台
  • 怀化找什么人做网站/百度提交入口网址
  • 南海网站推广/关键词智能优化排名