当前位置: 首页 > news >正文

flink1.19.2+cdc-3.2.1遇到的问题及解决方案

No operators defined in streaming topology. Cannot generate StreamGraph

原因是博主想通过在flinkjar中写flinksql,以jar的方式提交application模式的作业(因为无法直接使用yarn-application提交flinksql)。
代码中有create和insert intosql,但是运行时报上面错误,各方面查找资料,其原因是flink没有识别insert操作,解决方案就是显示提交insert
错误代码

public static void execute(String content, TableEnvironment tEnv) {List<String> insertSql = new ArrayList<>();// 分割SQL语句Arrays.stream(content.split(";")).map(String::trim).filter(s -> !s.isEmpty()).forEach(sql -> {sql = sql.replaceAll(SEMICOLON, ";").replaceAll(SY, "\"").replaceAll(SDY, "`");if (!sql.endsWith(";")) {sql = sql + ";"; // 加上分号}System.out.println("执行SQL>>>" + sql);tEnv.executeSql(sql);});
}

修改后

public static void execute(String content, TableEnvironment tEnv) {List<String> insertSql = new ArrayList<>();// 分割SQL语句Arrays.stream(content.split(";")).map(String::trim).filter(s -> !s.isEmpty()).forEach(sql -> {if (!sql.endsWith(";")) {sql = sql + ";"; // 加上分号}if (sql.toUpperCase().startsWith("INSERT")) {insertSql.add(sql);} else {System.out.println("执行SQL>>>" + sql);tEnv.executeSql(sql);}});// 统一执行insert// 显式触发, 否则会报错The main method caused an error: No operators defined in streaming topology. Cannot execute.if (insertSql.size() > 0) {StatementSet stmtSet = tEnv.createStatementSet();for (String insert : insertSql) {// 添加多个 INSERT 语句System.out.println("执行SQL>>>" + insert);stmtSet.addInsertSql(insert);}try {// 执行并设置作业名称TableResult result = stmtSet.execute();JobClient jobClient = result.getJobClient().orElseThrow(() -> new RuntimeException("无法获取JobClient"));System.out.println("等待作业启动...");// 等待作业进入运行状态JobStatus status = jobClient.getJobStatus().get(STATUS_TIMEOUT, TimeUnit.SECONDS);if (status == JobStatus.RUNNING) {System.out.println("作业已正常运行");// 添加2秒延迟确保状态同步Thread.sleep(2000);} else {throw new IllegalStateException("作业启动失败,状态: " + status);}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}}

ERROR org.apache.flink.shaded.curator5.org.apache.curator.ConnectionState - Authentication failed

在flink的conf.yml添加

zookeeper:sasl:disable: true

hadoop日志里中文显示成了问号

在flink的conf.yml添加

env:java:opts:all: -Dfile.encoding=UTF-8

Invalid event: APP_UPDATE_SAVED at ACCEPTED

flink/lib下添加flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;

flink/lib下添加commons-cli-1.5.0.jar

yarn启动容器使用随机端口的问题

需要节点间端口互通,否则经常会因为端口不通而导致容器启动不起来
#假设要允许192.168.0.66和192.168.0.67访问本机所有端口

sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.0.66" accept'
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.0.67" accept'
sudo firewall-cmd --reload

相关文章:

  • 【AI大模型入门指南】概念与专有名词详解 (二)
  • 达梦数据库单机部署dmhs同步复制(dm8->kafka)
  • mac电脑.sh文件,用来清除git当前分支
  • 代码填空题技术实现:突破 highlight.js 安全限制的工程实践
  • 数值偏微分方程的代数骨架:线性代数及其挑战-AI云计算
  • Cilium动手实验室: 精通之旅---23.Advanced Gateway API Use Cases
  • 机器学习与深度学习18-线性代数01
  • 老飞飞bug及原理修复方法
  • android studio向左向右滑动页面
  • 【Zephyr 系列 19】打造 BLE 模块完整 SDK:AT 命令系统 + 状态机 + NVS + OTA 一体化构建
  • Kotlin基础语法五
  • 调试`build.sh` 和用 `CMake` 编译出来的 `.elf` / `.bin` / `.hex` 文件大小或行为不同?
  • 大模型在输尿管下段积水预测及临床应用的研究
  • uni-app学习笔记三十六--分段式选项卡组件的使用
  • 电机控制基础,小白入门篇
  • Windows笔记之Win11让非焦点窗口程序也能获得流畅性能的方法
  • 计算机网络第3章(上):数据链路层全解析——组帧、差错控制与信道效率
  • Monkey 测试的基本概念及常用命令(Android )
  • uniapp 云打包 iOS 应用上传到 app store 商店的过程
  • 数据库连接池——关键技术点介绍
  • 现在很多网站都是wordpress/如何做网络宣传推广
  • 绿化公司网站建设/找资源最好的是哪个软件
  • 网站建设内容工作总结/北京软件培训机构前十名
  • 网站建设数据库系统/郑州粒米seo顾问
  • 做网站 创业 流程/网络营销的50种方法
  • 专门用于网页制作的软件/seo优化上海牛巨微