当前位置: 首页 > wzjs >正文

邯郸哪儿做网站好酒店推广渠道有哪些

邯郸哪儿做网站好,酒店推广渠道有哪些,云南网站建设首选才力,中国十大网站建设企业目录 代码分析 背景知识拓展 代码调优 1. 性能优化 1.1 使用 KeyedStream 和 ProcessWindowFunction 替代 windowAll 1.2 使用 ReduceFunction 优化聚合 2. 功能扩展 2.1 支持动态窗口大小 2.2 支持多维度统计 2.3 支持持久化存储 3. 代码可读性 3.1 提取公共逻辑 …

目录

代码分析

背景知识拓展

代码调优

1. 性能优化

1.1 使用 KeyedStream 和 ProcessWindowFunction 替代 windowAll

1.2 使用 ReduceFunction 优化聚合

2. 功能扩展

2.1 支持动态窗口大小

2.2 支持多维度统计

2.3 支持持久化存储

3. 代码可读性

3.1 提取公共逻辑

3.2 使用 Scala 的高级特性

4. 异常处理

4.1 处理数据异常

4.2 处理作业异常

5. 完整改进代码

总结


 

package processfunctionimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import source.ClickSourceimport scala.collection.mutable/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: processfunction* @author: 赵嘉盟-HONOR* @data: 2023-11-24 21:32* @DESCRIPTION**/
object TopNProcessAllWindowExample {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)data.map(_.url).windowAll(TumblingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(new ProcessAllWindowFunction[String,String,TimeWindow] {override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {val urlCountMap=mutable.Map[String,Int]()elements.foreach(data=>{urlCountMap.get(data) match {case Some(value) => urlCountMap.put(data,value+1)case None => urlCountMap.put(data,1)}})val tuples = urlCountMap.toList.sortBy(-_._2).take(10)val builder = new StringBuilder()builder.append(s"=========窗口:${context.window.getStart} ~ ${context.window.getEnd}========\n")for (i <- tuples.indices){val tuple = tuples(i)builder.append(s"浏览量Top ${i+1} url:${tuple._1} 浏览量是: ${tuple._2} \n")}out.collect(builder.toString())}}).print()env.execute("TopNDemo1")}
}

代码分析

 这段代码使用 Apache Flink 处理流数据,并计算每个时间窗口内 URL 的访问量 TopN。以下是代码的详细解释:

  1. 环境设置

    • val env = StreamExecutionEnvironment.getExecutionEnvironment:获取 Flink 的执行环境。
    • env.setParallelism(1):设置并行度为 1,即单线程执行。
  2. 数据源

    • val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp):从 ClickSource 获取数据,并分配时间戳。ClickSource 是一个自定义的数据源,生成模拟的点击事件。
  3. 窗口操作

    • data.map(_.url):将数据流中的每个事件映射为 URL。
    • .windowAll(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))):定义一个滚动窗口,窗口大小为 10 秒,滑动步长为 5 秒。windowAll 表示对所有数据进行窗口操作。
  4. 处理函数

    • .process(new ProcessAllWindowFunction[String, String, TimeWindow] { ... }):使用 ProcessAllWindowFunction 处理窗口内的所有数据。
    • urlCountMap:用于存储每个 URL 的访问次数。
    • elements.foreach(data => { ... }):遍历窗口内的所有 URL,更新 urlCountMap
    • val tuples = urlCountMap.toList.sortBy(-_._2).take(10):将 urlCountMap 转换为列表,按访问量降序排序,并取前 10 个。
    • builder.append(...):构建输出字符串,包含窗口时间和 TopN URL 的访问量。
    • out.collect(builder.toString()):将结果输出。
  5. 执行

    • env.execute("TopNDemo1"):启动 Flink 作业。

背景知识拓展

  1. Apache Flink

    • Apache Flink 是一个分布式流处理框架,支持高吞吐、低延迟的实时数据处理。
    • Flink 提供了丰富的 API,包括 DataStream API(用于流处理)和 DataSet API(用于批处理)。
  2. 流处理

    • 流处理是一种处理无界数据流的技术,适用于实时数据分析、监控等场景。
    • 与批处理不同,流处理是连续进行的,数据到达即处理。
  3. 窗口操作

    • 窗口操作是流处理中的核心概念,用于将无界数据流划分为有限的数据集进行处理。
    • 常见的窗口类型包括滚动窗口、滑动窗口和会话窗口。
  4. 时间语义

    • Flink 支持三种时间语义:事件时间(Event Time)、处理时间(Processing Time)和摄入时间(Ingestion Time)。
    • 事件时间是指事件实际发生的时间,通常用于处理乱序事件。
  5. ProcessFunction

    • ProcessFunction 是 Flink 提供的一种底层 API,允许用户自定义处理逻辑,包括访问时间戳、状态管理等。
    • ProcessAllWindowFunction 是 ProcessFunction 的一种,用于处理窗口内的所有数据。
  6. TopN 计算

    • TopN 计算是一种常见的分析任务,用于找出数据集中最频繁或最重要的元素。
    • 在流处理中,TopN 计算通常结合窗口操作进行,以实时更新结果。

代码调优

1. 性能优化

1.1 使用 KeyedStream 和 ProcessWindowFunction 替代 windowAll
  • 问题windowAll 会将所有数据发送到单个任务中,无法并行处理,性能较差。
  • 改进:使用 KeyedStream 对数据进行分组(例如按 URL 分组),然后使用 ProcessWindowFunction 处理每个窗口内的数据。
  • 代码示例
val keyedData = data.keyBy(_.url) // 按 URL 分组
keyedData.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessWindowFunction[ClickEvent, (String, Int), String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[ClickEvent], out: Collector[(String, Int)]): Unit = {out.collect((key, elements.size)) // 输出 URL 及其访问次数}})
1.2 使用 ReduceFunction 优化聚合
  • 问题:在 ProcessAllWindowFunction 中,每次都需要遍历所有数据,效率较低。
  • 改进:使用 ReduceFunction 或 AggregateFunction 进行增量聚合,减少数据遍历次数。
  • 代码示例
val reducedData = keyedData.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).reduce((event1, event2) => event1) // 仅保留一个事件,统计访问次数

2. 功能扩展

2.1 支持动态窗口大小
  • 问题:窗口大小和滑动步长是固定的,无法动态调整。
  • 改进:通过配置文件或外部参数动态设置窗口大小和滑动步长。
  • 代码示例
val windowSize = Time.seconds(10) // 从配置中读取
val slideSize = Time.seconds(5)   // 从配置中读取
data.windowAll(TumblingEventTimeWindows.of(windowSize, slideSize))
2.2 支持多维度统计
  • 问题:当前仅统计 URL 的访问量,无法支持其他维度的统计(如用户、设备等)。
  • 改进:扩展统计维度,支持按用户、设备等多维度统计。
  • 代码示例
val userKeyedData = data.keyBy(_.user) // 按用户分组
userKeyedData.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessWindowFunction[ClickEvent, (String, Int), String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[ClickEvent], out: Collector[(String, Int)]): Unit = {out.collect((key, elements.size)) // 输出用户及其访问次数}})
2.3 支持持久化存储
  • 问题:结果仅打印到控制台,无法持久化存储。
  • 改进:将结果写入外部存储系统(如 Kafka、HDFS、数据库等)。
  • 代码示例
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
val kafkaProducer = new FlinkKafkaProducer[String]("output-topic", new SimpleStringSchema(), properties)
resultStream.addSink(kafkaProducer)

3. 代码可读性

3.1 提取公共逻辑
  • 问题:统计逻辑和输出逻辑耦合在一起,代码可读性较差。
  • 改进:将统计逻辑和输出逻辑分离,提取为独立的方法或类。
  • 代码示例
def countUrlAccess(elements: Iterable[String]): List[(String, Int)] = {val urlCountMap = mutable.Map[String, Int]()elements.foreach(url => urlCountMap.update(url, urlCountMap.getOrElse(url, 0) + 1))urlCountMap.toList.sortBy(-_._2).take(10)
}def formatResult(window: TimeWindow, tuples: List[(String, Int)]): String = {val builder = new StringBuilder()builder.append(s"=========窗口:${window.getStart} ~ ${window.getEnd}========\n")tuples.zipWithIndex.foreach { case ((url, count), index) =>builder.append(s"浏览量Top ${index + 1} url: $url 浏览量是:$count \n")}builder.toString()
}
3.2 使用 Scala 的高级特性
  • 问题:代码风格较为传统,未充分利用 Scala 的高级特性。
  • 改进:使用 Scala 的 case classOption 和函数式编程特性。
  • 代码示例
case class UrlAccess(url: String, count: Int)val result = elements.groupBy(identity).map { case (url, urls) => UrlAccess(url, urls.size) }.toList.sortBy(-_.count).take(10)

4. 异常处理

4.1 处理数据异常
  • 问题:未处理数据异常(如空数据、非法数据等)。
  • 改进:添加异常处理逻辑,确保作业的健壮性。
  • 代码示例
try {val tuples = countUrlAccess(elements)out.collect(formatResult(context.window, tuples))
} catch {case e: Exception => println(s"处理窗口数据时发生异常:${e.getMessage}")
}
4.2 处理作业异常
  • 问题:未处理作业级别的异常(如数据源故障、网络中断等)。
  • 改进:使用 Flink 的 RestartStrategy 和 Checkpointing 机制。
  • 代码示例
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)))
env.enableCheckpointing(5000) // 每 5 秒进行一次 checkpoint

5. 完整改进代码

以下是结合上述改进的完整代码:

package processfunctionimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import source.ClickSourceimport scala.collection.mutablecase class UrlAccess(url: String, count: Int)object TopNProcessAllWindowExample {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)data.map(_.url).windowAll(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction[String, String, TimeWindow] {override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {try {val tuples = countUrlAccess(elements)out.collect(formatResult(context.window, tuples))} catch {case e: Exception => println(s"处理窗口数据时发生异常:${e.getMessage}")}}}).print()env.execute("TopNDemo1")}def countUrlAccess(elements: Iterable[String]): List[UrlAccess] = {elements.groupBy(identity).map { case (url, urls) => UrlAccess(url, urls.size) }.toList.sortBy(-_.count).take(10)}def formatResult(window: TimeWindow, tuples: List[UrlAccess]): String = {val builder = new StringBuilder()builder.append(s"=========窗口:${window.getStart} ~ ${window.getEnd}========\n")tuples.zipWithIndex.foreach { case (UrlAccess(url, count), index) =>builder.append(s"浏览量Top ${index + 1} url: $url 浏览量是:$count \n")}builder.toString()}
}

总结

通过上述改进,代码在 性能功能可读性 和 健壮性 方面都得到了显著提升。你可以根据实际需求进一步调整和扩展代码,例如支持更多维度统计、集成外部存储系统等。

http://www.dtcms.com/wzjs/187589.html

相关文章:

  • 专业网站建设最便宜石家庄百度推广排名优化
  • 做企业网站服务器在国外文案代写平台
  • 广州站电话自媒体代运营
  • 保定网站制作价格淘宝店铺怎么推广
  • 柳州做网站去哪家公司好seo排名系统
  • 网站仿做软件网站域名查询系统
  • 塘沽做网站的公司网站seo策划方案案例分析
  • 温州纪委作风建设网站外贸推广具体是做什么
  • 静态网站设计南京疫情最新消息
  • 加强住房公积金网站建设四川seo技术培训
  • 迅速建设企业网站关键词优化seo外包
  • 黄骅做网站_黄骅昊信科技|黄骅网站|黄骅网站开发|黄骅微信|黄骅seo外链工具源码
  • 微信网站的链接标志图片如何做促销活动推广方法有哪些
  • 网站倒计时代码优化近义词
  • 专业做传奇网站解析南宁seo服务公司
  • 高端网站制作平台百度搜索引擎推广
  • 如何选择宜昌网站建设沧州seo包年优化软件排名
  • 网上购物网站大全收录查询 站长工具
  • 一个域名做两个网站可以么深圳网站建设的公司
  • 免费炫酷网站模板seo超级外链工具
  • 云南网站做的好的公司哪家好新闻类软文营销案例
  • 专门做会议的网站手机百度浏览器
  • 青岛做物流网站淘宝网店代运营正规公司
  • 公安备案网站首页企业宣传文案
  • 公司网站百度小程序开发重庆seo整站优化设置
  • dedecms手机网站制作如何做友情链接
  • 安装网站到服务器怎样申请网站
  • 新乡网站制作接广告的平台
  • 网站功能优化的方法网络营销的流程和方法
  • 湛江做网站的网站京津冀协同发展