当前位置: 首页 > news >正文

Flink 1.19 REST API

Flink 1.19 REST API 集成设计文档

1. 概述

本文档详细描述了如何使用 Spring Boot (JDK 17) 开发一套用于与 Apache Flink 1.19 REST API 进行交互的服务,以实现以下功能:

  • 获取指定任务的日志信息
  • 获取作业ID列表
  • 获取任务数量统计
  • 获取任务状态等详细信息

该系统将通过 RESTful 方式远程调用 Flink 的监控 API,并提供统一的接口供上层应用调用。

2. 技术栈

  • Java 版本: JDK 17
  • 框架: Spring Boot 2.7.x 或更高版本
  • HTTP 客户端: RestTemplate (Spring 提供的同步客户端)
  • JSON 处理: Jackson (Spring Boot 默认集成)
  • 构建工具: Maven 或 Gradle

3. Flink REST API 分析

根据 Flink 1.19 官方文档,Flink 提供了丰富的 REST API 来监控和管理集群及作业。主要特性包括:

  • 默认端口: 8081 (可通过 rest.port 配置项修改)
  • API 前缀: /v1 (可选,默认使用最老的兼容版本)
  • 核心功能:
    • 作业管理 (提交、取消、查询状态)
    • 集群信息查询
    • 日志和指标监控
    • JAR 包管理

3.1 关键 API 端点

功能HTTP 方法路径描述
获取所有作业GET/jobs返回当前所有作业的概览信息
获取作业详情GET/jobs/{jobid}返回特定作业的详细信息
获取作业异常GET/jobs/{jobid}/exceptions返回作业的异常信息
获取作业检查点GET/jobs/{jobid}/checkpoints返回作业的检查点信息
获取作业配置GET/jobs/{jobid}/config返回作业的配置信息
获取任务管理器信息GET/taskmanagers返回所有任务管理器的信息
获取任务管理器详情GET/taskmanagers/{taskmanagerid}返回特定任务管理器的详细信息

3.2 日志相关 API

Flink 的日志访问在标准 REST API 中并不直接暴露,通常需要通过以下方式获取:

  1. 作业日志: 可以通过作业异常接口 (/jobs/{jobid}/exceptions) 获取部分错误日志
  2. TaskManager 日志: 通过 /taskmanagers/{taskmanagerid}/log 获取
  3. JobManager 日志: 通过 /jobmanager/log 获取
  4. stdout/stderr: 分别通过 /jobmanager/stdout 和 TaskManager 对应路径获取

4. 系统架构设计

4.1 整体架构图

Flink集群
通信层
应用层
Flink REST API
JobManager
TaskManager
RestTemplate
客户端应用
Flink监控服务

4.2 核心组件

  1. FlinkRestClient: 封装对 Flink REST API 的调用
  2. JobService: 提供作业相关业务逻辑
  3. LogService: 提供日志相关业务逻辑
  4. FlinkConfig: 配置类,管理 Flink 集群地址等参数

5. 接口设计

5.1 获取作业ID列表接口

接口路径: GET /api/flink/jobs

请求参数: 无

响应格式:

{"jobs": [{"id": "123e4567-e89b-12d3-a456-426614174000","name": "MyFlinkJob","status": "RUNNING","start-time": 1618759200000,"end-time": -1,"duration": 3600000,"last-modification": 1618759200000,"tasks": {"total": 5,"created": 0,"scheduled": 0,"deploying": 0,"running": 5,"finished": 0,"canceling": 0,"canceled": 0,"failed": 0,"reconciling": 0}}]
}

5.2 获取任务数量统计接口

接口路径: GET /api/flink/job-count

请求参数: 无

响应格式:

{"totalJobs": 15,"runningJobs": 8,"finishedJobs": 5,"failedJobs": 2,"cancelledJobs": 0
}

5.3 获取任务状态详细信息接口

接口路径: GET /api/flink/jobs/{jobId}

请求参数:

  • jobId (路径参数): 作业ID

响应格式:

{"jid": "123e4567-e89b-12d3-a456-426614174000","name": "MyFlinkJob","isStoppable": false,"state": "RUNNING","start-time": 1618759200000,"end-time": -1,"duration": 3600000,"maxParallelism": -1,"now": 1618762800000,"timestamps": {"CREATED": 1618759100000,"RUNNING": 1618759200000},"vertices": [{"id": "vertex1","name": "Source: Custom Source","maxParallelism": 128,"parallelism": 2,"status": "RUNNING","start-time": 1618759200000,"end-time": -1,"duration": 3600000,"tasks": {"CREATED": 0,"SCHEDULED": 0,"DEPLOYING": 0,"RUNNING": 2,"FINISHED": 0,"CANCELING": 0,"CANCELED": 0,"FAILED": 0,"RECONCILING": 0},"metrics": {"read-bytes": 0,"read-bytes-complete": true,"write-bytes": 1024000,"write-bytes-complete": true,"read-records": 0,"read-records-complete": true,"write-records": 10000,"write-records-complete": true}}],"status-counts": {"CREATED": 0,"SCHEDULED": 0,"DEPLOYING": 0,"RUNNING": 5,"FINISHED": 0,"CANCELING": 0,"CANCELED": 0,"FAILED": 0,"RECONCILING": 0},"plan": {"jid": "123e4567-e89b-12d3-a456-426614174000","name": "MyFlinkJob","nodes": [{"id": "node1","parallelism": 2,"operator": "Source: Custom Source","operator_strategy": "","description": "Source: Custom Source","inputs": [],"optimizer_properties": {}}]}
}

5.4 获取任务日志信息接口

接口路径: GET /api/flink/jobs/{jobId}/logs

请求参数:

  • jobId (路径参数): 作业ID
  • taskId (可选): 特定任务ID
  • lines (可选): 返回日志行数,默认100

响应格式:

{"jobId": "123e4567-e89b-12d3-a456-426614174000","taskId": "vertex1","logs": ["2023-04-18 17:20:00,000 INFO  org.apache.flink.runtime.taskmanager.TaskImpl [] - Loading JAR files for task.","2023-04-18 17:20:01,250 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default state backend.","2023-04-18 17:20:02,100 INFO  org.apache.flink.runtime.taskmanager.TaskImpl [] - Registering task at TaskManager."]
}

5.5 获取JobManager日志接口

接口路径: GET /api/flink/jobmanager/logs

请求参数:

  • lines (可选): 返回日志行数,默认100

响应格式:

{"component": "JobManager","logs": ["2023-04-18 17:15:00,000 INFO  org.apache.flink.runtime.jobmaster.JobMaster [] - Starting JobMaster.","2023-04-18 17:15:01,500 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC service.","2023-04-18 17:15:02,300 INFO  org.apache.flink.runtime.blob.BlobServer [] - Started BLOB server at 0.0.0.0:6124."]
}

6. 实现细节

6.1 配置文件设计

application.yml 中添加以下配置:

flink:rest:# Flink JobManager 地址base-url: http://localhost:8081# 连接超时时间(毫秒)connect-timeout: 5000# 读取超时时间(毫秒)read-timeout: 10000

6.2 核心实体类设计

6.2.1 作业概览实体类 (JobOverview.java)
public class JobOverview {private String id;private String name;private String status;private long startTime;private long endTime;private long duration;private long lastModification;private JobTasks tasks;// getters and setters
}public class JobTasks {private int total;private int created;private int scheduled;private int deploying;private int running;private int finished;private int canceling;private int canceled;private int failed;private int reconciling;// getters and setters
}
6.2.2 作业详情实体类 (JobDetails.java)
public class JobDetails {private String jid;private String name;private boolean isStoppable;private String state;private long startTime;private long endTime;private long duration;private int maxParallelism;private long now;private Map<String, Long> timestamps;private List<Vertex> vertices;private Map<String, Integer> statusCounts;private ExecutionPlan plan;// getters and setters
}
6.2.3 任务顶点实体类 (Vertex.java)
public class Vertex {private String id;private String name;private int maxParallelism;private int parallelism;private String status;private long startTime;private long endTime;private long duration;private TaskStatusCount tasks;private VertexMetrics metrics;// getters and setters
}
6.2.4 日志实体类 (LogInfo.java)
public class LogInfo {private String jobId;private String taskId;private List<String> logs;// getters and setters
}

6.3 核心服务类设计

6.3.1 Flink REST 客户端 (FlinkRestClient.java)
@Component
public class FlinkRestClient {@Value("${flink.rest.base-url}")private String baseUrl;private final RestTemplate restTemplate;public FlinkRestClient(RestTemplateBuilder builder,@Value("${flink.rest.connect-timeout}") int connectTimeout,@Value("${flink.rest.read-timeout}") int readTimeout) {this.restTemplate = builder.setConnectTimeout(Duration.ofMillis(connectTimeout)).setReadTimeout(Duration.ofMillis(readTimeout)).build();}/*** 获取所有作业概览信息*/public JobsOverview getJobsOverview() {String url = baseUrl + "/jobs";return restTemplate.getForObject(url, JobsOverview.class);}/*** 获取特定作业的详细信息*/public JobDetails getJobDetails(String jobId) {String url = baseUrl + "/jobs/" + jobId;return restTemplate.getForObject(url, JobDetails.class);}/*** 获取作业异常信息*/public JobExceptions getJobExceptions(String jobId) {String url = baseUrl + "/jobs/" + jobId + "/exceptions";return restTemplate.getForObject(url, JobExceptions.class);}/*** 获取作业日志信息*/public String getJobLogs(String jobId) {String url = baseUrl + "/jobs/" + jobId + "/exceptions";// 注意:实际Flink API可能不直接提供作业级别的完整日志// 可能需要从TaskManager获取或通过其他方式return restTemplate.getForObject(url, String.class);}/*** 获取JobManager日志*/public String getJobManagerLogs() {String url = baseUrl + "/jobmanager/log";return restTemplate.getForObject(url, String.class);}
}
6.3.2 作业服务类 (JobService.java)
@Service
public class JobService {@Autowiredprivate FlinkRestClient flinkRestClient;/*** 获取所有作业ID列表*/public List<String> getAllJobIds() {JobsOverview overview = flinkRestClient.getJobsOverview();return overview.getJobs().stream().map(JobOverview::getId).collect(Collectors.toList());}/*** 获取作业数量统计*/public JobCountStats getJobCountStats() {JobsOverview overview = flinkRestClient.getJobsOverview();JobCountStats stats = new JobCountStats();long total = overview.getJobs().size();long running = overview.getJobs().stream().filter(job -> "RUNNING".equals(job.getStatus())).count();long finished = overview.getJobs().stream().filter(job -> "FINISHED".equals(job.getStatus())).count();long failed = overview.getJobs().stream().filter(job -> "FAILED".equals(job.getStatus())).count();long cancelled = overview.getJobs().stream().filter(job -> "CANCELED".equals(job.getStatus())).count();stats.setTotalJobs(total);stats.setRunningJobs(running);stats.setFinishedJobs(finished);stats.setFailedJobs(failed);stats.setCancelledJobs(cancelled);return stats;}/*** 获取作业详细状态信息*/public JobDetails getJobDetails(String jobId) {return flinkRestClient.getJobDetails(jobId);}
}
6.3.3 日志服务类 (LogService.java)
@Service
public class LogService {@Autowiredprivate FlinkRestClient flinkRestClient;/*** 获取作业日志信息*/public LogInfo getJobLogs(String jobId, String taskId, int lines) {// 实际实现可能需要结合多个API调用// 因为Flink的REST API不直接提供完整的作业级别日志LogInfo logInfo = new LogInfo();logInfo.setJobId(jobId);logInfo.setTaskId(taskId);// 这里只是一个示例实现,实际需要根据Flink的具体API进行调整String rawLogs = flinkRestClient.getJobLogs(jobId);List<String> logs = parseLogs(rawLogs, lines);logInfo.setLogs(logs);return logInfo;}/*** 获取JobManager日志*/public LogInfo getJobManagerLogs(int lines) {String rawLogs = flinkRestClient.getJobManagerLogs();List<String> logs = parseLogs(rawLogs, lines);LogInfo logInfo = new LogInfo();logInfo.setLogs(logs);return logInfo;}/*** 解析并截取最新的日志行*/private List<String> parseLogs(String rawLogs, int lines) {if (rawLogs == null || rawLogs.isEmpty()) {return new ArrayList<>();}String[] logLines = rawLogs.split("\n");int start = Math.max(0, logLines.length - lines);return Arrays.asList(Arrays.copyOfRange(logLines, start, logLines.length));}
}

6.4 控制器类设计

6.4.1 Flink监控控制器 (FlinkMonitorController.java)
@RestController
@RequestMapping("/api/flink")
public class FlinkMonitorController {@Autowiredprivate JobService jobService;@Autowiredprivate LogService logService;/*** 获取所有作业列表*/@GetMapping("/jobs")public ResponseEntity<JobsOverview> getAllJobs() {JobsOverview jobs = jobService.getAllJobs();return ResponseEntity.ok(jobs);}/*** 获取作业数量统计*/@GetMapping("/job-count")public ResponseEntity<JobCountStats> getJobCount() {JobCountStats stats = jobService.getJobCountStats();return ResponseEntity.ok(stats);}/*** 获取特定作业的详细信息*/@GetMapping("/jobs/{jobId}")public ResponseEntity<JobDetails> getJobDetails(@PathVariable String jobId) {JobDetails details = jobService.getJobDetails(jobId);return ResponseEntity.ok(details);}/*** 获取作业日志信息*/@GetMapping("/jobs/{jobId}/logs")public ResponseEntity<LogInfo> getJobLogs(@PathVariable String jobId,@RequestParam(required = false) String taskId,@RequestParam(defaultValue = "100") int lines) {LogInfo logs = logService.getJobLogs(jobId, taskId, lines);return ResponseEntity.ok(logs);}/*** 获取JobManager日志*/@GetMapping("/jobmanager/logs")public ResponseEntity<LogInfo> getJobManagerLogs(@RequestParam(defaultValue = "100") int lines) {LogInfo logs = logService.getJobManagerLogs(lines);return ResponseEntity.ok(logs);}
}

7. 错误处理机制

7.1 异常类型定义

public class FlinkApiException extends RuntimeException {private int statusCode;public FlinkApiException(String message, int statusCode) {super(message);this.statusCode = statusCode;}public FlinkApiException(String message, Throwable cause, int statusCode) {super(message, cause);this.statusCode = statusCode;}// getters
}

7.2 全局异常处理器

@ControllerAdvice
public class FlinkApiExceptionHandler {@ExceptionHandler(FlinkApiException.class)public ResponseEntity<ErrorResponse> handleFlinkApiException(FlinkApiException ex) {ErrorResponse error = new ErrorResponse();error.setMessage(ex.getMessage());error.setCode(ex.getStatusCode());error.setTimestamp(System.currentTimeMillis());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);}@ExceptionHandler(Exception.class)public ResponseEntity<ErrorResponse> handleGenericException(Exception ex) {ErrorResponse error = new ErrorResponse();error.setMessage("Internal server error");error.setCode(500);error.setTimestamp(System.currentTimeMillis());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);}
}

8. 部署与配置

8.1 Maven依赖配置

pom.xml 中添加必要的依赖:

<dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Configuration Processor --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

8.2 应用启动类

@SpringBootApplication
public class FlinkMonitorApplication {public static void main(String[] args) {SpringApplication.run(FlinkMonitorApplication.class, args);}
}

9. 测试方案

9.1 单元测试

为每个服务类编写单元测试,验证业务逻辑正确性。

9.2 集成测试

使用 MockWebServer 或类似工具模拟 Flink REST API 响应,测试整个调用链路。

9.3 性能测试

评估在高并发情况下的接口响应时间和系统稳定性。

10. 安全考虑

10.1 认证与授权

如果 Flink 集群启用了安全认证,需要在 RestTemplate 中配置相应的认证信息。

10.2 数据传输安全

建议通过 HTTPS 连接到 Flink REST API,确保数据传输的安全性。

10.3 输入验证

对接口输入参数进行严格验证,防止恶意输入导致系统异常。

11. 监控与日志

11.1 接口调用监控

记录每次对 Flink REST API 的调用情况,包括响应时间、成功率等指标。

11.2 错误日志记录

详细记录系统运行过程中的错误信息,便于问题排查。

12. 扩展性考虑

12.1 缓存机制

对于不经常变化的数据(如作业配置),可以引入缓存机制减少对 Flink API 的频繁调用。

12.2 异步处理

对于耗时较长的操作,可以采用异步处理方式提高系统响应速度。

12.3 多集群支持

设计支持多个 Flink 集群的监控能力,满足复杂部署环境的需求。

13. 总结

本文档详细设计了一个基于 Spring Boot 的 Flink 1.19 REST API 集成方案,提供了获取作业信息、日志信息、状态统计等核心功能。通过合理的架构设计和服务划分,确保了系统的可维护性和扩展性。后续开发过程中可以根据实际需求进行适当调整和完善。

http://www.dtcms.com/a/508631.html

相关文章:

  • RoniaKit QML仪表盘开发指南:从零开始创建专业仪表板
  • 版本控制与GitLab完整实践指南
  • bash 基础编程的核心语法
  • 中山品牌网站建设报价做网站首页置顶多少钱
  • 京紫元年深圳网站建设欧美风格网站特点
  • 企业网站开发制作合同wordpress禁止图片点击
  • 做英文网站2014上海画册设计
  • 美颜SDK集成实录:打造兼容多端的直播一键美颜系统
  • 资料分析-增长量
  • 网站开发种类视频号推广入口
  • 南通做网站ntwsd椒江设计公司
  • 建设l旅游网站目的及功能定位域名访问过程会不会影响网站访问
  • 做门户网站建设多少钱wordpress js调用
  • 广州企业网站设计制作佛山网站的建设
  • 网络地址转换(NAT)和ISP(互联网服务提供商)
  • a5站长网宁波建网站选哪家好点
  • 三菱PLC与汇川伺服驱动器通讯实现:EtherCAT转CC-Link IE FB协议转换网关配置案例
  • 竞逐AI内容,爱奇艺先出手了
  • 淄博哪有做网站的微信同步wordpress
  • 【AI赋能未来】探索学前教育研究的智能化新范式
  • 网站模板 招聘单位网站建设管理工作总结
  • 在 Ubuntu24.04 上安装 JDK 21(Java 21)
  • 容县网站建设微信公众号流程图
  • Aiseesoft_iPhone_Unlocker
  • 网站开发规范拼多多福利券小程序怎么赚钱
  • 网站建设费用包括哪些内容wordpress边框
  • YOLO V3 目标检测教程:从样本分类到性能评价
  • 本地利用wordpress建站老域名重新做网站
  • [ SpringBoot ] 新手小白的详细使用方法
  • 私人免费网站怎么下载珠海网站制作计划