当前位置: 首页 > wzjs >正文

网站开发的程序平台中文域名最新资讯

网站开发的程序平台,中文域名最新资讯,手机网站建设必要性,企业建站用什么好flatmap() AMapFunction仅适用于执行一对一转换的情况&#xff1a;对于每个进入的流元素&#xff0c;map()都会发出一个转换后的元素。否则&#xff0c;您需要使用 flatmap() DataStream<TaxiRide> rides env.addSource(new TaxiRideSource(...));DataStream<Enric…

flatmap()

AMapFunction仅适用于执行一对一转换的情况:对于每个进入的流元素,map()都会发出一个转换后的元素。否则,您需要使用 flatmap()

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));DataStream<EnrichedRide> enrichedNYCRides = rides.flatMap(new NYCEnrichment());enrichedNYCRides.print();

连同FlatMapFunction:

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));DataStream<EnrichedRide> enrichedNYCRides = rides.flatMap(new NYCEnrichment());enrichedNYCRides.print();

通过Collector此接口提供的功能,该flatmap()方法可以发出任意数量的流元素,包括不发出任何元素。

实践

Flink 的 DataStream API 允许你流式传输任何可以序列化的数据。Flink 自己的序列化器用于

基本类型,即 String、Long、Integer、Boolean、Array
复合类型:Tuples、POJO
对于其他类型,Flink 会回退到 Kryo。Flink 也可以使用其他序列化器。

pom内容如上个内容,此处不再赘述
定义本机变量
连接的 IP 地址为 0.0.0.0(监听所有网络接口)

// 本机String ip = "0.0.0.0";//开启的端口号int port = 8886;

获取flink环境

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

使用套接字-socket流

  DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlatMap-分词
数据转换 - 分词和计数。
使用 flatMap 操作对每行文本进行分词处理;
将每行文本按空白字符分割成单词数组;
为每个单词生成一个 (单词, 1) 的元组(Tuple2);
结果是一个包含 (单词, 计数) 对的流

SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});

分组和窗口计算
keyBy: 按照元组的第一个字段(单词)进行分组;
window: 定义滑动窗口:窗口大小:5秒,滑动间隔:1秒;
sum(1): 对每个窗口内相同单词的计数(元组的第二个字段)求和;

SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);

完整代码

package org.example.snow.demo2;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class startDemo {public static void main(String[] args) throws Exception {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();executionEnvironment.execute("stream!");}
}

启动服务

nc -lk 8886

运行效果
在这里插入图片描述
请添加图片描述

http://www.dtcms.com/wzjs/570685.html

相关文章:

  • 网站建设模块下载深圳网站建设加q5299丶14602推广
  • 微信小程序怎拼做搬家网站网站在线支付
  • 手机访问网站下面电话怎么做电子商务网站建设渠道
  • dede网站地图xml南昌网站建设利润
  • 北京网站建设公司分享网站改版注意事项广告策划公司有哪些
  • 网站快速收录软件西安建筑工程有限公司
  • 个人网站放什么内容厦门网站制作案例
  • 企业微网站怎么做合肥网红打卡地
  • 四川省建设厅官方网站电话大都会下载安装
  • 网站建设教程纯正苏州久远网络企业微信app下载安装官方版
  • 滨海哪家专业做网站做ppt一般在什么网站好
  • 探测器 东莞网站建设白酒招商网站大全
  • 网站推广好做吗如何做网站的关键词
  • 做汽车团购网站app开发技术路线
  • 移动医护网站建设利弊红豆影视传媒有限公司
  • 郴州市网站建设科技免费申请地图定位
  • 做网站用商标吗wordpress widget修改
  • 163k地方门户网站系统奉化网络推广
  • 南宁大型网站开发自己做的网站能备案吗
  • c 做网站后台edd次元的避风港网站代理
  • 做电影网站怎么接广告有域名怎么建网站南宁
  • 静态网站模板下载wordpress最好cms
  • 网站logo的颜色与网页的颜色张雪峰谈建筑学前景
  • 做任务赚话费的网站wordpress页面无法显示
  • 做网站怎样建立服务器网站建设项目分期
  • 徽州网站建设ps怎么做网站分隔线
  • 响应式网站 html重庆软件制作
  • 京东网站建设案例云伙伴小程序开发公司
  • 给人做网站赚钱吗怎么查看网站的dns
  • 什么网站没人做做网站 贴吧