Day227-JUC异步并发及池化技术

做电子商城项目时,接触到了大量异步并发请求,需要使用一些高级技术

JUC异步并发及池化技术

  1. 线程构建方法(3种构建方法)

  2. 线程池相关技术(JUC)

  3. 队列相关实现(BlockingQueue)

1、进程&线程

1.1、进程

进程本质上是一个运行的程序,是系统分配资源的基本单位,每一个进程都有一个独立的内存地址空间(包括代码空间、数据空间)。每一个进程内存空间、数据空间都是私有的,不能被其他的进程访问;

进程与进程之间相互隔离

1.2、线程

线程是进程中一个任务,一个进程至少应该有一个线程(任务)

为了让CPU执行任务更加高效,提出了线程的任务执行方式。

进程中所有的资源都是被线程所共享的,线程是CPU分配的基本单位,线程是CPU执行的基本单元,线程之间操作共享资源进行通信,线程共享进程的资源比进程通信更加简单、高效。

1.2.1、上下文切换

现在服务器资源都是多核心的CPU资源,操作系统调度时运行很多的进程,支持远大于CPU核心数的线程运行。

操作系统如何分配CPU资源给多线程?

CPU采用时间分片的模式将CPU的时间片段轮流分配给多个线程执行;分配每一个线程的时间大概就是几十ms,线程在CPU时间片执行,如果当前线程没有执行结束,CPU时间分片已经结束,此线程就会被挂起,下一个CPU时间分片将会分配给下一个线程,上一个线程等待被唤醒。

线程没有执行完毕,线程被挂起,等待下次被唤醒,继续完成任务。系统是怎么知道线程之间运行到哪里?从哪里开始执行?

CPU内部有寄存器(存储数据状态信息)

程序计数器:存储CPU正在执行的指令,执行下一条指令位置

CPU在执行任务时候,都必须依赖寄存器,程序计数器,这些东西就是CPU切换的开销,称之为上下文切换

1.2.2、线程的调度

问题:操作系统如何共享CPU时间分片,分配CPU执行权限?线程何时分片到时间分片?线程分配多少时间?重要的线程分配时间多一些?次要的线程分配时间少一些?

(1)抢占式调度

并发线程模式下,所有的线程都会抢占时间分片,获取执行权限。有些线程执行时间长,造成线程阻塞等待,等待CPU资源

JVM线程调度:抢占式调度模式

(2)协同式调度

一个线程执行完成后主动通知系统切换到另一个线程执行;(同步阻塞)

致命缺点:一个线程阻塞,导致整个进程阻塞,一个线程异常,导致整个程序崩溃

(3)并行、并发

并发:一段时间内,多个线程轮流分配时间分片,抢占式执行,这叫做并发

并行:同一时刻,多个线程同时执行,这就叫作并行执行

多核CPU并行,单核CPU内并发

2、多线程实践

2.1、多线程实现方式

  1. 继承Thread类,实现多线程

  2. 实现Runnable接口,实现多线程

  3. FutureTask实现多线程

示例代码

1.继承Thread类

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 /**
  * @Name ThreadDemo
  * @Description 多线程测试
  * @Author 88534
  * @Date 2021/9/3 14:04
  */
 @Slf4j
 public class ThreadDemo {
     public static void main(String[] args) {
         System.out.println("thread………………start………………");
         //创建多线程对象
         ThreadA threadA = new ThreadA();
         //开启线程
         threadA.start();
         System.out.println("thread………………end………………");
     }
 ​
     /**
      * 01 多线程第一种实现方式:extends Thread
      */
     public static class ThreadA extends  Thread{
         //run是线程执行主题,多线程业务在run方法中运行
         @Override
         public void run() {
             log.info("继承Thread实现方式…………");
             //业务代码执行
             int i = 100/3;
             log.info("业务代码执行结果:{},线程名称:{},线程ID:{}",i,this.getName(),this.getId());
         }
     }
 }

 

输出结果:

thread………………start……………… thread………………end………………

14:23:27.567 [Thread-0] INFO com.zhou.cubemail.juc.ThreadDemo – 继承Thread实现方式…………

14:23:27.572 [Thread-0] INFO com.zhou.cubemail.juc.ThreadDemo – 业务代码执行结果:33,线程名称:Thread-0,线程ID:12

在主线程执行后才执行ThreadA,体现了异步的效果

2.实现Runnable接口

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 /**
  * @Name ThreadDemo
  * @Description 多线程测试
  * @Author 88534
  * @Date 2021/9/3 14:04
  */
 @Slf4j
 public class ThreadDemo {
     public static void main(String[] args) {
         System.out.println("thread………………start………………");
         //方式一:创建多线程对象thread
         Thread2 thread2 = new Thread2();
         Thread thread = new Thread(thread2);
         //开启线程
         thread.start();
         //方式二:匿名内部类Runnable
         Runnable runnable = new Runnable() {
             @Override
             public void run() {
                 log.info("继承Thread实现方式…………");
                 //业务代码执行
                 int i = 100/3;
                 log.info("业务代码执行结果:{}",i);
             }
         };
         /**
          * 方式三:Lambda表达式
          * 特点:@FunctionalInterface,此直接表示可以使用Lambda表达式的编程方式,此注解相当于是一个标识
          * 此接口只有一个方法(必须满足),即使没有上述注解,也可以使用Lambda表达式,程序会在后台自动识别
          * 写作形式:方法括号(有参写,无参不写)->{业务执行方法体}
          */
         new Thread(()->{
             log.info("继承Thread实现方式…………");
             //业务代码执行
             int i = 100/3;
             log.info("业务代码执行结果:{}",i);
         }).start();
         System.out.println("thread………………end………………");
     }
 ​
     /**
      * 01 多线程第一种实现方式:extends Thread
      */
     public static class ThreadA extends Thread{
         //run是线程执行主题,多线程业务在run方法中运行
         @Override
         public void run() {
             log.info("继承Thread实现方式…………");
             //业务代码执行
             int i = 100/3;
             log.info("业务代码执行结果:{},线程名称:{},线程ID:{}",i,this.getName(),this.getId());
         }
     }
 ​
     /**
      * 02 实现Runnable接口,实现多线程
      */
     public static class Thread2 implements Runnable{
         @Override
         public void run() {
             log.info("继承Thread实现方式…………");
             //业务代码执行
             int i = 100/3;
             log.info("业务代码执行结果:{}",i);
         }
     }
 ​
     /**
      * 方式三:Callable + FutureTask实现多线程
      * jdk1.5后添加,相较上述方式而言,有返回值
      * @FunctionalInterface 此直接表示可以使用Lambda表达式的编程方式
      * public interface Callable<V>:具有泛型的接口,只有一个方法call,是多线程执行业务主题,方法执行完毕后有返回值,是指定的泛型类型
      * callable和thread的关系:
      */
     public static class Thread03 implements Callable<Integer>{
         //业务执行主体
         @Override
         public Integer call() throws Exception {
             log.info("继承Thread实现方式…………");
             //业务代码执行
             int i = 100/3;
             log.info("业务代码执行结果:{}",i);
             return i;
         }
     }
 }

 

2.2、线程池

项目开发中,不会使用上面3种线程的实现方式,因为上面3种线程实现方式无法控制线程,可能会造成系统资源耗尽。浪费系统资源,造成系统的性能下降。

在企业业务开发中,必须使用线程池的方式,构建多线程,让线程充分利用,降低系统的资源消耗。

使用池化技术

执行一个java任务,可以直接new Thread运行任务,线程从创建到消耗经历哪些过程?

  1. 创建一个Java线程实例,线程是一个对象实例,堆内存中分配内存(创建线程需要消耗时间和内存)

  2. 执行start方法启动线程,操作系统为Java线程创建对应的内核线程,线程处于就绪状态(内核线程是操作系统的资源,创建需要时间和内存)

  3. 线程被操作系统CPU调度器选中后,线程开始执行run方法

  4. JVM开始为线程创建线程私有资源,JVM虚拟机栈程序计数器(需要时间和内存)

  5. 线程运行过程中,CPU上下文切换(消耗时间,频繁切换,影响性能)

  6. 线程运行完毕,Java线程被垃圾回收器回收(销毁线程内存需要时间)

从线程执行的流程来看

  1. 线程不仅是Java对象,更像是操作系统的资源(创建线程、消耗线程都需要时间)

  2. Java线程的创建和运行都需要内存时间(线程数量太多,消耗很多内存)

  3. CPU上下文切换(线程数量一旦大,CPU频繁切换)

需要线程池

线程池优势:

  1. 降低系统的资源的消耗

  2. 提高系统的响应速度

  3. 方便管理(线程复用,控制最大并发数,管理线程)

事先准备好一些资源,业务系统要是有线程,就从线程池中获取,用完之后不能销毁,实现线程池线程复用

线程池构建多线程实现方式

创建单个线程池

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 ​
 /**
  * @Name ThreadPoolDemo
  * @Description 线程池的操作
  * @Author 88534
  * @Date 2021/9/5 14:17
  */
 @Slf4j
 public class ThreadPoolDemo {
     public static void main(String[] args) {
         //1、创建线程池对象,创建单个线程的线程池对象
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         try {
             executorService.execute(()->{
                 log.info("Executors创建线程池方式实现多线程…………");
                 //业务代码执行
                 int i = 100/3;
                 log.info("业务代码执行结果:{}",i);
             });
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             //线程池用完,关闭线程池
             executorService.shutdown();
         }
     }
 }

 

输出结果:

14:21:22.295 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:21:22.295 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

固定数量的线程池

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 ​
 /**
  * @Name ThreadPoolDemo
  * @Description 线程池的操作
  * @Author 88534
  * @Date 2021/9/5 14:17
  */
 @Slf4j
 public class ThreadPoolDemo {
     public static void main(String[] args) {
         //2、创建固定数量的线程池(指定核心线程的数量),核心线程数为2
         ExecutorService executorService1 = Executors.newFixedThreadPool(2);
         try {
             for (int j = 0; j < 10; j++) {
                 executorService1.execute(()->{
                     log.info("Executors创建线程池方式实现多线程…………");
                     //业务代码执行
                     int i = 100/3;
                     log.info("业务代码执行结果:{}",i);
                 });
             }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             //线程池用完,关闭线程池
             executorService1.shutdown();
         }
     }
 }

 

输出结果:

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:33:22.589 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:33:22.589 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

有抢占执行现象

按计划进行的线程池

 //创建一个按照计划进行的线程池
 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

自动增长的线程池

 //创建一个自动增长的线程池
 ExecutorService executorService2 = Executors.newCachedThreadPool();

执行结果:

14:40:05.378 [pool-4-thread-3] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.379 [pool-4-thread-10] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.378 [pool-4-thread-9] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.378 [pool-4-thread-4] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.379 [pool-4-thread-5] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.379 [pool-4-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.378 [pool-4-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.379 [pool-4-thread-6] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.378 [pool-4-thread-7] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.378 [pool-4-thread-8] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

14:40:05.379 [pool-4-thread-7] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-6] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-4] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-9] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-8] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-5] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-10] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

14:40:05.379 [pool-4-thread-3] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

顺序打乱,但执行顺序有序

示例代码:

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 ​
 /**
  * @Name ThreadPoolDemo
  * @Description 线程池的操作
  * @Author 88534
  * @Date 2021/9/5 14:17
  */
 @Slf4j
 public class ThreadPoolDemo {
     public static void main(String[] args) {
         //1、创建线程池对象,创建单个线程的线程池对象
         ExecutorService executorService = Executors.newSingleThreadExecutor();
 ​
         //2、创建固定数量的线程池(指定核心线程的数量),核心线程数为2
         ExecutorService executorService1 = Executors.newFixedThreadPool(2);
 ​
         //3、创建 一个按照计划进行的线程池
         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
 ​
         //4、创建一个自动增长的线程池
         ExecutorService executorService2 = Executors.newCachedThreadPool();
 ​
         try {
             for (int j = 0; j < 10; j++) {
                 executorService2.execute(()->{
                     log.info("Executors创建线程池方式实现多线程…………");
                     //业务代码执行
                     int i = 100/3;
                     log.info("业务代码执行结果:{}",i);
                 });
             }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             //线程池用完,关闭线程池
             executorService2.shutdown();
         }
     }
 }

 

参数解析:

  1. corePoolSize:线程池核心线程数,初始化线程池时,会创建核心线程等待状态,核心线程不会被销毁,提供线程的复用

  2. maximumPoolSize:最大线程数:核心线程用完了,必须新建线程执行任务,但是新建的线程不能超过最大线程数

  3. keepAliveTime:线程的存活时间,除了核心线程以外的线程(maximumPoolSize-corePoolSize)存活时间;当线程处于空闲状态,可以活多久

  4. unit:存活时间单位

  5. workQueue:任务阻塞队列,任务可能会很多,线程较少,因此可以把多余的任务放入队列进行缓冲,队列采用先进先出(FIFO),等待线程空闲,再从队列中取出任务执行。


不建议使用以上的创建线程池的方式

原因是以上的线程池创建方式,当线程量大后,可能造成无限制创建线程,从而导致内存被占满,线程量大导致性能严重下降,甚至OOM(Out Of Memory,耗尽内存)

解决方案:使用ThreadPoolExecutor类创建线程池

 public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory,
                               RejectedExecutionHandler handler);

 

ThreadFactory:线程工厂,默认使用defaultThreadFactory,用来创建线程,一般使用默认即可

RejectedExecutionHandler:线程池拒绝策略

四种拒绝策略:

  1. new ThreadPoolExecutor.AbortPolicy():新任务直接被拒绝,抛出异常:RejectEexcutionException

  2. new ThreadPoolExecutor.DiscardPolicy:新任务忽略不执行,直接丢弃

  3. new ThreadPoolExecutor.DiscardOldestPolicy:队列满了,新任务尝试和等待最久的线程竞争,也不会抛出异常。抛弃任务队列中等待最久的业务,新任务直接添加到队列中。

  4. new ThreadPoolExecutor.CallerRunPolicy:新任务来临后,直接使用调用者所在线程执行任务

示例代码:

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @Name ThreadPoolDemo
  * @Description 线程池的操作
  * @Author 88534
  * @Date 2021/9/5 14:17
  */
 @Slf4j
 public class ThreadPoolDemo {
     public static void main(String[] args) {
 ​
         ThreadPoolExecutor threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                 10,     //maximumPoolSize
                 3,      //keepAliveTime
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>(3), //可伸缩
                 Executors.defaultThreadFactory(),
                 new ThreadPoolExecutor.AbortPolicy());
 ​
         try {
             for (int j = 0; j < 10; j++) {
                 threadPool.execute(()->{
                     log.info("Executors创建线程池方式实现多线程…………");
                     //业务代码执行
                     int i = 100/3;
                     log.info("业务代码执行结果:{}",i);
                 });
             }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             //线程池用完,关闭线程池
             threadPool.shutdown();
         }
     }
 }

 

输出结果:

11:15:56.240 [pool-1-thread-7] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-5] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-4] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-3] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-8] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.240 [pool-1-thread-6] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.243 [pool-1-thread-5] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-8] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.245 [pool-1-thread-5] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.245 [pool-1-thread-5] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-1] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-7] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-2] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-4] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-6] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.243 [pool-1-thread-3] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

11:15:56.245 [pool-1-thread-8] INFO com.zhou.cubemail.juc.ThreadPoolDemo – Executors创建线程池方式实现多线程…………

11:15:56.245 [pool-1-thread-8] INFO com.zhou.cubemail.juc.ThreadPoolDemo – 业务代码执行结果:33

合理配置线程的相关参数:核心线程数、最大线程数

设置线程池线程数的数量:根据业务类型进行设置(CPU密集型、IO密集型)

CPU密集型:所有的任务都在内存中执行,没有磁盘的读写。建议线程池最大数量设置为N(CPU核心数量)+1

IO密集型:大量磁盘读写任务,如果有IO操作,CPU此时处于空闲状态,最大线程数应该设置:2N+1

最大线程数设置公式:

最大线程数=(任务执行时间/任务CPU时间)*N

2.3、队列使用

在Java并发编程中,使用线程池,必须使用任务队列,让任务在队列中进行缓冲,可以线程执行防洪泄流的效果,提升线程池处理任务能力。

通常在线程池中,通常使用阻塞队列

当队列已经满时

阻塞等待:等待队列中元素被消费,有空闲位置后再放入队列。

队列空闲:阻塞等待直到队列中有数据,消费数据

如何为线程池选择一个合适的队列?

基于Java一些队列实现特性

1.ArrayLockingQueue:基于数组实现的有界的阻塞队列(有界的队列)

2.LinkedBlockingQueue:基于链表实现的有界阻塞队列

3.PriorityBlockingQueue:支持按照优先级排序的无界的阻塞队列

4.DelayQueue:优先级实现的无界阻塞队列

5.SynchronousQueue:只存储一个元素,如果这个元素没有被消费,不能再向里面存储元素

6.LinkedTransferQueue:基于链表实现的无界的阻塞队列

7.LinkedBlockingDeque:基于链表实现的双向的无界阻塞队列

如何选择一个合适的队列?

建议必须使用有界队列,有界队列能增加系统的稳定性,可以根据需求设置(可控设置),如果设置为无界队列,遇到不可控的因素,可能会导致队列中的任务越来越多,出现OOM,撑爆系统

3、异步计算

3.1、什么是异步

异步调用实现一个不需要被等待的方法的返回值,让调用者继续执行

在Java中,简单的讲就是开启另一个线程完成程序计算,使得调用者继续执行,不需要等待计算的结果,但是调用者仍然需要获取线程的计算结果(不需要同步阻塞等待)

主线程将任务交给子线程,子线程执行任务,返回(无需阻塞等待结果),继续执行其他任务(扔给子线程做任务,自己干别的事,等子线程做完了再接收返回的结果)

3.2、Future

Future也是一个异步计算结果返回接口,目的是获取返回值结果,但是Future在获取返回值结果的时候,方法必须同步阻塞等待返回值结果。

  • Get:获取结果(等待、阻塞)

  • Get(timeout):获取结果,执行等待时间

  • Cancel:取消当前任务

  • isDone:判断任务是否已经完成(轮询)

Future对于结果获取不是很方便,只能通过同步阻塞的方式获取结果,或者轮询的方式获取到结果,阻塞的方式获取返回值结果显然与异步的思想相违背,轮询方式很占用CPU资源,也不能及时得到结果

3.3、异步编排

CompletableFuture:可以帮助简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调函数的方式处理计算结果

public class CompletableFuture<T> implements Future<T>,CompletionStage<T>

CompletableFuture具有Future的特性,还实现了CompletionStage接口,具备CompletionStage接口的特性,串行执行、并行执行、聚合(AND聚合、OR聚合)

3.3.1、串行关系执行

then-然后,表示下一步,通常是一个串行关系的体现,then后面的单词(比如run/apply/accept)就是函数式接口中抽象方法名称

串行关系执行:利用上一步的执行结果,去进行下一步任务执行,任务执行具有先后顺序,因此把这种关系叫作串行关系

串行关系方法:

 public CompletionStage<Void> thenRun(Runnable action);
 public CompletionStage<Void> thenRunAsync(Runnable action);
 ​
 public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
 public <U> CompletionStage<U> thenApplyAsync
     (Function<? super T,? extends U> fn);
 public <U> CompletionStage<U> thenApplyAsync
     (Function<? super T,? extends U> fn,
      Executor executor);
 ​
 public CompletionStage<Void> thenAccept(Consumer<? super T> action);
 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                                  Executor executor);
 ​
 public <U> CompletionStage<U> thenCompose
         (Function<? super T, ? extends CompletionStage<U>> fn);
 public <U> CompletionStage<U> thenComposeAsync
     (Function<? super T, ? extends CompletionStage<U>> fn);
 public <U> CompletionStage<U> thenComposeAsync
     (Function<? super T, ? extends CompletionStage<U>> fn,
      Executor executor);

 

3.3.2、聚合AND

Combine…with…和both…and…都是要求两者都必须满足,也就是and/且的关系

 public <U,V> CompletionStage<V> thenCombine
     (CompletionStage<? extends U> other,
      BiFunction<? super T,? super U,? extends V> fn);
 public <U,V> CompletionStage<V> thenCombineAsync
     (CompletionStage<? extends U> other,
      BiFunction<? super T,? super U,? extends V> fn);
 public <U,V> CompletionStage<V> thenCombineAsync
     (CompletionStage<? extends U> other,
      BiFunction<? super T,? super U,? extends V> fn,
      Executor executor);
 ​
 public <U> CompletionStage<Void> thenAcceptBoth
         (CompletionStage<? extends U> other,
          BiConsumer<? super T, ? super U> action);
 public <U> CompletionStage<Void> thenAcceptBothAsync
     (CompletionStage<? extends U> other,
      BiConsumer<? super T, ? super U> action);
 public <U> CompletionStage<Void> thenAcceptBothAsync
     (CompletionStage<? extends U> other,
      BiConsumer<? super T, ? super U> action,
      Executor executor);
 ​
 public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
                                               Runnable action);
 public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
                                                Runnable action);
 public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
                                                Runnable action,
                                                Executor executor);

 

3.3.3、OR聚合

 public <U> CompletionStage<U> applyToEither
     (CompletionStage<? extends T> other,
      Function<? super T, U> fn);
 public <U> CompletionStage<U> applyToEitherAsync
     (CompletionStage<? extends T> other,
      Function<? super T, U> fn);
 public <U> CompletionStage<U> applyToEitherAsync
     (CompletionStage<? extends T> other,
      Function<? super T, U> fn,
      Executor executor);
 ​
 public CompletionStage<Void> acceptEither
     (CompletionStage<? extends T> other,
      Consumer<? super T> action);
 public CompletionStage<Void> acceptEitherAsync
     (CompletionStage<? extends T> other,
      Consumer<? super T> action);
 public CompletionStage<Void> acceptEitherAsync
     (CompletionStage<? extends T> other,
      Consumer<? super T> action,
      Executor executor);
 ​
 public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
                                             Runnable action);
 public CompletionStage<Void> runAfterEitherAsync
     (CompletionStage<?> other,
      Runnable action);
 public CompletionStage<Void> runAfterEitherAsync
     (CompletionStage<?> other,
      Runnable action,
      Executor executor);

 

3.3.4、异常处理

 public CompletionStage<T> exceptionally
     (Function<Throwable, ? extends T> fn);
 ​
 public CompletionStage<T> whenComplete
         (BiConsumer<? super T, ? super Throwable> action);
 public CompletionStage<T> whenCompleteAsync
     (BiConsumer<? super T, ? super Throwable> action);
 public CompletionStage<T> whenCompleteAsync
     (BiConsumer<? super T, ? super Throwable> action,
      Executor executor);
 ​
 public <U> CompletionStage<U> handle
         (BiFunction<? super T, Throwable, ? extends U> fn);
 public <U> CompletionStage<U> handleAsync
     (BiFunction<? super T, Throwable, ? extends U> fn);
 public <U> CompletionStage<U> handleAsync
     (BiFunction<? super T, Throwable, ? extends U> fn,
      Executor executor);

 

3.4、异步开启

CompletableFuture提供了4个静态的方法,来创建一个异步操作(异步开启,从这4个静态的方法开发即可)

 //runAsync:没有返回值的方法,不关注返回值
 public static CompletableFuture<Void> runAsync(Runnable runnable);
 public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                Executor executor);
 ​
 //supplyAsync:有返回值,关注返回值
 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                    Executor executor);

 

示例代码:

1.runAsync:没有使用自定义线程池,默认使用的线程池

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         /**
          * 此方法不关注返回值,实现异步编排
          * public static CompletableFuture<Void> runAsync(Runnable runnable);
          */
 ​
         CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
 ​
             log.info("子线程future线程start………………");
             int i = 10/3;
             log.info("线程的名称:{},线程执行结果:{}",Thread.currentThread().getName(),i);
             log.info("子线程future线程end………………");
 ​
         });
 ​
         //调用异步任务
         future.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

15:45:52.094 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

15:45:52.135 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

15:45:52.136 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

15:45:52.137 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

15:45:52.137 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

2.RunAsync使用自己的线程池

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
             10,
             3,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<>(3), //可伸缩
             Executors.defaultThreadFactory(),
             new ThreadPoolExecutor.AbortPolicy());
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         /**
          * 此方法不关注返回值,实现异步编排
          * public static CompletableFuture<Void> runAsync(Runnable runnable);
          */
 ​
         CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
 ​
             log.info("子线程future线程start………………");
             int i = 10/3;
             log.info("线程的名称:{},线程执行结果:{}",Thread.currentThread().getName(),i);
             log.info("子线程future线程end………………");
 ​
         },threadPool);//自定义线程池
//调用异步任务
         future.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

16:40:17.231 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

16:40:17.280 [pool-1-thread-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

16:40:17.280 [pool-1-thread-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:pool-1-thread-1,线程执行结果:3

16:40:17.280 [pool-1-thread-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

16:40:17.280 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.supplyAsync:有返回值,关注返回值

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         });
 ​
         //调用异步编排future
         Integer integer = integerCompletableFuture.get();
 ​
         log.info("supplyAsync异步编排的返回值结果:{}",integer);
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

16:38:50.160 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

16:38:50.205 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

16:38:50.205 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

16:38:50.207 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

16:38:50.207 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – supplyAsync异步编排的返回值结果:3

16:38:50.207 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5、异步执行

3.5.1、thenRun

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

thenRun没有返回值,也不关心上一步执行结果,只和上一步执行具有顺序关系

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Void> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).thenRun(()->{
             log.info("thenRun运行………………");
         });
 ​
         //调用执行
         integerCompletableFuture.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

17:15:21.299 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

17:15:21.340 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

17:15:21.340 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

17:15:21.340 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

17:15:21.340 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – thenRun运行………………

17:15:21.340 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

thenRunAsync执行类似

3.5.2、thenApply

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

thenApply具有返回值,上一步执行的结果当成传参结果给thenApply,T是参数类型,U是返回值类型

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Long> thenApply = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).thenApply((t)->{
             log.info("thenApply子线程开始执行,参数是:{}",t);
             long res = t*5;
             log.info("计算结果:{}",res);
             return res;
         });
 ​
         //调用执行
         thenApply.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

18:06:31.604 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

18:06:31.644 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

18:06:31.644 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

18:06:31.644 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

18:06:31.644 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – thenApply子线程开始执行,参数是:3

18:06:31.644 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 计算结果:15

18:06:31.644 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5.3、thenAccept

public CompletionStage<Void> thenAccept(Consumer<? super T> action);

thenAccept没有返回值,跟上一步执行结果有关系,上一步执行结果将会被下一步消费,参数类型T

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).thenAccept((t)->{
             log.info("thenAccept子线程开始执行,参数是:{}",t);
             long res = t*5;
             log.info("计算结果:{}",res);
         });
 ​
         //调用执行
         thenAccept.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

18:12:06.099 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

18:12:06.141 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

18:12:06.141 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

18:12:06.141 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

18:12:06.141 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – thenAccept子线程开始执行,参数是:3

18:12:06.141 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 计算结果:15

18:12:06.141 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5.4、thenCompose

public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn);

thenCompose有返回值,返回值类型U

依赖于上一步的返回值结果,上一步返回值结果将会作为参数被传递

允许对2个Completion流水线进行操作,每一个操作完成时,将第一个操作结果传递第二个CompletionStage

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.*;
 import java.util.function.Function;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Long> thenCompose = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).thenCompose((new Function<Integer, CompletionStage<Long>>() {
             @Override
             public CompletionStage<Long> apply(Integer t) {
                 //第二次执行
                 CompletableFuture<Long> longCompletableFuture = CompletableFuture.supplyAsync(() -> {
                     log.info("thenCompose子线程开始执行,参数是:{}", t);
                     long res = t * 5;
                     log.info("计算结果:{}", res);
                     return res;
                 });
                 return longCompletableFuture;
             }
         }));
 ​
         //调用执行
         thenCompose.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

18:27:33.382 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

18:27:33.427 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

18:27:33.427 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

18:27:33.427 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程end………………

18:27:33.427 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – thenCompose子线程开始执行,参数是:3

18:27:33.427 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 计算结果:15

18:27:33.427 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………


3.5.6、thenCombine

public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);

thenCombine有返回值

会把两个 CompletionStage的任务都执行完毕后,把两个任务的结果一起交给thenCombine去处理

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> thenCombine1 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future1线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future1线程end………………");
             return i;
 ​
         });
 ​
         CompletableFuture<Integer> thenCombine2 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future2线程start………………");
             int i = 20 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future2线程end………………");
             return i;
 ​
         });
 ​
         //利用thenCombine对theCombine1和theCombine2进行合并操作
         CompletableFuture<Integer> thenCombine = thenCombine1.thenCombine(thenCombine2, (t, u) -> {
             log.info("第一个CompletableFuture执行结果:{}", t);
             log.info("第二个CompletableFuture执行结果:{}", u);
             return t+u;
         });
 ​
         //调用执行
         Integer integer = thenCombine.get();
         log.info("最终异步的调用结果:{}",integer);
 ​
         log.info("主线程end………………");
     }
 }

 

运行结果:

18:36:49.735 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

18:36:49.771 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程start………………

18:36:49.771 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程start………………

18:36:49.771 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

18:36:49.771 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程end………………

18:36:49.771 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-2,线程执行结果:6

18:36:49.771 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程end………………

18:36:49.771 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 第一个CompletableFuture执行结果:3

18:36:49.771 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 第二个CompletableFuture执行结果:6

18:36:49.771 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 最终异步的调用结果:9 18:36:49.771 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5.7、thenAcceptBoth

public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);

没有返回值

当两个阶段的CompletionStage都执行完毕后,把结构一起交给thenAcceptBoth执行,没有返回值

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> thenCombine1 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future1线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future1线程end………………");
             return i;
 ​
         });
 ​
         CompletableFuture<Integer> thenCombine2 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future2线程start………………");
             int i = 20 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future2线程end………………");
             return i;
 ​
         });
 ​
         //利用thenCombine对theCombine1和theCombine2进行合并操作
         CompletableFuture<Integer> thenCombine = thenCombine1.thenCombine(thenCombine2, (t, u) -> {
             log.info("第一个CompletableFuture执行结果:{}", t);
             log.info("第二个CompletableFuture执行结果:{}", u);
             return t+u;
         });
 ​
         //调用执行
         Integer integer = thenCombine.get();
         log.info("最终异步的调用结果:{}",integer);
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

18:59:21.327 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

18:59:21.372 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程start………………

18:59:21.372 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程start………………

18:59:21.372 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-2,线程执行结果:6

18:59:21.372 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程end………………

18:59:21.372 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

18:59:21.372 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程end………………

18:59:21.372 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 第一个CompletableFuture执行结果:3

18:59:21.372 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 第二个CompletableFuture执行结果:6

18:59:21.372 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5.8、runAfterBoth

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

无返回值

当两个阶段的CompletionStage都执行完毕后,才会执行下一步操作

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future1线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future1线程end………………");
             return i;
 ​
         });
 ​
         CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future2线程start………………");
             int i = 20 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future2线程end………………");
             return i;
 ​
         });
 ​
         //利用runAfterBoth对f1和f2进行合并操作
         CompletableFuture<Void> f = f1.runAfterBoth(f2, () -> {
             log.info("有一个任务在执行,runAfterBoth方法正在运行………………");
         });
 ​
         //调用执行
         f.get();
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

19:03:39.565 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

19:03:39.606 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程start………………

19:03:39.606 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程start………………

19:03:39.606 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-2,线程执行结果:6

19:03:39.606 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

19:03:39.606 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程end………………

19:03:39.606 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程end………………

19:03:39.606 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 有一个任务在执行,runAfterBoth方法正在运行……………… 19:03:39.606 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………


3.5.9、applyToEither

public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn);

applyToEither特点:针对于两个阶段CompletionStage,将计算最快的那个CompletionStage的结果作为下一步处理的消费

有返回值

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future1线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future1线程end………………");
             return i;
 ​
         });
 ​
         CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future2线程start………………");
             int i = 20 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future2线程end………………");
             return i;
 ​
         });
 ​
         //利用applyToEither方法对f1,f2进行合并操作,合并操作关系,OR
         CompletableFuture<Integer> f = f1.applyToEither(f2, result -> {
             log.info("applyToEither子线程开始执行,参数是:{}", result);
             return result;
         });
 ​
         //调用执行
         Integer integer = f.get();
         log.info("最终计算结果:{}",integer);
 ​
         log.info("主线程end………………");
     }
 }

 

输出结果:

19:29:21.959 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

19:29:22.012 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程start………………

19:29:22.012 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程start………………

19:29:22.012 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-1,线程执行结果:3

19:29:22.012 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 线程的名称:ForkJoinPool.commonPool-worker-2,线程执行结果:6

19:29:22.014 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future1线程end………………

19:29:22.014 [ForkJoinPool.commonPool-worker-2] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future2线程end………………

19:29:22.015 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – applyToEither子线程开始执行,参数是:3

19:29:22.015 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 最终计算结果:3

19:29:22.015 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5.10、runAfterEither

public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);

任何一个完成都会执行下一步操作,没有返回值,更加简单,不再赘述


3.5.11、exceptionally

public CompletionStage<T> exceptionally (Function<Throwable, ? extends T> fn);

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 20 / 0;//除0异常
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).exceptionally((t)->{
             log.info("业务执行失败:{}",t.getMessage());
             return null;
         });
 ​
         Integer integer = f.get();
 ​
         log.info("主线程end………………");
     }
 }

 

返回结果:

19:35:21.768 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

19:35:21.805 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

19:35:21.805 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 业务执行失败:java.lang.ArithmeticException: / by zero

19:35:21.805 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

3.5.12、whenComplete

public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action);

T 上一步执行结果,U

异常

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 20 / 0;//除0异常
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).whenComplete((t,u)->{
             log.info("上一步执行结果:{}",t);
             if (u!=null){
                 log.info("执行错误,有异常:{}",u.getMessage());
             }
         });
 ​
         Integer integer = f.get();
         log.info("最终执行结果:{}",integer);
 ​
         log.info("主线程end………………");
     }
 }

 

增加一个除0异常

输出结果:

22:08:11.060 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

22:08:11.109 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

22:08:11.109 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 上一步执行结果:null

22:08:11.109 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 执行错误,有异常:java.lang.ArithmeticException: / by zero Exception in thread “main” java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at com.zhou.cubemail.juc.AsyncFutureDemo.main(AsyncFutureDemo.java:36) Caused by: java.lang.ArithmeticException: / by zero at com.zhou.cubemail.juc.AsyncFutureDemo.lambda$main$0(AsyncFutureDemo.java:24) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

3.5.13、handle

public <U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn);

类似于try{}finally{},对上一步执行结果进行处理(任务完成后处理,处理异常任务)

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future线程start………………");
             int i = 20 / 0;//除0异常
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future线程end………………");
             return i;
 ​
         }).handle((t,u)->{
             int res = -1;
             log.info("上一步执行结果:{}",t);
             if (u!=null){
                 log.info("执行错误,有异常:{}",u.getMessage());
             } else {
                 res = t * 5;
             }
             return res;
         });
 ​
         Integer integer = f.get();
         log.info("最终执行结果:{}",integer);
 ​
         log.info("主线程end………………");
     }
 }

 

增加一个除0异常,输出结果:

22:15:00.231 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程start………………

22:15:00.263 [ForkJoinPool.commonPool-worker-1] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 子线程future线程start………………

22:15:00.263 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 上一步执行结果:null

22:15:00.263 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 执行错误,有异常:java.lang.ArithmeticException: / by zero 22:15:00.263 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 最终执行结果:-1

22:15:00.263 [main] INFO com.zhou.cubemail.juc.AsyncFutureDemo – 主线程end………………

 package com.zhou.cubemail.juc;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Name AsyncFutureDemo
  * @Description 开发编程中的异步编排技术
  * @Author 88534
  * @Date 2021/9/7 15:40
  */
 @Slf4j
 public class AsyncFutureDemo {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
 ​
         log.info("主线程start………………");
 ​
         CompletableFuture<Integer> thenCombine1 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future1线程start………………");
             int i = 10 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future1线程end………………");
             return i;
 ​
        });
 ​
         CompletableFuture<Integer> thenCombine2 = CompletableFuture.supplyAsync(() -> {
 ​
             log.info("子线程future2线程start………………");
             int i = 20 / 3;
             log.info("线程的名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);
             log.info("子线程future2线程end………………");
             return i;
 ​
        });
 ​
         //利用thenCombine对theCombine1和theCombine2进行合并操作
         CompletableFuture<Integer> thenCombine = thenCombine1.thenCombine(thenCombine2, (t, u) -> {
             log.info("第一个CompletableFuture执行结果:{}", t);
             log.info("第二个CompletableFuture执行结果:{}", u);
             return t+u;
        });
 ​
         //调用执行
         Integer integer = thenCombine.get();
         log.info("最终异步的调用结果:{}",integer);
 ​
         log.info("主线程end………………");
    }
 }

————恢复内容结束————