当前位置: 首页 > news >正文

Java进阶版线程池(超详细 )

线程池 

线程池工具类  Executors 

Executors 是 Java 提供的一个工具类,它包含了多个静态方法,能够方便地创建不同类型的线程池。        

newFixedThreadPool

创建一个固定大小的线程池,线程池中的线程数量固定,当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则将任务放入工作队列等待。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小为 3 的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}
newSingleThreadExecutor

创建一个单线程的线程池,线程池只有一个线程,所有任务按顺序依次执行。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        // 创建一个单线程的线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}
 newCachedThreadPool

创建一个可缓存的线程池,线程池的线程数量不固定,当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则创建新线程来执行任务。当线程空闲时间超过 60 秒时,会被销毁。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个可缓存的线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}
newScheduledThreadPool

创建一个支持定时和周期性任务执行的线程池。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个大小为 2 的定时线程池
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        // 延迟 2 秒后执行任务
        executor.schedule(() -> System.out.println("Scheduled task executed"), 2, TimeUnit.SECONDS);
        // 延迟 1 秒后开始,每 3 秒执行一次任务
        executor.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 1, 3, TimeUnit.SECONDS);
        executor.shutdown();
    }
}

自定义线程池:ThreadPoolExecutor 

常用构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

参数含义如下:

  1. corePoolSize:核心线程数。线程池会保持这些线程一直存活,即便它们处于空闲状态。当有新任务提交时,若线程池里的线程数量少于 corePoolSize,就会创建新线程来处理任务。
  2. maximumPoolSize:线程池允许的最大线程数。当工作队列已满,并且线程池中的线程数量小于 maximumPoolSize 时,会创建新线程来处理任务。
  3. keepAliveTime:线程空闲时的存活时间。当线程池中的线程数量超过 corePoolSize,且这些多余的线程空闲时间达到 keepAliveTime 时,它们会被销毁。
  4. unitkeepAliveTime 的时间单位,它是 TimeUnit 枚举类型,例如 TimeUnit.SECONDSTimeUnit.MILLISECONDS 等。
  5. workQueue:用于存储待执行任务的阻塞队列。常见的队列类型有 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue 等。
  6. threadFactory:线程工厂,用于创建线程。通过自定义线程工厂,可以为线程设置名称、优先级等属性。
  7. handler:拒绝策略,当工作队列已满且线程池中的线程数量达到 maximumPoolSize 时,新提交的任务会被拒绝,此时会调用该策略来处理被拒绝的任务。常见的拒绝策略有 AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy 等。

CompletableFuture

CompletableFuture 是 Java 8 引入的一个类,位于 java.util.concurrent 包中。它是用于异步编程的工具类,表示一个异步任务的未来结果。CompletableFuture 提供了丰富的 API,用于处理异步任务的完成、组合和异常处理。

CompletableFuture 与 Thread 和 Runnable 的区别

Thread 和 Runnable

   Thread:是 Java 中最基本的线程类,用于创建和管理线程。它提供了线程的基本功能,但不支持异步编程和结果处理。

   Runnable:是一个接口,表示一个可以被线程执行的任务。它通常与 Thread 一起使用,但同样不支持异步编程和结果处理。

        缺点:Thread和Runnable都是在run()中写多线程代码,二者都没有返回值(可以使用轮询和回调)。

        CompletableFuture的出现解决了这个问题,它支持下面功能:
  • 支持异步编程,可以创建异步任务并处理其结果。

  • 链式调用:支持链式调用,可以将多个异步任务组合在一起,形成一个完整的流程。

  • 异常处理:提供了丰富的异常处理机制,可以捕获和处理异步任务中的异常。

  • 组合操作:可以组合多个异步任务,例如 allOf()anyOf(),并等待它们完成。

实例:

 Supplier<String> mm1 = new Supplier<String>() {
            @Override
            public String get() {
                for(int i=0;i<10;i++){
                    System.out.println("1111111111");
                }
                return "第一个";
            }
        };
        CompletableFuture<String> dd1 = CompletableFuture.supplyAsync(mm1);
        Supplier<String> mm2 = new Supplier<String>() {
            @Override
            public String get() {
                for(int i=0;i<10;i++){
                    System.out.println("2222222222222");
                }
                return "第二个";
            }
        };
        CompletableFuture<String> dd2 = CompletableFuture.supplyAsync(mm2);
        Supplier<String> mm3 =()->{         //Lambda表达式(匿名函数)

              for(int i=0;i<100;i++){
                    System.out.println("3333333333");
                }
                return "第三个";
        };
//        Supplier<String> mm3 = new Supplier<String>() {
//            @Override
//            public String get() {
//                for(int i=0;i<100;i++){
//                    System.out.println("3333333333");
//                }
//                return "第三个";
//            }
//        };
        CompletableFuture<String> dd3 = CompletableFuture.supplyAsync(mm3);
        CompletableFuture<Void> vo = CompletableFuture.allOf(dd1, dd2, dd3);
        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println(dd1.join()+"--------"+dd2.join()+"--------"+dd3.join());
            }
        };
        vo.thenRun(r);
        vo.join();

  CompletableFuture的重要API:

  • CompletableFuture.runAsync(Runnable runnable)
    • 此方法用于异步执行一个 Runnable 任务,没有返回值。它会使用 ForkJoinPool.commonPool() 作为线程池来执行任务。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class RunAsyncExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Running task asynchronously");
        });
        future.join();
    }
}

  • CompletableFuture.runAsync(Runnable runnable, Executor executor)
    • 与上面的方法类似,但可以指定一个自定义的 Executor 来执行任务(没有返回值)。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RunAsyncWithExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Running task asynchronously with custom executor");
        }, executor);
        future.join();
        executor.shutdown();
    }
}

  • CompletableFuture.supplyAsync(Supplier<U> supplier)
    • 异步执行一个 Supplier 任务,有返回值。同样使用 ForkJoinPool.commonPool() 作为线程池。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class SupplyAsyncExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Result from asynchronous task";
        });
        String result = future.join();
        System.out.println(result);
    }
}

  • CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
    • 与 supplyAsync(Supplier<U> supplier) 类似,可指定自定义的 Executor 来执行任务。

2. 处理任务结果

  • thenApply(Function<? super T,? extends U> fn)
    • 当 CompletableFuture 完成后,对结果进行转换。返回一个新的 CompletableFuture,其结果是原 CompletableFuture 结果经过 Function 处理后的结果。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class ThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5);
        CompletableFuture<Integer> newFuture = future.thenApply(num -> num * 2);
        Integer result = newFuture.join();
        System.out.println(result);
    }
}

  • thenAccept(Consumer<? super T> action)
    • 当 CompletableFuture 完成后,对结果进行消费,没有返回值。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class ThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10);
        future.thenAccept(num -> System.out.println("Received result: " + num));
    }
}

  • thenRun(Runnable action)
    • 当 CompletableFuture 完成后,执行一个 Runnable 任务,不关心原 CompletableFuture 的结果。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class ThenRunExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
        future.thenRun(() -> System.out.println("Task completed"));
    }
}

3. 组合多个 CompletableFuture

  • thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
    • 用于组合两个 CompletableFuture,前一个 CompletableFuture 的结果作为后一个 CompletableFuture 的输入。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class ThenComposeExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 3);
        CompletableFuture<Integer> future2 = future1.thenCompose(num -> CompletableFuture.supplyAsync(() -> num * 4));
        Integer result = future2.join();
        System.out.println(result);
    }
}

  • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    • 当两个 CompletableFuture 都完成后,将它们的结果组合起来。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class ThenCombineExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 2);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3);
        CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (num1, num2) -> num1 + num2);
        Integer result = combinedFuture.join();
        System.out.println(result);
    }
}

4. 异常处理

  • exceptionally(Function<Throwable, ? extends T> fn)
    • 当 CompletableFuture 出现异常时,使用 Function 处理异常并返回一个默认值。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class ExceptionallyExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Something went wrong");
            }
            return 10;
        });
        CompletableFuture<Integer> resultFuture = future.exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return 0;
        });
        Integer result = resultFuture.join();
        System.out.println("Final result: " + result);
    }
}

  • handle(BiFunction<? super T, Throwable, ? extends U> fn)
    • 无论 CompletableFuture 是否完成或出现异常,都会执行 BiFunction,可以根据是否有异常来处理结果。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class HandleExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Something went wrong");
            }
            return 20;
        });
        CompletableFuture<Integer> resultFuture = future.handle((result, ex) -> {
            if (ex != null) {
                System.out.println("Caught exception: " + ex.getMessage());
                return 0;
            }
            return result;
        });
        Integer result = resultFuture.join();
        System.out.println("Final result: " + result);
    }
}

5. 等待多个 CompletableFuture 完成

  • CompletableFuture.allOf(CompletableFuture<?>... cfs)
    • 等待所有给定的 CompletableFuture 都完成。返回一个新的 CompletableFuture,当所有输入的 CompletableFuture 都完成时,这个新的 CompletableFuture 也完成。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class AllOfExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> 3);

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
        allFutures.join();

        Integer result1 = future1.join();
        Integer result2 = future2.join();
        Integer result3 = future3.join();
        System.out.println("Results: " + result1 + ", " + result2 + ", " + result3);
    }
}

  • CompletableFuture.anyOf(CompletableFuture<?>... cfs)
    • 只要有一个给定的 CompletableFuture 完成,就返回一个新的 CompletableFuture,其结果是第一个完成的 CompletableFuture 的结果。
    • 示例代码:
import java.util.concurrent.CompletableFuture;

public class AnyOfExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
        Object result = anyFuture.join();
        System.out.println("First completed result: " + result);
    }
}

相关文章:

  • TorchServe部署模型-index_to_name.json
  • 硕日新能SRNE Solar 荣获 TÜV NORD 目击实验室认可资质!
  • FRP驱动本地摄像头实现远程图传
  • js异步机制
  • Unity3D多线程与协程优化分析
  • 【资料分享】瑞芯微RK3506(3核ARM+Cortex-A7 + ARM Cortex-M0)工业核心板选型资料
  • 【KWDB 创作者计划】_ruby基础语法
  • JVM虚拟机篇(七):JVM垃圾回收器全面解析与G1深度探秘及四种引用详解
  • C语言【输出字符串中的大写字母】
  • Codeforces Round 1016 (Div. 3)
  • vue3前一月/年+后一月/年
  • 基于springcloud的“微服务架构的巡游出租管理平台”的设计与实现(源码+数据库+文档+PPT)
  • 【Linux】jumpserver开源堡垒机部署
  • 【时时三省】(C语言基础)选择结构的嵌套
  • Linux 时间同步工具 Chrony 简介与使用
  • Java学习——day25(多线程基础与线程创建方式)
  • idea 安装 proxyai 后的使用方法
  • DAPP实战篇:使用web3.js连接合约
  • java设计模式-模板方法模式
  • 【JavaSE】异常
  • 房地产交易网站模版/上海推广网站
  • 怎么什么软件可以吧做网站/天津网站策划
  • 小型网站建设/什么软件可以找客户资源
  • 做一个免费网站的流程/搜索引擎优化介绍
  • 雄安免费网站建设方案/北京网站推广公司
  • 长春模板建站公司/成都网络营销公司