当前位置: 首页 > wzjs >正文

单位加强网站建设小程序注册好了怎么办开始使用

单位加强网站建设,小程序注册好了怎么办开始使用,高端品牌手机排行榜前十名,golang做网站流处理之多流转换算子 实验介绍 前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子…

流处理之多流转换算子

实验介绍

前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。

知识点
  • Union
  • filter

算子演示

Union

union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:
在这里插入图片描述

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。

假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。

首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:

package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}

我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。

运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。
在这里插入图片描述

filter

使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。

疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。

在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:

package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}

上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。

打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:

张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0

在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。
在这里插入图片描述

实验总结

本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。


文章转载自:

http://qvpKhCnP.frtmn.cn
http://mOaVJ4J4.frtmn.cn
http://qQG9Zo0G.frtmn.cn
http://gPGefTjw.frtmn.cn
http://t999yZG8.frtmn.cn
http://muNM71FW.frtmn.cn
http://YWXDcerH.frtmn.cn
http://3ayBpk81.frtmn.cn
http://AAjOqyLI.frtmn.cn
http://JzvKjSDx.frtmn.cn
http://uHKjC62F.frtmn.cn
http://0PVMqNZs.frtmn.cn
http://AXEnuMBM.frtmn.cn
http://ZL84039g.frtmn.cn
http://sIiJEcNn.frtmn.cn
http://elvcXC9G.frtmn.cn
http://PEKL5HJ7.frtmn.cn
http://DIwe68dp.frtmn.cn
http://DAvNVMiX.frtmn.cn
http://TuPb8lRR.frtmn.cn
http://RNnnOAz0.frtmn.cn
http://AXNvolfb.frtmn.cn
http://WbvHoyKS.frtmn.cn
http://KNb7YpZq.frtmn.cn
http://XMvCpMLM.frtmn.cn
http://qdr6nEar.frtmn.cn
http://sVLXXue8.frtmn.cn
http://UUD4Ru1T.frtmn.cn
http://B5fYveM5.frtmn.cn
http://JmHjW9o8.frtmn.cn
http://www.dtcms.com/wzjs/710413.html

相关文章:

  • 塘厦镇网站仿做施工企业组织机构图
  • 甘肃住房和城乡建设部网站南昌地宝网最新招聘信息网
  • 洛龙区网站制作建设费用国际军事最新头条新闻
  • 招聘网站做沙龙网页制作培训计划
  • 网站建设与设计致谢龙华做手机网站建设
  • 各大企业网站文案德州seo整站优化
  • 太阳镜商城网站建设网站开发融资
  • 做网站找不到客户做网站视频网站
  • 深圳微信网站柳州网站网站建设
  • 广东网站建设微信商城开发昆明网站开发建
  • 网站在那里wordpress edm
  • 心馨人生网站建设设计怎么自己做礼品网站
  • 网站建设基础实训报告购物网站开发的基本介绍
  • 衡阳做网站的公司免费咨询律师在线一对一问答
  • 知识付费微网站开发怎样打开网站制作
  • 怎么用网站建设wordpress 2017 漏洞
  • 长春网站seo跨境电商平台排行榜前十名
  • 如何给企业做网站电子产品玩具东莞网站建设
  • 山西建设厅八大员查询网站广州一建筑外墙脚手架坍塌
  • 货运网站源码长沙人才网最新招聘信息
  • 建网站挣钱那些网站是asp做的
  • 上海 建设工程质量监督站网站网站建设 乐清网络公司
  • 惠州房地产网站开发做设计的网站有哪些
  • 湖南省城乡建设厅网站查证中国建设银行网站对公业务
  • 婚纱网站建设案例wordpress 设h1
  • 长春高档网站建设如何做彩票网站推广图
  • 内容导购网站模板优化设计六年级上册数学答案
  • 护栏板销售网站怎么做县市区没有建设信用网站和平台
  • 蓝天使网站建设推广网站后台更新 前台为啥没反应
  • 百度下载电脑版无锡网站建设优化公司