JUC入门(三)
7、Callable
1、可以有返回值
2、可以抛出异常
3、方法不同 run()/call()
代码测试
老版本的应用
package com.yw.callable;public class Old {public static void main(String[] args) {new Thread(new MyThread()).start();}
}class MyThread implements Runnable{@Overridepublic void run(){}
}
Callable
package com.yw.callable;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class NewThread {public static void main(String[] args) throws ExecutionException, InterruptedException {MyNewThread myNewThread = new MyNewThread();FutureTask futureTask = new FutureTask(myNewThread);//适配类new Thread(futureTask,"A").start();//结果会被缓存提高效率new Thread(futureTask,"B").start();String result = (String) futureTask.get();//获取Callable的返回结果,这个方法会产生阻塞//或者使用异步来处理System.out.println(result);}
}class MyNewThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("call");return "你好,Callable";}
}
细节:
- 结果会被缓存
- 结果可能需要等待,会产生阻塞
8、常用的辅助类
CountDownLatch
可以简单将其理解为减法计数器
一般用于必须要执行某个任务过后才继续执行时使用
代码展示:
package com.yw.utlis;import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {//必须要执行的任务执行完成后才能继续执行CountDownLatch countDownLatch = new CountDownLatch(6);//总数是6for (int i = 1; i <=6; i++){final int tmp = i;new Thread(()->{System.out.println(Thread.currentThread().getName() + "出门咯");countDownLatch.countDown();//数量-1},String.valueOf(tmp)).start();}countDownLatch.await();//等待计数器归0才继续执行后面的操作System.out.println("关门");}
}
原理:
核心数据结构
CountDownLatch
的核心是一个计数器(count
),它在创建时被初始化为一个正整数。这个计数器的值表示需要等待的线程数量。
- 当一个线程调用
countDown()
方法时,计数器的值会减 1。 - 当计数器的值减到 0 时,表示所有需要等待的线程都已经完成了任务,此时所有因等待计数器归零而被阻塞的线程会被唤醒并继续执行
主要方法
CountDownLatch(int count)
:构造方法,初始化计数器的值。void countDown()
:将计数器的值减 1。如果计数器的值减到 0,所有等待的线程会被唤醒。void await()
:当前线程等待,直到计数器的值减到 0。如果计数器的值已经是 0,则当前线程直接继续执行。boolean await(long timeout, TimeUnit unit)
:当前线程等待,直到计数器的值减到 0 或者超时。如果超时,当前线程会返回,而不会被阻塞。
底层实现
CountDownLatch
的底层实现基于 AQS(AbstractQueuedSynchronizer,抽象队列同步器)。AQS 是 Java 并发包的核心框架,它提供了一种基于队列的锁机制。
countDown()
方法:调用 AQS 的releaseShared()
方法,尝试将计数器的值减 1。如果计数器的值减到 0,会唤醒所有等待的线程。await()
方法:调用 AQS 的acquireSharedInterruptibly()
方法,当前线程会进入等待状态,直到计数器的值减到 0 或者被中断。
工作流程
- 初始化:创建
CountDownLatch
对象时,指定计数器的初始值。 - 线程等待:一个或多个线程调用
await()
方法,这些线程会被阻塞,等待计数器的值减到 0。 - 线程完成任务:其他线程完成任务后,调用
countDown()
方法,计数器的值减 1。 - 计数器归零:当计数器的值减到 0 时,所有等待的线程被唤醒,继续执行后续操作。
CyclicBarrier
可以简单将其理解为加法计数器
代码演示:
package com.yw.utlis;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {public static void main(String[] args) throws BrokenBarrierException, InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(9,()->{System.out.println("上课");});//这个班级有6个学生,都到齐了才开始上课for (int i = 1;i <= 6;i++){final int tmp = i;new Thread(()->{try {cyclicBarrier.await();//表示当前线程达到屏障,触发+1} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + "到教室了");},String.valueOf(tmp)).start();}}
}
原理:
CyclicBarrier
是 Java 并发包(java.util.concurrent
)中提供的一个同步工具类,用于协调多个线程之间的同步操作。它可以让多个线程在某个点上相互等待,直到所有线程都到达这个点后,再一起继续执行。与 CountDownLatch
不同,CyclicBarrier
是可重用的,即在一次同步操作完成后,它可以被重置并再次使用。
核心概念
CyclicBarrier
的核心是一个循环屏障,它通过一个计数器来跟踪到达屏障的线程数量。当所有线程都到达屏障时,屏障会被释放,所有线程可以继续执行。
- 参与者数量(
parties
):在创建CyclicBarrier
时,需要指定一个整数parties
,表示需要参与同步的线程数量。 - 计数器:用于记录到达屏障的线程数量。每当一个线程到达屏障时,计数器加 1。
- 重置机制:当所有线程都到达屏障后,计数器会被重置为 0,屏障可以再次被使用。
主要方法
CyclicBarrier(int parties)
:构造方法,指定需要参与同步的线程数量。CyclicBarrier(int parties, Runnable barrierAction)
:构造方法,除了指定线程数量外,还可以指定一个在所有线程到达屏障后执行的回调任务(barrierAction
)。void await()
:当前线程到达屏障,等待其他线程也到达屏障。如果所有线程都到达屏障,则释放所有线程。boolean await(long timeout, TimeUnit unit)
:当前线程到达屏障,等待其他线程到达屏障或超时。如果超时,则当前线程会抛出TimeoutException
。int getNumberWaiting()
:获取当前正在等待的线程数量。int getParties()
:获取参与同步的线程数量。
底层实现
CyclicBarrier
的底层实现也是基于 AQS(AbstractQueuedSynchronizer)。它通过共享锁的方式实现线程的同步。
await()
方法:调用 AQS 的acquireSharedInterruptibly()
方法,当前线程进入等待状态。当所有线程都到达屏障时,AQS 会释放所有等待的线程。barrierAction
回调任务:在所有线程到达屏障后,会由最后一个到达屏障的线程执行barrierAction
任务。
工作流程
- 初始化:创建
CyclicBarrier
对象时,指定参与同步的线程数量(parties
)。 - 线程到达屏障:每个线程在执行到某个点时调用
await()
方法,表示到达屏障。 - 等待同步:线程调用
await()
后会被阻塞,直到所有线程都到达屏障。 - 释放线程:当所有线程都到达屏障后,所有等待的线程被释放,继续执行后续操作。
- 重置屏障:释放线程后,屏障会被重置,可以再次使用。
Semaphore
本质就是信号量操作
代码演示
package com.yw.utlis;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class SemaphoreDemo {public static void main(String[] args) {//线程数量:这里举例比喻为3个车位Semaphore semaphore = new Semaphore(3);for (int i = 1;i <= 6;i++){final int tmp = i;new Thread(()->{try {semaphore.acquire();//得到空车位System.out.println(Thread.currentThread().getName() + "抢到车位了");TimeUnit.SECONDS.sleep(3);//假设停车三秒钟System.out.println(Thread.currentThread().getName() + "离开车位了");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {semaphore.release();//释放空车位}},String.valueOf(tmp)).start();}}
}
原理
核心概念
Semaphore
的核心是一个许可计数器,它表示可用的许可数量。线程在访问资源之前需要获取许可,访问完成后释放许可。
- 许可数量(
permits
):在创建Semaphore
时,需要指定初始的许可数量。 - 获取许可(
acquire()
):线程调用acquire()
方法时,会尝试获取一个许可。如果没有可用的许可,线程会被阻塞。 - 释放许可(
release()
):线程调用release()
方法时,会释放一个许可,使其他线程可以获取许可并继续执行。
主要方法
Semaphore(int permits)
:构造方法,指定初始的许可数量。void acquire()
:当前线程获取一个许可。如果没有可用的许可,线程会被阻塞。boolean tryAcquire()
:当前线程尝试获取一个许可。如果当前有可用的许可,直接获取并返回true
;否则返回false
。boolean tryAcquire(long timeout, TimeUnit unit)
:当前线程尝试获取一个许可,直到超时。如果在超时时间内获取到许可,返回true
;否则返回false
。void release()
:当前线程释放一个许可。int availablePermits()
:返回当前可用的许可数量。
底层实现
Semaphore
的底层实现也是基于 AQS(AbstractQueuedSynchronizer)。它通过共享锁的方式实现线程的同步。
acquire()
方法:调用 AQS 的acquireSharedInterruptibly()
方法,尝试获取一个许可。如果没有可用的许可,线程会被阻塞。release()
方法:调用 AQS 的releaseShared()
方法,释放一个许可,使其他线程可以获取许可并继续执行。
工作流程
- 初始化:创建
Semaphore
对象时,指定初始的许可数量。 - 线程获取许可:线程在访问资源之前调用
acquire()
方法,尝试获取一个许可。 - 线程阻塞:如果没有可用的许可,线程会被阻塞,直到有许可被释放。
- 线程释放许可:线程访问资源完成后调用
release()
方法,释放一个许可。 - 重复使用:
Semaphore
是可重用的,许可数量会动态变化。