Flink原理与实战(java版)#第1章 Flink快速入门(第一节IDE词频统计)
电子书 Flink原理与实战(java版)专栏文章入口:电子书 Flink原理与实战(java版)- 目录结构
文章目录
- 第1章 Flink快速入门
- 1.1 IDE 词频统计
- 1.1.1 代码依赖与源代码
- 1.1.2 运行程序及验证
第1章 Flink快速入门
大数据技术框架经典入门案例是统计单词的词频,就像其他语言使用“Hello world”作为入门案例一样。Flink作为一个大数据的开发技术框架,同样我们介绍Flink的入门案例也是使用的统计单词的词频作为入门示例,让读者感受一下其中的差异。Flink支持程序开发和SQL 客户端,本节会以两种形式展示入门示例-wordcount。程序开发只需要在IDE中引入Flink的maven依赖即可,当然也可以使用其他的配置管理工具,比如Gradle;SQL 客户端则需要启动Flink的SQL 客户端服务。
Flink支持的语言有java和scala API,本书只介绍java的实现,JDK 使用的是8(如果本机没有则需要提前安装),以下不再赘述。如有特殊情况,将在具体情况下进行说明。
本示例是以Flink 1.17版本进行介绍的。IDE工具使用的是Visual Studio Code(IDE的运行环境是windows 11),shell客户端选择根据你的需要,本书作者选择的是Final Shell。
本书的读者需要有一定的java基础和其他大数据组件的基础,比如Hadoop、Spark、Hive、Kafka等,常规的创建项目等和本书介绍内容关系不是非常密切的过程将不再赘述。
1.1 IDE 词频统计
本示例是通过DataStream流式统计单词的词频,下面只列出实现的代码和代码的maven依赖。概括的来说就是通过流式程序统计有界数据的词频,关于有界和无界数据将在下面有关章节进行介绍。
本示例功能是模拟按行读取单词,其中单词是通过“,”进行分割的,分割后将每个单词计数为1,然后根据单词进行分组,最后根据单词分组进行求和。
1.1.1 代码依赖与源代码
Pom.xml文件中的properties和dependencies如下。
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies>
示例代码如下。
package com.alanchan2win.chapter1;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/** @Author: alan.chan.chn@163.com*/
public class WordCountDemo {public static void main(String[] args) throws Exception {// 1.准备环境-envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.准备数据-sourceDataStream<String> linesDS = env.fromElements("flink,hadoop,hive", "flink,hadoop,hive", "flink,hadoop","flink");// 3.处理数据-transformationDataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {public void flatMap(String value, Collector<String> out) throws Exception {// 将切割处理的一个个的单词收集起来并返回String[] words = value.split(",");for (String word : words) {out.collect(word);}}});DataStream<Tuple2<String, Integer>> word_OnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {//将每个单词计数为1public Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});KeyedStream<Tuple2<String, Integer>, String> groupedDS = word_OnesDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {// 按照单词进行分组public String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});// 根据分组进行求和 DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);// 4.输出结果-sinkresult.print();// 5.触发执行-executeenv.execute();}
}
程序说明:
使用flink DataStream一般的编程模型或程序的组成部分由以下几个部分。
- 1.Environment,获取一个执行环境(execution environment),本示例使用默认的流模式运行,系统自动区分有界或无界数据,也可自行设置为批处理模式。
- 2.Source,加载/创建初始数据,也就是需要处理的数据源,本示例使用的是集合数组定义的数据源,是一个有界的数据。当然可以使用系统自带的source,也可以自己定义source。
- 3.Transformation,指定数据相关的转换。转换就是Flink的处理逻辑,本示例中的处理逻辑有切词、计数、分组和汇总,分别用到了flatMap、map、keyBy和sum几个算子。
- 4.Sink,指定计算结果的存储位置。Sink就是将计算或处理的结果输出到指定的位置,本示例使用的是直接控制台输出。可以使用Flink自带的sink,也可以自己实现sink。
- 5.Execute,触发程序执行。应用程序的启动入口,告诉应用程序完成了上述逻辑后,可以启动了。
1.1.2 运行程序及验证
验证比较简单,即在IDE中启动程序,查看控制台输出结果即可。下面是IDE的控制台输出结果。
# IDE运行的控制台输出如下:
13> (flink,1)15> (hadoop,1)
2> (hive,1)
15> (hadoop,2)
2> (hive,2)
13> (flink,2)
15> (hadoop,3)
13> (flink,3)
13> (flink,4)
以上,就完成了Flink的第一个应用程序-词频统计。
