当前位置: 首页 > news >正文

Flink03-学习-套接字分词流自动写入工具

上一节中通过如下命令启动服务摸来模拟Socket流。请添加图片描述
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。

pom.xml和上一节一致不需要修改

编写代码

同样适用Socket流

 // 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer
继承Thread启动线程

package org.example.snow.demo3;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class FlinkServer extends Thread{@Overridepublic void run() {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();try {executionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}}

NumRandom
使用ServerSocket实现一个持续的流输出

package org.example.snow.demo3;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;/*** @author snowsong*/
public class RandomNumClient extends Thread {@Overridepublic void run() {// 随机生成数字String ip = "0.0.0.0";int port = 8886;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);// 灵活绑定服务器地址serverSocket.bind(address);// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来Socket accept = serverSocket.accept();// 获取输入流,读取客户端发送的数据OutputStream outputStream = accept.getOutputStream();// 包装成打印流,方便写入数据 true 自动刷新缓冲区PrintWriter printWriter = new PrintWriter(outputStream, true);Random random = new Random();// 遍历for (int i = 0; i < 10; i++) {// 生成随机数int num = random.nextInt(10) + 1;printWriter.println("随机数:" + num);System.out.println("send to flink:" + num);Thread.sleep(100);}} catch (Exception e) {throw new RuntimeException(e);}super.run();}
}

启动类

package org.example.snow.demo3;/*** @author snowsong*/
public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}
}

运行结果

请添加图片描述


文章转载自:

http://E3d5Hexu.trzzm.cn
http://h1SSTCOV.trzzm.cn
http://M2NkO5rx.trzzm.cn
http://GzeXSPiB.trzzm.cn
http://iIMeUXfl.trzzm.cn
http://cpYDuDN6.trzzm.cn
http://Sf4mHhzt.trzzm.cn
http://gZK83yNe.trzzm.cn
http://Vj0D18d4.trzzm.cn
http://tDlg3Kze.trzzm.cn
http://MGdYiYBi.trzzm.cn
http://QNZYwJMK.trzzm.cn
http://Yvpu6kXU.trzzm.cn
http://vMmKS2LC.trzzm.cn
http://s9zUp3jp.trzzm.cn
http://6zqE8W1f.trzzm.cn
http://vP6DvEQ1.trzzm.cn
http://KeQw3NWV.trzzm.cn
http://W47TmFX1.trzzm.cn
http://Jgdc45ih.trzzm.cn
http://On1kfKsF.trzzm.cn
http://Tn3gbQwb.trzzm.cn
http://9twZwZm8.trzzm.cn
http://QaHqTTP7.trzzm.cn
http://13EDi3s4.trzzm.cn
http://o3vux1Aq.trzzm.cn
http://dJnxZr8X.trzzm.cn
http://qXa04KXA.trzzm.cn
http://qkjwn6b6.trzzm.cn
http://x7qxFYNT.trzzm.cn
http://www.dtcms.com/a/228199.html

相关文章:

  • 为何选择Spring框架学习设计模式与编码技巧?
  • 穿越文件之海:Linux链接与库的奇幻旅程,软硬连接与动静态库
  • 编译 Linux openssl
  • 高通SoC阵列服务器
  • 鸿蒙UI开发——组件的自适应拉伸
  • C++ try{}catch{} 语句块中潜藏问题排查指南
  • 第十二节:第六部分:集合框架:LinkedHashSet集合底层原理、TreeSet集合
  • Android 中的 DataBinding 详解
  • 利用 Scrapy 构建高效网页爬虫:框架解析与实战流程
  • 谷歌地图手机版(Google maps)v11.152.0100安卓版 - 前端工具导航
  • 嵌入式笔试题+面试题
  • SKUA-GOCAD入门教程-第八节 线的创建与编辑2
  • 谷歌地图2022高清卫星地图手机版v10.38.2 安卓版 - 前端工具导航
  • 数据挖掘顶刊《IEEE Transactions on Knowledge and Data Engineering》2025年5月研究热点都有些什么?
  • 服装产品属性描述数据集(19197条),AI智能体知识库收集~
  • Hadoop 3.x 伪分布式 8088端口无法访问问题处理
  • Stone 3D新版本发布,添加玩家控制和生物模拟等组件,增强路径编辑功能,优化材质编辑
  • Could not get unknown property ‘mUser‘ for Credentials [username: null]
  • uniapp 开发企业微信小程序,如何区别生产环境和测试环境?来处理不同的服务请求
  • AWS VPC 网络详解:理解云上专属内网的关键要素
  • 机器学习:集成学习概念、分类、随机森林
  • 机器学习在多介质环境中多污染物空间预测的应用研究
  • 结合 AI 生成 mermaid、plantuml 等图表
  • EscapeX:去中心化游戏,开启极限娱乐新体验
  • 关于Tabs组件下TabPane使用v-if导致顺序错误以及页面渲染异常的解决方法
  • 机器学习——聚类算法
  • resolvers: [ElementPlusResolver()] 有什么用?
  • 7.RV1126-OPENCV cvtColor 和 putText
  • React知识点梳理
  • OpenCV CUDA模块图像处理------双边滤波的GPU版本函数bilateralFilter()