当前位置: 首页 > news >正文

分布式文件系统04-DataNode海量数据分布式高可靠存储

DataNode海量数据分布式高可靠存储

085_思考一下:为了高可用而冗余存储到底需要几个副本呢?

到底需要几个副本来冗余存储呢?比如说hadoop hdfs,默认就是3个副本,是有一个规律的,假设你的集群的所有的机器是假设到多个机架上的。机房和机架的关系,其中2个副本是在一个机架上,另外一个副本是在别的机架上。hadoop hdfs会实现机架感知的功能

默认情况下,如果是一个机架里的某台机器宕机了,此时使用这个机架上的另外一台服务器上的副本就可以了;万一说这个机架都故障了,此时别的机架上还有一个副本冗余是可以用的,也就是说,提高可用性的级别,万一某个机架都故障了,其他机架上有副本可以用

其实以我们通常的一些运维的经验来看的话,单台机器的故障概率比较高,物理机的网络故障、磁盘坏了。但是你说机架都故障了,机架上所有的机器都故障了,这个事情就比较尴尬了,是比较少见的。虽然说实现3个副本的优点,就是可以实现更高的可用性,但是缺点,就是对存储空间的占用太多了

比如,数据总共是100G,如果是使用3副本机制,就需要300G空间,其实就要耗费掉3倍的磁盘空间来存储你所有的数据;又比如总共10TB,3副本,就需要30TB来存储;但是如果你是2副本,就只要20TB来存储即可

所以,有没有必要用3副本呢?中等规模的公司而言,暂时还没有必要去考虑整个机架都出现故障的问题。

那么我们就采用双副本即可,一个图片就存储两份,在两台机器上就可以了,对于可用性而言也可以保证,一般来说比较多的故障就是某台机器故障,但是其他机器上还有图片的副本可以使用,也达到了保证系统的可用性的目的

另外,对存储空间的占用相对于3副本机制,可以减少1/3,特别是对于中等规模的公司而言,最大的好处就是可以节约很多的钱,1/3的存储,也是要钱买的

086_选择数据节点时如何实现集群资源均匀负载的效果?

尽可能得让集群里的各个机器上存放的数据是比较均匀的负载的

比如,当前一共有30TB的数据,4台机器。一台机器上放了20TB的数据,另外三台机器,每台机器放3TB多一点的数据,这就数据倾斜了。要实现集群里的存储资源的均匀使用,每台机器的存储的数据量大概在7TB~8TB之间

构思一下这个里面的思路,首先呢,必须是要知道,每台机器上放了多少数据量的文件,我现在需要知道的是机器01上存放了5TB的数据,机器02上存放了5.3TB的数据,机器03上存放了6TB的数据,机器04上存放了6.1TB的数据

这个时候来了一个新图片上传的请求,我应该把这个图片的2个副本上传到哪两台机器上去呢?把全量机器上面的数据量的大小正序来排序,选择数据量最小的两台机器就可以了,从而保证各个机器的存储的数据量基本是均匀

087_设计一个为文件的多个副本选择合适数据节点的RPC接口

设计一个master节点的接口,可以为图片上传的请求选择当前存储资源占用量,最小的两台机器分配过去作为双副本存储的目标数据节点

088_多副本机器选择算法底层依赖的数据结构如何设计

NameNode管理着所有注册过来的DataNode节点的元数据信息

089_基于存储数据大小排序机制实现多副本数据节点的选择

090_测试双副本对应的数据节点选择机制是否正常运行

091_是否有必要使用管道数据流传输方式来上传文件的多副本?

现在我们手上已经有多个数据节点了,此时就可以尝试把这个图片的副本,传输到各个数据节点上去,但是此时传输采用什么样的方式呢?有两种方式可以来做

第一种方式:管道数据流的方式

第二种方式:客户端多副本依次上传

hadoop hdfs采用的管道数据流的方式

  1. 避免客户端的网络连接资源过多,负载过重
  2. 机器之间传输数据的性能更高,比客户端到服务器之间的性能要高

如果采用第二种方式,当hadoop hdfs的三副本机制时,每个客户端就都分别需要与3个DataNode建立连接,这对于DataNode的连接资源消耗过重

咱们就是采用第二种方式即可,因为我们的定位和应对的场景,上传图片的次数并不是特别多的,一天可能才几万张图片上传,网络连接的资源不会特别的频繁和负载过重,我们也就是2个副本而已,所以说你如果为了把一个副本的传递放在两台机器之间自己去做,会导致编程特别的麻烦和复杂(如果有10个副本,那么服务端机器之间通过管道自己传递数据,那么收益才会比较大)

哪怕我们就是客户端的机器上传图片到两个数据节点上也可以的,假定中型公司没有使用那么多的机房,比如说总共就是1个机房,大家都在一起,所以说性能的损耗并不是特别的大,所以就是还好

092_通过画图的方式阐述传统的BIO为什么是同步阻塞的?

BIO 同步阻塞

文件读写,InputStream;网络通信,Socket

阻塞,针对的是IO流来说的,系统内核在IO完成之前,本应用线程都会卡住,本线程不能抽身去干别的事情;一直卡到内核recv_queue中有数据达到后才能返回,应用线程才能唤醒开始解析达到并返回的数据

同步,针对的是我们的程序和JDK API之间的关系,调用了IO API之后,就必须同步等待人家完成底层的IO操作,才可以让方法返回

093_针对文件和网络两种场景分析NIO的同步非阻塞

NIO 同步非阻塞

BIO,同步阻塞

NIO,同步非阻塞

AIO,异步非阻塞:NIO2,AIO,每次发起一个IO请求之前,都必须提供一个回调函数,发起以后就立马直接返回了,后续操作系统执行完IO操作拿到数据后,就会主动调起此回调函数开始处理数据

094_基于NIO设计数据节点应对多个客户端上传文件的架构

BIO,因为是阻塞的,导致一个客户端的连接,只能是一个线程来应对

NIO,非阻塞,N个客户端的连接,可以是用一个线程来应对,大大提升了单机应对高并发连接和请求的能力

每个DataNode数据节点,实际上都有一个NIO server,基于非阻塞的方式,来监视多个已经连接到DataNode身上的客户端的连接上,是否有请求过来,有请求就会交给后端的架构来进行处理,所以说,我们在核心的文件上传这块,就可以用NIO来进行实现

095_为数据节点引入数据流传输端口供NIO上传机制使用

每个数据节点要基于哪个端口去监听开放nio server呢?这个你一般可以做在配置文件里,但是对于我们来说,稍微简化一下,直接这个端口是写死的就可以了,就认为这个端口默认就是有一个端口就可以了

实际中,可以让DataNode数据节点上传心跳时,除了上传ip、hostname以外,还可以把nio server-port一起注册给NameNode,后续dfs-client上传文件之前,准备调用NameNode的分配两台机器的接口时,NameNode就可以通过这个接口,把两台机器的nio server-port信息一并返回给dfs-client端

096_完成客户端到数据节点基于NIO实现的小文件传输机制

097_NIO Server如何基于多线程来处理多个客户端的请求

中间的一个线程,其实就是一个selector

098_NIO文件传输的粘包拆包问题以及解决这些问题

粘包和拆包的问题

底层的TCP这块,如果看过之前的一些网络课程的话,网络对应的是很多的数据包、TCP包,客户端发送请求到服务端,其实本质就是发送很多个TCP包过去

粘包,意思就是说本来应该是两个包,但是到了服务器那块接收到了以后,把两个不同的包粘在一起交给了你的应用程序来处理,此时你会发现获取到的数据粘到一起去了,尴尬了,就不知道这次要处理的是哪部分的数据

拆包,本来是应用程序层面的一个包,结果被操作系统内核给拆分了2个包发出,到服务器的操作系统内核channel的recv_queu那块的时候,看到的就是两个包,服务器应用程序先接受一个包,再接受一个包,需要把两个包给合并起来进行处理

后续讲Netty的时候,可以深入的去看看应该怎么来处理,但是,在原生的NIO层面,现在没有做完美的解决方案,我的代码里仅仅是针对的是拆包问题,进行了一定的处理。

当前的代码,走的是短连接的方式,每次发送一次图片前都新建立一次连接,一个连接就传输 一个图片,所以一般不会出现粘包的问题。但是有可能会有拆包的问题,你的一个图片,100kb,给拆分成了两次来调用你的handleRequest(),第一次是过来了80kb,第二次是过来了20kb,此时,就需要把第一次过来的80kb,和第二次是过来了20kb拼接起来

NIOClient

package com.zhss.dfs.client;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class NIOClient {public static void sendFile(byte[] file, long fileSize) {  SocketChannel channel = null;  Selector selector = null;  try {  channel = SocketChannel.open();  channel.configureBlocking(false);  channel.connect(new InetSocketAddress("localhost", 9000)); selector = Selector.open();  channel.register(selector, SelectionKey.OP_CONNECT);  boolean sending = true;while(sending){    selector.select();   Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();  while(keysIterator.hasNext()){  SelectionKey key = (SelectionKey) keysIterator.next();  keysIterator.remove();  if(key.isConnectable()){  channel = (SocketChannel) key.channel(); if(channel.isConnectionPending()){  channel.finishConnect();ByteBuffer buffer = ByteBuffer.allocate((int) fileSize * 2);// long对应了8个字节,放到buffer里去buffer.putLong(fileSize);buffer.put(file);channel.register(selector, SelectionKey.OP_READ);}   }  else if(key.isReadable()){  channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);                         int len = channel.read(buffer); if(len > 0) {System.out.println("[" + Thread.currentThread().getName() + "]收到响应:" + new String(buffer.array(), 0, len)); sending = false;}}} }                            } catch (Exception e) {  e.printStackTrace();  } finally{  if(channel != null){  try {  channel.close();  } catch (IOException e) {                        e.printStackTrace();  }                    }  if(selector != null){  try {  selector.close();  } catch (IOException e) {  e.printStackTrace();  }  }  } }}

DataNodeNIOServer

package com.zhss.dfs.datanode.server;public class DataNodeNIOServer extends Thread {private Selector selector;private final List<LinkedBlockingQueue<SelectionKey>> queues = new ArrayList<LinkedBlockingQueue<SelectionKey>>();// 用于发生拆包时,在内存中缓存上一轮读到的buffer数据,方便后续进行拼接private final Map<String, CachedImage> cachedImages = new HashMap<String, CachedImage>();class CachedImage {String filename;long imageLength;long hasReadImageLength;public CachedImage(String filename, long imageLength, long hasReadImageLength) {this.filename = filename;this.imageLength = imageLength;this.hasReadImageLength = hasReadImageLength;}@Overridepublic String toString() {return "CachedImage [filename=" + filename + ", imageLength=" + imageLength + ", hasReadImageLength="+ hasReadImageLength + "]";}}public DataNodeNIOServer() {ServerSocketChannel serverSocketChannel = null;try {selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);for (int i = 0; i < 3; i++) {queues.add(new LinkedBlockingQueue<SelectionKey>());}for (int i = 0; i < 3; i++) {// 每个Worker线程,只独立负责一个唯一的queue中存储的事件new Worker(queues.get(i)).start();}System.out.println("NIOServer已经启动,开始监听端口:" + 9000);} catch (IOException e) {e.printStackTrace();}}public void run() {while (true) {try {selector.select();Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();while (keysIterator.hasNext()) {SelectionKey key = keysIterator.next();keysIterator.remove();handleRequest(key);}} catch (Throwable t) {t.printStackTrace();}}}private void handleRequest(SelectionKey key)throws IOException, ClosedChannelException {SocketChannel channel = null;try {if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();channel = serverSocketChannel.accept();if (channel != null) {channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_READ);}} else if (key.isReadable()) {channel = (SocketChannel) key.channel();// 同一个remoteAddr客户端过来的事件,都hash到同一个worker线程进行处理String remoteAddr = channel.getRemoteAddress().toString();int queueIndex = remoteAddr.hashCode() % queues.size();queues.get(queueIndex).put(key);}} catch (Throwable t) {t.printStackTrace();if (channel != null) {channel.close();}}}class Worker extends Thread {// selector线程,把每次select到的事件,丢入这些阻塞队列private final LinkedBlockingQueue<SelectionKey> queue;public Worker(LinkedBlockingQueue<SelectionKey> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {SocketChannel channel = null;try {// 从自身附带的阻塞队列中取事件,从而方面下面开始处理SelectionKey key = queue.take();channel = (SocketChannel) key.channel();if (!channel.isOpen()) {channel.close();continue;}String remoteAddr = channel.getRemoteAddress().toString();ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);int len = -1;String filename = null;if (cachedImages.containsKey(remoteAddr)) {filename = cachedImages.get(remoteAddr).filename;} else {// 生成一个随机的文件名filename = "F:\\development\\tmp\\" + UUID.randomUUID().toString() + ".jpg";}long imageLength = 0;if (cachedImages.containsKey(remoteAddr)) {imageLength = cachedImages.get(remoteAddr).imageLength;} else {len = channel.read(buffer);buffer.flip();// 8,是客户端先写入了8个字节,存储的是imageLength// 8个字节相当于协议头,头8个字节以后的字节数据,就是协议体bodyif (len > 8) {byte[] imageLengthBytes = new byte[8];buffer.get(imageLengthBytes, 0, 8);ByteBuffer imageLengthBuffer = ByteBuffer.allocate(8);imageLengthBuffer.put(imageLengthBytes);imageLengthBuffer.flip();// 获取文件实际长度imageLength = imageLengthBuffer.getLong();} else if (len <= 0) {channel.close();continue;}}long hasReadImageLength = 0;if (cachedImages.containsKey(remoteAddr)) {hasReadImageLength = cachedImages.get(remoteAddr).hasReadImageLength;}FileOutputStream imageOut = new FileOutputStream(filename);FileChannel imageChannel = imageOut.getChannel();imageChannel.position(imageChannel.size());if (!cachedImages.containsKey(remoteAddr)) {hasReadImageLength += imageChannel.write(buffer);buffer.clear();}// 核心就是这里,从channel读取数据写入本地磁盘while ((len = channel.read(buffer)) > 0) {hasReadImageLength += len;buffer.flip();imageChannel.write(buffer);buffer.clear();}if (cachedImages.get(remoteAddr) != null) {if (hasReadImageLength == cachedImages.get(remoteAddr).hasReadImageLength) {channel.close();continue;}}imageChannel.close();imageOut.close();if (hasReadImageLength == imageLength) {// 说明读完了客户端发过来的完整文件数据包ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());channel.write(outBuffer);// 防止内存泄漏cachedImages.remove(remoteAddr);} else {// 走到这里,说明本地达到的OP_READ事件,并没有把client端发过来的完整数据包读完// 也就是说,发生了拆包问题:一个文件一共580k// 这里第一次发送过来500k,后续第二次才发过来后续的80k// 这里就记录下,已经读了500kCachedImage cachedImage = new CachedImage(filename, imageLength, hasReadImageLength);cachedImages.put(remoteAddr, cachedImage);key.interestOps(SelectionKey.OP_READ);}} catch (Exception e) {e.printStackTrace();if (channel != null) {try {channel.close();} catch (IOException e1) {e1.printStackTrace();}}}}}}}

099_完善数据节点的核心配置信息以及上报拉取的过程

搭建好了客户端和数据节点传输一个图片文件的基础的代码,还没测试,主要是因为很多的代码还是没有完善好呢,所以先不要测试,等我们全部都搞定了以后我们再统一的测试一下整个完整的流程

写中间件的代码一定是不能着急的,要一点一点的写,考虑清楚很多里面的细节,就是一个是主体的架构设计核心的运行流程(主要靠你自己把各种细节考虑清楚)、核心之外其他的一些注意的点(主要是靠想),比如一些健壮性场景

很多人都说,我这个去年写的代码自己现在去看都觉得很陌生,只要记一些核心的点架构设计、核心运行流程就好了

100_完善客户端基于NIO依次对两个数据节点上传图片的代码

NioClient

package com.zhss.dfs.client;/*** 客户端的一个NIOClient,负责跟数据节点进行网络通信**/
public class NIOClient {/*** 发送一个文件过去*/public static void sendFile(String hostname, int nioPort, byte[] file, long fileSize) {// 建立一个短连接,发送完一个文件就释放网络连接SocketChannel channel = null;  Selector selector = null;  try {  channel = SocketChannel.open();  channel.configureBlocking(false);  channel.connect(new InetSocketAddress(hostname, nioPort)); selector = Selector.open();  channel.register(selector, SelectionKey.OP_CONNECT);  boolean sending = true;while(sending){selector.select();   Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();  while(keysIterator.hasNext()){  SelectionKey key = (SelectionKey) keysIterator.next();  keysIterator.remove();  // NIOServer允许进行连接的话if(key.isConnectable()){  channel = (SocketChannel) key.channel(); if(channel.isConnectionPending()){// 把三次握手做完,TCP连接建立好了channel.finishConnect();// 封装文件的请求数据ByteBuffer buffer = ByteBuffer.allocate((int)fileSize * 2); buffer.putLong(fileSize); // long对应了8个字节,放到buffer里去buffer.put(file);int sentData = channel.write(buffer);  System.out.println("已经发送了" + sentData + "字节的数据");  // 用于接收上传完文件,NameNode给DataNode的响应事件channel.register(selector, SelectionKey.OP_READ);}   }  // 接收到NIOServer的响应else if(key.isReadable()){  channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);                         int len = channel.read(buffer); if(len > 0) {System.out.println("[" + Thread.currentThread().getName() + "]收到响应:" + new String(buffer.array(), 0, len));// 接收到响应后,结束整个发送过程sending = false;}}} }                            } catch (Exception e) {  e.printStackTrace();  } finally{  if(channel != null){  try {  channel.close();  } catch (IOException e) {                        e.printStackTrace();  }                    }  if(selector != null){  try {  selector.close();  } catch (IOException e) {  e.printStackTrace();  }  }  } }}

101_重新审视一下数据节点的NIOServer接收图片的代码

102_通过文件名的同步在每个数据节点上保证图片文件名相同

目前的文件名,使用的是随机字符串

第一个办法,就是在上传文件的时候,就把文件名给上传过去

第二个办法,就是在上传文件的时候,先用一个随机的文件名,接着上传成功之后,客户端再发送一次RPC的请求把文件名给传输过去,让他进行目录的构建以及改名字

可以考虑使用第一个办法,在传输的数据里,必须先有4个字节是int类型的,他是代表了文件名的长度,接着就是实际的多个字节的文件名,接着是文件的数据,你在解析的时候就必须要按照这个思路来进行解析

103_在二进制格式的请求中如何放入文件名、文件大小以及文件数据?

我们需要在请求中放入文件名、文件大小、文件数据

http://www.dtcms.com/a/288604.html

相关文章:

  • 【LeetCode数据结构】单链表的应用——环形链表问题详解
  • 【PTA数据结构 | C语言版】哈夫曼树的实现
  • UDP中的单播,多播,广播
  • 【RAG Agent】Deep Searcher实现逻辑解析
  • 【Unity3D实例-功能-移动】角色移动-通过WSAD(CharacterController方式)
  • 【STM32实践篇】:串口通信
  • Qwen3-8B 的 TTFT 性能分析:16K 与 32K 输入 Prompt 的推算公式与底层原理详解
  • 吴恩达机器学习笔记(3)—线性代数回顾(可选)
  • 【Django】DRF API版本和解析器
  • HTML Style 对象深度解析:从基础到高级应用
  • 上电复位断言的自动化
  • 【数据结构】双向循环链表的实现
  • 18.TaskExecutor获取ResourceManagerGateway
  • 【MySQL】索引中的页以及索引的分类
  • 【Nature Communications】GaN外延层中位错辅助的电子和空穴输运
  • PHPStorm携手ThinkPHP8:开启高效开发之旅
  • selenium4 web自动化测试
  • pip关于缓存的用法
  • minizinc学习记录
  • 如何优雅解决缓存与数据库的数据一致性问题?
  • Docker实践:使用Docker部署WhoDB开源轻量级数据库管理工具
  • 飞船躲避陨石小游戏流量主微信抖音小程序开源
  • 【ESP32设备通信】-使用Modbus RTU读取传感器数据
  • 嵌入式硬件篇---按键
  • 嵌入式硬件篇---机械臂运动学解算(3自由度)
  • CentOS 服务器docker pull 拉取失败
  • 在vue中遇到Uncaught TypeError: Assignment to constant variable(常亮无法修改)
  • 后台管理系统登录模块(双token的实现思路)
  • 音视频学习(四十一):H264帧内压缩技术
  • 通俗易懂神经网络:从基础到实现