当前位置: 首页 > wzjs >正文

学做家常菜的网站最新热点新闻事件素材

学做家常菜的网站,最新热点新闻事件素材,网页设计图片居右代码,外企公司网站开发设计需求背景: 有一些xls大文件数据。使用spark-excel(spark-excel)来读取时,文件太大会oom;工具提供的流式读取参数:maxRowsInMemory 也只支持xlsx类型文件。搜索了poi流式读取xls的方案,HSSFEvent…

需求背景: 有一些xls大文件数据。使用spark-excel(spark-excel)来读取时,文件太大会oom;工具提供的流式读取参数:maxRowsInMemory 也只支持xlsx类型文件。搜索了poi流式读取xls的方案,HSSFEventFactory提供了HSSFListener进行逐条处理数据。所以编写了spark读取xls的简易source。代码如下:

spark.read.format(“xls”).option(“path”, logPath).load()能够跑通。但是对应xls大文件还是会oom。具体了解后得到原因:SSTRecord存储了整个excel中所有字符串去重后结果,LabelSSTRecord只是存储了该字符串值在SSTRecord中的索引位置。所以在逐条处理xls文件数据的时候遇到SSTRecord还是会oom。

结论:没实现成功,失败;找不到其它实习方案,只能python脚本提前将xls文件转为csv。

package cn.keytop.source.xlsimport org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.poi.hssf.eventusermodel._
import org.apache.poi.hssf.eventusermodel.dummyrecord.LastCellOfRowDummyRecord
import org.apache.poi.hssf.record._
import org.apache.poi.hssf.usermodel.HSSFDataFormatter
import org.apache.poi.poifs.filesystem.POIFSFileSystem
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}import scala.collection.mutable.ArrayBuffer/*** @author: 王建成* @since: 2025/4/18 13:46* @description: coding需求和地址 编写一个spark source plugin来读取xls大文件数据*/              
class XLSReader {def read(pathStr: String, sqlContext: SQLContext): org.apache.spark.sql.DataFrame = {val hadoopConf = sqlContext.sparkContext.hadoopConfigurationval fsPath = new Path(pathStr)val fs = fsPath.getFileSystem(hadoopConf)// 获取所有 .xls 文件val allFiles: Array[Path] = {if (fs.isDirectory(fsPath)) {fs.listStatus(fsPath).filter(f => f.isFile && f.getPath.getName.toLowerCase.endsWith(".xls")).map(_.getPath)} else {Array(fsPath)}}// 每个文件读取出一个 DataFrame,然后合并val dfs = allFiles.map { filePath =>println(s"Reading XLS file: $filePath")readSingleXLS(filePath, fs, sqlContext)}dfs.reduceOption(_.union(_)).getOrElse {// 如果目录下没有任何 xls 文件sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], StructType(Nil))}}private def readSingleXLS(path: Path, fs: FileSystem, sqlContext: SQLContext): DataFrame = {val inputStream = fs.open(path)val fsPOI = new POIFSFileSystem(inputStream)val rowsBuffer = ArrayBuffer[ArrayBuffer[String]]()var sstRecord: SSTRecord = nullvar headers: ArrayBuffer[String] = ArrayBuffer()var currentRow = ArrayBuffer[String]()var currentRowNum = -1val listener = new HSSFListener {val formatter = new HSSFDataFormatter()override def processRecord(record: Record): Unit = {record match {case sst: SSTRecord =>sstRecord = sstcase label: LabelSSTRecord =>val value = sstRecord.getString(label.getSSTIndex).toStringensureSize(currentRow, label.getColumn + 1, "")currentRow(label.getColumn) = valuecurrentRowNum = label.getRowcase number: NumberRecord =>val value = number.getValue.toStringensureSize(currentRow, number.getColumn + 1, "")currentRow(number.getColumn) = valuecurrentRowNum = number.getRowcase _: LastCellOfRowDummyRecord =>if (currentRow.nonEmpty) {if (currentRowNum == 0 && headers.isEmpty) {headers = currentRow.clone()} else {rowsBuffer += currentRow.clone()}}currentRow.clear()currentRowNum = -1case _ =>}}def ensureSize(buffer: ArrayBuffer[String], size: Int, default: String): Unit = {while (buffer.size < size) {buffer += default}}}val factory = new HSSFEventFactory()val request = new HSSFRequest()val listener1 = new MissingRecordAwareHSSFListener(listener)val listener2 = new FormatTrackingHSSFListener(listener1)request.addListenerForAllRecords(listener2)factory.processWorkbookEvents(request, fsPOI)val schema = StructType(headers.map(name => StructField(name, StringType, nullable = true)))val rows = rowsBuffer.map(Row.fromSeq)sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(rows), schema)}}
package cn.keytop.source.xlsimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}import java.io.Serializable
/*** @author: 王建成* @since: 2025/4/18 13:46* @description: coding需求和地址 编写一个spark source plugin来读取xls大文件数据*/
class DefaultSource extends RelationProvider with DataSourceRegister with Serializable{override def shortName(): String = "xls"override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {val path = parameters.getOrElse("path", throw new IllegalArgumentException("Missing path"))val reader = new XLSReader()val df = reader.read(path, sqlContext)new BaseRelation with TableScan {override def sqlContext: SQLContext = sqlContextoverride def schema: StructType = df.schemaoverride def buildScan(): RDD[Row] = df.rdd}}
}
http://www.dtcms.com/wzjs/451602.html

相关文章:

  • wordpress资讯站黄页推广平台有哪些
  • 郑州网站建设廴汉狮网络网络营销八大职能
  • 网站建设投标评分标准google网址直接打开
  • 网站建设 www.y1web.com中国企业网官方网站
  • 百度上做网站模板兰州网站seo服务
  • 住宿和餐饮网站建设的推广成都seo优化排名推广
  • 微山县建设.局网站今日国际新闻头条15条简短
  • 快捷做网站app推广活动策划方案
  • 有哪些公司做网站广州seo外包多少钱
  • wordpress 空格 插件百度seo高级优化
  • 南召网站建设路由优化大师官网
  • wordpress本地上传插件上海专业优化排名工具
  • 淮安专业做网站的公司网络测试
  • 做抽奖的网站犯法吗怎么注册一个网站
  • eclipse网站开发教程2023第二波疫情已经到来
  • 农业网站建设模板下载百家港 seo服务
  • 个人怎么做旅游网站谷歌浏览器中文手机版
  • wordpress vantage主题沈阳网站关键词优化公司
  • 淘宝客没有网站怎么做百度关键词推广教程
  • 做那个免费视频网站百度大数据中心
  • 有哪些可以做1元夺宝的网站新手20种引流推广方法
  • 网站建设过程中应该注意的事项有无锡网站建设优化公司
  • 做百度手机网站快速排seo如何提高排名
  • 哪个公司需要做网站爱站网挖掘关键词
  • 如何用phpstudy做网站精品成品网站源码
  • 企业网站栏目设置app开发需要多少钱
  • 泰安网站开发网站排名优化需要多久
  • ppt做的最好的网站网址创建
  • 百度知道2020 惠州seo服务
  • o2o网站建设咨询竞价培训课程