Flink启动任务
Flink
以本地运行作为解读,版本1.16.0
文章目录
- Flink
- 前言
- StreamExecutionEnvironment
- LocalExecutor
- MiniCluster
- 启动MiniCluster
- TaskManager
- TaskExecutor
- StreamGraph
- 二、使用步骤
- 1.引入库
- 2.读入数据
- 总结
前言
提示:这里可以添加本文要记录的大概内容:
例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。
提示:以下是本篇文章正文内容,下面案例可供参考
StreamExecutionEnvironment
流执行环境:本地使用:LocalStreamEnvironment,远程使用:RemoteStreamEnvironment。
1.1 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.1.1 获取所有算子,只能包含一个sink(输出)类型的算子。
1.1.2 调用方法getStreamGraph()将算子转换为流图(StreamGraph)。
1.1.3 调用execute(StreamGraph streamGraph)。
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
StreamGraph streamGraph = getStreamGraph();
if (jobName != null) {
streamGraph.setJobName(jobName);
}
try {
return execute(streamGraph);
} catch (Throwable t) {
// Retry without cache if it is caused by corrupted cluster dataset.
invalidateCacheTransformations(originalTransformations);
streamGraph = getStreamGraph(originalTransformations);
return execute(streamGraph);
}
}
1.2 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.2.1 通过executeAsync(StreamGraph streamGraph),方法异步执行流图。
1.2.2 根据返回的JobClient,用户控制作业的执行。
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
return null;
}
}
1.3 通过StreamExecutionEnvironment调用executeAsync(StreamGraph streamGraph)方法。
1.3.1 通过getPipelineExecutor()方法获取PipelineExecutor为LocalExecutor。
1.3.2 LocalExecutor根据提供的工作流图,并执行。获取JobClient,允许与正在执行的作业进行交互。例如:取消作业或获取保存点。
1.3.3 JobListener监听JobClient。
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
final PipelineExecutor executor = getPipelineExecutor();
CompletableFuture<JobClient> jobClientFuture =
executor.execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
collectIterators.clear();
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
LocalExecutor
实现了PipelineExecutor,负责执行StreamGraph,即用户提交的作业。
- 将StreamGraph通过getJobGraph(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)生成为JobGraph。
- 通过submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader)方法创建一个MiniCluster并提交一个任务(Job)。
public CompletableFuture<JobClient> execute(
Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(jobGraph, userCodeClassloader);
}
MiniCluster
本地执行Job任务
启动MiniCluster
1.1 创建workingDirectory,WorkingDirectory:该类管理进程或实例的工作目录,当已经被实例化后,该类确保指定的工作目录已存在。
1.2 创建metricRegistry,MetricRegistry:该类跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接。
1.3 创建commonRpcService,RpcService:rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。
1.4 taskManagerRpcServiceFactory = commonRpcService
1.5 创建metricQueryServiceRpcService,RpcService
1.6 将metricQueryServiceRpcService设置到metricRegistry的用于初始化MetricQueryService。
1.7 创建processMetricGroup,ProcessMetricGroup,并设置metricRegistry。
1.8 创建ioExecutor,ExecutorService,线程池用于任务执行。
1.9 创建haServicesFactory,HighAvailabilityServicesFactory,创建高可用服务工厂。
1.20 创建haServicesFactory,通过高可用工厂创建高可用服务,并将当前注入到ioExecutor。HighAvailabilityServices:高可用服务可以访问所有支持高可用的组件(服务),这些服务提供了高可用的存储和服务注册,以及分布式计算和领导者选举。
ResourceManager 领导者选举并获取领导者信息
JobManager 领导者选举并获取领导者信息
Persistence 用户检查点元数据
Registering 最新完成的检查点
Persistence BLOB(二进制大对象)存储
Registry 标记作业状态
Naming RPC站点
1.21 创建blobServer并启动,BlobServer:BLOB服务器负责监听传入的请求,并生成线程来处理这些请求。此外,它还负责创建目录结构来存储BLOB或临时缓存它们。
1.22 创建heartbeatServices,HeartbeatServices:提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送器。
1.23 创建delegationTokenManager,传入了commonRpcService.getScheduledExecutor()和ioExecutor。
DelegationTokenManager:Flink集群代理所有的Token管理器。代理Token启动后,此管理器将确保长时间运行的应用程序在访问安全服务时可以不中断地运行。它必须联系所有配置的安全服务,以获取要分发给应用程序其余部分的委托令牌。
1.24 创建blobCacheService传入workingDirectory.getBlobStorageDirectory(),haServices.createBlobStore(),InetSocketAddress的地址。
BlobCacheService:存储为永久或临时BLOB,并提供对BLOB服务的访问
1.25 startTaskManagers()启动TaskManager服务
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "MiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final boolean useSingleRpcService =
miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
//管理进程或实例的工作目录
workingDirectory =
WorkingDirectory.create(
ClusterEntrypointUtils.generateWorkingDirectoryFile(
configuration,
Optional.of(PROCESS_WORKING_DIR_BASE),
"minicluster_" + ResourceID.generate()));
initializeIOFormatClasses(configuration);
rpcSystem = rpcSystemSupplier.get();
LOG.info("Starting Metrics Registry");
//跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接
metricRegistry =
createMetricRegistry(
configuration,
rpcSystem.deref().getMaximumMessageSizeInBytes(configuration));
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
final RpcService metricQueryServiceRpcService;
if (useSingleRpcService) {
// we always need the 'commonRpcService' for auxiliary calls
//rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程
commonRpcService = createLocalRpcService(configuration, rpcSystem.deref());
final CommonRpcServiceFactory commonRpcServiceFactory =
new CommonRpcServiceFactory(commonRpcService);
taskManagerRpcServiceFactory = commonRpcServiceFactory;
dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
metricQueryServiceRpcService =
MetricUtils.startLocalMetricsRpcService(
configuration, rpcSystem.deref());
} else {
// start a new service per component, possibly with custom bind addresses
final String jobManagerExternalAddress =
miniClusterConfiguration.getJobManagerExternalAddress();
final String taskManagerExternalAddress =
miniClusterConfiguration.getTaskManagerExternalAddress();
final String jobManagerExternalPortRange =
miniClusterConfiguration.getJobManagerExternalPortRange();
final String taskManagerExternalPortRange =
miniClusterConfiguration.getTaskManagerExternalPortRange();
final String jobManagerBindAddress =
miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress =
miniClusterConfiguration.getTaskManagerBindAddress();
dispatcherResourceManagerComponentRpcServiceFactory =
new DedicatedRpcServiceFactory(
configuration,
jobManagerExternalAddress,
jobManagerExternalPortRange,
jobManagerBindAddress,
rpcSystem.deref());
taskManagerRpcServiceFactory =
new DedicatedRpcServiceFactory(
configuration,
taskManagerExternalAddress,
taskManagerExternalPortRange,
taskManagerBindAddress,
rpcSystem.deref());
// we always need the 'commonRpcService' for auxiliary calls
// bind to the JobManager address with port 0
commonRpcService =
createRemoteRpcService(
configuration, jobManagerBindAddress, 0, rpcSystem.deref());
metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
commonRpcService.getAddress(),
null,
rpcSystem.deref());
}
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
RpcUtils.getHostname(commonRpcService),
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
//创建线程池,执行对应的任务
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
//高可用服务工厂
haServicesFactory = createHighAvailabilityServicesFactory(configuration);
//高可用服务
haServices = createHighAvailabilityServices(configuration, ioExecutor);
//BLOB(二进制大对象)存储服务创建并启用
blobServer =
BlobUtils.createBlobServer(
configuration,
Reference.borrowed(workingDirectory.getBlobStorageDirectory()),
haServices.createBlobStore());
blobServer.start();
//监控所有服务心跳检测
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
//代理Tokne管理器
delegationTokenManager =
KerberosDelegationTokenManagerFactory.create(
getClass().getClassLoader(),
configuration,
commonRpcService.getScheduledExecutor(),
ioExecutor);
//BLOB缓存服务:提供永久和临时的存储,并提供对BLOB服务的访问
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
Reference.borrowed(workingDirectory.getBlobStorageDirectory()),
haServices.createBlobStore(),
new InetSocketAddress(
InetAddress.getLocalHost(), blobServer.getPort()));
startTaskManagers();
MetricQueryServiceRetriever metricQueryServiceRetriever =
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService());
setupDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagerComponentRpcServiceFactory,
metricQueryServiceRetriever);
resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
clusterRestEndpointLeaderRetrievalService =
haServices.getClusterRestEndpointLeaderRetriever();
dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
commonRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
commonRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
webMonitorLeaderRetriever = new LeaderRetriever();
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
} catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}
// create a new termination future
terminationFuture = new CompletableFuture<>();
// now officially mark this as running
running = true;
LOG.info("Flink Mini Cluster started successfully");
}
}
TaskManager
MiniCluster中的start(),调用下列方法
- 根据配置的TaskManager的个数启动,默认是1。
- 创建TaskExecutor,
- 并添加到taskManagers数组中,
private void startTaskManagers() throws Exception {
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
for (int i = 0; i < numTaskManagers; i++) {
startTaskManager();
}
}
public void startTaskManager() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final TaskExecutor taskExecutor =
TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
workingDirectory.createSubWorkingDirectory("tm_" + taskManagers.size()),
taskManagerTerminatingFatalErrorHandlerFactory.create(
taskManagers.size()));
taskExecutor.start();
taskManagers.add(taskExecutor);
}
}
TaskExecutor
任务执行器,负责多个Task任务执行。
StreamGraph
生成执行图
二、使用步骤
1.引入库
代码如下(示例):
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
2.读入数据
代码如下(示例):
data = pd.read_csv(
'https://labfile.oss.aliyuncs.com/courses/1283/adult.data.csv')
print(data.head())
该处使用的url网络请求的数据。
总结
提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。