Java--多线程知识(四)
一.生产者消费者模型
创建两个线程 t1,t2。 再创建一个阻塞队列,通过阻塞队列连接t1,t2。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Demo1 {public static void main(String[] args) {//阻塞队列BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);//生产者Thread t1 = new Thread(()->{int n = 0;while (true){System.out.println("生产了:" + n);try {queue.put(n);} catch (InterruptedException e) {throw new RuntimeException(e);}n++;}});//消费者Thread t2 = new Thread(()->{while ( true){try {int n = queue.take();System.out.println("消费了:" + n);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
以上代码是运行很快的,我们阻塞队列的容量设置为了100,如果想更清晰的看到阻塞队列的效果,我们可以对消费者进行一定的限制:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Demo1 {public static void main(String[] args) {//阻塞队列BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);//生产者Thread t1 = new Thread(()->{int n = 0;while (true){System.out.println("生产了:" + n);try {queue.put(n);} catch (InterruptedException e) {throw new RuntimeException(e);}n++;}});//消费者Thread t2 = new Thread(()->{while ( true){try {int n = queue.take();System.out.println("消费了:" + n);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
由于消费者被限制了消费速度,所以当阻塞队列满了之后,生产者的生产速度也受到了影响,会形成当消费者消费之后,阻塞队列有了空间,生产者才可以继续生产。
二.阻塞队列
我们自己创建一个阻塞队列,其中阻塞队列中实现了 put() 和 take() 方法,阻塞队列的原理就像是一个环形数组,注意 head 和 tail >= 数组的长度的时候,进行一下处理就可以。
class MyBlonkingQueue{private int[] data;private int head = 0;private int tail = 0;private int size = 0;private Object locker = new Object();public MyBlonkingQueue(int capacity){data = new int[capacity];}//放入元素public void put(int elem) throws InterruptedException {synchronized(locker){if (size == data.length){locker.wait();}data[tail] = elem;locker.notify();size++;tail++;if (tail >= data.length){tail = 0;}}}//取出元素public int take() throws InterruptedException {synchronized(locker){if (size == 0){locker.wait();}int ret = data[head];size--;head++;locker.notify();if (head >= data.length){head = 0;}return ret;}}
}
运行之后发现写的阻塞数组没有问题:
public class Demo2 {public static void main(String[] args) {MyBlonkingQueue queue = new MyBlonkingQueue(10);Thread t1 = new Thread(()->{while (true){for (int i = 0; i < 10000; i++) {try {queue.put(i);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("生产了:"+i);}}});Thread t2 = new Thread(()->{while (true){int ret = 0;try {ret = queue.take();Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费了:"+ret);}});t1.start();t2.start();}
}
作用:
阻塞队列可以解耦合,可以使流量较为稳定,不至于因为突然的高峰期而导致服务器崩溃。削峰补谷。
三.ThreadPoolExecutor
ThreadPoolExecutor 是Java中的线程池实现类,共七个参数:
其中 corePoolSize 指的是核心线程数的数量。
maximumPoolSize 指的是最大线程数,可以根据需要在核心线程数的基础上自动扩容和缩容。
keepAliveTime 指的是非核心线程的最大存活时间。
TimeUnit unit 指的是时间单位。
workQueue 指的是要用的队列。
threadFactory 和 handler 分别是线程工厂和拒绝策略。
工厂模式:
我们在笛卡尔坐标系想要表示一个点的坐标有很多种方法:首先是 x y的表示。其次是角度加线长的办法,通过sin 和cos 来求,但是在代码中:
由于重载的要求参数要有所不同,所以上述代码是无法同时存在的,这时候就可以用工厂模式解决上述问题。如果构造对象不依赖于构造方法,那么上述问题自然就不存在了。
public class Demo3 {}
class point {public point(){}void set(){}
}
class PointFactory{public static point PointFactoryXY(int x,int y){point p = new point();p.set();return p;}public static point PointFactory(int cos,int length){point p = new point();p.set();return p;}
}
以上就是工厂模式的大致框架,通过 set 方法来把我们需要的数据设置进去,以此来达到不同房的目的。
Java 标准库中提供了ThreadFactory来于创建新线程的工厂接口。
拒绝策略:
常见的拒绝策略类型有四种:
第一种:当阻塞队列满了之后再塞入任务,会抛异常
第二种:当阻塞队列满了之后,再提交任务,会让提交当前任务的线程去执行当前任务
第三种:当阻塞队列满了之后,会移除最老的任务,让新任务加入队列。
第四种:当阻塞队列满了之后,会移除最新的任务,让新任务加入队列。
线程池的简单创建:
通过创建简单的线程池来实现生产者消费者模型:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo5 {//创建简易的线程池static class MyThreadPool {//创建一个阻塞队列private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();public MyThreadPool(int n) {for (int i = 0; i < 5; i++) {Thread t1 = new Thread(() -> {while (true) {try {Runnable task = queue.take();//在当前线程中执行该任务task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}},"Thread" + i);t1.start();}}public void submit(Runnable task) throws InterruptedException {queue.put(task);}}//往线程池中添加任务public static void main(String[] args) throws InterruptedException {MyThreadPool pool = new MyThreadPool(5);for (int i = 0; i < 5000; i++) {int id = i;pool.submit(()->{Thread cur = Thread.currentThread();System.out.println(cur.getName() +" " + id);});}Thread.sleep(1000);}
}