当前位置: 首页 > news >正文

Flink03-学习-套接字分词流自动写入工具

上一节中通过如下命令启动服务摸来模拟Socket流。请添加图片描述
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。

pom.xml和上一节一致不需要修改

编写代码

同样适用Socket流

 // 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer
继承Thread启动线程

package org.example.snow.demo3;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class FlinkServer extends Thread{@Overridepublic void run() {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();try {executionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}}

NumRandom
使用ServerSocket实现一个持续的流输出

package org.example.snow.demo3;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;/*** @author snowsong*/
public class RandomNumClient extends Thread {@Overridepublic void run() {// 随机生成数字String ip = "0.0.0.0";int port = 8886;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);// 灵活绑定服务器地址serverSocket.bind(address);// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来Socket accept = serverSocket.accept();// 获取输入流,读取客户端发送的数据OutputStream outputStream = accept.getOutputStream();// 包装成打印流,方便写入数据 true 自动刷新缓冲区PrintWriter printWriter = new PrintWriter(outputStream, true);Random random = new Random();// 遍历for (int i = 0; i < 10; i++) {// 生成随机数int num = random.nextInt(10) + 1;printWriter.println("随机数:" + num);System.out.println("send to flink:" + num);Thread.sleep(100);}} catch (Exception e) {throw new RuntimeException(e);}super.run();}
}

启动类

package org.example.snow.demo3;/*** @author snowsong*/
public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}
}

运行结果

请添加图片描述

相关文章:

  • 为何选择Spring框架学习设计模式与编码技巧?
  • 穿越文件之海:Linux链接与库的奇幻旅程,软硬连接与动静态库
  • 编译 Linux openssl
  • 高通SoC阵列服务器
  • 鸿蒙UI开发——组件的自适应拉伸
  • C++ try{}catch{} 语句块中潜藏问题排查指南
  • 第十二节:第六部分:集合框架:LinkedHashSet集合底层原理、TreeSet集合
  • Android 中的 DataBinding 详解
  • 利用 Scrapy 构建高效网页爬虫:框架解析与实战流程
  • 谷歌地图手机版(Google maps)v11.152.0100安卓版 - 前端工具导航
  • 嵌入式笔试题+面试题
  • SKUA-GOCAD入门教程-第八节 线的创建与编辑2
  • 谷歌地图2022高清卫星地图手机版v10.38.2 安卓版 - 前端工具导航
  • 数据挖掘顶刊《IEEE Transactions on Knowledge and Data Engineering》2025年5月研究热点都有些什么?
  • 服装产品属性描述数据集(19197条),AI智能体知识库收集~
  • Hadoop 3.x 伪分布式 8088端口无法访问问题处理
  • Stone 3D新版本发布,添加玩家控制和生物模拟等组件,增强路径编辑功能,优化材质编辑
  • Could not get unknown property ‘mUser‘ for Credentials [username: null]
  • uniapp 开发企业微信小程序,如何区别生产环境和测试环境?来处理不同的服务请求
  • AWS VPC 网络详解:理解云上专属内网的关键要素
  • 公司网站首页制作教程/临沂今日头条新闻最新
  • 菏泽 做网站 多少钱/下载爱城市网app官方网站
  • 佛山专业的网站建设公司/百度网络营销中心app
  • 网站开发超链接点击后变色/重庆seo小z博客
  • 网站建设对电子商务的作用/中国制造网
  • 南通网站建设价格/最近有哪些新闻