当前位置: 首页 > wzjs >正文

做网站要怎么找单网络商城应该如何推广

做网站要怎么找单,网络商城应该如何推广,政府网站的ipv6建设方案,网站空间 上传程序Gravitino SparkConnector 实现原理 本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…

Gravitino SparkConnector 实现原理

本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理

文章目录

  • Gravitino SparkConnector 实现原理
    • 背景知识-Spark Plugin 介绍
        • (1) **插件加载**
        • (2) **DriverPlugin 初始化**
        • (3) **ExecutorPlugin 初始化**
        • (4) **插件执行**
        • (5) **插件销毁**
    • 背景知识-Driver Plugin 介绍
        • (1) **`init` 方法**
        • (2) **`registerMetrics` 方法**
        • (3) **`onTaskStart` 方法**
        • (4) **`onTaskSucceeded` 方法**
        • (5) **`onTaskFailed` 方法**
        • (6) **`close` 方法**
    • SparkConnector使用方式
      • 加载spark.sql.catalog.xxx 具体执行的配置

背景知识-Spark Plugin 介绍

spark在[spark-29399]pr提交更新了SparkPlugin插件

SparkPlugin插件执行生命周期

SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致,具体如下:

(1) 插件加载
  • 当 Spark 应用程序启动时,Spark 会扫描类路径下的 SparkPlugin 实现类。
  • 如果插件被正确配置(例如通过 spark.plugins 配置项),Spark 会实例化该类。
(2) DriverPlugin 初始化
  • Spark 调用 driverPlugin() 方法,获取 DriverPlugin 实例。
  • DriverPlugin 的生命周期开始,其方法(如 initregisterMetrics 等)会被调用。
(3) ExecutorPlugin 初始化
  • Spark 调用 executorPlugin() 方法,获取 ExecutorPlugin 实例。
  • ExecutorPlugin 的生命周期开始,其方法(如 initshutdown 等)会被调用。
(4) 插件执行
  • DriverPlugin 在 Driver 端执行自定义逻辑,例如注册指标、拦截 SQL 解析、修改 Catalog 等。
  • ExecutorPlugin 在 Executor 端执行自定义逻辑,例如监控 Task 执行、收集指标等。
(5) 插件销毁
  • 当 Spark 应用程序结束时,DriverPluginExecutorPlugin 的生命周期结束,其 close() 方法会被调用以释放资源。

背景知识-Driver Plugin 介绍

DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件,其生命周期方法包括:

(1) init 方法
  • 在 Driver 插件初始化时调用。
  • 可以在此方法中执行初始化逻辑,例如注册自定义 Catalog、拦截 SQL 解析器等。
(2) registerMetrics 方法
  • 在 Driver 插件初始化时调用。
  • 可以在此方法中注册自定义指标(Metrics)。
(3) onTaskStart 方法
  • 在 Task 启动时调用。
  • 可以在此方法中执行与 Task 相关的逻辑。
(4) onTaskSucceeded 方法
  • 在 Task 成功完成时调用。
  • 可以在此方法中执行与 Task 成功相关的逻辑。
(5) onTaskFailed 方法
  • 在 Task 失败时调用。
  • 可以在此方法中执行与 Task 失败相关的逻辑。
(6) close 方法
  • 在 Driver 插件销毁时调用。
  • 可以在此方法中释放资源,例如关闭连接、清理缓存等。

SparkConnector使用方式

./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive

可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin

public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}

可以看出实现方式很简单,仅仅使用了一个GravitinoDriverPlugin,也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数,init()shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。

在Driver端进行初始化

  1. 配置检查检查gravitino_uri和gravitino_metalake是否配置

  2. 如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置

  3. 初始化Gravtino客户端和GravitinoCatalogManager,并且将relational类型的表加载到缓存中

  4. 将缓存中的catalog进行如果是非iceberg类型(当前仅仅只有Hive)进行注册,这里定义的注册的实际操作配置Spark的配置项(spark.sql.catalog.catalogName)这里的catalogName对应的是缓存中的catalogName,配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。

  5. 然后注册SqlExtensions其实就是将第2步骤的数组配置到SPARK_SESSION_EXTENSIONS这个SparkConf配置里面

稍微贴一下注册Catalog代码,比较重要

  //初始化的时候调用注册逻辑,将Gravitino中的Catalog加载到缓存//然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}

到这里GravitinoConnector的代码机制已经说完了,下面聊聊Spark机制

加载spark.sql.catalog.xxx 具体执行的配置

经过上面GravitinoDriverPlugin的初始化之后,已经将具体的catalog名称和对应的处理类映射起来,这里以GravitinoHiveCatalogSpark33为例。

GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalogBaseCatalog是Spark中定义的CatalogPlugin的一个实现类。

Spark在解析SQL的时候会查找catalog对应的Catalog,可以看到调用了CatalogManager.catalog()方法

  private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}

这个catalog方法调用了Catalogs.load()方法

  def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}

这个方法才是真正的加载方法,他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中

   def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}

到这里流程就分析结束了

http://www.dtcms.com/wzjs/480203.html

相关文章:

  • 直播网站如何做域名查询注册信息查询
  • 网站建设打造微信加人推码35一单
  • 兰州市城乡建设局网站s104项目seo工具优化软件
  • 美国疫情都是假的抖音seo源码搭建
  • 淘宝装修做代码的网站美区下载的app怎么更新
  • 毕设代做的网站市场营销推广策略
  • 网站建设实习报告范文郑州网站优化顾问
  • 怎么做单位网站竞价推广招聘
  • 龙口建设网站qq群引流推广平台
  • 西安市住房和城乡建设局企业网站优化服务
  • 网站建设需求文档模版怎么样做推广最有效
  • 开发软件网站建设合肥网络推广培训学校
  • 做网站mac网站优化seo教程
  • 修改网站需要什么搜索引擎是网站吗
  • 河北省建设环境备案网站西安网络推广外包公司
  • 建设信基金管理有限公司网站百度网络营销推广
  • 周口网站建设.com站长统计app进入网址新版
  • 网站的营销方式有哪些购物网站有哪些
  • 上海专业网站建设电商seo是什么意思
  • 重庆做网站 哪个好些嘛站长收录平台
  • wordpress 做网站百度问答兼职怎么做
  • wordpress 嵌套评论专业网站优化公司
  • 沈阳做网站优化人工智能教育培训机构排名
  • 江门市住房和城乡建设局门户网站石家庄网络营销网站推广
  • 国内做心理咨询师培训出名的网站外贸网站平台都有哪些
  • 成都企业模版网站建设青岛网站建设策划
  • 成都网站优化软件百度竞价广告代理
  • 南平企业网站建设职业培训热门行业
  • 做网站好比做房子seo平台优化
  • 建设公司网站有用吗如何创建网址