当前位置: 首页 > news >正文

学习笔记12——并发编程之线程之间协作方式

线程之间协作有哪些方式

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其他部分之前完成,那么就需要对线程进行协调。

共享变量和轮询方式

实现:定义一个共享变量(如 volatile 修饰的布尔标志)。线程通过检查共享变量的状态来决定是否继续执行。

public class Test {
​
    private static volatile boolean flag = false;
​
    public static void main(String[] args) throws InterruptedException {
​
        new Thread(new Runnable() {
            public void run() {
                System.out.println("flag" + flag);
                while (!flag){
                    System.out.println( "waiting"); //轮询等待
                }
                System.out.println("Flag is now " + flag);
            }
        }).start();
​
        Thread.sleep(1000);
        flag = true; // 修改共享变量
    }
}

使用wait() 与 notify()/notifyAll()

通过object类中的wait()、notify()和notifyAll来实现。

实现:

  • wait():当前线程释放锁并进入等待状态(WAITING)。需在同步块中调用(持有锁)。

  • notify():随机唤醒一个等待线程(WAITINGBLOCKED)。

  • notifyAll():唤醒所有等待线程。

public class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    public void produce(int value) throws InterruptedException {
        synchronized (queue) {
            while (queue.size() == capacity) {
                queue.wait(); // 队列满,等待
            }
            queue.add(value);
            queue.notifyAll(); // 唤醒消费者
        }
    }
    public int consume() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                queue.wait(); // 队列空,等待
            }
            int value = queue.poll();
            queue.notifyAll(); // 唤醒生产者
            return value;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ProducerConsumer pc = new ProducerConsumer();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    pc.produce(i);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
​
        new Thread(() -> {
            while (true){
                try {
                    System.out.println(pc.consume());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}
​

 Condition 条件变量

通过 ReentrantLockCondition 实现更灵活的线程协作。

实现:Condition类似于 wait()/notify(),但支持多个条件队列,使用它需要和ReentrantLock配合使用。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
​
public class ProducerConsumerWithCondition {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
​
    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // 等待队列不满
            }
            queue.add(value);
            notEmpty.signalAll(); // 唤醒消费者
        } finally {
            lock.unlock();
        }
    }
​
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); //等待队列不空
            }
            int value = queue.poll();
            notFull.signalAll(); // 唤醒生产者
            return value;
        } finally {
            lock.unlock();
        }
    }
}

CountDownLatch

通过CountDownLatch实现线程的等待与唤醒。

实现:初始化时指定计数值,线程调用await()等待计数器归零;其他线程调用countDown()减少计数器。

import java.util.concurrent.CountDownLatch;
​
public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);
​
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println("Thread finished");
                latch.countDown(); // 计数器减1
            }).start();
        }
​
        latch.await(); // 等待所有线程完成
        System.out.println("All threads finished");
    }
}

CyclicBarrier

是一个同步辅助工具,允许一组线程相互等待,直到所有线程到达屏障点后才能继续执行。可以重复使用。

实现:初始化时指定参与线程数;线程调用await()等待其他线程到达屏障点;所有线程到达后,屏障重置,可重复使用。

import java.util.concurrent.CyclicBarrier;
​
public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("All threads reached the barrier");
        });
​
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println("Thread waiting at barrier");
                try {
                    barrier.await(); // 等待其他线程
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore信号量

通过Semaphore控制资源的并发访问。

实现:初始化时指定许可数,线程调用acquire()获取许可,调用release()释放许可。

import java.util.concurrent.Semaphore;
​
public class SemaphoreExample {
    public static void main(String[] args) {
        int permits = 2;
        Semaphore semaphore = new Semaphore(permits);
​
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("Thread acquired permit");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放许可
                    System.out.println("Thread released permit");
                }
            }).start();
        }
    }
}

阻塞队列

阻塞队列(Blocking Queue) 是 Java 多线程中实现线程协作的核心工具之一。它通过内置的阻塞机制,让生产者线程和消费者线程自动协调工作,无需开发者手动管理 wait()notify() 或锁的细节。

阻塞队列是一种特殊的队列,提供以下功能:

  1. 队列空时的阻塞:消费者线程尝试从空队列取数据时,会被阻塞直到队列非空。

  2. 队列满时的阻塞:生产者线程尝试向满队列放数据时,会被阻塞直到队列有空位。

  3. 支持超时机制:部分阻塞队列允许在指定时间内尝试操作,超时后返回失败

Java 并发包 java.util.concurrent 提供了多种阻塞队列实现:

实现类特点
ArrayBlockingQueue基于数组的有界队列,固定容量,公平性可选。
LinkedBlockingQueue基于链表的队列,默认无界(可指定容量),吞吐量高。
PriorityBlockingQueue支持优先级排序的无界队列。
SynchronousQueue不存储元素的队列,生产者插入操作必须等待消费者移除。
DelayQueue元素按延迟时间排序,只有到期后才能被取出。

示例:生产者——消费者模型

通过阻塞队列,生产者和消费者线程无需直接交互,只需要操作队列即可自动协调;

  • 生产者线程:调用 put() 方法放入数据,若队列满则阻塞。

  • 消费者线程:调用 take() 方法取出数据,若队列空则阻塞。

协作流程

  • 生产者放数据

    • 若队列未满,直接插入数据并唤醒消费者线程。

    • 若队列已满,生产者线程阻塞,等待消费者取走数据后唤醒。

  • 消费者取数据

    • 若队列非空,直接取出数据并唤醒生产者线程。

    • 若队列为空,消费者线程阻塞,等待生产者放入数据后唤醒。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
​
public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 容量为10的阻塞队列
​
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 100; i++) {
                    queue.put(i); // 队列满时自动阻塞
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
​
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take(); // 队列空时自动阻塞
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
​
        producer.start();
        consumer.start();
    }
}

总结

协作方式适用场景优点缺点
共享变量与轮询简单状态检查简单易用浪费 CPU,无法精确唤醒
wait/notify生产者-消费者模型精确控制线程唤醒需手动管理锁和条件
Condition复杂条件等待支持多条件队列,灵活性高代码复杂度高
CountDownLatch一次性等待场景(如任务完成)简单易用不可重置
CyclicBarrier分阶段任务同步可重复使用需预先知道线程数
Semaphore控制资源并发访问灵活控制并发数需手动管理许可
阻塞队列生产者-消费者模型线程池任务队列:如 ThreadPoolExecutor 使用阻塞队列管理待执行任务。 流量控制:通过有界队列限制系统资源使用,防止内存溢出。 延迟任务调度:如 DelayQueue 实现定时任务执行。简化代码避免竞态条件,自动阻塞与唤醒,支持多种策略无界队列可能导致内存溢出。

相关文章:

  • rust语言match模式匹配涉及转移所有权Error Case
  • Java本地方法根据线上地址下载图片到本地然后返回本地可以访问的地址
  • 【氮化镓】开态GaN HEMTs中氧诱导Vth漂移的缺陷演化
  • 力扣:找到一个数字的 K 美丽值(C++)
  • 面试题之强缓存协商缓存
  • javascript-es6 (六)
  • 从机器学习到生成式AI狂潮:AWS的AI征程从未停息
  • 【实战ES】实战 Elasticsearch:快速上手与深度实践-7.2.2自动扩缩容策略(基于HPA)
  • 2025年总结zabbix手动部署过程!
  • 【Python爬虫】使用python脚本拉取网页指定小说章节
  • [250310] Mistral 发布世界领先的文档理解 API:Mistral OCR | 谷歌利用 AI 保护自然的三种新方式
  • 可视化图解算法:反转链表
  • 《面向长尾分布的甲骨文识别算法设计与实现 》开题报告
  • 力扣hot100二刷——哈希、双指针、滑动窗口
  • C/C++中使用CopyFile、CopyFileEx原理、用法、区别及分别在哪些场景使用
  • 【gcc编译以及Makefile与GDB调试】
  • python LLM工具包
  • JavaScript数据类型和内存空间
  • 20-智慧社区物业管理平台
  • Java【多线程】(3)单例模式与线程安全
  • 做彩票网站犯法吗/磁力最好用的搜索引擎
  • 重庆沙坪坝学校/seo云优化是什么意思
  • 朝阳网站设计/手机清理优化软件排名
  • 做样子的网站/广告代理商
  • 中央广播电视总台在线观看/武安百度seo
  • 推动政府门户网站建设/企业站seo