建设网站去工信部备案需要什么资料关键字搜索
关于NIO,我们在上一篇 linux下网络编程socket&select&epoll的底层实现原理 就介绍了网络阻塞IO、以及基于事件驱动的非阻塞IO。对于NIO的API基本使用是java提供的接口,然后我们在业务上对NIO的使用,也是有不同的使用方法的。然后在我们的网络应用服务器的开发对NIO的使用,一般是基于Reactor模型,Reactor模型主要有3种:单Reactor单线程模型、单Reactor多线程模型、多Reactor多线程模型。下面我们就具体梳理下这3种模型,本篇主要是基于<>进行对应整理。
一、网络服务
对于一般的网络服务、分布式应用,都有一些基本的结构流程。也就是说,我们可以将网络服务分为这5种具体的处理步骤。Reactor模型主要就是看怎么基于NIO的API,充分利用多CPU多线程来合理加快各个环节的处理效率。
1、读取请求数据
2、对请求数据进行解码
3、对数据进行处理
4、对回复数据进行编码
5、发送回复
二、阻塞IO经典的网络服务器设计
然后在网络服务的处理中,最基本的类型就是这种,对于每个请求,新加一个线程进行处理。
在这种模型中,主要是阻塞的,当一个连接没有断,就会一直占用到线程。这个我们上一篇也有提到这个
在这种模型种,主要就是新接受到一个Socket就要新加一个线程进行处理,一直到这个socket释放,就即使这个socket没有进行操作也要一直阻塞占用这个线程
package org.example.reactor;import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;class ClassicServer{public static void main(String[] args) {ClassicServer classicServer = new ClassicServer();classicServer.start();}public static final int PORT = 9999;public void start() {try {ServerSocket ss = new ServerSocket(PORT);while (!Thread.interrupted()) {new Thread(new Handler(ss.accept())).start();}} catch (IOException e) {throw new RuntimeException(e);}}static class Handler implements Runnable {final Socket socket;Handler(Socket s) {socket = s;}public void run() {while (true) {try {InputStream inputStream = socket.getInputStream();byte[] read = read(inputStream);String decode = decode(read);String compute = compute(decode);byte[] encode = encode(compute);send(socket, encode);} catch (IOException ex) {ex.printStackTrace();}}}private byte[] read(InputStream inputStream) throws IOException {byte[] input = new byte[1024];int read = inputStream.read(input);if (read > 0){byte[] returnBytes = new byte[read];System.arraycopy(input,0,returnBytes,0,read);return returnBytes;}return new byte[0];}private String decode(byte[] bytes){return new String(bytes);}private String compute(String msg){System.out.println("print server msg ---- " + msg);return "Hello Client, I am Server";}private byte[] encode(String returnMsg){return returnMsg.getBytes(StandardCharsets.UTF_8);}private void send(Socket socket, byte[] returnBytes){try {socket.getOutputStream().write(returnBytes);socket.getOutputStream().flush();} catch (IOException e) {throw new RuntimeException(e);}}}
}
三、Reactor模型
然后对应Reactor模型,其使用NIO,然后进行拆分的概念。例如可以将accept提出来,然后对于应用来说,主要耗时的是在与业务相关,也可以将compute拿出来放到线程池来处理。
基于NIO,其是基于事件驱动,IO多路复用。这个我们上一篇也有提到这个:
这种的话,因为是select轮询,不用一直accept、read阻塞,一个线程就能管理多个SocketChannel。
1、单线程单Reactor模型
1)、基本介绍
可以看到,在单线程单Reactor模型中,将accept交给了Acceptor来处理,同时对于事件进行dispatch分发。
下面我们看下这种模型的具体实现:
2)、demo实现案例
对于Server端:
package org.example.reactor.single;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;class SingleThreadSingleReactor{private final Selector selector;private final ServerSocketChannel serverSocket;SingleThreadSingleReactor(int port) throws IOException {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.configureBlocking(false);serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.register(selector, SelectionKey.OP_ACCEPT);}public static void main(String[] args) throws IOException {SingleThreadSingleReactor singleThreadSingleReactor = new SingleThreadSingleReactor(6666);singleThreadSingleReactor.dispatcher();}public void dispatcher() throws IOException {System.out.println("server start dispatcher-----------");Acceptor acceptor = new Acceptor();Handler handler = new Handler();while (true){int select = selector.select(200);if (select > 0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()){SelectionKey selectionKey = keyIterator.next();if (selectionKey.isAcceptable()){System.out.println("client acceptor-----------");acceptor.acceptor();}else if (selectionKey.isReadable()){System.out.println("server read msg ---------");SocketChannel socketChannel = (SocketChannel) selectionKey.channel();byte[] read = read(socketChannel);if (read.length > 0){byte[] handlerMsg = handler.handler(read);if (handlerMsg != null){send(socketChannel,handlerMsg);}}}keyIterator.remove();}}}}private byte[] read(SocketChannel socket) throws IOException {ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int read = socket.read(byteBuffer);if (read > 0){byte[] returnBytes = new byte[read];System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);return returnBytes;}return new byte[0];}private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {socketChannel.write(ByteBuffer.wrap(bytes));}// Acceptor 连接处理类class Acceptor {public void acceptor() {try {SocketChannel c = serverSocket.accept();c.configureBlocking(false);c.register(selector,SelectionKey.OP_READ);} catch (IOException e) {throw new RuntimeException(e);}}}public class Handler {public byte[] handler(byte[] bytes) {//按业务含义去解析获取数据String decode = decode(bytes);//业务处理String compute = compute(decode);//业务处理后去进行返回编码byte[] encode = encode(compute);return encode;}private String decode(byte[] bytes){return new String(bytes);}private String compute(String msg){System.out.println("msg compute handler start------------");System.out.println("print client msg ---- " + msg);System.out.println("msg compute handler end------------");return "server compute msg return";}private byte[] encode(String returnMsg){return returnMsg.getBytes(StandardCharsets.UTF_8);}}
}
对于Client端:
package org.example.reactor;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;public class NioClientMain {public static void main(String[] args) {for (int i = 0; i < 4; i++) {Thread.sleep(1000);new Thread(() -> {try {multiThread();} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}}private static void multiThread() throws IOException, InterruptedException {Selector selector = Selector.open();SocketChannel clientSocketChannel = SocketChannel.open();clientSocketChannel.configureBlocking(false);clientSocketChannel.connect(new InetSocketAddress("127.0.0.1",6666));clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);while (true){int select = selector.select(200);if (select > 0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()){SelectionKey selectionKey = keyIterator.next();SocketChannel socketChannel = (SocketChannel) selectionKey.channel();if (selectionKey.isConnectable()){socketChannel.finishConnect();System.out.println("server is connect-------");socketChannel.register(selector,SelectionKey.OP_WRITE);}else if (selectionKey.isWritable()){Thread.sleep(3000);System.out.println("client send msg-------" + Thread.currentThread().getName() );String sendMsg = Thread.currentThread().getName() + ": Hello Server";ByteBuffer byteBuffer = ByteBuffer.wrap(sendMsg.getBytes(StandardCharsets.UTF_8));socketChannel.write(byteBuffer);socketChannel.register(selector,SelectionKey.OP_READ);}else if (selectionKey.isReadable()){ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int read = socketChannel.read(byteBuffer);if (read > 0){byte[] bytes = new byte[read];byteBuffer.flip();byteBuffer.get(bytes);System.out.println(Thread.currentThread().getName() + " current client ip " + socketChannel.getLocalAddress());System.out.println(Thread.currentThread().getName() + " server msg: "+ new String(bytes));socketChannel.register(selector,SelectionKey.OP_WRITE);}}keyIterator.remove();}}}}}
运行结果
server start dispatcher-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-3
client send msg-------Thread-2
client send msg-------Thread-1
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:7623
Thread-0 server msg: server compute msg return
Thread-1 current client ip /127.0.0.1:7621
Thread-1 server msg: server compute msg return
Thread-3 current client ip /127.0.0.1:7622
Thread-3 server msg: server compute msg return
Thread-2 current client ip /127.0.0.1:7620
Thread-2 server msg: server compute msg return
2、多线程单Reactor模型
1)、基本介绍
在上面的单线程单Reactor模型中,我们可以看到,其的业务是单线程的,前一个的处理会阻塞后面的处理。下面我们来看下多线程的版本。
在这种模型中,其将业务处理(decode、compute、encode)拿出来交给workerThreads线程池来处理了。这样的话,前一个业务处理就不会影响到后一个的逻辑处理了。
2)、demo实现案例
server端:
package org.example.reactor;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;class MultiThreadSingleReactor {private final Selector selector;private final ServerSocketChannel serverSocket;private ExecutorService executorService;MultiThreadSingleReactor(int port) throws IOException {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.configureBlocking(false);serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.register(selector, SelectionKey.OP_ACCEPT);executorService = Executors.newFixedThreadPool(4);}public static void main(String[] args) throws IOException {MultiThreadSingleReactor singleThreadSingleReactor = new MultiThreadSingleReactor(6666);singleThreadSingleReactor.dispatcher();}public void dispatcher() throws IOException {System.out.println("server start dispatcher-----------");Acceptor acceptor = new Acceptor();Handler handler = new Handler();while (true){int select = selector.select(200);if (select > 0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()){SelectionKey selectionKey = keyIterator.next();if (selectionKey.isAcceptable()){System.out.println("client acceptor-----------");acceptor.acceptor();}else if (selectionKey.isReadable()){System.out.println("server read msg ---------");SocketChannel socketChannel = (SocketChannel) selectionKey.channel();//dispatcher 读数据byte[] read = read(socketChannel);if (read.length > 0){//线程池进行 decode、computer、encode处理executorService.submit(() -> {try {handler.handler(socketChannel,read);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});}}else if (selectionKey.isWritable()){byte[] bytes = (byte[]) selectionKey.attachment();SocketChannel socketChannel = (SocketChannel) selectionKey.channel();if (Objects.nonNull(bytes)){//dispatcher 写数据send(socketChannel,bytes);}}keyIterator.remove();}}}}private byte[] read(SocketChannel socket) throws IOException {ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int read = socket.read(byteBuffer);if (read > 0){byte[] returnBytes = new byte[read];System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);return returnBytes;}return new byte[0];}private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {socketChannel.write(ByteBuffer.wrap(bytes));socketChannel.register(selector,SelectionKey.OP_READ);}// Acceptor 连接处理类class Acceptor {public void acceptor() {try {SocketChannel c = serverSocket.accept();c.configureBlocking(false);c.register(selector, SelectionKey.OP_READ);} catch (IOException e) {throw new RuntimeException(e);}}}public class Handler {public void handler(SocketChannel socketChannel, byte[] bytes) throws ClosedChannelException {//按业务含义去解析获取数据String decode = decode(bytes);//业务处理String compute = compute(decode);//业务处理后去进行返回编码byte[] encode = encode(compute);SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);//需要发送的数据selectionKey.attach(encode);}private String decode(byte[] bytes){return new String(bytes);}private String compute(String msg){System.out.println("msg compute handler start------------");System.out.println("print client msg ---- " + msg);System.out.println("msg compute handler end------------");return "server compute msg return";}private byte[] encode(String returnMsg){return returnMsg.getBytes(StandardCharsets.UTF_8);}}
}
Client的话,用上一个Client就可以了。
运行结果
server start dispatcher-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server read msg ---------server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:9953
Thread-0 server msg: server compute msg return
client send msg-------Thread-1
Thread-1 current client ip /127.0.0.1:9956
Thread-1 server msg: server compute msg return
client send msg-------Thread-2
Thread-2 current client ip /127.0.0.1:9959
Thread-2 server msg: server compute msg return
client send msg-------Thread-3
Thread-3 current client ip /127.0.0.1:9962
Thread-3 server msg: server compute msg return
3、多线程多Reactor模型
1)、基本介绍
在上面的多线程单Reactor模型中虽然进行业务处理部分用多线程解决了阻塞问题。但在通过Selector类型事件的时候,还是用的一个Selector,就一个Selector管理全部SocketChannel这个在连接多的时候,这部分就会有瓶颈,同时的话,读、写也是在一个Reactor中。
所以我们可以再优化,就进行多Reactor,一个主Reactor只进行accept,accept后,就再将这个SocketChannel交给子Reactor,同时子Reactor可以设置多个,这就将事件轮询、数据读取、业务处理、数据发送分散到多个Reactor了,更好的应用多线程充分利用CPU了。其具体的模型就是这样了:
2)、demo实现案例
服务端:
package org.example.reactor;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;class SingleThreadMainReactor {private final Selector selector;private final ServerSocketChannel serverSocket;private final MultiThreadSubReactor[] multiThreadSubReactors;public static final int mulSubNum = 4;private int nextSubIndex = 0;SingleThreadMainReactor(int port) throws IOException {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.configureBlocking(false);serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.register(selector, SelectionKey.OP_ACCEPT);multiThreadSubReactors = new MultiThreadSubReactor[mulSubNum];for (int i = 0; i < mulSubNum; i++) {multiThreadSubReactors[i] = new MultiThreadSubReactor();new Thread(multiThreadSubReactors[i]).start();}}public static void main(String[] args) throws IOException {SingleThreadMainReactor singleThreadSingleReactor = new SingleThreadMainReactor(6666);singleThreadSingleReactor.accept();}public void accept() throws IOException {System.out.println("server start accept-----------");Acceptor acceptor = new Acceptor();while (true){int select = selector.select(200);if (select > 0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()){SelectionKey selectionKey = keyIterator.next();if (selectionKey.isAcceptable()){System.out.println("client acceptor-----------");acceptor.acceptor();}keyIterator.remove();}}}}// Acceptor 连接处理类class Acceptor {public void acceptor() {try {SocketChannel c = serverSocket.accept();c.configureBlocking(false);nextSubIndex = nextSubIndex%mulSubNum;System.out.println("server subReactor register--" + nextSubIndex);multiThreadSubReactors[nextSubIndex].register(c);} catch (IOException e) {throw new RuntimeException(e);}}}}
package org.example.reactor;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;class MultiThreadSubReactor implements Runnable{private final Selector selector;private ExecutorService executorService;MultiThreadSubReactor() throws IOException {selector = Selector.open();executorService = Executors.newFixedThreadPool(4);}@Overridepublic void run() {System.out.println("MultiThreadSubReactor start-----------");Handler handler = new Handler();while (true){try {int select = selector.select(200);if (select > 0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()){SelectionKey selectionKey = keyIterator.next();if (selectionKey.isReadable()){System.out.println("subReactor read msg ---------");SocketChannel socketChannel = (SocketChannel) selectionKey.channel();//dispatcher 读数据byte[] read = read(socketChannel);if (read.length > 0){//线程池进行 decode、computer、encode处理executorService.submit(() -> {try {handler.handler(socketChannel,read);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});}}else if (selectionKey.isWritable()){byte[] bytes = (byte[]) selectionKey.attachment();SocketChannel socketChannel = (SocketChannel) selectionKey.channel();if (Objects.nonNull(bytes)){//dispatcher 写数据send(socketChannel,bytes);}}keyIterator.remove();}}} catch (IOException e) {throw new RuntimeException(e);}}}private byte[] read(SocketChannel socket) throws IOException {ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int read = socket.read(byteBuffer);if (read > 0){byte[] returnBytes = new byte[read];System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);return returnBytes;}return new byte[0];}private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {socketChannel.write(ByteBuffer.wrap(bytes));socketChannel.register(selector,SelectionKey.OP_READ);}public void register(SocketChannel socketChannel) throws ClosedChannelException {socketChannel.register(selector,SelectionKey.OP_READ);}public class Handler {public void handler(SocketChannel socketChannel, byte[] bytes) throws ClosedChannelException {//按业务含义去解析获取数据String decode = decode(bytes);//业务处理String compute = compute(decode);//业务处理后去进行返回编码byte[] encode = encode(compute);SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);//需要发送的数据selectionKey.attach(encode);}private String decode(byte[] bytes){return new String(bytes);}private String compute(String msg){System.out.println("msg compute handler start------------");System.out.println("print client msg ---- " + msg);System.out.println("msg compute handler end------------");return "server compute msg return";}private byte[] encode(String returnMsg){return returnMsg.getBytes(StandardCharsets.UTF_8);}}
}
Client同样使用原来的
运行结果:
MultiThreadSubReactor start-----------
MultiThreadSubReactor start-----------
MultiThreadSubReactor start-----------
server start accept-----------
MultiThreadSubReactor start-----------
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:1989
Thread-0 server msg: server compute msg return
client send msg-------Thread-1
Thread-1 current client ip /127.0.0.1:1992
Thread-1 server msg: server compute msg return
client send msg-------Thread-2
Thread-2 current client ip /127.0.0.1:1995
Thread-2 server msg: server compute msg return
client send msg-------Thread-3
Thread-3 current client ip /127.0.0.1:1998
Thread-3 server msg: server compute msg return