Java进阶版线程池(超详细 )
线程池
线程池工具类 Executors
Executors
是 Java 提供的一个工具类,它包含了多个静态方法,能够方便地创建不同类型的线程池。
newFixedThreadPool
创建一个固定大小的线程池,线程池中的线程数量固定,当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则将任务放入工作队列等待。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为 3 的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
});
}
executor.shutdown();
}
}
newSingleThreadExecutor
创建一个单线程的线程池,线程池只有一个线程,所有任务按顺序依次执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
// 创建一个单线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
});
}
executor.shutdown();
}
}
newCachedThreadPool
创建一个可缓存的线程池,线程池的线程数量不固定,当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则创建新线程来执行任务。当线程空闲时间超过 60 秒时,会被销毁。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
// 创建一个可缓存的线程池
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
});
}
executor.shutdown();
}
}
newScheduledThreadPool
创建一个支持定时和周期性任务执行的线程池。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
// 创建一个大小为 2 的定时线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 延迟 2 秒后执行任务
executor.schedule(() -> System.out.println("Scheduled task executed"), 2, TimeUnit.SECONDS);
// 延迟 1 秒后开始,每 3 秒执行一次任务
executor.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 1, 3, TimeUnit.SECONDS);
executor.shutdown();
}
}
自定义线程池:ThreadPoolExecutor
常用构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数含义如下:
- corePoolSize:核心线程数。线程池会保持这些线程一直存活,即便它们处于空闲状态。当有新任务提交时,若线程池里的线程数量少于
corePoolSize
,就会创建新线程来处理任务。 - maximumPoolSize:线程池允许的最大线程数。当工作队列已满,并且线程池中的线程数量小于
maximumPoolSize
时,会创建新线程来处理任务。 - keepAliveTime:线程空闲时的存活时间。当线程池中的线程数量超过
corePoolSize
,且这些多余的线程空闲时间达到keepAliveTime
时,它们会被销毁。 - unit:
keepAliveTime
的时间单位,它是TimeUnit
枚举类型,例如TimeUnit.SECONDS
、TimeUnit.MILLISECONDS
等。 - workQueue:用于存储待执行任务的阻塞队列。常见的队列类型有
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等。 - threadFactory:线程工厂,用于创建线程。通过自定义线程工厂,可以为线程设置名称、优先级等属性。
- handler:拒绝策略,当工作队列已满且线程池中的线程数量达到
maximumPoolSize
时,新提交的任务会被拒绝,此时会调用该策略来处理被拒绝的任务。常见的拒绝策略有AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
、DiscardOldestPolicy
等。
CompletableFuture
CompletableFuture
是 Java 8 引入的一个类,位于 java.util.concurrent
包中。它是用于异步编程的工具类,表示一个异步任务的未来结果。CompletableFuture
提供了丰富的 API,用于处理异步任务的完成、组合和异常处理。
CompletableFuture
与 Thread
和 Runnable
的区别
Thread
和 Runnable
Thread
:是 Java 中最基本的线程类,用于创建和管理线程。它提供了线程的基本功能,但不支持异步编程和结果处理。
Runnable
:是一个接口,表示一个可以被线程执行的任务。它通常与 Thread
一起使用,但同样不支持异步编程和结果处理。
缺点:Thread和Runnable都是在run()中写多线程代码,二者都没有返回值(可以使用轮询和回调)。
CompletableFuture的出现解决了这个问题,它支持下面功能:
-
支持异步编程,可以创建异步任务并处理其结果。
-
链式调用:支持链式调用,可以将多个异步任务组合在一起,形成一个完整的流程。
-
异常处理:提供了丰富的异常处理机制,可以捕获和处理异步任务中的异常。
-
组合操作:可以组合多个异步任务,例如
allOf()
和anyOf()
,并等待它们完成。
实例:
Supplier<String> mm1 = new Supplier<String>() {
@Override
public String get() {
for(int i=0;i<10;i++){
System.out.println("1111111111");
}
return "第一个";
}
};
CompletableFuture<String> dd1 = CompletableFuture.supplyAsync(mm1);
Supplier<String> mm2 = new Supplier<String>() {
@Override
public String get() {
for(int i=0;i<10;i++){
System.out.println("2222222222222");
}
return "第二个";
}
};
CompletableFuture<String> dd2 = CompletableFuture.supplyAsync(mm2);
Supplier<String> mm3 =()->{ //Lambda表达式(匿名函数)
for(int i=0;i<100;i++){
System.out.println("3333333333");
}
return "第三个";
};
// Supplier<String> mm3 = new Supplier<String>() {
// @Override
// public String get() {
// for(int i=0;i<100;i++){
// System.out.println("3333333333");
// }
// return "第三个";
// }
// };
CompletableFuture<String> dd3 = CompletableFuture.supplyAsync(mm3);
CompletableFuture<Void> vo = CompletableFuture.allOf(dd1, dd2, dd3);
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println(dd1.join()+"--------"+dd2.join()+"--------"+dd3.join());
}
};
vo.thenRun(r);
vo.join();
CompletableFuture的重要API:
CompletableFuture.runAsync(Runnable runnable)
- 此方法用于异步执行一个
Runnable
任务,没有返回值。它会使用ForkJoinPool.commonPool()
作为线程池来执行任务。 - 示例代码:
- 此方法用于异步执行一个
import java.util.concurrent.CompletableFuture;
public class RunAsyncExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running task asynchronously");
});
future.join();
}
}
CompletableFuture.runAsync(Runnable runnable, Executor executor)
- 与上面的方法类似,但可以指定一个自定义的
Executor
来执行任务(没有返回值)。
- 与上面的方法类似,但可以指定一个自定义的
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RunAsyncWithExecutorExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running task asynchronously with custom executor");
}, executor);
future.join();
executor.shutdown();
}
}
CompletableFuture.supplyAsync(Supplier<U> supplier)
- 异步执行一个
Supplier
任务,有返回值。同样使用ForkJoinPool.commonPool()
作为线程池。 - 示例代码:
- 异步执行一个
import java.util.concurrent.CompletableFuture;
public class SupplyAsyncExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Result from asynchronous task";
});
String result = future.join();
System.out.println(result);
}
}
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
- 与
supplyAsync(Supplier<U> supplier)
类似,可指定自定义的Executor
来执行任务。
- 与
2. 处理任务结果
thenApply(Function<? super T,? extends U> fn)
- 当
CompletableFuture
完成后,对结果进行转换。返回一个新的CompletableFuture
,其结果是原CompletableFuture
结果经过Function
处理后的结果。 - 示例代码:
- 当
import java.util.concurrent.CompletableFuture;
public class ThenApplyExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> newFuture = future.thenApply(num -> num * 2);
Integer result = newFuture.join();
System.out.println(result);
}
}
thenAccept(Consumer<? super T> action)
- 当
CompletableFuture
完成后,对结果进行消费,没有返回值。 - 示例代码:
- 当
import java.util.concurrent.CompletableFuture;
public class ThenAcceptExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10);
future.thenAccept(num -> System.out.println("Received result: " + num));
}
}
thenRun(Runnable action)
- 当
CompletableFuture
完成后,执行一个Runnable
任务,不关心原CompletableFuture
的结果。 - 示例代码:
- 当
import java.util.concurrent.CompletableFuture;
public class ThenRunExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
future.thenRun(() -> System.out.println("Task completed"));
}
}
3. 组合多个 CompletableFuture
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
- 用于组合两个
CompletableFuture
,前一个CompletableFuture
的结果作为后一个CompletableFuture
的输入。 - 示例代码:
- 用于组合两个
import java.util.concurrent.CompletableFuture;
public class ThenComposeExample {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> future2 = future1.thenCompose(num -> CompletableFuture.supplyAsync(() -> num * 4));
Integer result = future2.join();
System.out.println(result);
}
}
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
- 当两个
CompletableFuture
都完成后,将它们的结果组合起来。 - 示例代码:
- 当两个
import java.util.concurrent.CompletableFuture;
public class ThenCombineExample {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (num1, num2) -> num1 + num2);
Integer result = combinedFuture.join();
System.out.println(result);
}
}
4. 异常处理
exceptionally(Function<Throwable, ? extends T> fn)
- 当
CompletableFuture
出现异常时,使用Function
处理异常并返回一个默认值。 - 示例代码:
- 当
import java.util.concurrent.CompletableFuture;
public class ExceptionallyExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Something went wrong");
}
return 10;
});
CompletableFuture<Integer> resultFuture = future.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return 0;
});
Integer result = resultFuture.join();
System.out.println("Final result: " + result);
}
}
handle(BiFunction<? super T, Throwable, ? extends U> fn)
- 无论
CompletableFuture
是否完成或出现异常,都会执行BiFunction
,可以根据是否有异常来处理结果。 - 示例代码:
- 无论
import java.util.concurrent.CompletableFuture;
public class HandleExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Something went wrong");
}
return 20;
});
CompletableFuture<Integer> resultFuture = future.handle((result, ex) -> {
if (ex != null) {
System.out.println("Caught exception: " + ex.getMessage());
return 0;
}
return result;
});
Integer result = resultFuture.join();
System.out.println("Final result: " + result);
}
}
5. 等待多个 CompletableFuture
完成
CompletableFuture.allOf(CompletableFuture<?>... cfs)
- 等待所有给定的
CompletableFuture
都完成。返回一个新的CompletableFuture
,当所有输入的CompletableFuture
都完成时,这个新的CompletableFuture
也完成。 - 示例代码:
- 等待所有给定的
import java.util.concurrent.CompletableFuture;
public class AllOfExample {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.join();
Integer result1 = future1.join();
Integer result2 = future2.join();
Integer result3 = future3.join();
System.out.println("Results: " + result1 + ", " + result2 + ", " + result3);
}
}
CompletableFuture.anyOf(CompletableFuture<?>... cfs)
- 只要有一个给定的
CompletableFuture
完成,就返回一个新的CompletableFuture
,其结果是第一个完成的CompletableFuture
的结果。 - 示例代码:
- 只要有一个给定的
import java.util.concurrent.CompletableFuture;
public class AnyOfExample {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
Object result = anyFuture.join();
System.out.println("First completed result: " + result);
}
}