当前位置: 首页 > wzjs >正文

铜川矿业公司网站网络营销知名企业

铜川矿业公司网站,网络营销知名企业,咋做网站,做英文网站的心得文章目录 方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)方法二:使用 StarRocks Flink Connector(推荐)2.1 通过 Flink SQL 直接注册 StarRocks 表:2.2 使用 Flink DataStream 读取数据 在 Fl…

文章目录

      • 方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)
      • 方法二:使用 StarRocks Flink Connector(推荐)
        • 2.1 通过 Flink SQL 直接注册 StarRocks 表:
        • 2.2 使用 Flink DataStream 读取数据

  在 Flink 1.17.2 中使用 Java 读取 StarRocks 数据,可以通过 JDBC 连接器 或 StarRocks 官方提供的 Flink Connector 实现。以下是两种方法的详细步骤:

方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)

  StarRocks 兼容 MySQL 协议,可通过 Flink 的 JDBC 连接器读取数据。在 pom.xml 中添加以下依赖:

<!-- Flink JDBC 连接器 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version>
</dependency>
<!-- MySQL 驱动(兼容 StarRocks) -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version>
</dependency>

  编写 Java 代码:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;public class ReadStarRocksJDBC {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();TypeInformation[] fieldTypes = {Types.STRING,Types.STRING,Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JdbcInputFormat inputFormat = JdbcInputFormat.buildJdbcInputFormat().setDrivername("com.mysql.cj.jdbc.Driver").setDBUrl("jdbc:mysql://<starrocks-fe-host>:<port>/<database>").setUsername("<username>").setPassword("<password>").setQuery("SELECT teacher_id, student_id, student_num FROM dwd_student = limit 10").setRowTypeInfo(rowTypeInfo).finish();DataSet<Row> dataSet = env.createInput(inputFormat);dataSet.print();}
}

  输出:

+I[teacher03, abc01, 2]
+I[teacher01, abc01, 3]

方法二:使用 StarRocks Flink Connector(推荐)

参考:使用 Flink Connector 读取数据

  StarRocks 提供官方 Connector,支持高效读写。在 pom.xml 中添加以下依赖:

<dependency><groupId>com.starrocks.connector</groupId><artifactId>flink-connector-starrocks</artifactId><version>1.2.9_flink-1.17</version>
</dependency>
2.1 通过 Flink SQL 直接注册 StarRocks 表:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class ReadStarRocksSQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);String createTableSQL = "CREATE TABLE starrocks_table (\n" +"  id INT,\n" +"  name STRING,\n" +"  score INT\n" +") WITH (\n" +"  'connector' = 'starrocks',\n" +"  'jdbc-url' = 'jdbc:mysql://192.168.101.xx:9030',\n" +"  'scan-url' = '192.168.101.xx:8030',\n" +"  'database-name' = 'mydatabase',\n" +"  'table-name' = 'table1',\n" +"  'username' = 'root',\n" +"  'password' = ''\n" +")";tableEnv.executeSql(createTableSQL);tableEnv.executeSql("SELECT * FROM starrocks_table").print();}
}

  输出:

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |       score |
+----+-------------+--------------------------------+-------------+
| +I |           4 |                          Julia |          25 |
| +I |           2 |                           Rose |          23 |
| +I |           3 |                          Alice |          24 |
| +I |           1 |                           Lily |          23 |
+----+-------------+--------------------------------+-------------+
4 rows in set

  备注:StarRocks 表 table1 结构

CREATE TABLE `table1` (`id` int(11) NOT NULL COMMENT "用户 ID",`name` varchar(65533) NULL COMMENT "用户姓名",`score` int(11) NOT NULL COMMENT "用户得分"
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);INSERT INTO mydatabase.table1(id, name, score) VALUES(3, 'Alice', 24);
INSERT INTO mydatabase.table1(id, name, score) VALUES(2, 'Rose', 23);
INSERT INTO mydatabase.table1(id, name, score) VALUES(4, 'Julia', 25);
INSERT INTO mydatabase.table1(id, name, score) VALUES(1, 'Lily', 23);

注:但这遇到了一个问题就是这个映射关系必须和原表一一对应,那如果原表有一百多个字段的话这里还得映射这么多字段感觉很不方便,在网上查了能不能只映射部分字段,但并没有成功。

2.2 使用 Flink DataStream 读取数据

  在 pom.xml 文件中添加依赖,如下所示:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.2</version></dependency><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>1.2.8_flink-1.17</version></dependency>

  调用 Flink Connector,读取 StarRocks 中的数据,如下所示:

import com.starrocks.connector.flink.StarRocksSource;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;public class StarRocksSourceApp {public static void main(String[] args) throws Exception {StarRocksSourceOptions options = StarRocksSourceOptions.builder().withProperty("scan-url", "192.168.xxx.xxx:8030").withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030").withProperty("username", "root").withProperty("password", "").withProperty("table-name", "table1").withProperty("database-name", "mydatabase").build();TableSchema tableSchema = TableSchema.builder().field("id", DataTypes.INT()).field("name", DataTypes.STRING()).field("score", DataTypes.INT()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();env.execute("StarRocks flink source");}}

  输出:

11> +I(4,Julia,25)
8> +I(1,Lily,23)
16> +I(2,Rose,23)
9> +I(3,Alice,24)

在这里插入图片描述

        StarRocksSourceOptions options = StarRocksSourceOptions.builder().withProperty("scan-url", "192.168.37.11:8030").withProperty("jdbc-url", "jdbc:mysql://192.168.37.11:9030").withProperty("username", "root").withProperty("password", "").withProperty("table-name", "table1").withProperty("database-name", "mydatabase").withProperty("scan.columns", "id, name").withProperty("scan.filter", "id = 4 or id = 2").build();TableSchema tableSchema = TableSchema.builder().field("id", DataTypes.INT()).field("name", DataTypes.STRING()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();env.execute("StarRocks flink source");

  输出:

14> +I(2,Rose)
7> +I(4,Julia)
http://www.dtcms.com/wzjs/161419.html

相关文章:

  • 有专门做ppt的网站有哪些网络排名优化软件
  • 郑州做网站公司谷歌商店下载安装
  • 网站建设问题大全宁波seo在线优化哪家好
  • 有哪些免费做外贸网站国际新闻
  • saas建站平台有哪些seo推广营销靠谱
  • 宣传策划方案重庆seo主管
  • 无锡公司做网站深圳做网站的
  • 政府内部网站建设方案网页自助建站
  • 乐山北京网站建设网络营销自学网站
  • 农业公园网站建设关键词热度分析
  • 如何做赌博网站代理东莞网站营销策划
  • 学生模拟网站开发项目专业关键词排名优化软件
  • 腾讯cos wordpressseo顾问推推蛙
  • 国内好的网站建设8大营销工具
  • 嘉兴门户网站建设3d建模培训学校哪家好
  • 网站让百度收录应该怎么做台州专业关键词优化
  • wordpress这么设置导航免费seo在线优化
  • 比较好的前端网站推广运营怎么做
  • 潘家园做网站公司网络营销的盈利模式
  • 北京 企业网站开发郑州网站推广
  • 网站开发技术试验教程最厉害的搜索引擎
  • 济南做网站找哪家好优化大师百科
  • 网站如何做se百度产品
  • 网站建设的公司有发展吗厦门网络关键词排名
  • 建筑工程网上报建网站企业线上培训平台有哪些
  • 长春网站建设外包网站seo如何优化
  • 做销售在哪个网站找客户端谷歌paypal官网注册入口
  • 做flash的网站seo整站优化外包
  • 做外贸的有哪些网站搜索引擎优化的特点
  • 免费网站空间怎么做百度智能小程序怎么优化排名