当前位置: 首页 > news >正文

flink使用demo

1、添加不同数据源

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.WordCountData;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class EnvDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 从不同数据源读取数据
//        DataStream<String> text = env.fromElements(WordCountData.WORDS);
//        DataStream<String> text = env.fromCollection(Arrays.asList("hello world", "hello flink"));
//        DataStream<String> text = env.fromSource(crateFileSource(), WatermarkStrategy.noWatermarks(), "file source");
//        DataStream<String> text = env.fromSource(crateKafkaSource(), WatermarkStrategy.noWatermarks(), "kafka source");
        DataStream<String> text = env.fromSource(createDataGeneratorSource(), WatermarkStrategy.noWatermarks(), "datagen source");

        // 处理逻辑
        DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new WordCount.Tokenizer())
                        .keyBy(0)
                        .sum(1);
        counts.print();
        env.execute("WordCount");
    }

    public static FileSource crateFileSource() {
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),
                new Path("input/word.txt")).build();
        return fileSource;
    }

    public static KafkaSource<String> crateKafkaSource() {
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("ip-port")
                .setTopics("topic")
                .setGroupId("groupId")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        return kafkaSource;
    }

    public static DataGeneratorSource<String> createDataGeneratorSource() {
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                (GeneratorFunction<Long, String>) value -> "hello" + value,
                100, // 一共100条数据
                RateLimiterStrategy.perSecond(5), // 每秒5条
                Types.STRING
        );
        return dataGeneratorSource;
    }
}

2、数据处理

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class DataProcessDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 读取数据
        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );

        // map算子 : 对数据流中的每个元素执行转换操作,一进一出
        SingleOutputStreamOperator<String> map = sensorDs.map(new MapFunction<WaterSensor, String>() {
            @Override
            public String map(WaterSensor waterSensor) throws Exception {
                return waterSensor.getId() + " : " + waterSensor.getVc();
            }
        });
//        map.print();


        // filter算子 : 对数据流中的每个元素执行过滤操作
        SingleOutputStreamOperator<WaterSensor> filter = sensorDs.filter(new FilterFunction<WaterSensor>() {
            @Override
            public boolean filter(WaterSensor waterSensor) throws Exception {
                return waterSensor.getVc() > 1; // 只保留vc>1的元素
            }
        });
//        filter.print();

        // flatMap算子 : 扁平映射,一个可以有多个输出,在collector里面,然后将其平铺返回
        SingleOutputStreamOperator<String> flatMap = sensorDs.flatMap(new FlatMapFunction<WaterSensor, String>() {
            @Override
            public void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {
                // collector里面是输出的数据
                if ("s1".equals(waterSensor.getId())) {
                    collector.collect(waterSensor.getId());
                } else {
                    collector.collect(waterSensor.getId());
                    collector.collect(waterSensor.getVc().toString());
                }
            }
        });
//        flatMap.print();

        // keyBy 相同key的数据分到同一个分区,用于海量数据聚合操作来提升效率,不对数据进行转换,只是分区
        KeyedStream<WaterSensor, String> keyBy = sensorDs.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor waterSensor) throws Exception {
                return waterSensor.getId(); // 按照id分组
            }
        });
//        keyBy.print();
        // 在keyBy后可以使用聚合算子,求sum max min等
//        keyBy.sum("vc").print(); // 传实体类的属性名
//        keyBy.maxBy("vc").print(); // 传实体类的属性名


        // reduce算子 : 两两聚合,keyBy后才能操作
        keyBy.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t2, WaterSensor t1) throws Exception {
                System.out.println("t1=" + t1);
                System.out.println("t2=" + t2);
                return new WaterSensor(t1.getId(), t1.getTs() + t2.getTs(), t1.getVc() + t2.getVc());
            }
        }).print();
        env.execute("WordCount");
    }
}

3、分流/合流

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FenliuDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 读取数据
        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );

        SingleOutputStreamOperator<WaterSensor> oushu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 0);
        SingleOutputStreamOperator<WaterSensor> jishu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 1);
        oushu.print("偶数流");
        jishu.print("奇数流");
        // 偶数流和奇数流合并
        oushu.union(jishu).print("合并流");

        env.execute("WordCount");
    }
}

4、输出流 sink

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SinkDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 设置流处理还是批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);

        // 读取数据
        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );

        FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink"),
                        new SimpleStringEncoder<>("UTF-8"))
                .build();
        sensorDs.sinkTo(fileSink);
        env.execute("WordCount");
    }
}

5、flink流表互转,flink sql

package com.baidu.keyue.deepsight.memory.test;

import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1. 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<WaterSensor> sensorDs = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 100L, 100),
                new WaterSensor("s1", 1000L, 1000),
                new WaterSensor("s3", 3L, 3)
        );
        sensorDs.print();
        // 2.流转表
        Table sensorTable = tableEnv.fromDataStream(sensorDs);
        // 3.注册临时表
        tableEnv.createTemporaryView("sensorTable", sensorTable);

        Table resultTable = tableEnv.sqlQuery("select * from sensorTable where vc > 10");

        // 4. table转流
        DataStream<WaterSensor> waterSensorDataStream = tableEnv.toDataStream(resultTable, WaterSensor.class);
        waterSensorDataStream.print();
        env.execute();
    }
}

相关文章:

  • 前端面试之Flex布局:核心机制与高频考点全解析
  • MySQL MHA 部署全攻略:从零搭建高可用数据库架构
  • 第十章 Kubernetes Ingress
  • Windows使用docker部署fastgpt出现的一些问题
  • PyTorch torch.logsumexp 详解:数学原理、应用场景与性能优化(中英双语)
  • DPVS-2:单臂负载均衡测试
  • Springboot应用开发工具类整理
  • Unity Excel导表工具转Lua文件
  • ios UICollectionView使用自定义UICollectionViewCell
  • 输入框元素覆盖冲突
  • 2024年数学SCI1区TOP:改进海洋捕食者算法MMPA用于UAV路径规划,深度解析+性能实测
  • Python解析 Flink Job 依赖的checkpoint 路径
  • 【Erdas实验教程】010:监督分类及后处理、精度评价
  • 基于SpringBoot的“高校网上缴费综合务系统”的设计与实现(源码+数据库+文档+PPT)
  • C++/JavaScript ⭐算法OJ⭐链表的反转
  • 刺客信条 枭雄 画质设置以及【锁帧60帧】的办法
  • 【笔记ing】C语言补充、组成原理数据表示与汇编实战、操作系统文件实战(高级阶段)
  • SpringBoot【十一】mybatis-plus实现多数据源配置,开箱即用!
  • 数据结构:栈和队列详解(下)
  • 【python】pip命令合集
  • 前端做网站要会什么/什么样的人适合做策划
  • 门户网站建设课程设计/关键词优化排名软件流量词
  • java网站开发环境部署/站长工具查询网
  • 临朐门户网站/下载百度2023最新版安装
  • 蛋糕网站模板/随州网络推广
  • 浙江省住房建设局网站首页/seo提升排名