当前位置: 首页 > news >正文

分布式文件系统05-生产级中间件的Java网络通信技术深度优化

生产级中间件的Java网络通信技术深度优化

134_完成读取本地磁盘文件以及发送给客户端的代码逻辑

public class DataNodeNIOServer extends Thread {private Map<String, String> waitReadingFiles = new ConcurrentHashMap<String, String>();/*** 将文件发送到客户端去* @param channel* @param key* @throws Exception*/private void sendFileToClient(SocketChannel channel, SelectionKey key) throws Exception {// 构建针对本地文件的输入流if(!channel.isOpen()) {channel.close();return;}String remoteAddr = channel.getRemoteAddress().toString(); String filename = waitReadingFiles.get(remoteAddr);File file = new File(filename);Long fileLength = file.length();FileInputStream imageIn = new FileInputStream(filename);    FileChannel imageChannel = imageIn.getChannel();// 循环不断的从channel里读取数据,并写入磁盘文件ByteBuffer buffer = ByteBuffer.allocate(Integer.parseInt(String.valueOf(fileLength)) * 2);long hasReadImageLength = 0L;int len = -1;while((len = imageChannel.read(buffer)) > 0) {hasReadImageLength += len;System.out.println("已经从本地磁盘文件读取了" + hasReadImageLength + "字节的数据");  buffer.flip();channel.write(buffer);buffer.clear();}imageChannel.close();imageIn.close();// 判断一下,如果已经读取完毕,就返回一个成功给客户端if(hasReadImageLength == fileLength) {System.out.println("文件发送完毕,给客户端: " + remoteAddr);}}}

按理说,上面数据写完以后,需要把waitReadingFiles中的数据删除

135_在NIO中处理完一次读写请求之后应该如何处理事件的监听?

上面的实现方式,是把Read事件和Write事件,分开作两个流程进行处理,实际上,也不用那么麻烦,直接在处理Read事件时,就从磁盘中读出文件并通过channel写回到客户端即可。

waitReadingFiles的逻辑,也都就不需要了

/*** 数据节点的NIOServer*/
public class DataNodeNIOServer extends Thread {public static final Integer SEND_FILE = 1;public static final Integer READ_FILE = 2;// NIO的selector,负责多路复用监听多个连接的请求private Selector selector;// 内存队列,无界队列private List<LinkedBlockingQueue<SelectionKey>> queues =new ArrayList<LinkedBlockingQueue<SelectionKey>>();// 缓存的没读取完的文件数据private Map<String, CachedImage> cachedImages = new ConcurrentHashMap<String, CachedImage>();// 与NameNode进行通信的客户端private NameNodeRpcClient namenodeRpcClient;/*** NIOServer的初始化,监听端口、队列初始化、线程初始化*/public DataNodeNIOServer(NameNodeRpcClient namenodeRpcClient) {ServerSocketChannel serverSocketChannel = null;try {this.namenodeRpcClient = namenodeRpcClient;selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);for (int i = 0; i < 3; i++) {queues.add(new LinkedBlockingQueue<SelectionKey>());}for (int i = 0; i < 3; i++) {new Worker(queues.get(i)).start();}System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);} catch (IOException e) {e.printStackTrace();}}public void run() {/*** 无限循环,等待IO多路复用方式监听请求*/while (true) {try {selector.select();Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();while (keysIterator.hasNext()) {SelectionKey key = keysIterator.next();keysIterator.remove();handleEvents(key);}} catch (Throwable t) {t.printStackTrace();}}}/*** 处理请求分发** @param key* @throws IOException* @throws ClosedChannelException*/private void handleEvents(SelectionKey key) throws IOException {SocketChannel channel = null;try {if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();channel = serverSocketChannel.accept();if (channel != null) {channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_READ);}} else if (key.isReadable()) {channel = (SocketChannel) key.channel();String remoteAddr = channel.getRemoteAddress().toString();int queueIndex = remoteAddr.hashCode() % queues.size();queues.get(queueIndex).put(key);}} catch (Throwable t) {t.printStackTrace();if (channel != null) {channel.close();}}}/*** 处理请求的工作线程*/class Worker extends Thread {private LinkedBlockingQueue<SelectionKey> queue;public Worker(LinkedBlockingQueue<SelectionKey> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {SocketChannel channel = null;try {SelectionKey key = queue.take();channel = (SocketChannel) key.channel();handleRequest(channel, key);} catch (Exception e) {e.printStackTrace();if (channel != null) {try {channel.close();} catch (IOException e1) {e1.printStackTrace();}}}}}}/*** 处理客户端发送过来的请求** @param channel* @param key* @throws Exception*/private void handleRequest(SocketChannel channel, SelectionKey key) throws Exception {// 假如说你这个一次读取的数据里包含了多个文件的话// 这个时候我们会先读取文件名,然后根据文件的大小去读取这么多的数据String remoteAddr = channel.getRemoteAddress().toString();System.out.println("接收到客户端的请求:" + remoteAddr);// 需要先提取出来这次请求是什么类型:1 发送文件;2 读取文件if (cachedImages.containsKey(remoteAddr)) {handleSendFileRequest(channel, key);} else {// 但是此时channel的position肯定也变为了4Integer requestType = getRequestType(channel); if (SEND_FILE.equals(requestType)) {handleSendFileRequest(channel, key);} else if (READ_FILE.equals(requestType)) {handleReadFileRequest(channel, key);}}}/*** 发送文件*/private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {String remoteAddr = channel.getRemoteAddress().toString();Filename filename = getFilename(channel);System.out.println("从网络请求中解析出来文件名:" + filename);if (filename == null) {channel.close();return;}// 从请求中解析文件大小long imageLength = getImageLength(channel);System.out.println("从网络请求中解析出来文件大小:" + imageLength);// 定义已经读取的文件大小long hasReadImageLength = getHasReadImageLength(channel);System.out.println("初始化已经读取的文件大小:" + hasReadImageLength);// 构建针对本地文件的输出流FileOutputStream imageOut = new FileOutputStream(filename.absoluteFilename);FileChannel imageChannel = imageOut.getChannel();imageChannel.position(imageChannel.size());// 循环不断的从channel里读取数据,并写入磁盘文件ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);int len = -1;while ((len = channel.read(buffer)) > 0) {hasReadImageLength += len;System.out.println("已经向本地磁盘文件写入了" + hasReadImageLength + "字节的数据");buffer.flip();imageChannel.write(buffer);buffer.clear();}imageChannel.close();imageOut.close();// 判断一下,如果已经读取完毕,就返回一个成功给客户端if (hasReadImageLength == imageLength) {ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());channel.write(outBuffer);cachedImages.remove(remoteAddr);System.out.println("文件读取完毕,返回响应给客户端: " + remoteAddr);// 增量上报Master节点自己接收到了一个文件的副本// /image/product/iphone.jpgnamenodeRpcClient.informReplicaReceived(filename.relativeFilename);System.out.println("增量上报收到的文件副本给NameNode节点......");key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);}// 如果一个文件没有读完,缓存起来,等待下一次读取else {CachedImage cachedImage = new CachedImage(filename, imageLength, hasReadImageLength);cachedImages.put(remoteAddr, cachedImage);System.out.println("文件没有读取完毕,等待下一次OP_READ请求,缓存文件:" + cachedImage);}}/*** 读取文件*/private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {String remoteAddr = channel.getRemoteAddress().toString();// 从请求中解析文件名// 已经是:F:\\development\\tmp1\\image\\product\\iphone.jpgFilename filename = getFilename(channel);System.out.println("从网络请求中解析出来文件名:" + filename);if (filename == null) {channel.close();return;}File file = new File(filename.absoluteFilename);Long fileLength = file.length();FileInputStream imageIn = new FileInputStream(filename.absoluteFilename);FileChannel imageChannel = imageIn.getChannel();// 循环不断的从channel里读取数据,并写入磁盘文件ByteBuffer buffer = ByteBuffer.allocate(Integer.parseInt(String.valueOf(fileLength)) * 2);long hasReadImageLength = 0L;int len = -1;while ((len = imageChannel.read(buffer)) > 0) {hasReadImageLength += len;System.out.println("已经从本地磁盘文件读取了" + hasReadImageLength + "字节的数据");buffer.flip();channel.write(buffer);buffer.clear();}imageChannel.close();imageIn.close();// 判断一下,如果已经读取完毕,就返回一个成功给客户端if (hasReadImageLength == fileLength) {System.out.println("文件发送完毕,给客户端: " + remoteAddr);// 把文件给客户端发送回去后,就删除要关注的Read事件// 表示这个连接的事情就处理完了,等待客户端主动关闭这个连接了,因为客户端用的是短连接key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);}}/*** 获取本次请求的类型*/public Integer getRequestType(SocketChannel channel) throws Exception {ByteBuffer requestType = ByteBuffer.allocate(4);channel.read(requestType);  // 此时requestType ByteBuffer,position跟limit都是4,remaining是0if (!requestType.hasRemaining()) {// 已经读取出来了4个字节,可以提取出来requestType了requestType.rewind(); // 将position变为0,limit还是维持着4return requestType.getInt();}return -1;}}

主要就是加了,取消针对当前通道的Read事件的监听

136_在客户端实现从数据节点接收发送过来的图片数据

137_工业级NIO通信组件:请求头的拆包问题应该如何解决?

读取请求类型

第237行的remove动作不可少

读取文件名

	/*** 获取相对路径的文件名* @param channel* @return*/private String getRelativeFilename(SocketChannel channel) throws Exception {String client = channel.getRemoteAddress().toString();Integer filenameLength = null;String filename = null;// 读取文件名的大小if(!filenameByClient.containsKey(client)) {ByteBuffer filenameLengthBuffer = null;if(filenameLengthByClient.containsKey(client)) {filenameLengthBuffer = filenameLengthByClient.get(client);} else {filenameLengthBuffer = ByteBuffer.allocate(4);}channel.read(filenameLengthBuffer); if(!filenameLengthBuffer.hasRemaining()) { filenameLengthBuffer.rewind();filenameLength = filenameLengthBuffer.getInt();filenameLengthByClient.remove(client);} else {filenameLengthByClient.put(client, filenameLengthBuffer);}}// 读取文件名ByteBuffer filenameBuffer = null;if(filenameByClient.containsKey(client)) {filenameBuffer = filenameByClient.get(client);} else {filenameBuffer = ByteBuffer.allocate(filenameLength);}channel.read(filenameBuffer);if(!filenameBuffer.hasRemaining()) {filenameBuffer.rewind();filename = new String(filenameBuffer.array());  filenameByClient.remove(client);} else {filenameByClient.put(client, filenameBuffer);}return filename;}

读取文件长度

获取已经读取的文件大小

当客户端发送文件过来时,可能一个文件过大,不可避免的整个文件体就会被拆成多个包,表现在代码层面,就是会有一轮又一轮的Read事件达到,在最后一轮Read事件达到之前的每一轮,我们都需要记录已经读取了文件大小是多少。只有当已经读取的文件大小,等于请求头中的文件完整长度时,才表示这个文件全部接收完毕了

138_工业级NIO通信组件:多个数据文件的粘包问题应该如何解决?

某个客户端在一个连接中,连续上传了多个文件,才可能会出现粘包问题。虽然,我们这里使用的是短连接,一个请求一个连接,所以是不会出现粘包问题的

针对拆包问题,我们只要把已经读到的一部分包缓存起来,并保持对OP_READ事件的继续关注即可

针对粘包问题,我们就是一次只能读取一个文件的内容,如果一个文件读完了,channel中还有数据,那么就只能再走上面的拆包并缓存数据的逻辑,来重复处理第二个文件的数据包

这里是一个文件内容,发生拆包时的处理逻辑。如果要处理粘包问题,那么也还是在这段代码中进行修改,但是定义的fileBuffer也还是fileLength的大小

当前一个文件数据包处理完之后,继续while循环,从channel中读取数据,如果没有读取到数据就结束本次OP_READ事件处理,如果从channel中读取到了数据,则缓存起来,进行下一个包的的处理,依然是先获取4个字节的文件名长度的请求头或者8个字节文件长度的请求头,后续的处理逻辑又是拆包的处理逻辑

139_工业级NIO通信组件:数据文件的拆包问题应该如何解决?

上传文件

下载文件

140_工业级NIO通信组件:客户端读取文件的拆包问题如何解决?

141_工业级NIO通信组件:客户端读取多文件的粘包问题如何解决?

如果客户端通过一个连接,发送了多个文件的下载请求,服务端可能同时把多个文件包发过来,从而产生了粘包问题。我们这里使用的是短连接,当然对应客户端下载文件来说,也不存在粘包问题

通过定义的这个buffer,保证了每次最多只读完一个文件,不会读到下一个文件去。所以,如果发生粘包问题,我们依然可以通过给增加一个while循环,继续从channel中读取下一个文件包的内容即可。当然了,读取下一个文件包时,又有可能发生拆包问题

http://www.dtcms.com/a/312559.html

相关文章:

  • ClickHouse Windows迁移方案与测试
  • HiveMQ 2024.9 设计与开发文档
  • 知识随记-----MySQL 连接池健康检测与 RAII 资源管理技术
  • Timer串口常用库函数(STC8系列)
  • Docker--解决x509: certificate signed by unknown authority
  • 系统学习算法:专题十六 字符串
  • 基于SpringBoot+MyBatis+MySQL+VUE实现的电商平台管理系统(附源码+数据库+毕业论文+部署教程+配套软件)
  • WSUS服务器数据库维护与性能优化技术白皮书
  • Leetcode 12 java
  • CSS 预处理器(Preprocessor)和后处理器(Postprocessor)
  • python工具方法51 视频数据的扩充(翻转、resize、crop、re_fps)
  • 01.MySQL 安装
  • 仓库管理系统-15-前端之管理员管理和用户管理
  • 01数据结构-时间复杂度和空间复杂度
  • 每日五个pyecharts可视化图表-bars(2)
  • HCIP笔记(第四章)
  • Flutter各大主流状态管理框架技术选型分析及具体使用步骤
  • 网络原理 - TCP/IP
  • 计算机网络(TCP篇)
  • PPT自动化 python-pptx - 10 : 表格(tables)
  • 力扣经典算法篇-42-矩阵置零(辅助数组标记法,使用两个标记变量)
  • 使命召唤21:黑色行动6 免安 离线 中文版
  • 1.8 axios详解
  • Axios介绍
  • 一键安装RabbitMQ脚本
  • ESP32学习-I2C(IIC)通信详解与实践
  • 线程锁-互斥、自旋、读写、原子操作、线程池
  • [硬件电路-147]:模拟电路 - DC/DC电压的三种架构:升压(Boost)、降压(Buck)或升降压(Buck-Boost)
  • GLM-4.5 解读:统一推理、编码与智能体的全能王
  • 利用AI渲染技术提升元宇宙用户体验的技术难点有哪些?