当前位置: 首页 > news >正文

Flink导入StarRocks

1、pom依赖


  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

  <dependencies>
    <!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- flink-connector-starrocks -->
    <dependency>
      <groupId>com.starrocks</groupId>
      <artifactId>flink-connector-starrocks</artifactId>
      <version>1.2.5_flink-1.13_2.12</version>
    </dependency>
  </dependencies>

2、代码编写

public class LoadJsonRecords {
    public static void main(String[] args) throws Exception {
        // To run the example, you should prepare in the following steps
        // 1. create a primary key table in your StarRocks cluster. The DDL is
        //  CREATE DATABASE `test`;
        //    CREATE TABLE `test`.`score_board`
        //    (
        //        `id` int(11) NOT NULL COMMENT "",
        //        `name` varchar(65533) NULL DEFAULT "" COMMENT "",
        //        `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
        //    )
        //    ENGINE=OLAP
        //    PRIMARY KEY(`id`)
        //    COMMENT "OLAP"
        //    DISTRIBUTED BY HASH(`id`)
        //    PROPERTIES(
        //        "replication_num" = "1"
        //    );
        //
        // 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurations
        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
        String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");
        String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");

        //String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");
        //String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");

        //String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");
        //String loadUrl = params.get("loadUrl", "be-ip:8040,be-ip:8040,be-ip:8040");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Generate json-format records. Each record has three fields correspond to
        // the columns `id`, `name`, and `score` in StarRocks table.
        String[] records = new String[]{
                "{\"id\":1111, \"name\":\"starrocks-json\", \"score\":100}",
                "{\"id\":2222, \"name\":\"flink-json\", \"score\":100}",
        };
        DataStream<String> source = env.fromElements(records);

        // Configure the connector with the required properties, and you also need to add properties
        // "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the
        // input records are json-format.
        StarRocksSinkOptions options = StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", jdbcUrl)
                .withProperty("load-url", loadUrl)
                .withProperty("database-name", "tmp")
                .withProperty("table-name", "score_board")
                .withProperty("username", "")
                .withProperty("password", "")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .withProperty("sink.parallelism","1")
                //.withProperty("sink.version","V1")
                .build();
        // Create the sink with the options
        SinkFunction<String> starRockSink = StarRocksSink.sink(options);
        source.addSink(starRockSink);

        env.execute("LoadJsonRecords");
    }
}

相关文章:

  • 【音视频 ffmpeg 学习】 跑示例程序 持续更新中
  • Python圣诞树代码
  • 记录一下imx6ull linux 5.10.9多点电容触摸屏驱动报错问题解决方法
  • HTML---JavaScript基础
  • Go语言学习第二天
  • 云计算IaaS、PaaS和SaaS之
  • nodejs微信小程序+python+PHP的冷链物流配送系统-计算机毕业设计推荐
  • 查看ios app运行日志
  • 微服务(2)
  • 系列十一、解压文件到指定目录
  • Unity中Shader裁剪空间推导(在Shader中使用)
  • linux开放tomcat 8080端口
  • 大模型系列:OpenAI使用技巧_使用文本向量做语义文本搜索
  • 搭建在线720虚拟VR展厅,不仅是展厅也是名片
  • 湘潭大学-2023年下学期-c语言-作业0x0a-综合1
  • Y9000P + ubuntu22.04 配置Anaconda+pycharm +pytorch
  • RK3566 Android 11平台上适配YT8512C 100M PHY
  • C# Winform教程(二):基础窗口程序
  • 2核2G3M服务器上传速度多少?以阿里云和腾讯云为例
  • 【SpringCloud】-OpenFeign实战及源码解析、与Ribbon结合
  • 自媒体假扮官方蹭反间谍热度攫取利益,国安机关提醒
  • 广东信宜一座在建桥梁暴雨中垮塌,镇政府:未造成人员伤亡
  • 纽约市长称墨西哥海军帆船撞桥事故已致2人死亡
  • 2025年上海科技节开幕,人形机器人首次登上科学红毯
  • 新任重庆市垫江县委副书记刘振已任县政府党组书记
  • 以军证实空袭也门多个港口