《大数据技术原理与应用》实验报告六 Flink编程实践
目 录
一、实验目的
二、实验环境
三、实验内容与完成情况
3.1 使用IntelliJIDEA 工具开发WordCount 程序。
3.2 数据流词频统计。
四、问题和解决方法
五、心得体会
一、实验目的
1. 通过实验掌握基本的 Flink 编程方法。
2. 掌握用 IntelliJ IDEA 工具编写 Flink 程序的方法。
二、实验环境
1. 硬件要求:笔记本电脑一台
2. 软件要求:VMWare虚拟机、Ubuntu 18.04 64、JDK1.8、Hadoop-3.1.3、Hive-3.1.2、Windows11操作系统、Eclipse、Flink1.9.1、IntelliJ IDEA
三、实验内容与完成情况
3.1 使用IntelliJIDEA 工具开发WordCount 程序。
在 Linux 系统中安装 IntelliJ IDEA,然后使用 IntelliJ IDEA 工具开发 WordCount 程序,并打 包成JAR 文件,提交到 Flink 中运行。
(1)进行IntelliJ IDEA 工具在虚拟机的安装。
①启动IDEA并在IDEA中点击File->New->Project进行项目的创建:
②在GroupId中填写dblab、ArtifactId中填写FlinkWordCount后点击next进入下一步:
dblabFlinkWordCount
③对Project name进行设置,将其设置为FlinkWordCount后点击finish完成项目的创建:
FlinkWordCount
④进行项目文件信息的配置,点击pom.xml进行配置信息的填写:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>dblab</groupId><artifactId>FlinkWordCount</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- Flink Java API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><!-- Flink Streaming Java API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!-- Flink Clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.9.1</version></dependency></dependencies>
</project>
(2)进行WordCount 程序的开发。
①右击java文件夹后点击New进行java数据包的创建,输入以下包名即可:
cn.edu.xmu
②右击对应的包名后点击New进行java Class的创建,输入以下类名即可:
WordCount
③创建WordCountData.java类,并在类中进行相关代码的编写和实现:
package cn.edu.xmu;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;public class WordCountData {public static final String[] WORDS = new String[] {"To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despised love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscovered country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."};public WordCountData() {}public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env) {return env.fromElements(WORDS);}
}
④创建WordCountTokenizer.java类,并在类中进行相关代码的编写和实现:
package cn.edu.xmu;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {public WordCountTokenizer() {}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 将输入的字符串转为小写并按非字母数字字符分割String[] tokens = value.toLowerCase().split("\\W+");// 遍历所有的单词for (String token : tokens) {// 如果单词不为空,则输出 (word, 1)if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}
}
⑤创建WordCount.java类,并在类中进行相关代码的编写和实现:
package cn.edu.xmu;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;public class WordCount {public WordCount() {}public static void main(String[] args) throws Exception {// 解析命令行参数ParameterTool params = ParameterTool.fromArgs(args);ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);Object text;// 如果没有指定输入路径,则默认使用WordCountData中的数据if (params.has("input")) {text = env.readTextFile(params.get("input"));} else {System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text = WordCountData.getDefaultTextLineDataset(env);}// 执行词频统计AggregateOperator counts = ((DataSet<String>) text) // 强制转换.flatMap(new WordCountTokenizer()) // 进行分词处理.groupBy(0) // 按单词分组.sum(1); // 对每个单词进行计数// 如果指定了输出路径,则写入文件,否则输出到控制台if (params.has("output")) {counts.writeAsCsv(params.get("output"), "\n", " ");env.execute();} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}
}
⑥打开WordCount.java代码文件,在这个代码文件的代码区域鼠标右键单击,弹出菜单中选中“Run WordCount.main()”进行代码运行:
⑦运行代码进行词频统计并将相关的统计结果输出到控制台:
(3)将项目打包成JAR文件后提交到Flink中进行运行。
①将编写的代码编译打包为JAR包,编译打包成功以后可以看到生成的FlinkWordCount.jar文件:
②对Flink的压缩包进行上传并解压安装:
sudo tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -c /usr/local
③配置Flink的相关信息并进行启动运行:
sudo mv ./flink-1.9.1 ./flinksudo chown -R hadoop:hadoop ./flinkvim ~/.bashrcsource ~/.bashrcstart-cluster.sh
④将编译打包的JAR包上传至Flink:
/usr/local/flink/bin/flink run --class cn.edu.xmu.wordcount~/flinkapp/target/simple-project-1.0.jar
⑤使用flink启动编译打包后的FlinkWordCount.jar包,其最后的运行结果如下图所示:
3.2 数据流词频统计。
使用 Linux 系统自带的 NC 程序模拟生成数据流,不断产生单词并发送出去。编写 Flink 程序对 NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在 IntelliJ IDEA 中开发和调试程序,然后再打成 JAR 包部署到 Flink 中运行。
(1)在IntelliJ IDEA中新建一个项目,名称为“FlinkWordCount2”。新建一个pom.xm文件,内容和前面的FlinkWordCount项目中的pom.xml一样。
package cn.edu.xmu;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {// 定义socket的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");} catch (Exception e) {System.err.println("没有指定port参数,默认值为9000");port = 9000;}// 获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");// 计算数据DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String value, Collector<WordWithCount> out) {// 分割每行数据String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L)); // 为每个单词生成(单词, 1)元组}}}).keyBy("word") // 针对相同的word数据进行分组.timeWindow(Time.seconds(2), Time.seconds(1)) // 窗口大小2秒,滑动窗口大小1秒.sum("count"); // 对每个单词的count进行求和// 打印结果到控制台windowCount.print().setParallelism(1); // 使用一个并行度// 调用execute方法来启动流处理作业env.execute("Streaming Word Count");}/*** 用于存储单词及其计数的类*/public static class WordWithCount {public String word;public long count;public WordWithCount() {}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}
}
(2)启动Flink系统后打开一个Linux终端,使用如下命令启动NC程序。
nc -lk 9999
(3)新建一个Linux终端,使用如下命令启动FlinkWordCount2词频统计程序。
cd /usr/local/flink./bin/flink run --class cn.edu.xmu.WordCount/home/hadoop/IdeaProjects/FlinkWordCount2/out/artifacts/FlinkWordCount2_jar/FlinkWordCount2.jar --port 9999
(4)在Linux系统中打开一个浏览器,在里面输入“http://localhost:8081”进入Flink的WEB管理页面,然后点击左侧的“Task Managers”会弹出新页面,在flink控制台查看词频统计输出结果情况。
四、问题和解决方法
1. 实验问题:在IntelliJ IDEA中无法正确导入Flink库。
解决方法:确保项目的构建工具配置正确,且pom.xml或build.gradle文件中已添加了正确的Flink依赖。重新同步项目,如果问题仍然存在,尝试清除缓存并重启IntelliJ IDEA。
2. 实验问题:程序打包成JAR后无法运行。
解决方法:确保JAR文件包含了所有的依赖,并且主类是正确设置的,如果使用了Maven或Gradle,确认是否已经包含了所有必要的插件和配置。
3. 实验问题:Flink集群环境搭建失败。
解决方法:检查集群的硬件配置是否满足Flink的最低要求,并确保所有节点之间的网络连接是畅通的,此外检查Flink的配置文件是否正确设置。
4. 实验问题:数据流无法正确接收或处理。
解决方法:首先检查数据源是否正常工作,并确保数据流的格式与程序期望的格式相匹配,然后逐步调试程序的各个部分,找出问题所在。
5. 实验问题:词频统计结果不准确。
解决方法:检查程序中的状态管理逻辑是否正确,确保每个单词的计数是准确更新的。此外如果是在分布式环境下运行,还需要检查数据的一致性和容错处理。
6. 实验问题:IntelliJ IDEA中程序调试困难。
解决方法:熟悉IntelliJ IDEA的调试工具和功能,如断点、变量查看、堆栈跟踪等,对于复杂的逻辑或数据流可以使用日志或打印语句来辅助调试。
7. 实验问题:Flink程序运行时资源消耗过大。
解决方法:根据实际情况调整Flink的配置参数,如任务并行度、状态后端存储等。此外优化程序的逻辑和数据结构也可以减少资源消耗。
8. 实验问题:数据流处理速度缓慢。
解决方法:检查程序的瓶颈所在,可能是数据处理逻辑、状态更新或是网络通信。针对瓶颈进行优化,如使用更高效的数据结构或算法、增加并行度等。
9. 实验问题:Flink集群出现故障或节点失效。
解决方法:定期检查集群的健康状态,包括硬件、网络和软件配置。对于重要的任务,可以使用Flink的容错机制,如checkpoint和savepoint,来恢复状态。
10. 实验问题:IntelliJ IDEA界面卡顿或不响应。
解决方法:检查集群环境和本地环境的差异,包括硬件配置、网络设置和软件版本等,确保程序在集群上的部署和配置是正确的。
11. 实验问题:无法有效地监控和管理Flink任务。
解决方法:使用Flink提供的Web UI或其他监控工具来实时查看任务的状态和性能指标,根据需要设置告警和通知机制,以便及时发现和处理问题。
12. 实验问题:数据流中存在脏数据或异常值。
解决方法:在数据处理逻辑中添加清洗和过滤步骤,以去除脏数据或异常值。此外可以使用异常检测算法来自动识别和处理异常值。
13. 实验问题:程序在处理大量数据时出现内存溢出。
解决方法:优化程序的内存管理逻辑,如减少对象创建、使用更高效的数据结构等。此外可以考虑增加硬件资源或使用分布式处理来分担负载。
14. 实验问题:Flink集群扩展困难。
解决方法:在设计程序时考虑可扩展性,如使用参数化的配置、模块化的代码结构等。对于集群本身,可以使用容器化技术(如Kubernetes)来简化扩展过程。
15. 实验问题:数据流中的时间戳不准确或缺失。
解决方法:在数据源端确保时间戳的准确性和一致性,在程序中使用时间窗口来处理可能的时间戳偏差。
16. 实验问题:多个任务之间存在依赖关系,难以协调和管理。
解决方法:使用Flink的作业链功能来管理和协调多个任务之间的依赖关系,确保任务之间的数据传递和状态更新是正确的。
17. 实验问题:在处理敏感数据时需要考虑安全性和隐私保护。
解决方法:使用加密和匿名化技术来保护敏感数据的安全性和隐私性,在程序中实施访问控制和审计机制来防止未经授权的访问和操作。
五、心得体会
1、在开始编程前确保充分理解实验的需求和目标,对于WordCount实验要明确需要统计的单词范围以及输入数据的格式。对于数据流词频统计,要明确数据源、数据流的特点以及统计结果的具体要求。利用IntelliJ IDEA的强大功能,如代码提示、自动补全、调试工具等,可以大大提高编程效率。对于Flink,要熟悉其提供的API和相关库,如DataSet API、DataStream API、Stateful Functions等,以便更好地处理数据流和状态管理。
2、在编写代码时要考虑到各种异常情况,如输入数据格式错误、网络中断等。对于异常情况要提前进行预防和捕获处理,避免程序崩溃或数据丢失。注意优化资源利用,避免不必要的计算和内存消耗,可以使用懒加载、缓存等技术来提高程序的性能。同时要根据实验需求合理设置Flink的配置参数,如任务并行度、状态后端存储等。
3、参加编程实践是提高编程能力和解决问题能力的好方法,在实践中可以学习到新的技术和工具,可以接触到实际的项目需求和问题,可以锻炼自己的编程能力和解决问题的能力。同时也要不断学习和实践新的编程范式和设计模式,以适应不断变化的技术环境。
4、编写完代码后要进行充分的测试和调试,确保程序的正确性和稳定性,可以使用单元测试、集成测试等技术来验证程序的正确性,使用调试工具来跟踪程序的运行过程。在设计程序时要考虑到未来的扩展需求,可以设计成支持多种数据源的输入方式,可以添加新的统计指标或过滤器等。同时要考虑到集群环境的扩展性,如使用分布式文件系统来存储状态数据。
5、在进行词频统计时,要确保数据的一致性,对于分布式环境下运行的任务要使用一致性哈希或其他技术来保证数据分发的均衡和一致性。同时要利用Flink的容错机制,如checkpoint和savepoint来确保任务的状态安全。
6、一个好的程序结构可以提高可读性、可维护性和可扩展性,在编写代码之前,先画出程序的流程图,并尽量采用模块化、函数式编程方法,将代码分解为小块,每个小块完成特定的功能。对于WordCount实验和数据流词频统计这样的计算密集型任务,要选择合适的算法和数据结构来提高程序的性能,例如可以使用哈希表来存储单词和对应的计数器,使用红黑树等平衡树来维护数据结构等。