当前位置: 首页 > wzjs >正文

wordpress站群seoc2c网站有哪些平台

wordpress站群seo,c2c网站有哪些平台,学校网站建设流程,创意营销点子目录 代码结构 代码解析 (1) 主程序入口 (2) 窗口联结(Window Join) (3) 间隔联结(Interval Join) (4) 窗口同组联结(CoGroup) (5) 执行任务 代码优化 (1) 时间戳分配 (2) 窗口大小 (3) 输出格式…

目录

代码结构

代码解析

(1) 主程序入口

(2) 窗口联结(Window Join)

(3) 间隔联结(Interval Join)

(4) 窗口同组联结(CoGroup)

(5) 执行任务

代码优化

(1) 时间戳分配

(2) 窗口大小

(3) 输出格式

(4) 并行度

优化后的代码


 

这段代码展示了 Apache Flink 中三种不同的流联结操作:窗口联结(Window Join)间隔联结(Interval Join) 和 窗口同组联结(CoGroup)。以下是对代码的详细解析和说明: 

代码结构

  • 包声明package transformplus
    定义了代码所在的包。

  • 导入依赖
    导入了 Flink 相关类库,包括流处理 API、窗口分配器、时间语义等。

  • WindowJoin 对象
    主程序入口,包含三种流联结操作的实现。

package transformplusimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Event/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: transformplus* @author: 赵嘉盟-HONOR* @data: 2023-12-05 12:05* @DESCRIPTION**/
object WindowJoin {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//TODO 窗口联结(join)val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)).assignAscendingTimestamps(_._2)val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)).assignAscendingTimestamps(_._2)stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1,e2)=>e1+"->"+e2).print("Join")//TODO 间隔联结:用户行为事件联系(intervalJoin)// 订单事件流val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignAscendingTimestamps(_._3)// 点击事件流val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignAscendingTimestamps(_.timestamp)orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5),Time.seconds(10)).process(new ProcessJoinFunction[(String,String,Long),Event,String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(in1+"=>"+in2)}}).print("intervalJoin")//TODO 窗口同组联结: coGroup(iterable)stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String,Long),(String,Long),String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(iterable+"=>"+iterable1)}}).print("coGroup")env.execute("windowJoin")}
}

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)
  • 创建 Flink 流处理环境 StreamExecutionEnvironment,并设置并行度为 1。
(2) 窗口联结(Window Join)
val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)
).assignAscendingTimestamps(_._2)val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)
).assignAscendingTimestamps(_._2)stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1, e2) => e1 + "->" + e2).print("Join")
  • 数据流:定义了两个流 stream1 和 stream2,分别包含键值对 (String, Long)
  • 时间戳分配:使用 assignAscendingTimestamps 方法为事件分配时间戳。
  • 窗口联结
    • 使用 join 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件对拼接成字符串并输出。
(3) 间隔联结(Interval Join)
val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignAscendingTimestamps(_._3)val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignAscendingTimestamps(_.timestamp)orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction[(String, String, Long), Event, String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(in1 + "=>" + in2)}}).print("intervalJoin")
  • 数据流:定义了两个流 orderStream(订单事件)和 pvStream(点击事件)。
  • 时间戳分配:为事件分配时间戳。
  • 间隔联结
    • 使用 intervalJoin 方法将两个流按键(_._1 和 user)联结。
    • 使用 between 方法定义时间间隔(前 5 秒到后 10 秒)。
    • 使用 process 方法将匹配的事件对拼接成字符串并输出。
(4) 窗口同组联结(CoGroup)
stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String, Long), (String, Long), String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(iterable + "=>" + iterable1)}}).print("coGroup")
  • 窗口同组联结
    • 使用 coGroup 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件集合拼接成字符串并输出。
(5) 执行任务
env.execute("windowJoin")
  • 启动 Flink 流处理任务,任务名称为 windowJoin

代码优化

(1) 时间戳分配
  • assignAscendingTimestamps 方法假设事件时间戳是严格递增的。如果时间戳可能乱序,应使用 assignTimestampsAndWatermarks 方法:

    java

    stream1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2)
    )
(2) 窗口大小
  • 窗口大小(5 秒)可能不适合所有场景。应根据实际需求调整窗口大小。
(3) 输出格式
  • 输出格式较为简单,可以优化为更易读的形式:

    java

    collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")
(4) 并行度
  • 并行度设置为 1,可能影响性能。可以根据集群资源调整并行度:

    java

    env.setParallelism(4)

优化后的代码

以下是优化后的完整代码:

package transformplusimport java.lang
import java.time.Durationimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Eventobject WindowJoin {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)// 窗口联结val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2))val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2))stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1, e2) => s"${e1._1} (${e1._2}) -> ${e2._1} (${e2._2})").print("Join")// 间隔联结val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, String, Long), timestamp: Long) => event._3))val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: Event, timestamp: Long) => event.timestamp))orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction[(String, String, Long), Event, String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")}}).print("intervalJoin")// 窗口同组联结stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String, Long), (String, Long), String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(s"Stream1: ${iterable.toString}, Stream2: ${iterable1.toString}")}}).print("coGroup")env.execute("windowJoin")}
}
http://www.dtcms.com/wzjs/590454.html

相关文章:

  • 沈阳犀牛云做网站怎么样公司简介模板免费ppt下载
  • 星沙网站建设公司网页制作软件大概需要多少钱
  • 自学商城网站建设高端网站建设询问磐石网络
  • 清丰网站建设公司广告公司业务推广
  • 网站开发工程师项目经验网站建设 深度网
  • 网站建设it行业电子商务网站有哪些
  • 网站开发与制作工资电子商务网站建设总结
  • 平面设计有什么网站厦门小微企业网站建设补贴
  • 登陆注册是静态网站网页美工实训心得
  • 广州网站建设圣矢网店网站建设规划方案
  • wordpress grace7seo是什么岗位的缩写
  • 全网引擎搜索自助模板网站建设做seo
  • 建立网站项目企业信息查询单在哪打印
  • 四平网站建设服务网站算信息化建设
  • 网站关键词数量多少好做外贸网站效果图
  • 网站备案期间能使用吗地方网站用什么域名
  • 高端网站制作网址佛山新网站建设如何
  • 北京市保障性住建设投资中心网站注册公司流程和费用找哪家
  • 会展相关网站建设情况做印量调查的网站
  • 网站维护流程图顺义区做网站
  • 河南炒股配资网站开发辽宁人社app一直更新
  • 网站建设技术要求网站及备案
  • 怎么用flash做视频网站微网站开发第三方平台
  • 郑州高端网站建设淮安维度网站建设
  • wordpress资源站源码网站使用帮助
  • 个体户可以做网站么wordpress维护模式
  • 漂亮网站网页制作学情分析
  • 哈尔滨快速建站合作超级优化液
  • 洞口建设局网站平阳高端网站建设
  • 名城苏州网站朔州企业网站建设