深入解析Java NIO:从BIO到Reactor模式的网络编程演进
Socket是什么?
Socket是应用层与TCP/IP协议族通信的中间软件抽象层——一组接口。
以设计模式的视角来看,其实就是一个门面模式,它将复杂的TCP/IP协议族隐藏在Socket接口里。
对于用户而言,一组简单的接口就是全部内容,让Socket组织接口以符合指定的协议。
主机 A 的应用程序要能和主机 B 的应用程序通信,必须通过 Socket 建立连接,而建立 Socket 连接必须需要底层TCP/IP 协议来建立 TCP 连接。建立 TCP 连接需要底层 IP 协议来寻址网络中的主机。网络层使用的 IP 协议可以帮助我们根据 IP 地址来找到目标主机,但是一台主机上可能运行着多个应用程序。那么怎么才能与指定的应用程序通信?这就需要通过TCP或者UDP的地址即端口号来决定。这样的话,通过一个Socket实例就唯一代表一个主机上某个应用程序的通信链路了。
长连接和短连接的区别?
短连接(可以说是SOCKET连接后发送后接收完数据后马上断开连接。):传统的HTTP是无状态的,浏览器与服务器进行一次HTTP操作,那就建立一次连接。但是任务结束完了就中断连接。
长连接:建立Socket连接后不管是否使用都保持连接。
长连接多用于操作频繁,点对点的通讯,而且连接数不能太多。例如:数据库的连接用长连接, 如果用短连接频繁的通信会造成socket错误,而且频繁的socket 创建也是对资源的浪费。像Web网站的http服务一般都用短链接,因为长连接对于服务端来说会耗费一定的资源,而像Web网站这么频繁的成千上万甚至上亿客户端的连接用短连接会更省一些资源。
网络通讯流程?
在通信编程中,提供服务的程序称为服务端,连接服务端并使用其服务的程序称为客户端。这是网络通信中最基础的架构模式。
在开发过程中,类名的设计直观地反映了其功能:
包含
Server
或ServerSocket
的类:专属于服务端,用于容纳和接纳网络连接。它的核心职责是监听特定端口,等待客户端连接,类似于一个提供服务的“场所”或“接待中心”。仅包含
Socket
的类:负责具体的网络数据传输(读写操作)。无论是客户端还是服务端,实际的数据通信都由Socket对象完成。对于服务端来说,每接受一个客户端连接,就会创建一个新的Socket实例来专门处理与该客户的通信,这相当于发生在服务场所内的一个个独立“服务事件”。
关键比喻:ServerSocket
如同一家酒店的前台,只负责接待和分配客人;而Socket
则是为每位客人提供服务的专属管家,负责所有的具体交流和服务。
所有通信编程模式,无论其底层协议或架构如何,都围绕着以下三个核心操作展开 或者说我们所关心的:
1、连接(客户端连接服务器,服务器等待和接收连接)
2、读网络数据
3、写网络数据
服务端启动后,首先在指定的IP地址和端口上开启监听。客户端获知该地址后,发起连接操作。一旦TCP三次握手成功,连接便正式建立。此后,双方便可通过各自持有的套接字(Socket) 进行双向的网络数据读写,完成所需的通信任务。
这种“连接-读写”的模式构成了所有网络应用(如HTTP、FTP、游戏服务器等)最基础的通信骨架。
在Java中的网络编程(BIO)
首先,BIO 传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
public class Client {public static void main(String[] args) throws IOException {//客户端启动必备Socket socket = null;//实例化与服务端通信的输入输出流ObjectOutputStream output = null;ObjectInputStream input = null;//服务器的通信地址InetSocketAddress addr = new InetSocketAddress("127.0.0.1",10001);try{socket = new Socket();/*连接服务器*/socket.connect(addr);output = new ObjectOutputStream(socket.getOutputStream());input = new ObjectInputStream(socket.getInputStream());/*向服务器输出请求*/output.writeUTF("哈哈哈哈");output.flush();//接收服务器的输出System.out.println(input.readUTF());}finally{if (socket!=null) socket.close();if (output!=null) output.close();if (input!=null) input.close();}}
}public class Server {public static void main(String[] args) throws IOException {/*服务器必备*/ServerSocket serverSocket = new ServerSocket();/*绑定监听端口*/serverSocket.bind(new InetSocketAddress(10001));System.out.println("Server start.......");while(true){new Thread(new ServerTask(serverSocket.accept())).start();}}private static class ServerTask implements Runnable{private Socket socket = null;public ServerTask(Socket socket) {this.socket = socket;}@Overridepublic void run() {/*拿和客户端通讯的输入输出流*/try(ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){/*服务器的输入*/String userName = inputStream.readUTF();System.out.println("Accept clinet message:"+userName);outputStream.writeUTF("Hello,"+userName);outputStream.flush();}catch (Exception e){e.printStackTrace();}finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}}
}
传统BIO通信模型采用典型的一请求一应答模式。服务端通过一个独立的Acceptor
线程监听客户端连接,每当接收到新的连接请求后,会为每个客户端创建一个新的专用线程进行链路处理。该线程负责完成数据读取、业务逻辑处理、数据写入返回应答等全套操作,处理完成后线程随之销毁。
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,最终系统崩溃。
为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程,实现1个或多个线程处理N个客户端的模型(底层还是使用的同步阻塞I/O),通常被称为伪异步I/O模型。
public class ServerPool {private static ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());public static void main(String[] args) throws IOException {//服务端启动必备ServerSocket serverSocket = new ServerSocket();//表示服务端在哪个端口上监听serverSocket.bind(new InetSocketAddress(10001));System.out.println("Start Server ....");try{while(true){executorService.execute(new ServerTask(serverSocket.accept()));}}finally {serverSocket.close();}}//每个和客户端的通信都会打包成一个任务,交个一个线程来执行private static class ServerTask implements Runnable{private Socket socket = null;public ServerTask(Socket socket){this.socket = socket;}@Overridepublic void run() {//实例化与客户端通信的输入输出流try(ObjectInputStream inputStream =new ObjectInputStream(socket.getInputStream());ObjectOutputStream outputStream =new ObjectOutputStream(socket.getOutputStream())){//接收客户端的输出,也就是服务器的输入String userName = inputStream.readUTF();System.out.println("Accept client message:"+userName);//服务器的输出,也就是客户端的输入outputStream.writeUTF("Hello,"+userName);outputStream.flush();}catch(Exception e){e.printStackTrace();}finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}}
}
RPC框架是什么?为什么?
在互利网刚发展的时候,一个应用一台机器,将所有功能耦合在一起,比如比较常见的电商场景。随着业务的发展,如何提升性能?将不同的业务功能放到线程里实现异步提升性能。但是业务复杂、业务量积累,单个应用或机器的资源无法承载了。此时又怎么做?将核心业务抽取,作为独立自治服务,放到服务器或者形成集群。那么,这个时候提出RPC,系统变为分布式的架构。
引入rpc框架对我们现有的代码影响最小,同时又可以帮我们实现架构上的扩展。当服务越来越多,各种rpc之间的调用会越来越复杂,这个时候我们会引入中间件,比如说消息队列MQ、缓存,同时架构上整体往微服务去迁移,引入了各种比如容器技术docker,DevOps等。
RPC(Remote Procedure Call ——远程过程调用),它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络的技术。
一次完整的RPC(Remote Procedure Call)同步调用包含以下8个关键步骤:
本地调用发起
服务消费方(Client)以本地调用方式调用客户端存根(Client Stub)
对调用者透明:调用者感觉就像调用本地方法一样
客户端存根处理
客户端存根(Client Stub)是远程方法在客户端的代理对象
存根接收到调用后:
将方法名、参数等调用信息序列化(编码)
将包装后的信息通过网络发送到服务端
服务端接收消息
服务端接收到网络消息
交给服务端存根(Server Stub)进行反序列化(解码)
还原出实际的方法名和参数
服务端本地调用
Server Stub根据解码结果调用服务端本地的实际服务实现
服务执行返回
本地服务执行业务逻辑
将执行结果返回给Server Stub
服务端结果打包
Server Stub将返回结果序列化(打包成消息)
通过网络发送回消费方
客户端接收结果
Client Stub接收到网络消息
对消息进行反序列化(解码)
返回最终结果
服务消费方得到解码后的最终结果
整个调用过程完成,对调用者而言就像执行了本地方法调用
RPC和HTTP
RPC字面意思就是远程过程调用,只是对不同应用间相互调用的一种描述,一种思想。
实现方式可以是最直接的TCP通信,也可以是HTTP方式,在很多的消息中间件的技术书籍里,甚至还有使用消息中间件来实现RPC调用的,我们知道的dubbo是基于tcp通信的,gRPC是Google公布的开源软件,基于最新的HTTP2.0协议,底层使用到了Netty框架的支持。所以总结来说,rpc和http是完全两个不同层级的东西,他们之间并没有什么可比性。
实现RPC框架需要考虑或解决哪些问题?
代理、序列化、服务实例化。
首先,代理本质上是为了解决被调用的服务本身是远程的服务,但是调用者并不知道也不关心,调用者只要结果,具体的事情由代理的对象负责。既然是远程代理,自然是用代理模式了。代理对象访问目标对象,可以在目标对象的实现基础上拓展目标对象的功能。
JDK 代理又分为 静态代理和动态代理两种实现。
序列化?序列化要解决的核心矛盾就是:如何将内存中复杂的、立体的对象状态,完整且准确地转换( flatten ,铺平)成一串线性的字节序列,并且还能从这串字节序列中恢复( reconstruct ,重建)出完全等价的对象。序列化就是将对象变为二进制字节流,反序列化就是再将字节流变回对象。 Serializable
接口是Java为我们提供的实现这一机制的标准方案。虽然在现代分布式系统中,更高效的跨语言序列化方案(如Protobuf、JSON)更为流行,但理解Java原生的序列化机制仍然是掌握Java核心基础的重要一环。
服务实例化?登记的服务有可能在我们的系统中就是一个名字,怎么变成实际执行的对象实例,当然是使用反射机制。反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法和属性;这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制。
NIO网络编程
BIO、NIO、AIO如何理解?
同步 vs 异步:关注的是消息通知机制。是应用程序主动等待结果(同步),还是系统主动通知应用程序结果已就绪(异步)。
阻塞 vs 非阻塞:关注的是等待数据时的状态。是应用程序在等待结果时什么都不干(阻塞),还是可以继续做别的事(非阻塞)。
您的比喻非常形象和准确!完全抓住了BIO、NIO、AIO三种I/O模型的核心区别。我们来详细拆解和深化一下这个优秀的类比。
I/O模型核心概念拆解
在深入您的比喻前,先明确两个技术关键词:
同步 vs 异步:关注的是消息通知机制。是应用程序主动等待结果(同步),还是系统主动通知应用程序结果已就绪(异步)。
阻塞 vs 非阻塞:关注的是等待数据时的状态。是应用程序在等待结果时什么都不干(阻塞),还是可以继续做别的事(非阻塞)。
BIO (同步阻塞) - 食堂排队打饭模式
场景:你在食堂窗口排队。轮到你时,你对厨师(内核)说:“一份红烧肉”。然后你就站在窗口前等着,什么也不干,直到厨师把红烧肉做好并递到你手上,你才能离开去吃饭。
对应编程模型:
阻塞:线程在调用
socket.read()
读取网络数据时,如果数据还没到达,线程就会被挂起,什么都不做,直到数据就绪。同步:数据就绪后,需要线程自己去执行将数据从内核空间拷贝到用户空间的操作。
特点:一个连接一个线程。线程大部分时间都在“傻等”,资源浪费严重。就像食堂开了很多个窗口(线程),每个窗口前都有人(连接)在排队且等待,厨师和顾客效率都很低。
NIO (同步非阻塞) - 点单、等待被叫模式
场景:你去餐厅坐下,服务员给你一个号码牌。你点完菜后,不用在窗口傻等,可以回座位玩手机、聊天(处理其他任务)。但你得时不时地抬头看屏幕(
Selector
轮询),看看你的号码(连接)有没有被叫到(数据是否就绪)。如果被叫到,你就需要自己起身去窗口端菜(用户态自己完成数据拷贝)。对应编程模型 (Selector模式):
非阻塞:线程可以发起一个读请求,然后立刻返回去做别的事情,而不是傻等。
同步:线程需要不断地轮询(或由Selector通知)哪些连接的数据就绪了。数据就绪后,依然需要线程自己去执行数据拷贝。
核心:
Selector
(叫号大屏幕)是关键。一个线程(一个服务员)可以管理多个连接(照顾多桌客人),通过轮询知道哪一桌的菜好了(哪个SocketChannel数据就绪了)。
特点:一个线程处理多个连接。大大减少了线程数量。但编程模型复杂,并且数据就绪后,拷贝数据的过程仍然是同步的(需要用户线程自己端菜)。
AIO (异步非阻塞) - 包厢模式
场景:你在包厢吃饭。点完菜后,你就可以完全不管了,尽情聊天吃饭(处理其他任务)。后厨(操作系统)会负责把菜做好,并且由服务员(系统内核)直接端到你的桌上。菜上来后,服务员可能会说一声“先生,您的菜齐了”(回调通知),然后你直接开吃即可。
对应编程模型:
异步:应用程序发起一个I/O操作(如
asyncChannel.read
)后,立刻返回,完全不会阻塞。非阻塞:应用程序在等待数据期间可以处理任何其他逻辑。
机制:应用程序向内核注册一个回调函数。当内核完成了所有操作(包括数据准备和从内核空间到用户空间的数据拷贝),会主动通知应用程序(执行回调函数)。
特点:真正的“饭来张口”。应用程序无需关心数据准备和拷贝的过程,只需要定义好数据处理逻辑。理想很美好,但在Linux上实现不成熟,应用不广泛。
Java中的NIO
NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。NIO翻译成 no-blocking io 或者 new io都说得通。
NIO和BIO的主要区别
面向流与面向缓冲
Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
阻塞与非阻塞IO
Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
NIO三大核心组件
NIO有三大核心组件:Selector选择器、Channel管道、buffer缓冲区。
Selector
Selector的英文含义是“选择器”,也可以称为为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。
Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器(Selectors),然后使用一个单独的线程来操作这个选择器,进而“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。
应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。
Channel
通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据,而且可以同时进行读写。
所有被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。
ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。
ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口到服务器IP:端口的通信连接。
通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
buffer缓冲区
网络通讯中负责数据读写的区域
JDK-实现NIO
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class NioServerHandle implements Runnable {private Selector selector;private ServerSocketChannel serverChannel;private volatile boolean started;private final int port;// 您图片中的构造函数public NioServerHandle(int port) {this.port = port;try {// 1. 创建选择器selector = Selector.open();// 2. 打开监听通道serverChannel = ServerSocketChannel.open();// 3. 配置为非阻塞模式serverChannel.configureBlocking(false);// 4. 绑定端口serverChannel.socket().bind(new InetSocketAddress(port));// 5. 注册到选择器,监听ACCEPT事件serverChannel.register(selector, SelectionKey.OP_ACCEPT);// 6. 标记服务器已开启started = true;System.out.println("服务器已启动,端口号:" + port);} catch (IOException e) {e.printStackTrace();System.exit(1);}}public void stop() {started = false;}@Overridepublic void run() {// 事件循环,持续处理就绪的Channelwhile (started) {try {// 阻塞,等待有就绪事件发生。超时时间1秒。selector.select(1000);// 获取所有就绪的事件的SelectionKey集合Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove(); // 处理完后,必须移除,防止重复处理try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null) {key.channel().close();}}}}} catch (IOException e) {e.printStackTrace();}}// 循环结束后,关闭选择器释放资源if (selector != null) {try {selector.close();} catch (IOException e) {e.printStackTrace();}}}// 处理就绪的事件private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 处理新接入的客户端连接请求if (key.isAcceptable()) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();// 通过accept()方法接收客户端的连接,建立TCP物理链路SocketChannel sc = ssc.accept();// 设置为非阻塞模式sc.configureBlocking(false);// 注册到选择器,监听读操作sc.register(selector, SelectionKey.OP_READ);System.out.println("客户端连接成功: " + sc.getRemoteAddress());}// 处理读请求,读取客户端发送的数据if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();// 创建缓冲区,准备读取数据ByteBuffer buffer = ByteBuffer.allocate(1024);// 从Channel读取数据到缓冲区int readBytes = sc.read(buffer);if (readBytes > 0) {// 切换缓冲区为读模式buffer.flip();// 根据可读字节数创建字节数组byte[] bytes = new byte[buffer.remaining()];// 将缓冲区数据复制到字节数组buffer.get(bytes);String message = new String(bytes, "UTF-8");System.out.println("服务器收到消息: " + message);// 回复响应数据给客户端String response = "服务器响应: " + message;doWrite(sc, response);} else if (readBytes < 0) {// 对端链路关闭key.cancel();sc.close();System.out.println("客户端连接已关闭。");}// readBytes == 0 忽略,表示没有数据可读,是正常情况}}}// 异步发送响应消息private void doWrite(SocketChannel channel, String response) throws IOException {if (response != null && response.trim().length() > 0) {// 将消息编码为字节数组byte[] bytes = response.getBytes("UTF-8");// 根据数组容量创建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);// 将字节数组复制到缓冲区writeBuffer.put(bytes);// 切换缓冲区为读模式writeBuffer.flip();// 发送缓冲区的字节数组channel.write(writeBuffer);}}
}
Selector对象是通过调用静态工厂方法open()来实例化的。
要实现Selector管理Channel,需要将channel注册到相应的Selector上。
通过调用通道的register()方法会将它注册到一个选择器上。与Selector一起使用时,Channel必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式,而套接字通道都可以。另外通道一旦被注册,将不能再回到阻塞状态,此时若调用通道的configureBlocking(true)将抛出BlockingModeException异常。
在实际运行中,我们通过Selector的select()方法可以选择已经准备就绪的通道(这些通道包含感兴趣的的事件)。
select():阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
selectNow():非阻塞,立刻返回。
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。
一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合。
Set selectedKeys=selector.selectedKeys();
这个时候,循环遍历selectedKeys集中的每个键,并检测各个键所对应的通道的就绪事件,再通过SelectionKey关联的Selector和Channel进行实际的业务处理。
注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除,否则的话,下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
SelectionKey?
SelectionKey是一个抽象类,表示selectableChannel在Selector中注册的标识.每个Channel向Selector注册时,都将会创建一个SelectionKey。SelectionKey将Channel与Selector建立了关系,并维护了channel事件。
可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它.所以在调用某个key时,需要使用isValid进行校验。
SelectionKey类型和就绪条件
在向Selector对象注册感兴趣的事件时,JAVA NIO共定义了四种:OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT(定义在SelectionKey中),分别对应读、写、请求连接、接受连接等网络Socket操作。
操作类型 | 就绪条件及说明 |
OP_READ | 当操作系统读缓冲区有数据可读时就绪。并非时刻都有数据可读,所以一般需要注册该操作,仅当有就绪时才发起读操作,有的放矢,避免浪费CPU。 |
OP_WRITE | 当操作系统写缓冲区有空闲空间时就绪。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。 |
OP_CONNECT | 当SocketChannel.connect()请求连接成功后就绪。该操作只给客户端使用。 |
OP_ACCEPT | 当接收到一个客户端连接请求时就绪。该操作只给服务器使用。 |
服务端和客户端分别感兴趣的类型
ServerSocketChannel和SocketChannel可以注册自己感兴趣的操作类型,当对应操作类型的就绪条件满足时OS会通知channel,下表描述各种Channel允许注册的操作类型,Y表示允许注册,N表示不允许注册,其中服务器SocketChannel指由服务器ServerSocketChannel.accept()返回的对象。
服务器启动ServerSocketChannel,关注OP_ACCEPT事件,
客户端启动SocketChannel,连接服务器,关注OP_CONNECT事件
服务器接受连接,启动一个服务器的SocketChannel,这个SocketChannel可以关注OP_READ、OP_WRITE事件,一般连接建立后会直接关注OP_READ事件
客户端这边的客户端SocketChannel发现连接建立后,可以关注OP_READ、OP_WRITE事件,一般是需要客户端需要发送数据了才关注OP_READ事件
连接建立后客户端与服务器端开始相互发送消息(读写),根据实际情况来关注OP_READ、OP_WRITE事件。
Buffer
Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。以写为例,应用程序都是将数据写入缓冲,再通过通道把缓冲的数据发送出去,读也是一样,数据总是先从通道读到缓冲,应用程序再读缓冲的数据。
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存(其实就是数组)。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
capacity
作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。
position
当你写数据到Buffer中时,position表示当前能写的位置。初始的position值为0。当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1.
当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0. 当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
limit
在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。写模式下,limit等于Buffer的capacity。
当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)
Buffer的分配
要想获得一个Buffer对象首先要进行分配。每一个Buffer类都有allocate方法(可以在堆上分配,也可以在直接内存上分配)。
直接内存
HeapByteBuffer与DirectByteBuffer,在原理上,前者可以看出分配的buffer是在heap区域的,其实真正flush到远程的时候会先拷贝到直接内存,再做下一步操作;在NIO的框架下,很多框架会采用DirectByteBuffer来操作,这样分配的内存不再是在java heap上,经过性能测试,可以得到非常快速的网络交互,在大量的网络交互下,一般速度会比HeapByteBuffer要快速好几倍。
直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,但是这部分内存也被频繁地使用,而且也可能导致OutOfMemoryError 异常出现。
NIO可以使用Native 函数库直接分配堆外内存,然后通过一个存储在Java 堆里面的DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在Java 堆和Native 堆中来回复制数据。
直接内存(堆外内存)与堆内存比较
直接内存申请空间耗费更高的性能,当频繁申请到一定量时尤为明显
直接内存IO读写的性能要优于普通的堆内存,在多次读写操作的情况下差异明显
Buffer的读写
Buffer方法总结
limit(), limit(10)等 | 其中读取和设置这4个属性的方法的命名和jQuery中的val(),val(10)类似,一个负责get,一个负责set |
reset() | 把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方 |
clear() | position = 0;limit = capacity;mark = -1; 有点初始化的味道,但是并不影响底层byte数组的内容 |
flip() | limit = position;position = 0;mark = -1; 翻转,也就是让flip之后的position到limit这块区域变成之前的0到position这块,翻转就是将一个处于存数据状态的缓冲区变为一个处于准备取数据的状态 |
rewind() | 把position设为0,mark设为-1,不改变limit的值 |
remaining() | return limit - position;返回limit和position之间相对位置差 |
hasRemaining() | return position< limit返回是否还有未读内容 |
compact() | 把从position到limit中的内容移到0到limit-position的区域内,position和limit的取值也分别变成limit-position、capacity。如果先将positon设置到limit,再compact,那么相当于clear() |
get() | 相对读,从position位置读取一个byte,并将position+1,为下次读写作准备 |
get(int index) | 绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position |
get(byte[] dst, int offset, int length) | 从position位置开始相对读,读length个byte,并写入dst下标从offset到offset+length的区域 |
put(byte b) | 相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备 |
put(int index, byte b) | 绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position |
put(ByteBuffer src) | 用相对写,把src中可读的部分(也就是position到limit)写入此byteBuffer |
put(byte[] src, int offset, int length) | 从src数组中的offset到offset+length区域读取数据并使用相对写写入此byteBuffer |
NIO之Reactor模式
Reactor 是一种开发模式,模式的核心流程:
注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 -> 事件发生后做出相应的处理
单线程Reactor模式
①服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector(选择器)来实现IO的多路复用。注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。
②客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。
③当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。
④每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。
注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。
但在目前的单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应。
多线程Reactor模式
与单线程Reactor模式不同的是,添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理。
使用线程池的优势:
①通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程产生的巨大开销。
②另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
③通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态。同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
改进的版本中,所有的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:
①一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送;
②当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
主从多线程Reactor模式
Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的事件循环逻辑。
mainReactor可以只有一个,但subReactor一般会有多个。mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。
流程:
①注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。启动mainReactor的事件循环。
②客户端向服务器端发起一个连接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个SocketChannel传递给subReactor线程池。
③ subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。当然你也注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的循环逻辑。
④当有I/O事件就绪时,相关的subReactor就将事件派发给相应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。
注意,所有的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依旧还是在Reactor线程(mainReactor线程或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处理非I/O操作的逻辑。
多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。