当前位置: 首页 > news >正文

DataStream实现WordCount

目录

  • 读取文本数据
  • 读取端口数据

事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。

读取文本数据

需要处理数据如下:

hello flink
hello java
hello world

在这里插入图片描述

package com.tsg.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = dataStreamSource.flatMap((String value, Collector<Tuple2<String, Long>> out) -> {String[] split = value.split(" ");for (String s : split) {out.collect(Tuple2.of(s, 1L));}}).returns(Types.TUPLE(Types.STRING,Types.LONG));
//        KeyedStream<Tuple2<String, Long>, Tuple> tuple2TupleKeyedStream = wordAndOne.keyBy(0);KeyedStream<Tuple2<String, Long>, String> tuple2TupleKeyedStream = wordAndOne.keyBy(data->data.f0);SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2TupleKeyedStream.sum(1);sum.print();env.execute();}
}

在这里插入图片描述

读取端口数据

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

package com.tsg.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {// 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从参数中提取主机名和端口号ParameterTool parameterTool = ParameterTool.fromArgs(args);String hostname = parameterTool.get("host");int port = parameterTool.getInt("port");DataStreamSource<String> lineStream = env.socketTextStream(hostname,port);
//        DataStreamSource<String> lineStream = env.socketTextStream("master", 7777);SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = lineStream.flatMap((String str, Collector<Tuple2<String, Long>> out) -> {// 注意这里的Collector是org.apache.flink.util.Collector;String[] split = str.split(" ");for (String s : split) {out.collect(Tuple2.of(s, 1L));}}).returns(Types.TUPLE(Types.STRING,Types.LONG));KeyedStream<Tuple2<String, Long>, Tuple> tuple2TupleKeyedStream = tuple2SingleOutputStreamOperator.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2TupleKeyedStream.sum(1);sum.print();env.execute();}
}
http://www.dtcms.com/a/344919.html

相关文章:

  • 世界模型一种能够对现实世界环境进行仿真,并基于文本、图像、视频和运动等输入数据来生成视频、预测未来状态的生成式 AI 模型
  • LeetCode第1695题 - 删除子数组的最大得分
  • 数字经济浪潮下的刑事法律风险与辩护新路径
  • k8s 简介及部署方法以及各方面应用
  • STM32F1 GPIO介绍及应用
  • Vue2.x核心技术与实战(三)
  • 掌握DRF的serializer_class:高效API开发
  • [激光原理与应用-318]:光学设计 - Solidworks - 草图中常见的操作
  • PCIe 5.0 SSD的发热量到底有多大?如何避免?
  • ubuntu - 终端工具 KConsole安装
  • DL00433-基于深度学习的无人机红外成像系统可视化含数据集
  • 【数据结构】选择排序:直接选择与堆排序详解
  • 【小白笔记】 MNN 移动端大模型部署
  • Java试题-选择题(14)
  • 新能源知识库(83)新能源行业的标准制定机构介绍
  • 期权买沽是什么意思?
  • python3GUI--Joy音乐播放器 在线播放器 播放器 By:PyQt5(附下载地址)
  • DAY01:【DL 第一弹】深度学习的概述
  • 什么是哈希值(hash value)???
  • FFmpeg03:多媒体文件处理基础
  • ffmpeg 中 crc32 源码分析及调试
  • vagrant怎么在宿主机操作虚拟机里面的系统管理和软件安装
  • xilinx的oddr原语是否可以直接使用verilog实现?
  • ingress和service区别
  • 20250822解决荣品RD-RK3588-MID核心板出现插USB开机-长按RESET开机的问题
  • 基于LangChain + Milvus 实现RAG
  • 升级 Docker Compose 到最新版本:从安装到验证全指南
  • SOLIDWORKS 2025智能工具优化设计流程
  • 数据结构: 2-3 树的删除操作 (Deletion)
  • Maven的概念与Maven项目的创建