当前位置: 首页 > news >正文

hadoop的api操作对象存储

一、获取文件或目录

1. 获取某个目录下的文件


// 必须的依赖
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}// 获取某个目录下的文件路径
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 获取文件系统val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意这里用 URI 让 Hadoop 根据 scheme 找对应 FS// 递归获取该目录下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 获取文件路径val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 关闭文件系统fs.close()// 返回结果buffer.toArray
}// 设定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") // 读取oss的路径// 需要指定的路径
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)

2.  获取某个目录下的子目录

import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}/**
* 获取某个目录下所有子目录的路径, 以字符串数组的形式返回
*/
def getOnlineFirstDir: Array[String] = {// 获取路径val path = s"s3://aa/bb/"val filePath = new org.apache.hadoop.fs.Path( path )// 获取文件系统val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 获取所有子目录的路径val allFiles = FileUtil.stat2Paths( fileSystem.listStatus( filePath ) )val res = allFiles.filter( fileSystem.getFileStatus( _ ).isDirectory() ).map( _.toString)// 返回结果res
}

二、删除文件或目录

/*** 删除目录*/
def deletePath(spark: SparkSession, path: String): Unit = {// 1 获取文件系统val file_path = new org.apache.hadoop.fs.Path( path )val file_system = file_path.getFileSystem( spark.sparkContext.hadoopConfiguration )// 2 判断路径存在时, 则删除if (file_system.exists( file_path )) {file_system.delete( file_path, true )}
}

三、获取文件或目录大小

/*** 获取某个目录的大小(单位b字节),注意:只能在driver端使用,可以多线程来提速。*/
def get_path_size(spark: SparkSession, path: String): Long = {//取文件系统val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 获取该目录的大小,单位是字节if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}
}

四、判读文件或目录是否存在

方式一
/*** 判断目录是否存在,注意:只能在driver端使用,可以多线程来提速。问题: 对删除过的目录可能会误判*/
def pathIsExist(spark: SparkSession, path: String): Boolean = {//取文件系统val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 判断路径是否存在fileSystem.exists( filePath )
}方式二
/*** 通过目录是否大于0来判断目录是否存在(消除对删除过的目录的误判),注意:只能在driver端使用,可以多线程来提速。*/
def def pathIsExist(spark: SparkSession, path: String): Boolean = //取文件系统val filePath = new org.apache.hadoop.fs.Path( path )val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )// 获取该目录的大小,单位是字节val size = if (fileSystem.exists( filePath )) {fileSystem.getContentSummary( filePath ).getLength} else {0}// 返回结果size > 0}

五、parquet文的行组信息


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.parquet.column.statistics.Statistics
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.Binaryimport java.{lang, util}// 获取某个目录下的文件路径
def list_file(conf: Configuration, dir_path: String, is_recursive: Boolean = false): Array[String] = {// 获取文件系统val fs = FileSystem.get(new java.net.URI(dir_path), conf) // 注意这里用 URI 让 Hadoop 根据 scheme 找对应 FS// 递归获取该目录下所有文件val it: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(dir_path), is_recursive)// 获取文件路径val buffer = scala.collection.mutable.ArrayBuffer[String]()while (it.hasNext) {val fileStatus = it.next()buffer += fileStatus.getPath.toString}// 关闭文件系统fs.close()// 返回结果buffer.toArray
}// 某个文件某列的行组信息
def print_row_groupp(conf: Configuration, file_name: String, col_name: String): Unit = {// 读取元数据val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 遍历每个行组,并手动添加索引val blocks: util.List[BlockMetaData] = footer.getBlocksfor (i <- 0 until blocks.size()) {val block = blocks.get(i)println(s"Row Group #${i}:")println(s"  - Total Rows: ${block.getRowCount}")println(s"  - Total Size: ${block.getTotalByteSize} bytes")// 遍历每个列块block.getColumns.forEach { columnChunkMetaData =>val columnPath = columnChunkMetaData.getPath.toDotString// 过滤目标列if (columnPath == col_name) {val statistics: Statistics[_] = columnChunkMetaData.getStatisticsprintln(s"  Column: $columnPath")if (statistics != null) {// 获取最小值和最大值并解码val minValue = statistics.genericGetMin match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}val maxValue = statistics.genericGetMax match {case b: Binary => b.toStringUsingUTF8case l: lang.Long => l.toStringcase i: Integer => i.toStringcase other => other.toString}println(s"    - Min Value: $minValue")println(s"    - Max Value: $maxValue")println(s"    - Null Count: ${statistics.getNumNulls}")} else {println("    - No statistics available for this column.")}println("    ------")}}println("======================")}}// 某个文件的行组数
def get_row_group_size(conf: Configuration, file_name: String): Int = {// 读取元数据val parquetFilePath = new Path(file_name)val inputFile: HadoopInputFile = HadoopInputFile.fromPath(parquetFilePath, conf)val footer: ParquetMetadata = ParquetFileReader.open(inputFile).getFooter// 行组数footer.getBlocks.size()
}// 设定配置文件
val conf = new Configuration()
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")// 需要指定的路径
val path = "oss://aa/bb/"
val file_paths = list_file(conf, path).filter(x => x.contains("parquet"))
file_paths.foreach(println)// 获取第一个文件的行组信息
val first_file = file_paths(0)
print_row_groupp(conf, first_file, "odid")// 统计行组数
for (file_path <- file_paths) {val file_index = file_path.split("part-")(1).split("-")(0)println(file_index + " = " + get_row_group_size(conf, file_path))
}


文章转载自:

http://4lYznaEU.hsjfs.cn
http://BFk9bpUh.hsjfs.cn
http://tM4hvizQ.hsjfs.cn
http://d6Mtw6cP.hsjfs.cn
http://OlKblxmc.hsjfs.cn
http://ezpwpZ8x.hsjfs.cn
http://po5djCKq.hsjfs.cn
http://Z1LAQzjz.hsjfs.cn
http://3ydIP7hc.hsjfs.cn
http://NKds7aW0.hsjfs.cn
http://0UjpzU0c.hsjfs.cn
http://ERsVQKKd.hsjfs.cn
http://y2Em6ml2.hsjfs.cn
http://NtGDgzUq.hsjfs.cn
http://l6cdjQKc.hsjfs.cn
http://In5XPE29.hsjfs.cn
http://pbmHmB07.hsjfs.cn
http://q0Dnv7sg.hsjfs.cn
http://OXjTTRuH.hsjfs.cn
http://cOJiV72r.hsjfs.cn
http://lHcbfl4Z.hsjfs.cn
http://TbtjKe4Q.hsjfs.cn
http://xk9jus70.hsjfs.cn
http://PMzo4jDG.hsjfs.cn
http://5r9iIQHk.hsjfs.cn
http://SA8AvWON.hsjfs.cn
http://i8eZEAc9.hsjfs.cn
http://BfVAsLtj.hsjfs.cn
http://Dn1DBuVS.hsjfs.cn
http://yqhQ7z4X.hsjfs.cn
http://www.dtcms.com/a/378260.html

相关文章:

  • 硬件开发_基于物联网的沼气池环境监测系统
  • 水质在线监测系统御控物联网解决方案
  • A股大盘数据-20250911分析
  • 【星海出品】rabbitMQ - 叁 应用篇
  • 【npm】npm 包更新工具 npm-check-updates (ncu)
  • pnpm相对于npm,yarn的优势
  • vue3源码学习(四)watch 源码学习
  • 利用JSONCrack与cpolar提升数据可视化及跨团队协作效率
  • 短剧小程序系统开发:打造个性化娱乐新平台
  • 从MySQL到StarRocks:全量与增量同步的最佳实践
  • 第七篇:识破“共因失效”——如何阻止汽车系统的“团灭”危机
  • SSL部署完成,https显示连接不安全如何处理?
  • Java 与 AI 生态:深度学习框架的支持现状
  • Linux iptables 实战:配置 NAT 端口转发访问内网 MySQL
  • docker,自定义镜像dockerfile
  • 分布式专题——9 Redis7底层数据结构解析
  • WPF 数据绑定模式详解(TwoWay、OneWay、OneTime、OneWayToSource、Default)
  • 前端埋点系统架构设计与优化实践
  • SEO新手入门:什么是SEO及其作用
  • Nginx性能优化与防盗链实战指南
  • C++类(上)默认构造和运算符重载
  • 字符串大数相加:从初稿到优化的思路演进
  • 追根索源-神经网络的灾难性遗忘原因
  • 零碎的嵌入式笔记2
  • 室内配线工程量计算-批量测量更方便
  • 深入理解 Gateway 网关:原理、源码解析与最佳实践
  • 3.List,set 与 Zset(Redis数据类型)
  • 前沿探索:RISC-V 架构 MCU 在航天级辐射环境下的可靠性测试
  • 苹果上架App软件全流程指南:iOS 应用发布步骤、App Store 上架流程、uni-app 打包上传与审核技巧详解
  • NW622NW623美光固态闪存NW624NW635