安卓 Java 中比 RxJava 更好用的多线程异步框架 MultithreadingExecutor
文章目录
- MultithreadingExecutor 系列框架支持的功能
- 实战
- 单线程执行器 SerialExecutor
- 单线程异步执行器 AsyncSerialExecutor
- 多线程执行器 MultithreadingExecutor
- 创建 MultithreadingExecutor 实例
- 提交任务
- 内部处理中断请求
- 外部中断任务
- 设置中断回调
- 开启逻辑任务
- 组队任务多线程执行器 GroupMultiExecutor
- 创建 GroupMultiExecutor 实例
- 提交任务
- 执行任务
- 安卓适配多线程执行器 ActivityMultithreading
- 继承 Activity
- 使用 ActivityMultithreading
- 源代码
众所周知,RxJava 是一个基于响应式编程思想和观察者模式的异步事件处理框架。它提供了优雅的流式编程和声明式编程,大大增强了代码的可读性和编程的便捷性。不过,在笔者的使用过程,也遇到了一些最终难以忍受的问题。最终,笔者决定开发一个新的多线程异步框架来弥补 RxJava 的致命性缺陷。
RxJava 中存在的问题:
-
设计机械,逻辑臃肿。RxJava 对所有的场景都采用观察者模式,很多时候这是不必要的。因为 Java 支持 lambda 表达式,可以直接在 lambda 表达式内完成数据回写,所以不一定非要经过将结果层层返回等等一系列繁琐的固定编程流程。另外,RxJava 提供了方便的线程创建以及线程切换,但实际上线程创建和线程切换本来就很简单,此为不必要的功能。
-
过度封装,调试困难。RxJava 过度封装了太多本来应该暴露给开发者的底层细节,如线程池的创建等等。这些细节应该留给开发者根据实际的场景来调优,而不应该事先决定且一概而论。另外,优雅的流式编程的美丽外表隐藏了难以追踪程序内部走向的丑陋。
-
功能不全,支持有限。RxJava 虽然有众多的方法库,但却连一个程序的生命周期方法都没有。如果可以提供异步程序的启动之前、结束之后回调,以及排队任务数量、运行线程数量、中断任务、组队任务等等,这将为应用的性能调优带来极大的方便。此外,流式的级联回调导致 lambda 表达式之间不能便捷地共享局部变量。流式看似优雅,但增加了不便。
基于此,笔者决定开发一个新的 Java 多线程框架,希望在减少复杂步骤的同时,保留给开发者一定的自由度。当然,这并不是说 RxJava 不行,而是说要不同的场景选择技术,没有一种技术可以满足任何需求。经过四年的打磨,笔者最终开发出了一套比 RxJava 更好用的多线程异步框架 MultithreadingExecutor 系列框架。
MultithreadingExecutor 系列框架的优点:(下面将简称为 ME)
-
操作极其简单,学习成本几乎为 0。ME 不要求开发者像 RxJava 一样需要花比较大的时间成本去学习它的设计思想与使用指南。ME 的方法基本是自注释的,通俗易懂。
-
充分的设计自由度,拓展性强。ME 并不负责创建线程池的细节,而要求开发者自行提供,这避免掩盖了开发者必须掌握和自行决定的技术细节。
-
流式编程,异常回调。ME 与 RxJava 一样支持流式编程和异常回调。不仅如此,ME 还支持嵌套任务(嵌套 lamdba 表达式)。开发者可以在一个任务中异步提交一个子任务,且提交的子任务仍然受主任务控制。
-
生命周期,任务监控。ME 支持为每个任务提供其各个生命周期节点的回调,如任务开始前、任务结束后等等。这非常方便开发者统计任务运行时间,监控任务运行状态,优化程序设计。另外,每个提交的任务都存在一个控制台 Console,开发者可以在外部任何线程中请求中断正在运行的任务。
-
多样化的线程模型。ME 系列框架支持单线程、异步单线程,线程池分组,组队任务等等。开发者可以根据实际场景选择最适合自己的线程模型。其中,组队任务支持并发计算,等所有任务都结束时,可以触发异步总任务结束回调。
-
安卓适配。ME 对安卓进行了适配。如果在安卓环境下,可以自由地在安卓 UI 线程和非 UI 线程中同步切换。此外,如果和安卓 Activity 的生命周期相绑定,还可以在 Activity 创建时惰性创建线程池,在 Activity 销毁时自动销毁线程池。
MultithreadingExecutor 系列框架支持的功能
MultithreadingExecutor 系列框架支持如下功能:
-
单线程执行器 SerialExecutor:用于支持线程任务串行执行。用单线程可以保证从不同来源提交的异步任务的线程安全。
-
单线程异步执行器 AsyncSerialExecutor:一种异步的 SerialExecutor,将任务的提交与运行分离,可以自由决定提交的任务执行的时机。
-
多线程执行器 MultithreadingExecutor:分组多线程执行器。支持如下功能:
-
为不同的线程池设置分组。关闭其中一种线程池时,可以保证其它的不受影响。同时支持单例模式、惰性模式等。
-
支持任务的生命周期。提供了相应的回调,可以监控每个任务开始执行、执行结束,任务执行用时,线程池终止等生命周期。
-
支持异常回调。提供了相应的回调,可以在每个任务抛出异常时,接收该异常并决定程序之后的行为。
-
支持逻辑任务的逻辑任务级中断。如果在一个逻辑任务中,在一个线程中嵌套开启了另一个线程,可以在任意线程中通过一种 Command 对象来中止整个任务中所有嵌套产生的线程。
-
-
组队任务多线程执行器 GroupMultiExecutor:一种提交组队任务的多线程执行器。用户可以将一个任务分解成多个独立的任务,分别用不同的线程执行以提高效率。最后在所有的任务都完成时,会收到整个组队任务结束的回调通知。
-
安卓适配多线程执行器 ActivityMultithreading:在安卓环境下,可以自由地在安卓 UI 线程和非 UI 线程中同步切换。此外,如果和安卓 Activity 的生命周期相绑定,还可以在 Activity 创建时惰性创建线程池,在 Activity 销毁时自动销毁线程池。
实战
纸上得来终觉浅,没有实战的讲解没有任何意义。这里结合具体代码来介绍 MultithreadingExecutor 系列框架的使用。
单线程执行器 SerialExecutor
单线程执行器 SerialExecutor:用于支持线程任务串行执行。用单线程可以保证从不同来源提交的异步任务的线程安全。下面定义了一个安卓的全局单线程执行器。
public final static SerialExecutor EXECUTOR = SerialExecutor.builder()
.beforeEveryTaskStarting(taskInfo -> Log.d("EXECUTOR", String.format(
"任务开始。taskId=%s,taskName=%s", taskInfo.getTaskId(), taskInfo.getTaskName())))
.afterEveryTaskFinishing(taskInfo ->
Log.d("EXECUTOR", String.format("任务结束。taskId=%s,taskName=%s,duration=%ss",
taskInfo.getTaskId(), taskInfo.getTaskName(),
taskInfo.getDuration().toFormatSecond())))
.setOnTaskException((taskInfo, throwable) -> {
Log.e("SerialExecutor", taskInfo.toString(), throwable);
return false; // 遇到异常需立刻终止
})
.setOnForcedExitException((taskInfo, throwable) -> {
Log.e("SerialExecutor", taskInfo.toString(), throwable);
return false; // 遇到异常需立刻终止
})
.setOnExecutorException(throwable -> {
Log.e("SerialExecutor", "", throwable);
return false; // 遇到异常需立刻终止
})
.setMaxQueuedTask(100)
.build();
然后就可以向其提交处理非 UI 数据的任务了。
注意:如果不想上锁,又要保证线程安全,应进行数据隔离,禁止 UI 线程操作非 UI 数据。光在非 UI 线程中使用单线程不能彻底保证线程安全。
EXECUTOR.submit(() -> {
// 处理非 UI 数据...
});
单线程异步执行器 AsyncSerialExecutor
单线程执行器 AsyncSerialExecutor:一种异步的 SerialExecutor。将任务的提交与运行分离,可以自由决定提交的任务执行的时机。下面定义了一个安卓的全局单线程异步执行器。
public final static AsyncSerialExecutor EXECUTOR = AsyncSerialExecutor.builder()
.setOnTaskException((taskInfo, throwable) -> {
Log.e("SerialExecutor", taskInfo.toString(), throwable);
return false; // 遇到异常需立刻终止
})
.setOnForcedExitException((taskInfo, throwable) -> {
Log.e("SerialExecutor", taskInfo.toString(), throwable);
return false; // 遇到异常需立刻终止
})
.setOnExecutorException(throwable -> {
Log.e("SerialExecutor", "", throwable);
return false; // 遇到异常需立刻终止
})
.setMaxQueuedTask(100)
.build();
现在,可以在一个线程中提交任务。
EXECUTOR.preSubmit()
.submit(() -> {
// 非 UI 任务
});
// 其它代码...
EXECUTOR.preSubmit()
.submit(() -> {
// 非 UI 任务
});
当不需要再提交任务,且需要执行任务时,在该线程的某处结束任务的提交。这样,前面提交的任务会在此处立刻执行。
注意:本类是线程级的。本类会不同线程提交的任务互相隔离,不会发生干扰。但本类只有一个运行线程 SerialExecutor,也就是说,不同线程提交的任务最终都会在同一个线程中执行。如果不希望这样,请创建本类的多个实例。
EXECUTOR.finishSubmit();
多线程执行器 MultithreadingExecutor
这是本框架的核心组件,也是笔者重点开发的部分。核心功能如下:
-
它底层依赖线程池,但却并不负责提供创建它的细节,而留给开发者在使用它时自行提供。
-
它对不同的线程池进行了分组,销毁本对象时,不会影响其它线程池。
-
支持单例模式。如果开启了单例模式,且因为意外而没有销毁本对象,则下次创建同一分组的本对象时,则自动销毁上一次的对象,从而避免内存泄露,线程池资源浪费。
-
支持惰性创建模式。如果开启了惰性创建模式,则在创建本对象,但还没有提交任务时,并不会创建线程池资源。
-
支持异常回调。可以向其提供异常回调,用于异步捕获异常。异常回调的返回值还可以用于程序是否继续运行,线程池是否需要销毁。
-
支持生命周期回调。可以向其提供生命周期回调,用于分析任务执行状态。
-
支持任务状态数据收集。任务在运行过程中,会自动收集任务的一些必要数据,如任务名、执行时间,运行状态等等。
-
支持外部中断。任务在运行过程中,可以在任意线程中尝试中断此任务。
-
支持外部中断回调。在任务被中断之后,可以触发外部中断回调。
-
支持逻辑任务控制。逻辑任务指的是一次任务提交时,以闭包的形式嵌套提交的新的异步任务。本类支持在最初的任务被中断时,同时中断提交任意层的嵌套任务。
创建 MultithreadingExecutor 实例
MultithreadingExecutor executor = MultithreadingExecutor.builder().withName("xxxTag")
.withExecutorGenerator(() -> Executors.newFixedThreadPool(4))
.beforeEveryTaskStarting(taskInfo -> Log.d(tag, String.format(
"任务开始。taskId=%s,taskName=%s", taskInfo.getTaskId(), taskInfo.getTaskName())))
.afterEveryThreadTaskFinishing(taskInfo ->
Log.d(tag, String.format("本线程任务结束。taskId=%s,taskName=%s,duration=%ss",
taskInfo.getTaskId(), taskInfo.getTaskName(),
taskInfo.getThreadTaskDuration().toFormatSecond())))
.afterEveryLogicalTaskFinishing(taskInfo ->
Log.d(tag, String.format("本逻辑任务结束。taskId=%s,taskName=%s,duration=%ss",
taskInfo.getTaskId(), taskInfo.getTaskName(),
taskInfo.getLogicalTaskDuration().toFormatSecond())))
.setOnTaskException((taskInfo, throwable) -> {
Log.e(tag, taskInfo.toString(), throwable);
return true; // 遇到异常不需要立刻终止
})
.setOnExecutorException(throwable -> {
Log.e(tag, "", throwable);
return true; // 遇到异常不需要立刻终止
})
.lazy()
.build();
提交任务
InterruptibleTask interruptibleTask = executor.submit("taskNameXXX", console -> {
// 非 UI 任务
});
内部处理中断请求
InterruptibleTask interruptibleTask = executor.submit("taskNameXXX", console -> {
if (console.needInterruption() || Thread.currentThread().isInterrupted()) {
Log.i(TAG, "准备xxx时检测到中断标志,操作中止");
return;
}
// 其它代码...
});
外部中断任务
interruptibleTask.getCommand().interrupt();
设置中断回调
InterruptibleTask interruptibleTask = executor.submit(console -> {
console.setOnInterruptedListener(() -> {
// 中断回调...
});
});
开启逻辑任务
下面演示了从最初的任务中切换到 UI 线程时,如何响应源自最初任务的外部中断。
InterruptibleTask interruptibleTask = executor.submit("taskNameXXX", console -> {
if (console.needInterruption() || Thread.currentThread().isInterrupted()) {
Log.i(TAG, "准备xxx时检测到中断标志,操作中止");
return;
}
console.newTask(); // 在新线程任务中调用 console.needInterruption() 之前,必须调用本方法
executor.runOnUiThread(() -> {
if (console.needInterruption()) {
Log.i(TAG, "准备xxx时检测到中断标志,操作中止");
console.endTask();
return;
}
// 更新 UI...
console.endTask();
});
});
组队任务多线程执行器 GroupMultiExecutor
一种提交组队任务的多线程执行器。用户可以将一个任务分解成多个独立的任务,分别用不同的线程执行以提高效率。最后在所有的任务都完成时,会收到整个组队任务结束的回调通知。
创建 GroupMultiExecutor 实例
InterruptibleTaskLoader taskLoader = GroupMultiExecutor.builder()
.withExecutorGenerator(() -> Executors.newFixedThreadPool(4))
.withAtomicTask(true)
.beforeEveryTaskStarting(taskInfo -> Log.d(tag, String.format(
"任务开始。taskId=%s,taskName=%s", taskInfo.getTaskId(), taskInfo.getTaskName())))
.afterEveryTaskFinishing(taskInfo ->
Log.d(tag, String.format("任务结束。taskId=%s,taskName=%s,duration=%ss",
taskInfo.getTaskId(), taskInfo.getTaskName(),
taskInfo.getThreadTaskDuration().toFormatSecond())))
.afterAllTasksFinishing(taskList -> {
// 当所有任务完成时的回调...
})
.setOnGroupTaskFailed((taskList, failedTask) -> {
// 当组队任务失败时的回调...
})
.setOnTaskException((taskInfo, throwable) -> {
Log.e(tag, taskInfo.toString(), throwable);
return true; // 遇到异常不需要立刻终止
})
.setOnExecutorException(throwable -> {
Log.e(tag, "", throwable);
return true; // 遇到异常不需要立刻终止
})
.taskLoader();
提交任务
taskLoader.submit(console -> {
// 提交任务 1
});
taskLoader.submit(console -> {
// 提交任务 2
});
taskLoader.submit(console -> {
// 提交任务 3
});
执行任务
taskLoader.execute();
安卓适配多线程执行器 ActivityMultithreading
一种适配安卓的 MultithreadingExecutor。其中嵌入了 Activity,因此可以很方便的在安卓 UI 线程和非 UI 线程中同步切换。此外,如果和安卓 Activity 的生命周期相绑定,还可以在 Activity 创建时惰性创建线程池,在 Activity 销毁时自动销毁线程池。下面将演示 ActivityMultithreading 如何嵌入 Activity 中使用。
继承 Activity
package org.wangpai.android.demo;
import android.os.Bundle;
import android.util.Log;
import androidx.annotation.CallSuper;
import androidx.annotation.UiThread;
import androidx.appcompat.app.AppCompatActivity;
import java.util.concurrent.Executors;
import lombok.Getter;
import org.wangpai.commonutil.simultaneous.android.multithreading.ActivityMultithreading;
/**
* @since 2025-3-24
*/
public abstract class AMActivity extends AppCompatActivity {
private static final String TAG = "AMActivity";
/**
* @since 2025-3-24
*/
@Getter
protected ActivityMultithreading multiThread = null;
/**
* @since 2025-3-24
*/
@CallSuper
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
var builder = (ActivityMultithreading.ActivityMultithreadingBuilder)
ActivityMultithreading.builder().withName(TAG)
.withExecutorGenerator(() -> Executors.newFixedThreadPool(4))
.beforeEveryTaskStarting(taskInfo -> Log.d(TAG, String.format(
"任务开始。taskId=%s,taskName=%s", taskInfo.getTaskId(), taskInfo.getTaskName())))
.afterEveryThreadTaskFinishing(taskInfo ->
Log.d(TAG, String.format("本线程任务结束。taskId=%s,taskName=%s,duration=%ss",
taskInfo.getTaskId(), taskInfo.getTaskName(),
taskInfo.getThreadTaskDuration().toFormatSecond())))
.afterEveryLogicalTaskFinishing(taskInfo ->
Log.d(TAG, String.format("本逻辑任务结束。taskId=%s,taskName=%s,duration=%ss",
taskInfo.getTaskId(), taskInfo.getTaskName(),
taskInfo.getLogicalTaskDuration().toFormatSecond())))
.setOnTaskException((taskInfo, throwable) -> {
Log.e(TAG, taskInfo.toString(), throwable);
return true; // 遇到异常不需要立刻终止
})
.setOnExecutorException(throwable -> {
Log.e(TAG, "", throwable);
return true; // 遇到异常不需要立刻终止
})
.lazy();
this.multiThread = builder.withActivity(this).build();
}
/**
* 此方法将在 Activity 被销毁时被调用
*
* @since 2024-8-20
*/
@CallSuper
@Override
protected void onDestroy() {
super.onDestroy();
this.recycle();
}
/**
* 本方法只能在调用方法 recycle 之后,想重新使用本对象才可调用。
* 本方法只能在 UI 线程中执行
*
* @since 2023-9-19
*/
@CallSuper
@UiThread
protected void reuse() {
Log.d(TAG, "reuse called.");
this.multiThread.reuse();
}
/**
* 本方法是回收本类的多余资源。调用本方法之后,本对象依然可以通过调用方法 reuse 之后,继续使用本对象。
* 本方法只能在 UI 线程中执行
*
* @since 2023-9-19
*/
@CallSuper
@UiThread
protected void recycle() {
Log.d(TAG, "recycle called.");
this.multiThread.recycle();
}
}
使用 ActivityMultithreading
package org.wangpai.android.demo;
import android.os.Bundle;
/**
* @since 2025-3-24
*/
public class MyActivity extends AMActivity {
/**
* @since 2025-3-24
*/
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
this.multiThread.submit(console -> {
// 处理非 UI 数据...
});
}
}
源代码
已上传至 GitHub:
-
MultithreadingExecutor 系列框架 Java 版:https://github.com/wangpai-common-util-Java/multithreading
-
MultithreadingExecutor 系列框架安卓适配版:https://github.com/wangpai-common-util-android-java/multithreading