达美网站建设seo智能优化软件

温故而知新 --- Java多线程
- 1. 关键字
- 1.1 并发与并行
- 1.2 进程和线程
- 2. Java 线程
- 2.1 Java的主线程
- 2.2 线程生命周期
- 3. Java 线程三种实现
- 3.1 继承Thread类
- 3.2 实现Runnable接口
- 3.3 实现Callable接口
- 4. 线程池
- 4.1 七种线程池实现
- 4.2 线程池核心参数
- 4.3 Code
- 4.4 线程池的执行过程
- 4.5 Java并发工具包
- 5. Java线程间的通信
- 5.1 锁与同步
- 5.1.1 无锁状态
- 5.1.2 加锁,A执行完,再去执行B
- 5.2 等待/通知
- 5.2.1 加锁,等待/通知,A和B交替执行
- 5.3 信号量
- 5.3.1 线程A输出0,然后线程B输出1,再然后线程A输出2…以此类推
- 5.4 管道
- 5.4.1 Code
- 5.5 其它通信
- 5.5.1 Join方法
- 5.5.2 Sleep方法
- 5.5.3 ThreadLocal类
- 6. Volatile关键字
- 6.1 内存可见性
- 6.2 volatile的内存语义
- 6.2.1 内存可见性(code)
- 6.3 volatile用途
- 7. Synchronized关键字
- 7.1 三种上锁方式
- 7.2 乐观锁与悲观锁的概念
- 7.2.1 乐观锁
- 7.2.1 悲观锁
- 7.2.3 CAS的概念
- 7.3 CAS实现原⼦操作的三⼤问题
- 7.3.1 ABA问题
- 7.3.2 循环时间长开销大
- 7.3.1 只能保证一个共享变量的原子操作
- 8. Java8 Stream并行计算原理
- 8.1 单线程计算
- 8.2 多线程计算
- 8.3 Stream并⾏计算的性能提升
- 9. Awakening
1. 关键字
1.1 并发与并行
- 并发(concurrency)是在同一时间段,多个任务都在执行。宏观上是同时执行,微观上是顺序地交替执行
- 并行(parallellism)是一组程序按独立异步的速度执行,无论从微观还是宏观,多个任务都是同时执行的
简单举个例子:主要看是否是同时
- 你吃饭吃到一半,电话来了,你一直吃完以后才可以去接,这就说明你不支持并发也不支持并行。
- 你吃饭吃到一半,电话来了,你停下吃饭去接电话,接完后继续吃饭,这说明你支持并发。(可以不同时)
- 你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行(必须同时)
1.2 进程和线程
- 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础
- 线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
简单举个例子:
- 开机后,打开chrome浏览器是开启一个进程。
- 我同时浏览四个网页,这就是多线程。
2. Java 线程
2.1 Java的主线程
每个java程序都含有一个线程,那就是主线程(main线程)。Java应用程序都是从主类main方法执行的,当jvm加载代码,调动方法之后,就会启动一个线程,这个线程就是主线程,负责执行main方法。如果在主线程里面创建其他线程,就会在主线程和其他线程来回切换,直到其他所有线程结束才会结束主线程
2.2 线程生命周期
在线程的声明周期中,要经过新建(New),就绪(Runnable),运行(Running),阻塞(Blocked)和死亡(Dead)5种状态
新建状态:
线程对象声明和创建,未被执行之前。就绪状态:
处于新建状态的线程被启动后进入线程队列排队等待CPU时间片。运行状态:
就绪状态的线程被调度并获得CPU资源。阻塞状态:
在特殊情况下让出CPU资源暂时中止自己的执行。消亡状态:
线程执行完成或者程序停止运行。
3. Java 线程三种实现
3.1 继承Thread类
package com.xxxx;public class Hello {public static void main(String[] args) {Thread1 thread1 = new Thread1();Thread thread = new Thread() {@Overridepublic void run() {for (int i = 0; i < 100; i++) {if (i % 2 == 1) {//打印线程名,线程名是从0开始的System.out.println(Thread.currentThread().getName() + ":" + i);}}}};thread1.start();thread.start();}
}class Thread1 extends Thread{@Overridepublic void run() {super.run();for(int i=0;i<100;i++){if (i%2==0){System.out.println(Thread.currentThread().getName()+":"+i);}}}
}
方法说明:
start()
启动当前线程;调用当前线程的run()方法run():
需要重写Thread类中的此方法,将创建线程需要执行的操作声明在此方法中currentThread():
返回执行当前代码的线程getName():
获取当前线程的名字setName(String name):
设置当前线程的名字yield():
释放当前CPU的执行权join():
在线程a中调用线程b的join(),此时线程a就进入阻塞状态,直到线程b完全执行完之后,线程a在结束阻塞状态sleep(int millitime):
让当前线程“睡眠”指定的millitime毫秒。在指定的millitime毫秒时间内,当前进程是阻塞状态isAlive():
判断当前线程是否存活(线程执行完之前都是存活的)
3.2 实现Runnable接口
package com.xxxx;public class Hello {public static void main(String[] args) {Thread thread = new Thread(new MyThread1());Thread thread1 = new Thread(new MyThread2());thread.start();thread1.start();}
}class MyThread1 implements Runnable{@Overridepublic void run() {for(int i=0;i<100;i++) {if (i % 2 == 1) {System.out.println(Thread.currentThread().getName() + ":" + i);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}}
}class MyThread2 implements Runnable{@Overridepublic void run() {for(int i=0;i<100;i++) {if (i % 2 == 0) {System.out.println(Thread.currentThread().getName() + ":" + i);}}}
}
开发中,优先选择实现Runnable接口的方式创建线程
- 实现Runnable接口的方式没有类的单继承性的局限性(一个类只能继承一个父类,继承了Thread类就不能在继承其他类了)
- 实现Runnable接口的方式更适合来处理多个线程之间有共享数据的情况
3.3 实现Callable接口
package com.xxxx;import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;public class Hello {public static void main(String[] args) throws Exception {MyThread1 myThread1 = new MyThread1();FutureTask<Integer> futureTask = new FutureTask<>(new MyThread1());new Thread(futureTask).start();//开启线程System.out.println(futureTask.get());//获取返回值}
}class MyThread1 implements Callable<Integer> {@Overridepublic Integer call() throws Exception {int count = 0;for (int i = 1;i<100;i++){if (i%3==0){count++;}}return count;}
}
说明:
- Runnable接口是没有返回值的 Callable有返回值,支持泛型,可以抛出异常
- Thread类并不接受Callable对象。可以使用FutureTask类实现Runnable接口和Future接口
- Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
- FutureTask是实现的 RunnableFuture 接口的,而RunnableFuture 接口同时继承了 Runnable 接口和 Future 接口,它有两个构造函数,一个以Callable为参数,另外一个以Runnable为参数
4. 线程池
- 使⽤线程池主要有以下三个原因:
1.创建/销毁线程需要消耗系统资源,线程池可以复⽤已创建的线程。
2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从⽽造成服务器
崩溃。(主要原因)
3. 可以对线程做统⼀管理。
4.1 七种线程池实现
- 创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
Executors.newFixedThreadPool
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newCachedThreadPool
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
Executors.newSingleThreadExecutor
- 创建一个可以执行延迟任务的线程池。
Executors.newScheduledThreadPool
- 创建一个单线程的可以执行延迟任务的线程池。
Executors.newSingleThreadScheduledExecutor
- 创建一个抢占式执行的线程池(任务执行顺序不确定),这个是JDK 1.8 添加。
Executors.newWorkStealingPool
- 手动创建线程池的方式,它创建时最多可以设置 7 个参数。
ThreadPoolExecutor
4.2 线程池核心参数
- 核心参数介绍
corePoolSize:
队列没满时,线程最大并发数
maxiumumPoolSizes:
队列满后线程能够达到的最大并发数
keepAliveTime:
空闲线程过多久被回收的时间限制
unit:
keepAliveTime的时间单位
workQueue:
阻塞的队列类型
RejectedExecutionHandler:
超出maximumPoolSizes+workQueue时,任务交给RejectedExecutionHandler来处理
4.3 Code
package com.xxxx;
import java.util.concurrent.*;public class Hello {public static void main(String[] args) throws Exception {//创建线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 5, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(2),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("myThread");return thread;}},new ThreadPoolExecutor.AbortPolicy());//threadPoolExecutor.submit();threadPoolExecutor.execute(new MyThread());//提交任务threadPoolExecutor.shutdown();//关闭线程池}
}class MyThread implements Runnable{@Overridepublic void run() {for (int i=0;i<10;i++){System.out.println(i);}}
}
4.4 线程池的执行过程
执行过程:
1.判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新线程来执行任务,如果核心线程都在执行任务,则进入下一个流程
2.线程池判断工作队列是否已满,如果工作队列没满,则将新提交的任务存储到这个工作队列里,如果工作队列已满,则进入下一个流程
3.判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务,如果已经满了,则交给饱和策略来处理这个任务
4.5 Java并发工具包
- 并发工具类
提供了比synchronized更加高级的各种同步结构:包括CountDownLatch、CyclicBarrier、Semaphore等,可以实现更加丰富的多线程操作。- 并发容器
提供各种线程安全的容器:最常见的ConcurrentHashMap、有序的ConcurrentSkipListMap,实现线程安全的动态数组CopyOnWriteArrayList等。- 并发队列
各种BlockingQueue的实现:常用的ArrayBlockingQueue、SynchorousQueue或针对特定场景的PriorityBlockingQueue。- Executor框架
可以创建各种不同类型的线程池,调度任务运行等。
5. Java线程间的通信
5.1 锁与同步
- Java中,锁的概念是基于对象的,所以又称为对象锁
- 一个锁只能被一个线程持有,一个锁如果被一个线程持有,那其他线程如果需要这个锁,只能等到这个线程释放锁
5.1.1 无锁状态
public class NoneLock {static class ThreadA implements Runnable {@Overridepublic void run() {for (int i = 0; i < 100; i++) {System.out.println("Thread A " + i);}}}static class ThreadB implements Runnable {@Overridepublic void run() {for (int i = 0; i < 100; i++) {System.out.println("Thread B " + i);}}}public static void main(String[] args) {new Thread(new ThreadA()).start();new Thread(new ThreadB()).start();}
}
- 结果不可控,非常的混乱
5.1.2 加锁,A执行完,再去执行B
public class ObjectLock {private static Object lock = new Object();static class ThreadA implements Runnable {@Overridepublic void run() {synchronized (lock) {for (int i = 0; i < 100; i++) {System.out.println("Thread A " + i);}}}}static class ThreadB implements Runnable {@Overridepublic void run() {synchronized (lock) {for (int i = 0; i < 100; i++) {System.out.println("Thread B " + i);}}}}public static void main(String[] args) throws InterruptedException {new Thread(new ThreadA()).start();Thread.sleep(5);new Thread(new ThreadB()).start();}
}
- 因为有Thread.sleep(5),所以肯定是A先拿到锁,全部执行完后,释放锁给到B
5.2 等待/通知
- notify()⽅法会随机叫醒⼀个正在等待的线程,⽽notifyAll()会叫醒所有正在等
待的线程。
- 前面说一个锁只能被一个线程持有,⽽假如线程A现在持有了⼀个锁 lock 并开始执⾏,它可以使⽤ lock.wait() 让⾃⼰进⼊等待状态。这个时候, lock 这个锁是被释放了的。
- 线程B获得了 lock 这个锁并开始执⾏,它可以在某⼀时刻,使⽤ lock.notify() ,通知之前持有 lock 锁并进⼊等待状态的线程A,说“线程A你不⽤等了,可以往下执⾏了”。==>注意:这个时候线程B并没有释放锁 lock ,除⾮线程B这个时候使⽤ lock.wait() 释放锁,或者线程B执⾏结束⾃⾏释放锁,线程A才能得到 lock 锁。
5.2.1 加锁,等待/通知,A和B交替执行
public class WaitAndNotify {private static Object lock = new Object();static class ThreadA implements Runnable {@Overridepublic void run() {synchronized (lock) {for (int i = 0; i < 5; i++) {try {System.out.println("ThreadA: " + i);lock.notify();lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}lock.notify();}}}static class ThreadB implements Runnable {@Overridepublic void run() {synchronized (lock) {for (int i = 0; i < 5; i++) {try {System.out.println("ThreadB: " + i);lock.notify();lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}lock.notify();}}}public static void main(String[] args) throws InterruptedException {new Thread(new ThreadA()).start();Thread.sleep(100);new Thread(new ThreadB()).start();}
}
- 唤醒对方,然后自己进入等待状态
- 注意:
等待/通知机制使⽤的是同⼀个对象锁,如果你两个线程使⽤的是不同的对象锁,那它们之间是不能⽤等待/通知机制通信的
5.3 信号量
- volatile关键字能够保证内存的可⻅性,如果⽤volitile关键字声明了⼀个变量,在⼀个线程⾥⾯改变了这个变量的值,那其它线程是⽴⻢可⻅更改后的值的。
5.3.1 线程A输出0,然后线程B输出1,再然后线程A输出2…以此类推
public class Signal {private static volatile int signal = 0;static class ThreadA implements Runnable {@Overridepublic void run() {while (signal < 5) {if (signal % 2 == 0) {System.out.println("threadA: " + signal);synchronized (this) {signal++;}}}}}static class ThreadB implements Runnable {@Overridepublic void run() {while (signal < 5) {if (signal % 2 == 1) {System.out.println("threadB: " + signal);synchronized (this) {signal = signal + 1;}}}}}public static void main(String[] args) throws InterruptedException {new Thread(new ThreadA()).start();Thread.sleep(1000);new Thread(new ThreadB()).start();}
}
- 运行结果如下:
5.4 管道
- 管道是基于“管道流”的通信⽅式。JDK提供了 PipedWriter 、 PipedReader 、PipedOutputStream 、 PipedInputStream 。其中,前⾯两个是基于字符的,后⾯两个是基于字节流的。
5.4.1 Code
public class Pipe {static class ReaderThread implements Runnable {private PipedReader reader;public ReaderThread(PipedReader reader) {this.reader = reader;}@Overridepublic void run() {System.out.println("this is reader");int receive = 0;try {while ((receive = reader.read()) != -1) {System.out.print((char)receive);}} catch (IOException e) {e.printStackTrace();}}}static class WriterThread implements Runnable {private PipedWriter writer;public WriterThread(PipedWriter writer) {this.writer = writer;}@Overridepublic void run() {System.out.println("this is writer");int receive = 0;try {writer.write("test");} catch (IOException e) {e.printStackTrace();} finally {try {writer.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) throws IOException, InterruptedExcePipedWriter writer = new PipedWriter();PipedReader reader = new PipedReader();writer.connect(reader); // 这⾥注意⼀定要连接,才能通信new Thread(new ReaderThread(reader)).start();Thread.sleep(1000);new Thread(new WriterThread(writer)).start();}
}
- 输出结果:
- 执⾏流程:
1. 线程ReaderThread开始执⾏,
2. 线程ReaderThread使⽤管道reader.read()进⼊”阻塞“,
3. 线程WriterThread开始执⾏,
4. 线程WriterThread⽤writer.write(“test”)往管道写⼊字符串,
5. 线程WriterThread使⽤writer.close()结束管道写⼊,并执⾏完毕,
6. 线程ReaderThread接受到管道输出的字符串并打印,
7. 线程ReaderThread执⾏完毕。
5.5 其它通信
5.5.1 Join方法
- join()⽅法是Thread类的⼀个实例⽅法。它的作⽤是让当前线程陷⼊“等待”状态,等join的这个线程执⾏完成后,再继续执⾏当前线程。。
- 有时候,主线程创建并启动了⼦线程,如果⼦线程中需要进⾏⼤量的耗时运算,主线程往往将早于⼦线程结束之前结束。
- 如果主线程想等待⼦线程执⾏完毕后,获得⼦线程中的处理完的某个数据,就要⽤到join⽅法了。
public class Join {static class ThreadA implements Runnable {@Overridepublic void run() {try {System.out.println("我是⼦线程,我先睡⼀秒");Thread.sleep(1000);System.out.println("我是⼦线程,我睡完了⼀秒");} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(new ThreadA());thread.start();thread.join();System.out.println("如果不加join⽅法,我会先被打出来,加了就不⼀样了");}
}
- 注意:
1.join()⽅法有两个重载⽅法,⼀个是join(long), ⼀个是join(long, int)。
2.通过源码你会发现,join()⽅法及其重载⽅法底层都是利⽤了wait(long)这个⽅法。对于join(long, int),通过查看源码(JDK 1.8)发现,底层并没有精确到纳秒,⽽是对第⼆个参数做了简单的判断和处理。
5.5.2 Sleep方法
- Sleep的两个方法
Thread.sleep(long)
Thread.sleep(long, int)
同上,查看源码(JDK 1.8)发现,第⼆个⽅法貌似只对第⼆个参数做了简单的处理,没有精确到纳秒。实际上还是调⽤的第⼀个⽅法。- 注意:
sleep⽅法是不会释放当前的锁的,⽽wait⽅法会。- sleep与wait区别:
wait可以指定时间,也可以不指定;⽽sleep必须指定时间。
wait释放cpu资源,同时释放锁;sleep释放cpu资源,但是不释放锁,所以易死锁。
wait必须放在同步块或同步⽅法中,⽽sleep可以再任意位置
5.5.3 ThreadLocal类
- ThreadLocal是⼀个本地线程副本变量⼯具类。内部是⼀个弱引⽤的Map来维护。
- ThreadLocal类并不属于多线程间的通信,⽽是让每个线程有⾃⼰”独⽴“的变量,线程之间互不影响。它为每个线程都创建⼀个副本,每个线程可以访问⾃⼰内部的副本变量。
public class ThreadLocalDemo {static class ThreadA implements Runnable {private ThreadLocal<String> threadLocal;public ThreadA(ThreadLocal<String> threadLocal) {this.threadLocal = threadLocal;}@Overridepublic void run() {threadLocal.set("A");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ThreadA输出:" + threadLocal.get());}static class ThreadB implements Runnable {private ThreadLocal<String> threadLocal;public ThreadB(ThreadLocal<String> threadLocal) {this.threadLocal = threadLocal;}@Overridepublic void run() {threadLocal.set("B");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ThreadB输出:" + threadLocal.get());}}public static void main(String[] args) {ThreadLocal<String> threadLocal = new ThreadLocal<>();new Thread(new ThreadA(threadLocal)).start();new Thread(new ThreadB(threadLocal)).start();}}
}
6. Volatile关键字
6.1 内存可见性
- 内存可⻅性,指的是线程之间的可⻅性,当⼀个线程修改了共享变量时,另⼀个线
程可以读取到这个修改后的值。
6.2 volatile的内存语义
- volatile的两个功能:
保证变量的内存的可见性。
禁止volatile变量和普通变量重排序。
6.2.1 内存可见性(code)
public class VolatileExample {int a = 0;volatile boolean flag = false;public void writer() {a = 1; // step 1flag = true; // step 2}public void reader() {if (flag) { // step 3System.out.println(a); // step 4}}
}
- 所谓内存可⻅性
指的是当⼀个线程对volatile 修饰的变量进⾏写操作(⽐如step 2)时,JMM会⽴即把该线程对应的本地内存中的共享变量的值刷新到主内存;当⼀个线程对 volatile 修饰的变量进⾏读操作(⽐如step 3)时,JMM会把⽴即该线程对应的本地内存置为⽆效,从主内存中读取共享变量的值。- 线程A先⾃⾏⽅法 writer ⽅法,线程B后执⾏ reader ⽅法
- 如果 flag 变量没有⽤ volatile 修饰
在step 2,线程A的本地内存⾥⾯的变量就不会⽴即更新到主内存,那随后线程B也同样不会去主内存拿最新的值,仍然使⽤线程B本地内存缓存的变量的值 a = 0,flag = false
6.3 volatile用途
- volatile有着与锁相同的内存语义,所以可以作为⼀个“轻量级”的锁来使⽤。
- 在功能上,锁⽐volatile更强⼤;在性能上,volatile更有优势。
7. Synchronized关键字
7.1 三种上锁方式
// 关键字在实例⽅法上,锁为当前实例
public synchronized void instanceLock() {// code
}
// 关键字在静态⽅法上,锁为当前Class对象
public static synchronized void classLock() {// code
}
// 关键字在代码块上,锁为括号⾥⾯的对象
public void blockLock() {Object o = new Object();synchronized (o) {// code}
}
- 所谓“临界区”,指的是某⼀块代码区域,它同⼀时刻只能由⼀个线程执⾏。在上⾯的例⼦中,如果 synchronized 关键字在⽅法上,那临界区就是整个⽅法内部。⽽如果是使⽤synchronized代码块,那临界区就指的是代码块内部的区域。
- 所以下面代码里的方法是等价的作用。
// 关键字在实例⽅法上,锁为当前实例
public synchronized void instanceLock() {// code
}
// 关键字在代码块上,锁为括号⾥⾯的对象
public void blockLock() {synchronized (this) {// code}
}
// 关键字在静态⽅法上,锁为当前Class对象
public static synchronized void classLock() {// code
}
// 关键字在代码块上,锁为括号⾥⾯的对象
public void blockLock() {synchronized (this.getClass()) {// code}
}
7.2 乐观锁与悲观锁的概念
乐观锁多⽤于“读多写少“的环境,避免频繁加锁影响性能;⽽悲观锁多⽤于”写多读少“的环境,避免频繁失败和重试影响性能。
7.2.1 乐观锁
- 乐观锁⼜称为“⽆锁”,顾名思义,它是乐观派。乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执⾏,⽆需加锁也⽆需等待,效率高。⽽⼀旦多个线程发⽣冲突,乐观锁通常是使⽤⼀种称为CAS的技术来保证线程执⾏的安全性。
7.2.1 悲观锁
- 悲观锁就是我们常说的锁。对于悲观锁来说,它总是认为每次访问共享资源时会发⽣冲突,所以必须对每次数据操作加上锁,以保证临界区的程序同⼀时间只能有⼀个线程在执⾏,效率低。
7.2.3 CAS的概念
- CAS的全称是:⽐较并交换(Compare And Swap)。
V:
要更新的变量(var)
E:
预期值(expected)
N:
新值(new)- 比较并交换的过程:
1.判断V是否等于E,如果等于,将V的值设置为N;如果不等,说明已经有其它线程
更新了V,则当前线程放弃更新,什么都不做。
2.所以这⾥的预期值E本质上指的是“旧值”。
- 注意:
当多个线程同时使⽤CAS操作⼀个变量时,只有⼀个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
7.3 CAS实现原⼦操作的三⼤问题
7.3.1 ABA问题
- 所谓ABA问题,就是⼀个值原来是A,变成了B,⼜变回了A。这个时候使⽤CAS是检查不出变化的,但实际上却被更新了两次。
1.ABA问题的解决思路是在变量前⾯追加上版本号或者时间戳。
2.从JDK 1.5开始,JDK的atomic包⾥提供了⼀个类 AtomicStampedReference 类来解决ABA问题。
3.这个类的 compareAndSet ⽅法的作⽤是⾸先检查当前引⽤是否等于预期引⽤,并且检查当前标志是否等于预期标志,如果⼆者都相等,才使⽤CAS设置为新的值和标志。
7.3.2 循环时间长开销大
- CAS多与⾃旋结合。如果⾃旋CAS⻓时间不成功,会占⽤⼤量的CPU资源,解决办法:
=>
让JVM⽀持处理器提供的pause指令。
pause指令能让⾃旋失败时cpu睡眠⼀⼩段时间再继续⾃旋,从⽽使得读操作的频率低很多,为解决内存顺序冲突⽽导致的CPU流⽔线重排的代价也会⼩很多。
7.3.1 只能保证一个共享变量的原子操作
- 解决办法:
1.使用JDK 1.5开始就提供的 AtomicReference 类保证对象之间的原⼦性,把多个变量放到⼀个对象里面进行CAS操作;
2.使用锁。锁内的临界区代码可以保证只有当前线程能操作。
8. Java8 Stream并行计算原理
8.1 单线程计算
public class StreamDemo {public static void main(String[] args) {Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).reduce((a, b) -> {System.out.println(String.format("%s: %d + %d = %d",Thread.currentThread().getName(), a, b, a + b));return a + b;}).ifPresent(System.out::println);}
}
- 详解:
1.这⾥的Stream.of(T… values)⽅法是Stream接⼝的⼀个静态⽅法,其底层调⽤的是Arrays.stream(T[] array)⽅法
2.reduce ⽅法是从前两个元素开始,进⾏某种操作(我这⾥进⾏的是加法操作)后,返回⼀个结
果,然后再拿这个结果跟第三个元素执⾏同样的操作,以此类推,直到最后的⼀个元素。
8.2 多线程计算
public class StreamDemo {public static void main(String[] args) {Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).parallel().reduce((a, b) -> {System.out.println(String.format("%s: %d + %d = %d",Thread.currentThread().getName(), a, b, a + b));return a + b;}).ifPresent(System.out::println);}
}
- 详解:
reduce⽅法被调⽤之前,调⽤了parallel()⽅法。
8.3 Stream并⾏计算的性能提升
public class StreamParallelDemo {public static void main(String[] args) {System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime// 产⽣100w个随机数(1 ~ 100),组成列表Random random = new Random();List<Integer> list = new ArrayList<>(1000_0000);for (int i = 0; i < 1000_0000; i++) {list.add(random.nextInt(100));}long prevTime = getCurrentTime();list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() -prevTime = getCurrentTime();list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out:System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() -}private static long getCurrentTime() {return System.currentTimeMillis();}
}
- 运行结果:
9. Awakening
在一秒钟内看到本质的人和花半辈子也看不清一件事本质的人,自然是不一样的命运。