【并发编程】JUC常用类以及线程池
目录
✅JUC常用类
ConcurrentHashMap
CopyOnWriteArrayList
CountDownLatch
✅线程池
ThreadPoolExecutor
线程池的工作流程
线程池的拒绝策略
线程池提交execute和submit的区别?
怎么关闭线程池?
✅JUC常用类
在集合类中,例如Vector,Hashtable这些类加锁时都是直接把锁加在方法上,性能较低。在并发访问量较小是还可以使用,但是在高并发访问下性能就显得很低。
▪ HashMap适合单线程场景下,不允许多个线程同时访问操作,如果有多线程访问会报异常
▪ Hashtable是线程安全的,直接给方法加锁,效率低
基于Hashtable效率低的情况,ConcurrentHashMap弥补了其缺点。
ConcurrentHashMap
▪ ConcurrentHashMap是线程安全的,且没有直接给方法加锁,用哈希表中每一个位置上的第一个元素作为锁对象。若哈希表长度是16,那么就有16把锁对象,只管锁住自己的位置即可。
▪ 这样如果多个线程若操作不同的位置,那么相互不影响,只有多个线程操作同一个位置时才会等待。
▪ 如果位置上没有任何元素,那么采用CAS机制插入数据到指定为止。
Hashtable,ConcurrentHashMap 键和值都不能为null
public static void main(String[] args) {
/*
hashmap适合单线程场景下的,不允许多线程同时访问操作
*/
HashMap<String,String> map = new HashMap<>();
for (int i = 0; i < 2000; i++) {
int finalI =i;
new Thread(()->{
map.put("key:"+finalI, "value:"+finalI);
}).start();
System.out.println(map);
}
}
CopyOnWriteArrayList
▪ CopyOnWriteArrayList中,为写方法操作加了锁(ReentrantLock实现的)
▪ 在写入数据时,先把原数组做了备份,把要添加的数据写入到备份数组中,当写入完成后,再把修改的数组赋值到原数组中
▪ 给写加了锁,读没有加锁,读的效率提高了,适合写操作少,读操作多的场景
▼ 测试代码
public static void main(String[] args) {
CopyOnWriteArrayList<String> arrayList =new CopyOnWriteArrayList();
for (int i = 0; i < 1000; i++) {
int finalI = i;
new Thread(()->{
arrayList.add("a"+ finalI);
}).start();
System.out.println(arrayList);
}
}
CountDownLatch
CountDownLatch是java.util.concurrent
包中的一个同步工具类,允使一个线程或多个线程等待其他线程执行结束后再执行。
它通过一个计数器(count
)实现,是一个递减的计数器,初始化时设定计数值,当有一个线程执行结束后就减一,直到为 0 关闭计数器,这样线程就可以执行了。
▼ 源代码
public static void main(String[] args) throws InterruptedException {
CountDownLatch downLatch = new CountDownLatch(6);//计数
for (int i = 0; i <6 ; i++) {
new Thread(
()->{
System.out.println(Thread.currentThread().getName());
downLatch.countDown();//计数器减一操作
}
).start();
}
downLatch.await();//关闭计数
System.out.println("main线程执行");
}
▼ 测试结果
✅线程池
池的概念:
▪ 我们了解的池,像字符串常量池,若String s1 = "abc" ;String s2 = "abd" ;s1 == s2 // true
▪ Integer自动装箱,缓存了-128到127之间的对象
▪ 数据库连接池,例如阿里巴巴Druid,它就是事先帮助我们缓存了一定数量的链接对象,放在池子里,用完还回到池子中,从而减少了对象的频繁创建和销毁的时间开销。
▪ 那么线程池其实就是为了减少频繁创建和销毁线程
ThreadPoolExecutor
阿里规范建议使用ThreadPoolExecutor类来创建线程池,提高效率
✅关于在ThreadPoolExecutor的构造方法中七个参数的含义:
▪ corePoolSize:核心线程池中的数量(初始化数量)
▪ maximumPoolSize:线程池中最大的数量
▪ keepAliveTime:空闲线程存活时间
▪ unit:时间单位
▪ workQueue:等待队列,当核心线程池中的线程都在使用时,会先将等待的线程放到队列中,当队列满时才会创建新的线程
▪ threadFactory:线程工程,用来创建线程池中的线程
▪ handler:拒绝处理任务时的策略
线程池的工作流程
▪ 当有大量的任务到来时,先判断核心线程池中的线程是否都忙着(是否已满)
▪ 有空闲的直接让核心线程中的线程执行任务
▪ 没有空闲的则判断等待队列是否已满
▪ 如果没满把任务添加到队列等待
▪ 如果已满,判断非核心线程池中的线程是否都忙着
▪ 如果有空闲的,没满,交由线程核心线程池中的线程执行
▪ 如果非核心线程池也已经满了,那么就使用对应的拒绝策略处理
线程池的拒绝策略
在线程池中,当提交的任务数量超过了线程池的最大容量,线程池就需要使用拒绝策略来处理无法处理的新任务。Java 中提供了 4 种默认的拒绝策略:
▪ AbortPolicy(默认策略):直接抛出 runtime 异常,阻止系统正常运行。
▪ CallerRunsPolicy:由提交该任务的线程来执行这个任务。
▪ DiscardPolicy:直接丢弃任务,不给予任何处理。
▪ DiscardOldestPolicy:丢弃队列中最老的一个请求,尝试再次提交当前任务。
除了这些默认的策略之外,我们也可以自定义自己的拒绝策略,实现RejectedExecutionHandler接口即可。
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义的拒绝策略处理逻辑
}
}
线程池提交execute和submit的区别?
在Java中,线程池中一般有两种方法来提交任务:execute() 和 submit()
▪ execute() 用于提交不需要返回值的任务
▪ submit() 用于提交需要返回值的任务。
线程池会返回一个future类型的对象,通过这个 future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值
怎么关闭线程池?
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
shutdown:将线程池状态置为shutdown,并不会立即停止:
▪ 停止接收外部submit的任务
▪ 内部正在跑的任务和队列里等待的任务,会执行完
▪ 等到第二步完成后,才真正停止
shutdownNow:将线程池状态置为stop。一般会立即停止,事实上不一定:
▪ 和shutdown()一样,先停止接收外部提交的任务
▪ 忽略队列里等待的任务
▪ 尝试将正在跑的任务interrupt中断
▪ 返回未执行的任务列表
shutdown 和shutdownnow区别如下:
▪ shutdownNow:能立即停止线程池,正在跑的和正在等待的任务都停下了。这样做立即生效,但是风险也比较大。
▪ shutdown:只是关闭了提交通道,用submit()是无效的;而内部的任务该怎么跑还是怎么跑,跑完再彻底停止线程池。
▼ Shutdown样例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 一颗小谷粒
*/
public class BaiLiShutdownDemo {
public static void main(String[] args) {
// 创建一个线程池,包含两个线程
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任务到线程池
executor.submit(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task 1 finished");
});
executor.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task 2 finished");
});
// 关闭线程池
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Waiting for all tasks to finish...");
try {
// 每500毫秒检查一次
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("All tasks finished");
}
}
▼ ShutdownNow样例
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author 一颗小谷粒
*/
public class BaiLiShutdownNowDemo {
public static void main(String[] args) {
// 创建一个线程池,包含两个线程
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任务到线程池
executor.submit(() -> {
while (!Thread.interrupted()) {
System.out.println("Task 1 running...");
}
System.out.println("Task 1 interrupted");
});
executor.submit(() -> {
while (!Thread.interrupted()) {
System.out.println("Task 2 running...");
}
System.out.println("Task 2 interrupted");
});
// 关闭线程池
List<Runnable> unfinishedTasks = null;
executor.shutdownNow();
try {
// 等待直到所有任务完成或超时60秒
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 如果等待超时,则记录未完成的任务列表
unfinishedTasks = executor.shutdownNow();
System.out.println("Some tasks were not finished");
}
} catch (InterruptedException e) {
// 如果等待过程中发生异常,则记录未完成的任务列表
unfinishedTasks = executor.shutdownNow();
Thread.currentThread().interrupt();
}
if (unfinishedTasks != null && !unfinishedTasks.isEmpty()) {
System.out.println("Unfinished tasks: " + unfinishedTasks);
} else {
System.out.println("All tasks finished");
}
}
}
🌟感谢大家喜欢我的文章哦🌟