使用京东AsyncTool实现异步编排
asyncTool: 解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。多线程编排一站式解决方案。来自于京东主App后台。
/**
* 批量更新用户(使用京东AsyncTool)
*
* @param idList
*/
public void batchUpdateAsyncTool(List<Long> idList) {
Boolean resultBoolean = processOrderAsyncTool(idList);
if (resultBoolean) {
System.out.println("所有都执行完毕");
}
}
@Transactional(rollbackFor = Exception.class)
public Boolean processOrderAsyncTool(List<Long> idList) {
List<WorkerWrapper> workerWrapperList = new ArrayList<>();
for (int i = 0; i < idList.size(); i++) {
UserServiceUpdateAsyncTool userServiceUpdateAsyncToolObject = new UserServiceUpdateAsyncTool();
WorkerWrapper<Long, Boolean> workerWrapper = new WorkerWrapper.Builder<Long, Boolean>()
.id("wrapper" + idList.get(i))
.worker(userServiceUpdateAsyncToolObject)
.callback(userServiceUpdateAsyncToolObject)
.param(idList.get(i))//1+1
.build();
workerWrapperList.add(workerWrapper);
}
try {
//3个WorkerWrapper一起begin
com.jd.platform.async.executor.Async.beginWork(1000, asyncExecutor, workerWrapperList);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
workerWrapperList.stream().forEach(workerWrapper -> {
System.out.println("workResult:" + workerWrapper.getWorkResult());
if ("EXCEPTION".equals(workerWrapper.getWorkResult().getResultState().name())) {
if (workerWrapper.getWorkResult().getEx() instanceof BusinessException) {
throw new BusinessException(workerWrapper.getWorkResult().getEx().getMessage());
} else {
throw new RuntimeException(workerWrapper.getWorkResult().getEx().getMessage());
}
}
});
return Boolean.TRUE;
}
@Slf4j
@Service
public class UserServiceUpdateAsyncTool implements IWorker<Long, Boolean>, ICallback<Long, Boolean> {
private UserService userService;
@Transactional(rollbackFor = Exception.class)
public Boolean funcAsyncTool(Long id) {
System.out.println("lo开始=" + id);
userService = ApplicationContextHolder.context.getBean(UserService.class);
// try {
// Thread.sleep(5000);
User user = new User();
user.setAge(Integer.valueOf(String.valueOf(Long.valueOf("30") + id)));
if (id.equals(2L)) {
throw new BusinessException("出现了2异常");
}
else if (id.equals(3L)) {
throw new RuntimeException("出现了3异常");
}
}
this.userService.update(user, Wrappers.lambdaUpdate(User.class).eq(User::getId, id));
// } catch (InterruptedException e) {
// } catch (BusinessException e) {
// } catch (RuntimeException e) {
// }
System.out.println("lo结束=" + id);
return Boolean.TRUE;
}
@Override
public void begin() {
}
@Override
public void result(boolean b, Long s, WorkResult<Boolean> workResult) {
// if ("EXCEPTION".equals(workResult.getResultState().name())) {
// throw new BusinessException(workResult.getEx().getMessage());
// }
}
@Override
public Boolean action(Long s, Map<String, WorkerWrapper> map) {
return this.funcAsyncTool(s);
}
@Override
public Boolean defaultValue() {
return null;
}
}
运行结果:
{
"success": false,
"code": "505",
"message": "出现了2异常",
"data": null
}
入参:[1,2,3,4,5]
运行结果,2、3没有更新,更新了1、4、5