当前位置: 首页 > news >正文

JUC入门(三)

7、Callable

1、可以有返回值

2、可以抛出异常

3、方法不同 run()/call()

代码测试

老版本的应用

package com.yw.callable;public class Old {public static void main(String[] args) {new Thread(new MyThread()).start();}
}class MyThread implements Runnable{@Overridepublic void run(){}
}

Callable

package com.yw.callable;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class NewThread {public static void main(String[] args) throws ExecutionException, InterruptedException {MyNewThread myNewThread = new MyNewThread();FutureTask futureTask = new FutureTask(myNewThread);//适配类new Thread(futureTask,"A").start();//结果会被缓存提高效率new Thread(futureTask,"B").start();String result = (String) futureTask.get();//获取Callable的返回结果,这个方法会产生阻塞//或者使用异步来处理System.out.println(result);}
}class MyNewThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("call");return "你好,Callable";}
}

细节:

  1. 结果会被缓存
  2. 结果可能需要等待,会产生阻塞

8、常用的辅助类

CountDownLatch

 可以简单将其理解为减法计数器

一般用于必须要执行某个任务过后才继续执行时使用

代码展示:

package com.yw.utlis;import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {//必须要执行的任务执行完成后才能继续执行CountDownLatch countDownLatch = new CountDownLatch(6);//总数是6for (int i = 1; i <=6; i++){final int tmp = i;new Thread(()->{System.out.println(Thread.currentThread().getName() + "出门咯");countDownLatch.countDown();//数量-1},String.valueOf(tmp)).start();}countDownLatch.await();//等待计数器归0才继续执行后面的操作System.out.println("关门");}
}

原理:

核心数据结构

CountDownLatch 的核心是一个计数器(count),它在创建时被初始化为一个正整数。这个计数器的值表示需要等待的线程数量。

  • 当一个线程调用 countDown() 方法时,计数器的值会减 1。
  • 当计数器的值减到 0 时,表示所有需要等待的线程都已经完成了任务,此时所有因等待计数器归零而被阻塞的线程会被唤醒并继续执行
主要方法
  • CountDownLatch(int count):构造方法,初始化计数器的值。
  • void countDown():将计数器的值减 1。如果计数器的值减到 0,所有等待的线程会被唤醒。
  • void await():当前线程等待,直到计数器的值减到 0。如果计数器的值已经是 0,则当前线程直接继续执行。
  • boolean await(long timeout, TimeUnit unit):当前线程等待,直到计数器的值减到 0 或者超时。如果超时,当前线程会返回,而不会被阻塞。
底层实现

CountDownLatch 的底层实现基于 AQS(AbstractQueuedSynchronizer,抽象队列同步器)。AQS 是 Java 并发包的核心框架,它提供了一种基于队列的锁机制。

  • countDown() 方法:调用 AQS 的 releaseShared() 方法,尝试将计数器的值减 1。如果计数器的值减到 0,会唤醒所有等待的线程。
  • await() 方法:调用 AQS 的 acquireSharedInterruptibly() 方法,当前线程会进入等待状态,直到计数器的值减到 0 或者被中断。

工作流程

  1. 初始化:创建 CountDownLatch 对象时,指定计数器的初始值。
  2. 线程等待:一个或多个线程调用 await() 方法,这些线程会被阻塞,等待计数器的值减到 0。
  3. 线程完成任务:其他线程完成任务后,调用 countDown() 方法,计数器的值减 1。
  4. 计数器归零:当计数器的值减到 0 时,所有等待的线程被唤醒,继续执行后续操作。

CyclicBarrier

  可以简单将其理解为加法计数器

代码演示:

package com.yw.utlis;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {public static void main(String[] args) throws BrokenBarrierException, InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(9,()->{System.out.println("上课");});//这个班级有6个学生,都到齐了才开始上课for (int i = 1;i <= 6;i++){final int tmp = i;new Thread(()->{try {cyclicBarrier.await();//表示当前线程达到屏障,触发+1} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + "到教室了");},String.valueOf(tmp)).start();}}
}

原理:

CyclicBarrier 是 Java 并发包(java.util.concurrent)中提供的一个同步工具类,用于协调多个线程之间的同步操作。它可以让多个线程在某个点上相互等待,直到所有线程都到达这个点后,再一起继续执行。与 CountDownLatch 不同,CyclicBarrier 是可重用的,即在一次同步操作完成后,它可以被重置并再次使用。

核心概念

CyclicBarrier 的核心是一个循环屏障,它通过一个计数器来跟踪到达屏障的线程数量。当所有线程都到达屏障时,屏障会被释放,所有线程可以继续执行。

  • 参与者数量(parties:在创建 CyclicBarrier 时,需要指定一个整数 parties,表示需要参与同步的线程数量。
  • 计数器:用于记录到达屏障的线程数量。每当一个线程到达屏障时,计数器加 1。
  • 重置机制:当所有线程都到达屏障后,计数器会被重置为 0,屏障可以再次被使用。
主要方法
  • CyclicBarrier(int parties):构造方法,指定需要参与同步的线程数量。
  • CyclicBarrier(int parties, Runnable barrierAction):构造方法,除了指定线程数量外,还可以指定一个在所有线程到达屏障后执行的回调任务(barrierAction)。
  • void await():当前线程到达屏障,等待其他线程也到达屏障。如果所有线程都到达屏障,则释放所有线程。
  • boolean await(long timeout, TimeUnit unit):当前线程到达屏障,等待其他线程到达屏障或超时。如果超时,则当前线程会抛出 TimeoutException
  • int getNumberWaiting():获取当前正在等待的线程数量。
  • int getParties():获取参与同步的线程数量。
 底层实现

CyclicBarrier 的底层实现也是基于 AQS(AbstractQueuedSynchronizer)。它通过共享锁的方式实现线程的同步。

  • await() 方法:调用 AQS 的 acquireSharedInterruptibly() 方法,当前线程进入等待状态。当所有线程都到达屏障时,AQS 会释放所有等待的线程。
  • barrierAction 回调任务:在所有线程到达屏障后,会由最后一个到达屏障的线程执行 barrierAction 任务。
工作流程
  1. 初始化:创建 CyclicBarrier 对象时,指定参与同步的线程数量(parties)。
  2. 线程到达屏障:每个线程在执行到某个点时调用 await() 方法,表示到达屏障。
  3. 等待同步:线程调用 await() 后会被阻塞,直到所有线程都到达屏障。
  4. 释放线程:当所有线程都到达屏障后,所有等待的线程被释放,继续执行后续操作。
  5. 重置屏障:释放线程后,屏障会被重置,可以再次使用。

Semaphore

 本质就是信号量操作

代码演示

package com.yw.utlis;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class SemaphoreDemo {public static void main(String[] args) {//线程数量:这里举例比喻为3个车位Semaphore semaphore = new Semaphore(3);for (int i = 1;i <= 6;i++){final int tmp = i;new Thread(()->{try {semaphore.acquire();//得到空车位System.out.println(Thread.currentThread().getName() + "抢到车位了");TimeUnit.SECONDS.sleep(3);//假设停车三秒钟System.out.println(Thread.currentThread().getName() + "离开车位了");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {semaphore.release();//释放空车位}},String.valueOf(tmp)).start();}}
}

原理

核心概念

Semaphore 的核心是一个许可计数器,它表示可用的许可数量。线程在访问资源之前需要获取许可,访问完成后释放许可。

  • 许可数量(permits:在创建 Semaphore 时,需要指定初始的许可数量。
  • 获取许可(acquire():线程调用 acquire() 方法时,会尝试获取一个许可。如果没有可用的许可,线程会被阻塞。
  • 释放许可(release():线程调用 release() 方法时,会释放一个许可,使其他线程可以获取许可并继续执行。
主要方法
  • Semaphore(int permits):构造方法,指定初始的许可数量。
  • void acquire():当前线程获取一个许可。如果没有可用的许可,线程会被阻塞。
  • boolean tryAcquire():当前线程尝试获取一个许可。如果当前有可用的许可,直接获取并返回 true;否则返回 false
  • boolean tryAcquire(long timeout, TimeUnit unit):当前线程尝试获取一个许可,直到超时。如果在超时时间内获取到许可,返回 true;否则返回 false
  • void release():当前线程释放一个许可。
  • int availablePermits():返回当前可用的许可数量。
底层实现

Semaphore 的底层实现也是基于 AQS(AbstractQueuedSynchronizer)。它通过共享锁的方式实现线程的同步。

  • acquire() 方法:调用 AQS 的 acquireSharedInterruptibly() 方法,尝试获取一个许可。如果没有可用的许可,线程会被阻塞。
  • release() 方法:调用 AQS 的 releaseShared() 方法,释放一个许可,使其他线程可以获取许可并继续执行。
工作流程
  1. 初始化:创建 Semaphore 对象时,指定初始的许可数量。
  2. 线程获取许可:线程在访问资源之前调用 acquire() 方法,尝试获取一个许可。
  3. 线程阻塞:如果没有可用的许可,线程会被阻塞,直到有许可被释放。
  4. 线程释放许可:线程访问资源完成后调用 release() 方法,释放一个许可。
  5. 重复使用Semaphore 是可重用的,许可数量会动态变化。

相关文章:

  • Unity 本土化插件 I2Localization
  • 需求频繁变更?AI 驱动的自动化解决方案实践
  • 【Fifty Project - D28】
  • chirpstack v4版本 全流程部署[ubuntu+docker]
  • Java Spring Boot 应用集成 Spring Security 使用 Redis 存储用户信息
  • 小白的进阶之路-人工智能从初步到精通pytorch的基本流程详解-1
  • 深入解析Spring Boot与Spring Cloud在微服务架构中的最佳实践
  • nginx日志
  • 人员管理2302版本
  • CVE-2022-22963源码分析与漏洞复现
  • 类autosar的os系统实现
  • 说一下响应状态码有哪些?
  • 语言幻觉测试用例及相关策略总结
  • SAP学习笔记 - 开发13 - CAP 之 添加数据库支持(Sqlite)
  • plc基础知识整理(三菱)
  • SVN与蓝盾流水线
  • 依赖错误终结者:AI 项目管理中的故障排查指南
  • python训练营day29
  • C++类与对象--5 运算符重载
  • 【Canvas与图标】圆角方块蓝星CSS图标
  • 中国预警机雷达有多强?可数百公里外看清足球轨迹
  • 消费维权周报丨上周涉汽车类投诉较多,涉加油“跳枪”等问题
  • 曾毓群说未来三年重卡新能源渗透率将突破50%,宁德时代如何打好换电这张牌
  • 家庭医生可提前5天预约三甲医院号源,上海常住人口签约率达45%,
  • 终于,俄罗斯和乌克兰谈上了
  • 李伟任山东省委常委、省纪委书记