Java并发工具类JUC

一.等待多线程的CountDownLatch
初始CountDownLatch
- CountDownLatch同步工具允许一条或多条线程等待其他线程中的一组操作后,再继续执行
- 比如我需要有七颗龙珠才能召唤神龙,这七颗龙珠互相没有关系,如果我一颗一颗收集太慢了,所以适合派七个人每个人帮我收集一颗,对比下来就是:
- 子线程:各自寻找相应的目标龙珠,
- 主线程:等子线程都执行完毕了,再召唤神龙。
CountDownLatch验证
public class Main {private final static Random random=new Random();static class SearchTask implements Runnable{private Integer id;private CountDownLatch latch;public SearchTask(Integer id, CountDownLatch latch) {this.id = id;this.latch = latch;}@Overridepublic void run() {System.out.println("开始寻找"+id+"号龙珠");int seconds=random.nextInt(10);try {Thread.sleep(seconds*1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("找到"+id+"号龙珠,耗时"+seconds+"秒");latch.countDown();//告知任务执行完毕}}public static void main(String[] args) {List<Integer> idList= Arrays.asList(1,2,3,4,5,6,7);CountDownLatch latch=new CountDownLatch(idList.size());for (Integer id:idList){new Thread(new SearchTask(id,latch)).start();}try {latch.await();//等待所有线程执行完毕} catch (InterruptedException e) {e.printStackTrace();}System.out.println("所有龙珠都找到啦");}
}其中涉及到的方法:
public void await() throws InterruptedException//阻塞当前线程,直到计数器值变为0。如果计数器已经为0,立即返回。如果在等待过程中被中断,抛出 InterruptedException 异常。public void countDown()//递减计数器的值,表示一个任务已经完成。执行结果:


二.同步屏障 CyclicBarrier
- CyclicBarrier的字面意思是可以循环使用(Cycle)的屏障(Barrier).
- 要做的事情是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
类比理解:坐过那种滚动发车的汽车吗,例如汽车作为10个,坐满就走,那前9个坐上车的人就只有等待,第10个人来了之后就发车,一起出发。
代码示例:11个空位,主线程和子线程看成一样的,不用区别理解。前10个线程到达之后只能等待,等到第11个线程到来之后,全部一起执行。
public class Main {public static void main(String[] args) throws InterruptedException, BrokenBarrierException {CyclicBarrier cb=new CyclicBarrier(11);for (int i = 1; i <=10 ; i++) {String name="线程"+i;new Thread(new Runnable() {@Overridepublic void run() {System.out.println("任务:"+Thread.currentThread().getName()+"正在等待....");try {cb.await();} catch (InterruptedException | BrokenBarrierException e) {throw new RuntimeException(e);}System.out.println("任务:"+Thread.currentThread().getName()+"完成执行");}}, name).start();}Thread.sleep(200);System.out.println("任务11还要5s才到,其它任务先等待...");Thread.sleep(5000);cb.await();System.out.println("任务11开始执行...");}
}- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量等。

三.控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
public class Main {public static void main(String[] args) throws InterruptedException {Semaphore semaphore=new Semaphore(5);for(int i=0;i<10;i++){new Thread(new Runnable() {@Overridepublic void run() {try {semaphore.acquire();System.out.println(Thread.currentThread().getName()+"正在使用");Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}finally {semaphore.release();}}}).start();}Thread.sleep(20000);}
}- 虽然会有20个线程尝试执行,但是同一时间只会有5个线程可以运行
- 构造方法 Semaphore(int permmits)接受一个整型的数字,表示可用许可证数量。
- Semaphore(5)表示允许5个线程获取许可证,也就是最大并发数是5。
- Semaphore的用法也很简单,首先线程使用 Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
四.线程间交换数据的Exchanger
- Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
- 这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
- exchange()方法:入参是本线程需要传递给对方的数据,返回值则是对方线程传过来的数据。
- 如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(Vx,longtimeout,TimeUnit unit)设置最大等待时长。
import java.util.concurrent.Exchanger;public class Main {public static void main(String[] args) {final Exchanger<String> exgr = new Exchanger<>();new Thread(new Runnable() {@Overridepublic void run() {try {String A = "你好,我是AAA";String B = exgr.exchange(A);System.out.println("我是线程" + Thread.currentThread().getName() + ",我交换到了:" + B);} catch (Exception e) {throw new RuntimeException(e);}}}, "A").start();new Thread(new Runnable() {@Overridepublic void run() {try {String A = "你好,我是BBB";String B = exgr.exchange(A);System.out.println("我是线程" + Thread.currentThread().getName() + ",我交换到了:" + B);} catch (Exception e) {throw new RuntimeException(e);}}}, "B").start();}} 
