当前位置: 首页 > wzjs >正文

aspx网站地图url中的参数怎么办游戏推广平台有哪些

aspx网站地图url中的参数怎么办,游戏推广平台有哪些,wordpress修改社交标签,襄樊seoResourceMangaer ResourceManager 是 Flink 集群中的核心组件之一,负责资源的申请、分配与回收。在具备对以下底层机制的理解后: HighAvailabilityServices:提供 Leader 选举、地址监听等高可用能力;RpcSystem:为 Flin…

ResourceMangaer

ResourceManager 是 Flink 集群中的核心组件之一,负责资源的申请、分配与回收。在具备对以下底层机制的理解后:

  • HighAvailabilityServices:提供 Leader 选举、地址监听等高可用能力;
  • RpcSystem:为 Flink 分布式组件之间的通信提供统一抽象;

我们就已经具备了阅读 ResourceManager 启动流程源码的基础。

DefaultDispatcherResourceManagerComponentFactory.create方法

  • 前面初始化RpcSystem,blobServer等组件就不解析了。自己去看初始化的过程。
@Overridepublic DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor ioExecutor,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,DelegationTokenManager delegationTokenManager,MetricRegistry metricRegistry,ExecutionGraphInfoStore executionGraphInfoStore,MetricQueryServiceRetriever metricQueryServiceRetriever,Collection<FailureEnricher> failureEnrichers,FatalErrorHandler fatalErrorHandler)throws Exception {//这些都是 监听组件LeaderRetrievalService dispatcherLeaderRetrievalService = null;LeaderRetrievalService resourceManagerRetrievalService = null;WebMonitorEndpoint<?> webMonitorEndpoint = null;ResourceManagerService resourceManagerService = null;DispatcherRunner dispatcherRunner = null;try {dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever();resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));final ScheduledExecutorService executor =WebMonitorEndpoint.createExecutorService(configuration.get(RestOptions.SERVER_NUM_THREADS),configuration.get(RestOptions.SERVER_THREAD_PRIORITY),"DispatcherRestEndpoint");final long updateInterval =configuration.get(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);final MetricFetcher metricFetcher =updateInterval == 0? VoidMetricFetcher.INSTANCE: MetricFetcherImpl.fromConfiguration(configuration,metricQueryServiceRetriever,dispatcherGatewayRetriever,executor);webMonitorEndpoint =restEndpointFactory.createRestEndpoint(configuration,dispatcherGatewayRetriever,resourceManagerGatewayRetriever,blobServer,executor,metricFetcher,highAvailabilityServices.getClusterRestEndpointLeaderElection(),fatalErrorHandler);log.debug("Starting Dispatcher REST endpoint.");webMonitorEndpoint.start();final String hostname = RpcUtils.getHostname(rpcService);resourceManagerService =//这一步是关键,初始化 resourceManagerService。内部就是将 ResourceManager 初始化在 akka上。但是状态是未启动的ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);final HistoryServerArchivist historyServerArchivist =HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);final DispatcherOperationCaches dispatcherOperationCaches =new DispatcherOperationCaches(configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));final PartialDispatcherServices partialDispatcherServices =new PartialDispatcherServices(configuration,highAvailabilityServices,resourceManagerGatewayRetriever,blobServer,heartbeatServices,() ->JobManagerMetricGroup.createJobManagerMetricGroup(metricRegistry, hostname),executionGraphInfoStore,fatalErrorHandler,historyServerArchivist,metricRegistry.getMetricQueryServiceGatewayRpcAddress(),ioExecutor,dispatcherOperationCaches,failureEnrichers);log.debug("Starting Dispatcher.");dispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElection(),fatalErrorHandler,new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,rpcService,partialDispatcherServices);log.debug("Starting ResourceManagerService.");//这里才是启动了状态resourceManagerService.start();//这里启动了监听resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);return new DispatcherResourceManagerComponent(dispatcherRunner,resourceManagerService,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,fatalErrorHandler,dispatcherOperationCaches);} catch (Exception exception) {// clean up all started componentsif (dispatcherLeaderRetrievalService != null) {try {dispatcherLeaderRetrievalService.stop();} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(e, exception);}}if (resourceManagerRetrievalService != null) {try {resourceManagerRetrievalService.stop();} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(e, exception);}}final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);if (webMonitorEndpoint != null) {terminationFutures.add(webMonitorEndpoint.closeAsync());}if (resourceManagerService != null) {terminationFutures.add(resourceManagerService.closeAsync());}if (dispatcherRunner != null) {terminationFutures.add(dispatcherRunner.closeAsync());}final FutureUtils.ConjunctFuture<Void> terminationFuture =FutureUtils.completeAll(terminationFutures);try {terminationFuture.get();} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(e, exception);}throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);}}

ResourceManagerServiceImpl

在 Flink 的集群管理中,ResourceManagerServiceImpl资源管理组件 ResourceManager 的封装服务,用于支持高可用部署(HA)。它实现了两个重要接口:

1. ResourceManagerService
  • 提供对外生命周期控制(如 start()closeAsync());
  • DispatcherResourceManagerComponent 所依赖的通用接口;
  • 作为 ResourceManager 的启动入口。
2. LeaderContender
  • 参与高可用 Leader 选举的核心接口;
  • 当当前节点被选为 Leader 时,Flink HA 模块会调用 grantLeadership(UUID) 启动 ResourceManager
  • 当失去 Leader 身份时,调用 revokeLeadership() 触发回收逻辑。

内部结构:延迟创建 ResourceManager

ResourceManagerServiceImpl 内部并不会立即创建 ResourceManager 实例,而是通过一个工厂方法延迟生成:

  • 这是一个工厂接口(泛型类型可对应 YARN/K8s/Standalone 等部署环境);

  • 真正的 ResourceManager 实例(如 StandaloneResourceManager)是在当选 Leader 时,由该工厂调用 createResourceManager() 创建出来;

  • 启动流程会将生成的实例启动并注册 RPC 服务。

ResourceManagerServiceImpl部分源码解析

private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory,ResourceManagerProcessContext rmProcessContext)throws Exception {this.resourceManagerFactory = checkNotNull(resourceManagerFactory);this.rmProcessContext = checkNotNull(rmProcessContext);//在standalone模式下,默认返回一个LeaderElectionthis.leaderElection =rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();this.ioExecutor = rmProcessContext.getIoExecutor();this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();this.serviceTerminationFuture = new CompletableFuture<>();this.running = false;this.leaderResourceManager = null;this.leaderSessionID = null;this.previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();}@Overridepublic void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");//this表明自己作为组件启动leaderElection.startLeaderElection(this);}@Overridepublic void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");leaderElection.startLeaderElection(this);}@Overridepublic void grantLeadership(UUID newLeaderSessionID) {handleLeaderEventExecutor.execute(() -> {synchronized (lock) {if (!running) {LOG.info("Resource manager service is not running. Ignore granting leadership with session ID {}.",newLeaderSessionID);return;}LOG.info("Resource manager service is granted leadership with session id {}.",newLeaderSessionID);try {//这里启动真正的 ResourceManagerstartNewLeaderResourceManager(newLeaderSessionID);} catch (Throwable t) {fatalErrorHandler.onFatalError(new FlinkException("Cannot start resource manager.", t));}}});}@GuardedBy("lock")private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Exception {stopLeaderResourceManager();this.leaderSessionID = newLeaderSessionID;//工厂类方法,返回一个真正的 leaderResourceManagerthis.leaderResourceManager =resourceManagerFactory.createResourceManager(rmProcessContext, newLeaderSessionID);final ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager;//previousResourceManagerTerminationFuture 是一个 FutureUtils.completedVoidFuture();表示已经完成的 Future。可以立马被运行previousResourceManagerTerminationFuture//对结果调用函数并执行返回的future.thenComposeAsync((ignore) -> {synchronized (lock) {return startResourceManagerIfIsLeader(newLeaderResourceManager);}},handleLeaderEventExecutor)//对上一步返回的结果调用。.thenAcceptAsync((isStillLeader) -> {if (isStillLeader) {leaderElection.confirmLeadershipAsync(newLeaderSessionID, newLeaderResourceManager.getAddress());}},ioExecutor);}@GuardedBy("lock")private CompletableFuture<Boolean> startResourceManagerIfIsLeader(ResourceManager<?> resourceManager) {if (isLeader(resourceManager)) {//这一步关键,调用了 rpcServer.start方法。真正启动了 resourcemanager方法resourceManager.start();forwardTerminationFuture(resourceManager);return resourceManager.getStartedFuture().thenApply(ignore -> true);} else {return CompletableFuture.completedFuture(false);}}
http://www.dtcms.com/wzjs/337239.html

相关文章:

  • 千度网站如何免费制作网站
  • 智能自助建站系统源码信息流推广
  • 做外贸面料哪个网站可以接单seo优化软件购买
  • 怎样查看网站是用什么cms_做的seo网络推广报价
  • 做网站 除了域名网络推广有前途吗
  • 网站建设历史注册域名查询网站官网
  • 网页制作免费网站建设网站外包
  • 网站建设中数据库网站制作企业
  • 做汽车网站怎么挣钱吗zac博客seo
  • 做网站建设电话销售深圳sem竞价托管
  • 成都网站建设报价怎样在平台上发布信息推广
  • 兰州市疫情最新政策公告重庆搜索引擎seo
  • 柳州网站建设多少钱重大新闻事件2023
  • 景安 怎么把网站做别名如何做游戏推广
  • 叙述一个网站的建设过程吸引人气的营销方案
  • 多平台网站建设百度优化服务
  • 网站备案查询 工信部防疫优化措施
  • 免费注册网站怎么做链接二十条优化
  • 如何看出网站用的是什么cms程序短链接在线生成官网
  • 网站建设步骤及分工网络营销案例分析题及答案
  • 涂料厂做网站有用吗seo站长之家
  • php网站建设模板下载网站底部友情链接代码
  • 西峡微网站开发同城发广告的平台有哪些
  • 深圳新冠病毒最新消息上海网站营销seo方案
  • 网站做ssl证书有风险sem 推广软件
  • 网站建设做网站seo引擎优化
  • 江苏省建设类高工申报网站关键词优化是怎样收费的
  • 济南建网站市场广州优化公司哪家好
  • 睢宁网站建设九江seo公司
  • 公司网站的重要性深圳优化公司