当前位置: 首页 > news >正文

模拟flink处理无限数据流

如果没有在 linux 环境下安装 flink ,先看我的上一篇文章:如何搭建Linux环境下的flink本地集群-CSDN博客

使用工具:IntelliJ IDEA 2021,Maven 3.6.1

第一步,创建一个空的 Maven 项目,导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinkLearn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.0</flink.version></properties><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

等等依赖下载完毕

第二步,编写分词处理无界流代码

此刻先不要运行,因为还没有数据源

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamNoBundedWordCount {public static void main(String[] args) throws Exception{//拿到执行环境StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();//数据源 source//注:加 returns 是因为 lambda 表达式无法识别二元组中的类型,故手动指定以避免报错DataStreamSource<String> source = evn.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy((Tuple2<String, Integer> value) -> value.f0).sum(1);//写入,sinksum.print();//执行evn.execute();}}

 

第三步,进入 linux 环境,安装 netcat 模拟数据源发送数据

安装 netcat

sudo apt update
sudo apt install netcat

验证安装情况

nc --version

模拟数据源,监听 8888 端口

nc -lk 8888

第四步,启动程序

点击 绿色按钮 启动 Main 程序后

在 netcat 中发送数据比如 hello world

可以看到程序可以正常运行

第五步,打包并上传到 linux 中的 flink 集群中

 点开右侧的 Maven 选项,找到生命周期,先 clean 一下,将多余文件清除

再 packge

成功后会生成一个 target 文件夹,找到其中的 jar 包,会生成两个

两个都能用,上面那个东西要少一些 

然后我们将 jar 包上传到 flink 的 web UI 上

点击 add new

找到我们刚刚生成的 jar 包 并点击 "打开" 上传

点击 jar 包,为其指定启动类,指定并行度,这里我写的 2

 可以看到 作业启动成功

接下来,我们再 netcat 中发送数据 hello

可以看到, 集群中的 task Managers 成功接收到了数据并进行词频统计

 

http://www.dtcms.com/a/298634.html

相关文章:

  • WAIC2025预告|英码深元AI一体机将亮相华为昇腾展区,以灵活部署的能力赋能行业智能化转型
  • 学习:JS[6]环境对象+回调函数+事件流+事件委托+其他事件+元素尺寸位置
  • ReVQ (Quantize-then-Rectify,量化后修正)
  • 笛卡尔积规避:JOIN条件完整性检查要点
  • FreeRTOS—互斥信号量
  • Sweet Home 3D:一款免费的室内装修辅助设计软件
  • 【集合】JDK1.8 HashMap 底层数据结构深度解析
  • 第二章: 解密“潜在空间”:AI是如何“看见”并“记住”世界的?
  • 深入解析C语言三路快速排序算法
  • 动态规划:从入门到精通
  • 多品种小批量如何实现柔性排产?
  • 无感交互,创意飞扬:AI摄像头动捕赋能中小学AI人工智能实训室
  • Python Requests-HTML库详解:从入门到实战
  • 环境变量-进程概念(7)
  • 对自定义域和 GitHub 页面进行故障排除(Windows)
  • 批改作业小工具(一)-read report
  • InfluxDB Line Protocol 协议深度剖析(一)
  • 07 51单片机之定时器
  • 10BASE-T1S核心机制——PLCA参数详解
  • 关于AI编程的分析报告
  • 【通识】算法案例
  • 【电赛学习笔记】MaxiCAM 项目实践——与单片机的串口通信
  • 日语学习-日语知识点小记-构建基础-JLPT-N3阶段(10):ような复习
  • [科普] 快速傅里叶变换(FFT)和离散傅里叶变换(DFT)的差异
  • WordPress WPBookit插件任意文件上传漏洞(CVE-2025-6058)
  • 魔百和M401H_国科GK6323V100C_安卓9_不分地区免拆卡刷固件包
  • 一键搭建博客脚本LNMP(非编译)Wordpress
  • 【论文解读】MambaVision: A Hybrid Mamba-Transformer Vision Backbone
  • 深度学习入门(1)
  • 深度学习篇---剪裁缩放