java异步编程接口简介
java异步编程接口简介
java中使用线程的4种方式:
- 继承thread类
- 实现Runnable接口
- 实现Callable接口结合FutureTasl使用
- 使用线程池
1.继承Thread类
继承Thread类并重写run方法
来定义线程执行的任务。这种方式简单直观,但存在一些限制。
代码示例:
public class ThreadExample1 extends Thread {
@Override
public void run() {
System.out.println("ThreadExample1 is running.");
}
}
// 使用
ThreadExample1 thread = new ThreadExample1();
thread.start();
线程实现方式:将线程需要执行的操作放在一个继承了Thread的类里边,当需要执行这个线程的时候,就新建类对象,调用类对象的start()方法。
问题
:继承Thread类会强制要求类继承自Thread,这限制了类的继承结构,因为Java不支持多重继承。
2.实现Runnable接口
实现Runnable接口并重写run方法
,可以避免继承Thread类的限制,因为Java允许一个类实现多个接口。
代码示例:
public class RunnableExample1 implements Runnable {
@Override
public void run() {
System.out.println("RunnableExample1 is running.");
}
}
// 使用
RunnableExample1 runnable = new RunnableExample1();
new Thread(runnable).start();
线程实现方式:将runnable
的实例对象传递给Thread
的构造方法,然后调用Thread对象的start()方法。
3.实现Callable接口结合FutureTask
实现Callable接口
并重写run方法
,用FutureTask
封装Callable接口实例
,将新创建的FutureTask实例对象传递给Thread
的构造方法,然后调用Thread对象的start()方法。
Callable接口
允许任务有返回值,并且可以通过FutureTask
来管理线程的生命周期和结果。
代码示例:
public class CallableExample1 implements Callable<Integer> {
@Override
public Integer call() {
return 100;
}
}
// 使用
FutureTask<Integer> futureTask = new FutureTask<>(new CallableExample1());
new Thread(futureTask).start();
try {
Integer result = futureTask.get(); // 阻塞等待结果
System.out.println("CallableExample1 returned: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
线程实现方式:将 Callable接口
的实例对象包装成FutureTask
类型的实例,然后将FutureTask对象实例放入到Thread中执行。
问题
:尽管这种方式可以获取返回值,但创建和管理线程的开销仍然存在。
4.使用线程池
线程池提供了一种高效的方式来管理线程
,它可以复用线程
,减少创建和销毁线程的开销,同时可以控制并发级别。
代码示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10); // 创建线程池
Runnable task = () -> System.out.println("Task is running in thread pool.");
executorService.submit(task); // 提交任务到线程池
// 执行完毕后关闭线程池
executorService.shutdown();
}
}
其中,Runnable task = () -> System.out.println(“Task is running in thread pool.”);是一种使用lambda表达式
实现函数式接口
的方式。函数式接口简介参考往往期的博客:https://blog.csdn.net/m0_70484213/article/details/145859478
和通过匿名内部类的写法等价:
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task is running in the thread pool.");
}
};
优势:
-
资源控制:线程池可以控制并发线程的数量,有效管理资源。
-
性能稳定:通过复用线程,减少了线程创建和销毁的开销,提高了性能。
-
易于管理:线程池提供了统一的接口来提交和管理任务,简化了代码。
在实际开发中,推荐使用线程池来执行多线程任务
,因为它提供了更好的资源管理和性能优势。
前三种方式虽然在某些简单场景下可以使用,但在需要高效资源管理和复杂任务调度的现代应用程序中,它们往往不够灵活和高效。
线程池详解
写在最前面,创建线程池的代码:只有一行
public static ExecutorService executor= Executors.newFixedThreadPool(10);
1.线程池的七大参数
线程池(ThreadPoolExecutor
)是Java并发编程中用于管理线程的一个核心组件。它允许你复用线程,从而减少因频繁创建和销毁线程而带来的开销。以下是线程池的七个主要参数,这些参数在创建线程池时可以进行配置:
-
corePoolSize:线程池的基本大小,即在没有任务执行时,线程池中保留的线程数量。如果允许核心线程超时(通过设置
allowCoreThreadTimeOut
为true
),则即使线程池中的线程是空闲的,它们也会在一定时间后终止。 -
maximumPoolSize:线程池中允许的最大线程数。如果任务队列满了,线程池会尝试创建新的线程,直到达到这个数目。
-
keepAliveTime:当线程池中的线程数量超过
corePoolSize
时,多余的空闲线程能等待新任务的最长时间。如果在这个时间内没有新任务到达,这些线程将被终止。 -
unit:
keepAliveTime
参数的时间单位,常见的时间单位有TimeUnit.SECONDS
、TimeUnit.MILLISECONDS
等 -
workQueue:一个阻塞队列,用于存储等待执行的任务。这个队列只保存通过
execute(Runnable)
方法提交的Runnable
任务。 -
threadFactory:一个线程工厂,用于创建新线程。通过自定义线程工厂,你可以对线程的创建过程进行控制,例如设置线程的名称、优先级等。
-
handler:当线程池饱和(即线程数量达到maximumPoolSize且任务队列已满)时,使用的饱和策略。饱和策略定义了如何处理无法立即执行的任务,常见的饱和策略有:
ThreadPoolExecutor./AbortPolicy/DiscardPolicy/CallerRunsPolicy/DiscardOldestPolicy
AbortPolicy
:抛出RejectedExecutionException来拒绝新任务的处理(Executors.newFixedThreadPoold的默认策略
)。DiscardPolicy
:同AbortPolicy,但不会抛出异常,直接丢弃无法处理的任务。CallerRunsPolicy
:调用执行任务的线程(提交任务的线程)来运行当前任务。DiscardOldestPolicy
:丢弃任务队列中最老的任务,然后尝试再次提交当前任务。
2.Executors能创建的4中线程池
在实际开发过程中,一般用Executors
创建线程池,Executors
相当于一个工厂类,创建各种各样的线程池对象。
FixedThreadPool
:核心线程=最大线程
类型:固定大小的线程池。
特点:线程池中的线程数量是固定的。当一个线程完成任务后,它会从队列中获取新的任务来执行。这种线程池能够保证程序的资源占用相对稳定。
用途:适用于需要控制线程数量的场景,如后台任务处理、定时任务等。
CachedThreadPool
:核心线程=0,最大线程=Integer.max_value
类型:可缓存的线程池。
特点:线程池的大小没有限制,可以根据需要动态地调整线程的数量。空闲的线程会在一段时间后自动终止。
用途:适用于执行大量短小的任务,如网络I/O操作、计算密集型任务等。
SingleThreadExecutor
:核心线程=最大线程=1
类型:单线程的线程池。
特点:线程池只有一个线程,确保所有任务按照指定顺序执行。
用途:适用于需要保证任务顺序执行的场景,例如数据更新、文件写入等。
ScheduledThreadPoolExecutor
:
类型:支持定时任务的线程池。
特点:线程池可以调度任务以定期或延迟的方式执行。
用途:适用于需要定期执行任务或延时执行任务的场景,如计划任务、周期性检查等。
下面是这四种线程池的具体实现方式:
FixedThreadPool
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
CachedThreadPool
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
SingleThreadExecutor
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledThreadPoolExecutor
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(nThreads);
其中,nThreads
表示线程池中的线程数量。对于 newFixedThreadPool 和 newScheduledThreadPool,这个值是必需的,而对于 newSingleThreadExecutor 和 newCachedThreadPool,则不需要指定线程数量。
CompletableFuture 简介
CompletableFuture
是 Java 8 引入的一个非常重要的类,它提供了一种异步编程的解决方案,用于编写非阻塞的
、基于回调
的代码。
以下是 CompletableFuture
的一些关键特性:
异步执行
:CompletableFuture 可以异步执行
任务,而不会阻塞
当前线程。
链式调用
:通过方法引用
和 lambda 表达式
,CompletableFuture 支持链式调用
,允许你将多个异步操作链接在一起。
回调
:你可以为 CompletableFuture 添加回调方法
,当操作完成或发生异常时,这些回调将被执行。
组合操作
:CompletableFuture 允许你组合
多个异步操作,例如 thenCombine、thenAcceptBoth 等。
异常处理
:CompletableFuture 提供了 exceptionally 方法,用于处理异步操作中发生的异常。
转换结果
:CompletableFuture 的 thenApply 方法可以用来转换
异步操作的结果。
等待完成
:CompletableFuture 的 get 方法可以等待异步操作完成
,并返回结果。
取消操作
:CompletableFuture 支持取消尚未完成的异步操作。
CompletableFuture执行任务的四个静态方法
runAsync
和supplyAsync
系列
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
回调方法
whenComplete
系列
whenComplete
在completableFuture
完成后执行,能感知结果
和异常
,但没法修改返回的结果(无法使用return)。
//TODO 会在当前的主线程中执行。
public CompletableFuture<T> whenComplete(BiComsumer<? super T, ? super Throwable> action );
public CompletableFuture<T> whenCompleteAsync(BiComsumer<? super T, ? super Throwable> action );
public CompletableFuture<T> whenCompleteAsync(BiComsumer<? super T, ? super Throwable> action, Executor executor);
exceptionally
系列
exception
在completableFuture
抛出异常后执行,能感知异常
,同时用return修改返回的结果。
public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn);
handle
系列
handle
在completableFuture
完成后执行,可以使用修改返回的结果。
public <U> CompletionStage<U> handle(BiFuction<? super T.Throwable,? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFuction<? super T.Throwable,? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFuction<? super T.Throwable,? extends U> fn,Executor executor);
编排1:多线程串行
1.thenApply
系列方法:
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) #跟当前线程共用一个线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) #默认线程池异步执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) #指定线程池异步执行
thenApply
(Function<? super T, ? extends U> fn):这个方法获取上一步的执行结果,同时返回值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = future1.thenApply(String::length);
在上面的例子中,future1是一个已完成的CompletableFuture,它的结果是字符串"Hello"。future2是future1的一个后续操作,它会计算字符串的长度,并将长度返回return。
2.thenAccept
系列方法:
public CompletionStage<void> thenAccept(Consumer<? super T> action) #跟当前线程共用一个线程
public CompletionStage<void> thenAcceptAsync(Consumer<? super T> action) #默认线程池异步执行
public CompletionStage<void> thenAcceptAsync(Consumer<? super T> action, Executor executor) #指定线程池异步执行
thenAccept
(Consumer<? super T> action):这个方法获取上一步的执行结果,但不返回任何值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
future1.thenAccept(System.out::println);
在上面的例子中,future1的结果会被打印出来,注意thenAccept不会返回任何值。
3.thenRun
系列方法:
public CompletionStage<void> thenRun(Runnable action) #跟当前线程公共用一个线程
public CompletionStage<void> thenRunAsync(Runnable action) #默认线程池异步执行
public CompletionStage<void> thenRunAsync(Runnable action, Executor executor) #指定线程池异步执行
thenRun(Runnable action):这个方法不获取到上一步的执行结果,同时不返回任何值。
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
future1.thenRun(() -> System.out.println("Task completed"));
thenApply、thenAccept和thenRun三者比较
方法 | 能否获取上一步返回结果 | 能否返回值 |
---|---|---|
thenApply | 能 | 能 |
thenAccept | 能 | 不能 |
thenRun | 不能 | 不能 |
编排2:两个异步任务组合为一个任务
2.1 两个任务均需要完成
1.thenCombine
系列方法
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
thenCombine
(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):这个方法组合两个future,获取两个future的返回结果,并返回当前任务的返回值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5);
CompletableFuture<String> combined = future1.thenCombine(future2, (s, i) -> s + " " + i);
在上面的例子中,combined是一个新的CompletableFuture,它会等待future1和future2都完成后,获取它们的执行结果
,将它们的结果合并成一个字符串并返回
。
2.thenAcceptBoth
系列方法
public <U> CompletableFuture<void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
thenAcceptBoth
(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action):这个方法组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5);
future1.thenAcceptBoth(future2, (s, i) -> System.out.println(s + " " + i));
在上面的例子中,当future1和future2都完成后,获取它们的结果,并打印出它们的结果,但不返回任何值。
3.runAfterBoth
系列方法
public CompletableFuture<void> runAfterBoth(CompletionStage<?> other, Runnable action)
public CompletableFuture<void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
public CompletableFuture<void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
runAfterBoth
(CompletionStage<?> other, Runnable action):这个方法组合两个future,不会要获取future的结果,当两个future处理完任务后,执行该任务,不返回值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5);
future1.runAfterBoth(future2, () -> System.out.println("Both tasks completed"));
在上面的例子中,当future1和future2都完成后,不会获取它们的结果,然后输出"Both tasks completed",但不返回任何值。
thenCombine、thenAcceptBoth和runAfterBoth三者比较
thenCombine
:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth
:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth
:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
方法 | 能否获取上一步返回结果 | 能否返回值 |
---|---|---|
thenCombine | 能 | 能 |
thenAcceptBoth | 能 | 不能 |
runAfterBoth | 不能 | 不能 |
2.2 两个任务完成一个即可
1.applyToEither
系列方法
public <U> ComletableFuture<U> applyToEither(CompletionStage<? extends U> other, Function<? super U, V> fn)
public <U> ComletableFuture<U> applyToEitherAsync(CompletionStage<? extends U> other, Function<? super U, V> fn)
public <U> ComletableFuture<U> applyToEitherAsync(CompletionStage<? extends U> other, Function<? super U, V> fn,Executor executor)
applyToEither
(CompletionStage<? extends U> other, Function<? super U, V> fn):两个任务有一个执行完成,获取它的返回值,处理任务并由新的返回值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5).thenDelay(1000, TimeUnit.MILLISECONDS);
CompletableFuture<String> either = future1.applyToEither(future2, String::valueOf);
在上面的例子中,either是一个新的CompletableFuture,它会选择首先完成的任务(在这个例子中是future1),获取其结果,将其结果转换为字符串后返回。
2.acceptEither
系列方法
public CompletableFuture<void> acceptEither(CompletionStage<? extends U> other, Consumer<? super U> action)
public CompletableFuture<void> acceptEitherAsync(CompletionStage<? extends U> other, Consumer<? super U> action)
public CompletableFuture<void> acceptEitherAsync(CompletionStage<? extends U> other, Consumer<? super U> action,Executor executor)
acceptEither
(CompletionStage<? extends U> other, Consumer<? super U> action):这个方法用于在两个CompletableFuture中选择第一个完成的任务,然后应用传入的Consumer对象来消费其结果。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5).thenDelay(1000, TimeUnit.MILLISECONDS);
future1.acceptEither(future2, System.out::println);
在上面的例子中,当其中一个任务完成时,会获取到该该任务的结果,并打印出来,无返回值。
3.runAfterEither
系列方法
public ComletableFuture<void> runAfterEither(CompletionStage<?> other,Runnable action)
public ComletableFuture<void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public ComletableFuture<void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
runAfterEither
(CompletionStage<?> other, Runnable action):这个方法两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5).thenDelay(1000, TimeUnit.MILLISECONDS);
future1.runAfterEither(future2, () -> System.out.println("One of the tasks completed"));
在上面的例子中,当其中一个任务完成时,会输出"One of the tasks completed"。
applyToEither、acceptEither和runAfterEither的区别
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并由新的返回值。
acceptEither:两个任务由一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
总结1,apply
既获取上一步的结果又要返回值,accept
获取上一步的结果但不返回值,run
既不获取上一步的结果又不返回值。
总结2,所有的异步方法,如果不指定自己建立的线程池,默认使用ForkJoinPool,ForkJoinPool是守护线程,主线程结束,守护线程也结束。
编排3:多任务编排
anyOf()、anyOf()
1.allOf()
方法:多个任务全部完成才行,没有返回值
public static CompletableFuture<void> allOf(CompletableFuture<?>... cfs)
allOf
(CompletableFuture<?>… cfs):这个方法用于将多个 CompletableFuture
对象组合成一个新的 CompletableFuture
对象。这个新的 CompletableFuture
对象会在所有传入的 CompletableFuture
对象都完成时完成。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5);
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2);
all.get(); // 返回 null
在上面的例子中,all是一个新的CompletableFuture,它会在future1
和future2
都完成后才完成。
当所有的CompletableFuture都完成后,可以通过all.join()
或者all.get()
获取结果。由于all本身没有结果
,所以它的类型是Void。如果任何一个CompletableFuture失败了,那么all也会立即失败
,并抛出相应的异常
。
2.anyOf()
方法:一个任务完成即可,anyof()返回的结果是第一个完成的CompletableFuture返回的结果。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
anyOf
(CompletableFuture<?>… cfs):这个方法用于创建一个新的CompletableFuture,表示至少有一个输入的CompletableFuture完成。例如:
CompletableFuture<String> future1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<Integer> future2 = CompletableFuture.completedFuture(5);
CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2);
在上面的例子中,any
是一个新的CompletableFuture,它会在future1
或future2
中任一完成时就完成。
当至少有一个CompletableFuture完成时,可以通过any.join()
或者any.get()
获取结果。由于any可能由任何CompletableFuture的结果决定
,所以它的类型是Object
。如果所有的CompletableFuture都失败了,那么any也会失败,并抛出最后一个发生的异常。