让线程按指定顺序运行
1 概述
郝林老师在《Go 语言核心》课程里提了个有意思的问题:当多个线程运行之后,如何能够让线程按指定的顺序执行?多个线程按顺序执行的效果和串行是一样的,根本没有必要用多线程。但细想一下,其实这里面应该包含着线程调度的知识,比如当一件比较耗时的事情可以分成几个子任务,有些任务可以并行执行,而整体又有一定的串行执行要求,此时就可以用上按一定顺序调度某些线程的方法。
这里有两个要求:一是线程是运行起来了的,不是一个执行完之后再启动另外一个;二是调度的代码尽可能要剥离到业务逻辑代码之外。
2 实现方法
2.1 Go版本
郝林老师给了个Go实现的版本,Go里面不叫线程而是Go程,先给没有顺序要求的版本,满足要求一:
for i := 0; i < 10; i++ {go func(i int) {fmt.Println(i)}(i)
}在此基础上封装一下,封装的代码不影响原来主体业务逻辑代码,满足要求二:
var count uint32 = 0 trigger := func(i uint32, fn func()) {for {if n := atomic.LoadUint32(&count); n == i {fn()atomic.AddUint32(&count, 1)break}time.Sleep(time.Nanosecond)}
}for i := uint32(0); i < 10; i++ {go func(i uint32) {fn := func() {fmt.Println(i)}trigger(i, fn)}(i)
}2.2 Java版
不协调线程顺序:
public class ThreadSeqOrderDemo {public static void main(String[] args) {ThreadSeqOrderDemo demo = new ThreadSeqOrderDemo();demo.test();}private void test() {int count = 10;Thread[] threads = new Thread[count];for (int i = 0; i < count; i++) {threads[i] = new Thread(new TestTask(i + 1));threads[i].start();}for (int i = 0; i < count; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}}
}
public class TestTask implements Runnable {private int i;public TestTask(int i) {this.i = i;}@Overridepublic void run() {System.out.println(i);}public int getIndex() {return i;}
}增加一个Wrapper,相当于增加一个切面,把调度封装起来:
public class ThreadSeqOrderDemo {public static void main(String[] args) {ThreadSeqOrderDemo demo = new ThreadSeqOrderDemo();demo.test();}private void test() {int count = 10;AtomicInteger counter = new AtomicInteger(1); // 增加协调用的countThread[] threads = new Thread[count];for (int i = 0; i < count; i++) {// 用Wrapper把Task包装起来threads[i] = new Thread(new TestTaskWrapper(new TestTask(i + 1), counter));threads[i].start();}for (int i = 0; i < count; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}}
}
public class TestTask implements Runnable {private int i;public TestTask(int i) {this.i = i;}@Overridepublic void run() {System.out.println(i);}public int getIndex() {return i;}
}
public class TestTaskWrapper implements Runnable {private TestTask task;private AtomicInteger counter;public TestTaskWrapper(TestTask task, AtomicInteger counter) {this.task = task;this.counter = counter;}@Overridepublic void run() {while (true) {// counter值和任务编号一样时才执行if(counter.get() == task.getIndex()) {task.run();// 触发下一个要执行的编号counter.incrementAndGet();break;}try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}
}2.3 CountDownLatch版本
上面版本用一个while(true)的方式,不够优雅,可以改用CountDownLatch改造一下:
public class ThreadSeqOrderDemo {public static void main(String[] args) {ThreadSeqOrderDemo demo = new ThreadSeqOrderDemo();demo.test();}private void test() {int count = 10;Thread[] threads = new Thread[count];CountDownLatch[] latches = new CountDownLatch[count]; // 数字换成CountDownLatchCountDownLatch preLatch = null;for (int i = 0; i < count; i++) {// 按要求把协调用的CountDownLatch和Task组装到Wrapper里threads[i] = new Thread(new TestTaskWrapper2(new TestTask(i + 1), preLatch, latches[i]));threads[i].start();preLatch = latches[i];}for (int i = 0; i < count; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}}
}
public class TestTask implements Runnable {private int i;public TestTask(int i) {this.i = i;}@Overridepublic void run() {System.out.println(i);}public int getIndex() {return i;}
}
public class TestTaskWrapper2 implements Runnable {private TestTask task;/** 上一个Latch */private CountDownLatch preLatch;/** 当前Latch */private CountDownLatch curLatch;public TestTaskWrapper2(TestTask task, CountDownLatch preLatch, CountDownLatch curLatch) {this.task = task;this.preLatch = preLatch;this.curLatch = curLatch;}@Overridepublic void run() {// 第一个任务没有preLatch,可以直接执行,有preLatch则等待其执行完的通知if(preLatch != null) {try {preLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}task.run();// 触发当前Latch完成,通知下一个任务执行curLatch.countDown();}
}换成CountDownLatch就有点协调的感觉了,不再依赖固定的数字,而是抽象成一系列协调对象,当希望线程按指定顺序执行时,调整这些协调对象并与对应的线程和任务绑定即可,其核心原理就是执行完一部分再通知下一部分执行,在没有上一部分完成的通知之前,线程处于等待状态。
