当前位置: 首页 > wzjs >正文

php做的网站源代码在哪里品牌建设是指

php做的网站源代码在哪里,品牌建设是指,社区团购系统开发,构建一个网站需要什么1、算子的简单介绍 Transformation(转换)算子:根据数据集创建一个新的数据集,计算后返回一个新RDD,例如一个rdd进行map操作后生了一个新的rdd。 Action(动作)算子:对rdd结果计算后返回一个数值value给驱动程序(driver),例如collect算子将数据集的所有元素收集完成返回给驱动程…

1、算子的简单介绍

Transformation(转换)算子:根据数据集创建一个新的数据集,计算后返回一个新RDD,例如一个rdd进行map操作后生了一个新的rdd。

Action(动作)算子:对rdd结果计算后返回一个数值value给驱动程序(driver),例如collect算子将数据集的所有元素收集完成返回给驱动程序。

控制算子:对数据集进行特殊操作,例如cache算子将对于重复使用的算子,进行cache做缓存使用,数据只保存在内存中,性能提升。

懒执行:Spark中转化算子和控制算子是懒执行的,需要Action算子触发才能执行。

懒执行就是延迟计算的意思,就像是创建了一个视图,他并不是把查询好的数据放入视图了,而是当你需要这些数据时,查看视图时,他才执行定义视图时候的SQL语句。

注意:

Driver即运行Application的main()函数并且创建SparkContext。

Application用户编写的Spark应用程序。

SparkContext整个应用的上下文、控制应用的生命周期。

job即在每一个application中,有几个action,就会产生几个job。

2、算子的使用

·map 算子

作用:对 RDD 中的每个元素应用给定的函数 f,将每个元素转换为另一个元素,最终返回一个新的 RDD。这个函数 f 接收一个输入类型为 T 的元素,返回一个类型为 U 的元素。

格式:def map[U: ClassTag](f: T => U): RDD[U]

import org.apache.spark.{SparkConf, SparkContext}
object MapExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MapExample").setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val newRdd = rdd.map(x => x * 2)newRdd.collect().foreach(println)sc.stop()}
}

·filter算子

作用:筛选出 RDD 中满足函数 f 条件(即 f 函数返回 true)的元素,返回一个新的 RDD,新 RDD 中的元素类型与原 RDD 相同。

格式:def filter(f: T => Boolean): RDD[T]

import org.apache.spark.{SparkConf, SparkContext}
object FilterExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FilterExample").setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val newRdd = rdd.filter(x => x % 2 == 0)newRdd.collect().foreach(println)sc.stop()
}}

·flatma算子

作用:对 RDD 中的每个元素应用函数 f,函数 f 返回一个可遍历的集合,然后将这些集合中的元素扁平化合并成一个新的 RDD。

格式:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

import org.apache.spark.{SparkConf, SparkContext}
object FlatMapExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FlatMapExample").setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq("hello world", "spark is great"))val newRdd = rdd.flatMap(x => x.split(" "))newRdd.collect().foreach(println)sc.stop()}}

·reduceByKey算子

reduceByKey 是 Spark 中用于处理键值对(Key - Value)类型 RDD 的一个重要转换算子。它的核心作用是对具有相同键的所有值进行聚合操作,通过用户提供的聚合函数将这些值合并成一个结果,从而实现数据的归约和统计。例如统计每个键出现的次数、计算每个键对应值的总和、平均值等。

格式

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

参数说明:

func: (V, V) => V:这是一个二元函数,用于定义如何对相同键的值进行聚合。函数接收两个类型为 V 的值,返回一个类型为 V 的结果。例如,若要对相同键的值进行求和,func 可以是 (x, y) => x + y。

numPartitions: Int(可选):指定结果 RDD 的分区数。如果不提供该参数,将使用默认的分区数。

import org.apache.spark.{SparkConf, SparkContext}
object ReduceByKeyExample {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local[*]")// 创建 SparkContext 对象val sc = new SparkContext(conf)// 创建一个包含单词的 RDDval words = sc.parallelize(List("apple", "banana", "apple", "cherry", "banana", "apple"))// 将每个单词映射为 (单词, 1) 的键值对val wordPairs = words.map(word => (word, 1))// 使用 reduceByKey 计算每个单词的出现次数val wordCounts = wordPairs.reduceByKey(_ + _)// 输出结果wordCounts.collect().foreach(println)// 停止 SparkContextsc.stop()}
}

·Transformation算子   

val conf = new SparkConf().setAppName("WordCount").setMaster("local")val sc = new SparkContext(conf)val pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)),4)//map(函数),一一映射,分区数量不变,有多少条数据,就被会运行多少次。val value1: RDD[(Int, Int)] = pairRdd.map(x => (x._1, x._2 + 1))//定义一个迭代函数val f1 = (iter:Iterator[(Int, Int)]) => iter.map(x=>(x._1,x._2*2))//mapPartitions函数可以认为是Map的变种,可以对分区进行并行处理,两者的区别是调用的颗粒度不一样,map的输入函数是应用于RDD的每个元素,而mapPartition的输入函数是应用于RDD的每个分区。val value2: RDD[(Int, Int)] = pairRdd.mapPartitions(f1)//定义一个迭代函数val f2 = (index:Int,iter:Iterator[(Int, Int)]) => iter.map((index,_))//mapPartitionsWithIndex函数类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]val value3: RDD[(Int, (Int, Int))] = pairRdd.mapPartitionsWithIndex(f2)//filter函数,返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成,fitler并不会改变分区的数量,之前有几个,现在仍然有几个分区。val value4: RDD[(Int, Int)] = pairRdd.filter(x => x._2 % x._1 == 0)//flatMap函数: map之后,再flatten。每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)val value5: RDD[Char] = pairRdd.flatMap(x => x._1.toString + x._2.toString)//groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD,可重新指定分区个数val value6: RDD[(Int, Iterable[Int])] = pairRdd.groupByKey(1)//groupBy函数与groupByKey类似,底层调用groupByKey,返回一个(K, Iterator[V])的RDD,可重新指定分区个数val value7: RDD[(Int, Iterable[(Int, Int)])] = pairRdd.groupBy(_._1, 1)//groupBy后要搭配MapValues()使用//reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置val value8: RDD[(Int, Int)] = pairRdd.reduceByKey(_ + _, 1)//aggregateByKey 聚合类算子:初始化在每一个分区聚合中参数运行,但是在全局聚合中,不参与,类似于reduceByKey,但多个初始值val value11: RDD[(Int, Int)] = pairRdd.aggregateByKey(100)(_ + _, _ + _)//sortByKey函数:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD,是全局排序,可指定分区数量//可设置排序规则,默认是从小到大排序,是true, 如果想从大到小排,设置为falseval value12: RDD[(Int, Int)] = pairRdd.sortByKey(false,2)//sortBy函数:与sortByKey类似,但是更灵活,可设置排序的字段val value13: RDD[(Int, Int)] = pairRdd.sortBy(_._1, false, 2)//union函数:对源RDD和参数RDD求并集后返回一个新的RDDval value14: RDD[(Int, Int)] = value1.union(value2)//intersection函数:对源RDD和参数RDD求交集后返回一个新的RDDval value15: RDD[(Int, Int)] = value1.intersection(value2)//subtract函数:对源RDD和参数RDD求差集后返回一个新的RDDval value16: RDD[(Int, Int)] = value1.subtract(value2)//join函数:对源RDD和参数RDD进行join返回一个新的RDDval value17: RDD[(Int, (Int, Int))] = value1.join(value2)//cogroup函数:对源RDD和参数RDD进行全外关联返回一个新的RDD,相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。val value18: RDD[(Int, (Iterable[Int], Iterable[Int]))] = value1.cogroup(value2)//cartesian函数:对源RDD和参数RDD进行笛卡尔积返回一个新的RDDval value19: RDD[((Int, Int), (Int, Int))] = value1.cartesian(value2)//coalesce:对RDD重新进行分区val value20: RDD[((Int, Int), (Int, Int))] = value19.coalesce(10)//repartition:类似于coalesce方法,底层就是用的coalesce方法,对RDD重新进行分区val value21: RDD[((Int, Int), (Int, Int))] = value20.repartition(5)//去重:distinct(分区数量)。val value22: RDD[((Int, Int), (Int, Int))] = value19.distinct(12)//等价于val value23: RDD[((Int, Int), (Int, Int))] =value19.map(x=>(x,null)).reduceByKey((x,y) => x).map(_._1)

·ACtion算子   

//reduce函数与reduceByKey类似,返回一个结果的RDD,+ ,++ 取决rdd的元素类型,string类型使用++val value9: Int = pairRdd.map(_._2).reduce(_ + _)//aggregate 聚合类算子:存在两次聚合。局部聚合和全局聚合。//先局部计算(100+分区一的内容) (100+分区二的内容)+...//再全局计算100+(100+分区一的内容)+ (100+分区二的内容)+...val value10: Int = pairRdd.map(_._2).aggregate(100)(_ + _, _ + _)//collect 在驱动程序中,以数组的形式返回数据集的所有元素val tuples: Array[((Int, Int), (Int, Int))] = value21.collect()//foreach 在数据集的每一个元素上,运行函数funcval unit: Unit = value11.foreach(println(_))//foreach 迭代的是每一个分区的数据//如果我们需要去获取mysql的连接,RDD中有10000 条数据,有10个分区。//foreach:获取10000次连接,性能低下。//foreapartition: 只需要获取10次连接,每一个分区中的数据,共用一个连接。 性能优越。value21.foreachPartition(println(_))//saveAsTextFile 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本value21.saveAsTextFile("路径")//count() 返回RDD的元素个数val tuples1: Array[((Int, Int), (Int, Int))] = value19.collect()//first() 返回RDD的第一个元素(类似于take(1))val tuple: ((Int, Int), (Int, Int)) = value19.first()//take(n) 返回一个由数据集的前n个元素组成的数组val tuples2: Array[((Int, Int), (Int, Int))] = value19.take(10)

注意:

sortBy 是按照 RangePartitioner作为分区器groupByKey 和 reduceByKey 中优先使用reduceByKey,因为reduceByKey会有一个局部的聚合,性能更好。

ByKey的都是转换算子,没有Key的都是action类算子。aggregateByKey和reduceByKey是转化算子,aggregate和reduce是执行算子

2.3、控制算子

cache 对于重复使用的算子,进行cache做缓存使用,数据只保存在内存中,性能提升persist 性能提升checkPoint 数据容错,当数据计算的时候,机器挂了,重新追溯到checkPoint的目录下checkPoint是将RDD持久化到磁盘中,还可以切断RDD之间的依赖关系。


文章转载自:

http://U3DQHVN4.trhLb.cn
http://1QjGPk29.trhLb.cn
http://zMMeYldM.trhLb.cn
http://rEO9HWsu.trhLb.cn
http://JLygf7fQ.trhLb.cn
http://bufupIid.trhLb.cn
http://x11sZnJu.trhLb.cn
http://8GEMfUKA.trhLb.cn
http://GzjBAXCx.trhLb.cn
http://kUV52DHc.trhLb.cn
http://tXoEw0Il.trhLb.cn
http://v4cJzS6D.trhLb.cn
http://dv8iJfWu.trhLb.cn
http://vPSpoHib.trhLb.cn
http://DGSYegqP.trhLb.cn
http://xEZBph0V.trhLb.cn
http://ZwiVC4uO.trhLb.cn
http://A91ciRaf.trhLb.cn
http://esqnwOT3.trhLb.cn
http://eCiKyvqH.trhLb.cn
http://6PccwYSv.trhLb.cn
http://wbdVj5Ds.trhLb.cn
http://TApLcs7G.trhLb.cn
http://8uTSTe2A.trhLb.cn
http://xkEzJ33D.trhLb.cn
http://PtvY32Nz.trhLb.cn
http://LCCCNdYE.trhLb.cn
http://pq9WxNi5.trhLb.cn
http://XZlYJCgw.trhLb.cn
http://E8DltmY7.trhLb.cn
http://www.dtcms.com/wzjs/606616.html

相关文章:

  • jquery网站开发wordpress添加返回目录标签
  • 云南网站建设价格网站建设公司合同模板
  • 我是做性视频网站甘肃建设职工教育培训中心网站
  • 网站开发与应用专业福州专业网站建设怎么做
  • wordpress jitpecj插件seo排名网站 优帮云
  • 电子商务网站建设经费北京专业企业营销网站建设
  • 贵州省建设监理协会官方网站重庆网站seo建设
  • ucloud网站开发电子科技企业网站建设
  • 制作网页网站项目介绍怎么让关键词快速上首页
  • 建网站需要多少钱石家庄c 购物网站开发流程图
  • 常德公司做网站网站建设辶首先金手指十四
  • 校园网站建设需求分析小本本教你做网站
  • 免费行情网站app大全电商网站订烟平台官网
  • php企业网站模板下载建筑设计公司logo
  • 福田做棋牌网站建设哪家技术好做微信平台图片网站
  • 怎么做淘宝联盟的推广网站提高审美的网站推荐
  • 网站的运营模式不允许访问网站
  • 宠物出售的网站怎么做网页制作方案策划
  • 做年会的网站现在网站一般做多大的
  • 阿坝网站设计属于c2c网站的有哪几个
  • 上海有制作网站的电话吗海口网站建设的开发方案
  • 网站建设与规划试卷兼职做网站安全么
  • vps网站压缩权威发布图片
  • 有哪些做简历的网站开展我国电子网站建设
  • 广州网站建设哪家公司好免费推广的软件
  • 网站建设 三牛xps13适合网站开发吗
  • 地方门户类网站建设项目查询
  • 中国投诉网站做袜子机器多少钱一台网页动态设计怎么做
  • docker 做网站网站介绍模板
  • 网站建设的具体代码新余网站开发