当前位置: 首页 > news >正文

【异步编程解析】

文章目录

  • FutureTask分析
    • FutureTask介绍
    • FutureTask应用
    • FutureTask源码分析
  • CompletableFuture
    • CompletableFuture介绍
    • 场景应用
    • 源码分析

FutureTask分析

FutureTask介绍

FutureTask 是 Java 并发包 (java.util.concurrent) 中的一个 可取消的异步计算任务,它实现了 Runnable 和 Future 接口,可以用于 异步任务执行 和 获取结果。

FutureTask应用

        Callable<Integer> callable = () -> {
            System.out.println("任务开始执行...");
            Thread.sleep(2000);
            return 10;
        };

        // 包装成 FutureTask
        FutureTask<Integer> futureTask = new FutureTask<>(callable);

        // 启动线程执行任务
        new Thread(futureTask).start();

        System.out.println("主线程可以做其他事情...");

        // 获取任务执行结果(如果任务未完成,会阻塞)
        Integer result = futureTask.get();
        System.out.println("任务执行结果: " + result);
   

结合线程池执行FutureTask

import java.util.concurrent.*;

public class FutureTaskWithThreadPool {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            Thread.sleep(2000);
            return 200;
        });

        // 提交 FutureTask 到线程池
        executor.submit(futureTask);

        System.out.println("主线程继续执行...");

        Integer result = futureTask.get(); // 阻塞等待结果
        System.out.println("异步任务结果: " + result);

        executor.shutdown();
    }
}

FutureTask源码分析

FutureTask的run流程和get流程图
在这里插入图片描述

了解FutureTask的枚举值

 /**

    * NEW -> COMPLETING -> NORMAL           任务正常执行,并且返回结果也正常返回
 	* NEW -> COMPLETING -> EXCEPTIONAL      任务正常执行,但是结果是异常
 	* NEW -> CANCELLED                      任务被取消   
	 * NEW -> INTERRUPTING -> INTERRUPTED    任务被中断
     */
     //状态标识
    private volatile int state;
    //初始状态
    private static final int NEW          = 0;
    //中间状态
    private static final int COMPLETING   = 1;
    //正常执行完成
    private static final int NORMAL       = 2;
    //任务报错
    private static final int EXCEPTIONAL  = 3;
    //任务取消
    private static final int CANCELLED    = 4;
    //任务中断执行中
    private static final int INTERRUPTING = 5;
    //中断完成
    private static final int INTERRUPTED  = 6;

    /** 需要执行的任务,会被赋值到这个全局变量 */
    private Callable<V> callable;
    /** 任务执行结果,也会被赋值到这个全局对象中 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 执行任务的线程 */
    private volatile Thread runner;
    /** 等待返回结果的线程WaiteNode */
    private volatile WaitNode waiters;
    
    static final class WaitNode {
    	//线程
        volatile Thread thread;
        volatile WaitNode next;
        //有参构造
        WaitNode() { thread = Thread.currentThread(); }
    }

有参构造及run()方法的源码解析

  public FutureTask(Callable<V> callable) {
  	  //健壮性校验
      if (callable == null)
          throw new NullPointerException();
      //给callable赋值
      this.callable = callable;
      //将当前的状态置为NEW
      this.state = NEW;       // ensure visibility of callable
  }
  	//run方法的执行流程
    public void run() {
   		//判断state==new,并且cas将runnerOffset赋值给当前线程
       if (state != NEW ||
           !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
           return;
       try {
       		//将全局变量赋值给局部变量
           Callable<V> c = callable;
           //健壮性校验,dcl
           if (c != null && state == NEW) {
               V result;
               //执行成功的标志,默认false
               boolean ran;
               try {
               		//执行callable中的call()方法
                   result = c.call();
                   //将标志位设置为true
                   ran = true;
               } catch (Throwable ex) {
               		//任务执行报错,将结果置为null
                   result = null;
                   //任务执行完成设置为false
                   ran = false;
                   //设置报错信息
                   setException(ex);
               }
               //任务成功执行完成
               if (ran)
               		//设置结果
                   set(result);
           }
       } finally {
           // runner must be non-null until state is settled to
           // prevent concurrent calls to run()
           runner = null;
           // state must be re-read after nulling runner to prevent
           // leaked interrupts
           int s = state;
           if (s >= INTERRUPTING)
               handlePossibleCancellationInterrupt(s);
       }
   }
   //任务报错,设置报错信息
   protected void setException(Throwable t) {
   		//CAS,先将当前状态从NEW设置为COMPLETING
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
		 //全局结果设置为t
         outcome = t;
         //将当前的state,设置为EXCEPTIONAL
         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
         //唤醒下一个节点
         finishCompletion();
     }
 }
 //任务成功执行完成
  protected void set(V v) {
  		//cas 将stateOffset从new设置为completing
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        	//设置结果
            outcome = v;
            //设置为normal
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //唤醒此节点的后续节点
            finishCompletion();
        }
    }
 //唤醒下一个节点的操作
  private void finishCompletion() {
        // 设置局部变量q,并且给局部变量赋值waiters
        for (WaitNode q; (q = waiters) != null;) {
        	//将waiterOffset从q设置为null
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            	//死循环唤醒后续所有的节点
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                    	//help GC
                        q.thread = null;
                        //唤醒此节点
                        LockSupport.unpark(t);
                    }
                    //获取到下一个节点
                    WaitNode next = q.next;
                    //如果后续节点为null
                    if (next == null)
                    	//跳出此节点,结束
                        break;
                    //去除上一个节点,help gc
                    q.next = null; // unlink to help gc
                   	//将下一个节点,赋值给q
                    q = next;
                }
                //结束
                break;
            }
        }
		//待实现
        done();
		//将callable置为null
        callable = null;        // to reduce footprint
    }

get()方法的源码分析

  public V get() throws InterruptedException, ExecutionException {
  		//将局部变量设置为全局变量
      int s = state;
      //判读那状态
      if (s <= COMPLETING)
      
          s = awaitDone(false, 0L);
      return report(s);
  }
  //awaitDone方法
 private int awaitDone(boolean timed, long nanos)
     throws InterruptedException {
     //如果有超时时间,设置超时时间
     final long deadline = timed ? System.nanoTime() + nanos : 0L;
     //声明局部变量
     WaitNode q = null;
     //声明标志位
     boolean queued = false;
     //死循环
     for (;;) {
     		//线程中断
         if (Thread.interrupted()) {
         	//移除当前节点
             removeWaiter(q);
             //抛出异常
             throw new InterruptedException();
         }
		//将全局变量赋值给局部变量
         int s = state;
         //判断当前任务是否执行
         if (s > COMPLETING) {
         	 //健壮性校验
             if (q != null)
             	//help gc
                 q.thread = null;
             //返回状态
             return s;
         }
         //状态为completing
         else if (s == COMPLETING) // cannot time out yet
             //当前线程让出cpu
             Thread.yield();
         //q为null,封装新的waitNode
         else if (q == null)
             q = new WaitNode();
         //第一次进来
         else if (!queued)
 			 // 没放队列的话,直接放到waiters的前面
             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
           //判断是否有超时限制                                      q.next = waiters, q);
         else if (timed) {
         	//判断剩余超时时间
             nanos = deadline - System.nanoTime();
             //已经超时
             if (nanos <= 0L) {
             	//移除节点
                 removeWaiter(q);
                 //返回state
                 return state;
             }
             //挂起当前线程,设置超时时间
             LockSupport.parkNanos(this, nanos);
         }
         //如果没有超时时间限制,直接将当前线程挂起
         else
             LockSupport.park(this);
     }
 }
 //封装当前的结果
  private V report(int s) throws ExecutionException {
  		//将全局的返回结果,赋值给局部变量
       Object x = outcome;
       //判断是否是正常执行完结束
       if (s == NORMAL)
       		//返回结果
           return (V)x;
        //非正常执行我那结束,手动取消
       if (s >= CANCELLED)
           throw new CancellationException();
       //抛出异常
       throw new ExecutionException((Throwable)x);
   }
 //移除waiterNode节点
 private void removeWaiter(WaitNode node) {
 		//健壮性校验
        if (node != null) {
        	//help GC
            node.thread = null;
            retry:
            //死循环
            for (;;) {          // restart on removeWaiter race
            	//pred 前置节点 q 当前节点 s: next的节点
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                	//赋值s=q.next
                    s = q.next;
                    if (q.thread != null)
                    	//前置节点赋值
                        pred = q;
                     //前置节点不为null
                    else if (pred != null) {
                    	//移除q节点
                        pred.next = s;
                        //判断pred.thread是否wieldnull
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    //q节点置为s
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                         //失败重试                            q, s))
                        continue retry;
                }
                break;
            }
        }
    }

FutureTask的cacel()方法源码分析和流程图
在这里插入图片描述
源码分析

  public boolean cancel(boolean mayInterruptIfRunning) {
  		   // 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning
    // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // 如果mayInterruptIfRunning为true,需要中断线程
            if (mayInterruptIfRunning) {
                try {
                	//将全局变量赋值给局部变量
                    Thread t = runner;
                    //健壮性校验
                    if (t != null)
                    	//中断线程
                        t.interrupt();
                } finally { // final state
                	//cas将状态改为中断
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
        	//唤醒后续的线程
            finishCompletion();
        }
        //返回结果
        return true;
    }

CompletableFuture

CompletableFuture介绍

CompletableFuture 是 Java 8 引入的 异步编程工具,提供了更强大的功能来处理异步任务、组合任务、并行计算,并支持非阻塞编程。
supplyAsync() 适用于有返回值的异步任务。
runAsync() 适用于没有返回值的异步任务。
thenApply() 接收前一个任务的返回值,然后转换返回新值。
thenAccept() 只消费结果,但不返回新值。
thenRun() 不接收前面任务的返回值,只是在任务完成后执行某些操作。
thenCombine() 合并两个 CompletableFuture 的结果。
allOf() 等待所有 CompletableFuture 任务完成,但不会收集返回值。
anyOf() 只要有一个 CompletableFuture 任务完成,就返回结果。
exceptionally() 捕获异常,并提供默认值。
handle() 可同时处理成功和失败情况,更灵活。

场景应用

有返回值的场景

  CompletableFuture completableFuture=CompletableFuture.supplyAsync(()->{
            System.out.println("task1开始执行");
            return "abc";
        }).thenApply(result->{
            System.out.println("task1的结果:"+result+",开始执行task2");
            return "任务完成";
        });
        System.out.println("获取task2的返回结果:"+completableFuture.get());

无返回值的场景:

 CompletableFuture completableFuture=CompletableFuture.runAsync(()->{
            System.out.println("task1开始执行");
            return ;
        }).thenRun(()->{
            System.out.println("开始执行task2");
            return ;
        });

源码分析

我们源码分析,主要分析runAsync()方法和thenRun()方法
流程图
在这里插入图片描述

分析runAsync()源码
流程图:
在这里插入图片描述
源码分析

//传入Runnable方法
 public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
 }
 //异步执行
 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
 	//健壮性校验
     if (f == null) throw new NullPointerException();
     //封装返回的结果
     CompletableFuture<Void> d = new CompletableFuture<Void>();
     //扔到线程池中执行
     e.execute(new AsyncRun(d, f));
     //返回结果
     return d;
 }
 //AsyncRun()方法
  static final class AsyncRun extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
         //声明全局变量
        CompletableFuture<Void> dep; Runnable fn;
        //有参构造
        AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
        	//给全局变量赋值
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }
		//在线程池中执行此方法
        public void run() {
        	//声明局部变量
            CompletableFuture<Void> d; Runnable f;
            //健壮性校验,并且给局部变量赋值
            if ((d = dep) != null && (f = fn) != null) {
            	//将之前的全部变量置为null,help GC
                dep = null; fn = null;
                //判断当前任务是否执行
                if (d.result == null) {
                    try {
                    	//任务未执行,此时执行run方法
                        f.run();
                        //封装返回结果
                        d.completeNull();
                    } catch (Throwable ex) {
                    	//封装报错信息
                        d.completeThrowable(ex);
                    }
                }
                //触发后续任务
                d.postComplete();
            }
        }
    }
 //后续任务源码分析
 final void postComplete() {	
 	//声明局部变量
  	CompletableFuture<?> f = this; Completion h;
  	//进入循环,并且给h赋值,最后进行参数校验
     while ((h = f.stack) != null ||
     		//任务栈被改变,需要重新办检查
            (f != this && (h = (f = this).stack) != null)) {
          //声明两个局部变量
         CompletableFuture<?> d; Completion t;
         //cas 将h节点换成h.next,给t赋值为h.next
         if (f.casStack(h, t = h.next)) {
         	//健壮性校验
             if (t != null) {
             	//如果栈发生了新的改变
                 if (f != this) {
                 	//将h节点重新压入栈中
                     pushStack(h);
                     //跳过
                     continue;
                 }
                 // help gc
                 h.next = null;    // detach
             }
             //执行 `Completion` 任务
             f = (d = h.tryFire(NESTED)) == null ? this : d;
         }
     }
 }
 //执行UniRun中的tryFire方法
 static final class UniRun<T> extends UniCompletion<T,Void> {
 	//声明变量
     Runnable fn;
     //有参构造src:前置任务,fn:Runnable方法
     UniRun(Executor executor, CompletableFuture<Void> dep,
            CompletableFuture<T> src, Runnable fn) {
         super(executor, dep, src); this.fn = fn;
     }
     final CompletableFuture<Void> tryFire(int mode) {
     	//声明两个局部变量
         CompletableFuture<Void> d; CompletableFuture<T> a;
         //给局部变量d赋值,并且进行健壮性校验
         if ((d = dep) == null ||
         	 //执行失败,直接返回null
             !d.uniRun(a = src, fn, mode > 0 ? null : this))
             return null;
          //将变量全部赋值为null help gc
         dep = null; src = null; fn = null;
         //清理任务栈 stack,并调用 postComplete() 处理后续任务。
         return d.postFire(a, mode);
     }
 }
 //执行UniRun方法
 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
 	 //声明局部变量
     Object r; Throwable x;
     //校验前置任务信息,和执行结果
     if (a == null || (r = a.result) == null || f == null)
         return false;
      //当前任务还没有执行
     if (result == null) {
     	//判断前置任务的执行是否报错
         if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
         	//封装错误信息
             completeThrowable(x, r);
         else
             try {
             	 //校验健壮性 && 异步执行任务
                 if (c != null && !c.claim())
                     return false;
                 //执行run()方法
                 f.run();
                 //封装执行结果
                 completeNull();
             } catch (Throwable ex) {
             	//报错的话,也需要封装报错信息
                 completeThrowable(ex);
             }
     }
     //返回结果
     return true;
 }
// 异步的线程池处理任务
final boolean claim() {
    Executor e = executor;
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        // 只要有线程池对象,不为null
        if (e == null)
            return true;
        executor = null; // disable
        // 基于线程池的execute去执行任务
        e.execute(this);
    }
    return false;
}

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    // 如果 `a` 存在,并且 `a.stack` 还有未执行的回调任务
    if (a != null && a.stack != null) {
        if (mode < 0 || a.result == null) // mode < 0 表示异步执行
            a.cleanStack();  // 清理 `stack`
        else
            a.postComplete();  // 处理 `stack` 中的任务
    }

    // 如果当前 `CompletableFuture` 已经有结果,并且 `stack` 还有未执行任务
    if (result != null && stack != null) {
        if (mode < 0)  // 如果 mode < 0,返回当前 `CompletableFuture`
            return this;
        else
            postComplete();  // 触发 `postComplete()` 继续执行
    }

    return null;
}

分析threnRun()源码

	//调用thenRun()方法,传入Runnable
   public CompletableFuture<Void> thenRun(Runnable action) {
   		//调用此方法
        return uniRunStage(null, action);
    }
    //uniRunStage源码分析
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
    // 后继任务不能为null,健壮性判断
    if (f == null) throw new NullPointerException();
    // 创建CompletableFuture对象d,与后继任务f绑定
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // 如果线程池不为null,代表异步执行,将任务压栈
    // 如果线程池是null,先基于uniRun尝试下,看任务能否执行
    if (e != null || !d.uniRun(this, f, null)) {
        // 如果传了线程池,这边需要走一下具体逻辑
        // e:线程池
        // d:后继任务的CompletableFuture
        // this:前继任务的CompletableFuture
        // f:后继任务
        UniRun<T> c = new UniRun<T>(e, d, this, f);
        // 将封装好的任务,push到stack栈结构
        // 只要前继任务没结束,这边就可以正常的将任务推到栈结构中
        // 放入栈中可能会失败
        push(c);
        // 无论压栈成功与否,都要尝试执行以下。
        c.tryFire(SYNC);
    }
    // 无论任务执行完毕与否,都要返回后继任务的CompletableFuture
    return d;
}

相关文章:

  • spring学习(spring容器、加载配置文件方式、获取bean的方式)
  • mac开发环境配置笔记
  • GO大模型应用开发框架-
  • 网络安全防护
  • Unity学习part4
  • SQLMesh 系列教程7- 详解 seed 模型
  • 第29篇 基于ARM A9处理器用C语言实现中断<五>
  • LeetCode - 21 合并两个有序链表
  • 板块一 Servlet编程:第十节 监听器全解 来自【汤米尼克的JAVAEE全套教程专栏】
  • 【学习笔记】Cadence电子设计全流程(二)原理图库的创建与设计(8-15)
  • ac的dhcp池里option43配错导致ap无法上线问题排查过程
  • 修改Linux下kernel里ramdisk.img教程
  • 机器学习,我们主要学习什么?
  • MySQL 三层 B+ 树能存多少数据?
  • 抖音试水AI分身;腾讯 AI 战略调整架构;百度旗下小度官宣接入DeepSeek...|网易数智日报
  • Hopper架构 GEMM教程
  • RAG基于用户问题的内容,对其进行分类和路由,然后选择适当的处理方式(2)
  • 同步异步日志系统-设计模式
  • Zabbix 7.2实操指南:基于OpenEuler系统安装Zabbix 7.2
  • 《数组》学习——区间和
  • 临沂哪里有做网站/网络营销文案策划
  • 网站运营seo招聘/整合营销方案怎么写
  • 国外b2b网站是什么意思/高端网站定制
  • 南通网站制作外包/凤凰网台湾资讯
  • 重庆做网站建设公司排名/深圳刚刚突然宣布
  • 怎么 给自己的网站做优化呢/广州百度网站快速排名