当前位置: 首页 > news >正文

线程通信---java

线程

我们知道,线程是进程的最小执行单位,一个进程可以拥有多个线程,那么就会引入两个问题:

  • 多个线程之间如何进行通信
  • 多个线程对同一个数据进行操作,如何保证程序正确执行,也就是线程安全问题

线程的常用调度方法有:
在这里插入图片描述

线程通信和线程安全

线程通信和线程安全并不是完全独立的

  1. 线程通信依赖线程安全:线程通信通常涉及共享变量或资源的状态变化,而这些共享资源必须是线程安全的,否则通信本身可能失败
    • 例子:
      使用 wait()/notify() 时,必须在 synchronized 块内操作共享对象的监视器。如果没有同步,线程可能在未持有锁的情况下调用 wait(),导致 IllegalMonitorStateException,或者多个线程同时修改状态,破坏通信逻辑。
  2. 线程安全需要线程通信:在某些场景下,线程安全不仅需要互斥访问,还需要线程间的协作。例如,一个线程等待条件满足后才能安全访问资源
    • 例子:
      生产者-消费者模型中,消费者线程需要等待生产者生产数据。如果没有通信机制(比如队列为空时直接访问),会导致数据不一致或异常。
  3. Java 中许多线程通信和线程安全的实现机制是重叠或互补的

1.共享变量(volatile)

  • volatile关键字用于修饰变量,当一个线程修改了volatile修饰的变量的值,其他线程能够立即看到最新的值,从而避免了线程之间的数据不一致(可见性)
  • 防止指令重排,volatile 变量的写入不会被重排序到它之前的代码。
  • 缺点:volatile并不能解决多线程并发下的复合操作问题,比如i++这种操作不是原子操作,如果多个线程同时对i进行自增操作,volatile不能保证线程安全。
public class VolatileExample {
    private volatile boolean running = true;

    public void start() {
        new Thread(() -> {
            while (running) {
                System.out.println("线程运行中...");
            }
        }).start();
    }

    public void stop() {
        running = false; // 修改立即可见
    }

    public static void main(String[] args) throws InterruptedException {
        VolatileExample example = new VolatileExample();
        example.start();
        Thread.sleep(1000);
        example.stop();
    }
}

2.wait() / notify() / notifyAll()

  • wait() 和 notify() 是定义在 java.lang.Object 类中的方法,用于让线程等待和唤醒线程
  • 基于对象监视器(monitor),确保在调用这些方法时,线程已经获得了对象的监视器锁,即**必须在 synchronized 块内使用**,否则会抛出 IllegalMonitorStateException。
  • 缺点:只能唤醒一个(notify())或全部(notifyAll()),不够灵活
  • 内置的 synchronized 关键字 不支持中断等待 ,线程如果在进入 synchronized 块前等待锁,就算被中断,也不会停止等待。
public class WaitNotifyExample {
    private static final Object lock = new Object();
    private static boolean condition = false;

    public static void main(String[] args) {
        // 消费者线程
        Thread consumer = new Thread(() -> {
            synchronized (lock) {
                while (!condition) { // 使用 while 循环等待条件满足
                    try {
                        System.out.println("消费者等待数据...");
                        lock.wait(); // 释放锁并等待
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        e.printStackTrace();
                    }
                }
                System.out.println("消费者处理数据...");
            }
        });

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                Thread.sleep(1000); // 模拟数据准备时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
            synchronized (lock) {
                condition = true; // 改变条件
                System.out.println("生产者生产了数据,通知消费者...");
                lock.notify(); // 唤醒等待中的消费者线程
            }
        });

        consumer.start();
        producer.start();
    }
}

3.Lock 和 Condition

  • ReentrantLock 提供锁机制,Condition 通过 await() 和 signal() 实现等待和唤醒,是 wait()/notify() 的增强替代品。
  • 可中断性: ReentrantLock 实现了可中断性,这意味着线程在等待锁的过程中,可以被其他线程中断而提前结束等待。在底层,ReentrantLock 使用了与 LockSupport.park() 和 LockSupport.unpark() 相关的机制来实现可中断性。
  • 设置超时时间: ReentrantLock 支持在尝试获取锁时设置超时时间,即等待一定时间后如果还未获得锁,则放弃锁的获取。这是通过内部的 tryAcquireNanos 方法来实现的。
  • 公平锁和非公平锁: 在直接创建 ReentrantLock 对象时,默认情况下是非公平锁。公平锁是按照线程等待的顺序来获取锁,而非公平锁则允许多个线程在同一时刻竞争锁,不考虑它们申请锁的顺序。公平锁可以通过在创建 ReentrantLock 时传入 true 来设置
ReentrantLock fairLock = new ReentrantLock(true);
  • 可重入性: ReentrantLock 支持可重入性,即同一个线程可以多次获得同一把锁,而不会造成死锁。这是通过内部的 holdCount 计数来实现的。
  • 多个条件变量: ReentrantLock 支持多个条件变量,每个条件变量可以与一个 ReentrantLock 关联。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class ProducerConsumerExample {
    // 定义一个可重入锁
    private final Lock lock = new ReentrantLock();
    // 定义两个条件:notFull 用于生产者等待队列有空位,notEmpty 用于消费者等待队列中有数据
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    // 定义一个缓冲区及相关指针和计数器
    private final int[] buffer = new int[10];
    private int count = 0;   // 当前存储的数据个数
    private int putPtr = 0;  // 生产者写入数据的位置
    private int takePtr = 0; // 消费者取数据的位置

    // 生产者方法:向缓冲区添加数据
    public void put(int data) throws InterruptedException {
        lock.lock();
        try {
            // 如果缓冲区满了,则等待 notFull 条件满足
            while (count == buffer.length) {
                notFull.await();
            }
            buffer[putPtr] = data;
            putPtr = (putPtr + 1) % buffer.length;
            count++;
            // 添加数据后,通知消费者:缓冲区不为空了
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 消费者方法:从缓冲区取数据
    public int take() throws InterruptedException {
        lock.lock();
        try {
            // 如果缓冲区为空,则等待 notEmpty 条件满足
            while (count == 0) {
                notEmpty.await();
            }
            int data = buffer[takePtr];
            takePtr = (takePtr + 1) % buffer.length;
            count--;
            // 取走数据后,通知生产者:缓冲区有空位了
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerConsumerExample example = new ProducerConsumerExample();
        
        // 启动生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    example.put(i);
                    System.out.println("Produced: " + i);
                    Thread.sleep(100); // 模拟生产耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 启动消费者线程
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    int data = example.take();
                    System.out.println("Consumed: " + data);
                    Thread.sleep(150); // 模拟消费耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        producer.start();
        consumer.start();
    }
}

4.BlockingQueue(阻塞队列)

  • 基于队列(如 ArrayBlockingQueue、LinkedBlockingQueue),通过 put() 和 take() 实现阻塞式数据传递。
  • 内BlockingQueue的实现都是线程安全的,所有队列的操作或使用内置锁或是其他形式的并发控制来保证原子。
  • 缺点:功能单一,仅限数据传递。容量有限可能导致阻塞。
import java.util.concurrent.*;

public class BlockingQueueExample {
    private BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    public void produce() throws InterruptedException {
        queue.put("数据"); // 放入数据
    }

    public void consume() throws InterruptedException {
        String data = queue.take(); // 取出数据
        System.out.println("消费: " + data);
    }

    public static void main(String[] args) {
        BlockingQueueExample example = new BlockingQueueExample();
        new Thread(() -> {
            try {
                example.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                example.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

5.Thread.interrupt()

  • 通过调用线程的 interrupt() 方法 设置中断标志,等线程自行中断
public class InterruptExample {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("运行中...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("被中断");
                    return;
                }
            }
        });
        thread.start();
        Thread.sleep(3000);
        thread.interrupt(); // 中断线程
    }
}

6.CountDownLatch / CyclicBarrier / Semaphore

  • CountDownLatch:允许一个或多个线程等待其他一组线程完成操作后再继续执行。它通过一个计数器来实现,计数器初始化为线程的数量,每个线程完成任务后调用countDown方法将计数器减一,当计数器为零时,等待的线程可以继续执行。常用于多个线程完成各自任务后,再进行汇总或下一步操作的场景。
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    // 模拟任务执行
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println(Thread.currentThread().getName() + " 完成任务");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 任务完成,计数器减一
                }
            }).start();
        }

        latch.await(); // 等待所有子线程完成
        System.out.println("所有子线程已完成,主线程继续执行");
    }
}

  • CyclicBarrier:让一组线程互相等待,直到所有线程都到达某个屏障点后,再一起继续执行。与CountDownLatch不同的是,CyclicBarrier可以重复使用,当所有线程都通过屏障后,计数器会重置,可以再次用于下一轮的等待。适用于多个线程需要协同工作,在某个阶段完成后再一起进入下一个阶段的场景。
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有任务已到达屏障点,继续执行...");
        });

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    // 模拟任务执行
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println(Thread.currentThread().getName() + " 到达屏障点");
                    barrier.await(); // 等待其他线程到达
                    System.out.println(Thread.currentThread().getName() + " 继续执行");
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

  • Semaphore:信号量,用于控制同时访问某个资源的线程数量。它维护了一个许可计数器,线程在访问资源前需要获取许可,如果有可用许可,则获取成功并将许可计数器减一,否则线程需要等待,直到有其他线程释放许可。常用于控制对有限资源的访问,如数据库连接池、线程池中的线程数量等。
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        int threadCount = 5;
        Semaphore semaphore = new Semaphore(2); // 允许两个线程同时访问

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " 获取许可,开始执行");
                    // 模拟任务执行
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println(Thread.currentThread().getName() + " 释放许可");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

7. Future 和 Callable

通过 Future 获取线程执行结果,结合 Callable 实现异步通信。

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(() -> {
            Thread.sleep(1000);
            return 42;
        });
        System.out.println("结果: " + future.get()); // 阻塞等待
        executor.shutdown();
    }
}

相关文章:

  • 【Python字符串】\n是什么?它与raw字符串、多行字符串的运用有什么关系?
  • FPGA|Verilog-SPI驱动
  • C++中的构造函数
  • 在 Ubuntu 下通过 Docker 部署 Mastodon 服务器
  • click house扩容方案
  • (四)Linux Miniconda 安装+换源+新建环境+迁移环境
  • C++指针的基本认识
  • Upload-Labs-Linux 1-20
  • Python脚本,音频格式转换 和 视频格式转换
  • 达梦数据库导入数据,通过命令的方式
  • 【Godot4.4】浅尝Godot中的MVC
  • 蓝桥杯—走迷宫(BFS算法)
  • QT | 信号与槽(超详解)
  • 控制系统分类
  • AI赋能Python零代码编程知识技能体系构架
  • 【从零开始学习计算机科学】计算机组成原理(八)输入输出设备 与 输入输出系统
  • 初学者快速入门Python爬虫 (无废话版)
  • springboot的实体类字段校验的分组校验
  • 知识图谱相关的FOAF是个啥?
  • ②Modbus TCP转Modbus RTU/ASCII网关同步采集无需编程高速轻松组网
  • 广州开发网站技术/如何做网址
  • 如何给wordpress文章部分内容加密/seo免费优化工具
  • 网站收录需要多久/网络推广产品公司
  • 网站备案流程/百度竞价推广联系方式
  • 网站开发分类/seo外包公司报价
  • 摄影网站的意义/百度搜索广告