当前位置: 首页 > news >正文

CSI-PVController

startPersistentVolumeBinderController()

  • 加载内置插件 ProbeControllerVolumePlugins()
  • 初始化PVController PersistentVolumes()
func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
    plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
    if err != nil {
        return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
    }
    filteredDialOptions, err := options.ParseVolumeHostFilters(
        ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
        ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
    if err != nil {
        return nil, true, err
    }
    params := persistentvolumecontroller.ControllerParameters{
        KubeClient:                ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
        SyncPeriod:                ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
        VolumePlugins:             plugins,
        Cloud:                     ctx.Cloud,
        ClusterName:               ctx.ComponentConfig.KubeCloudShared.ClusterName,
        VolumeInformer:            ctx.InformerFactory.Core().V1().PersistentVolumes(),
        ClaimInformer:             ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
        ClassInformer:             ctx.InformerFactory.Storage().V1().StorageClasses(),
        PodInformer:               ctx.InformerFactory.Core().V1().Pods(),
        NodeInformer:              ctx.InformerFactory.Core().V1().Nodes(),
        EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
        FilteredDialOptions:       filteredDialOptions,
    }
    volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
    if volumeControllerErr != nil {
        return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
    }
    go volumeController.Run(ctx.Stop)
    return nil, true, nil
}

NewController()

  • 初始化PersistentVolumeController对象
  • 初始化了所有的VolumePlugin 我们使用的是CSI
  • Informer主要关注了PVC和PV的变化,监听产生变化的对象加到队列中处理
  • 然后又初始化了claims和volumes。除了informer外,PVC控制器自建的缓存,主要用来储存pvc和pv,后面claimworker和volumeworker会用到
// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
       //eventRecorder用于记录异常和关键信息,以Event资源形式记录在APIServer并和event的来源建立绑定关系
    eventRecorder := p.EventRecorder
    if eventRecorder == nil {
        broadcaster := record.NewBroadcaster()
        broadcaster.StartStructuredLogging(0)
        broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
        eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
    }
        //初始化PersistentVolumeController对象
    controller := &PersistentVolumeController{
        volumes:                       newPersistentVolumeOrderedIndex(),
        claims:                        cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
        kubeClient:                    p.KubeClient,
        eventRecorder:                 eventRecorder,
        runningOperations:             goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
        cloud:                         p.Cloud,
        enableDynamicProvisioning:     p.EnableDynamicProvisioning,
        clusterName:                   p.ClusterName,
        createProvisionedPVRetryCount: createProvisionedPVRetryCount,
        createProvisionedPVInterval:   createProvisionedPVInterval,
        claimQueue:                    workqueue.NewNamed("claims"),
        volumeQueue:                   workqueue.NewNamed("volumes"),
        resyncPeriod:                  p.SyncPeriod,
        operationTimestamps:           metrics.NewOperationStartTimeCache(),
    }
    // Prober is nil because PV is not aware of Flexvolume.
    if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {
        return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
    }
    p.VolumeInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
            UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
            DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
        },
    )
    controller.volumeLister = p.VolumeInformer.Lister()
    controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced

    p.ClaimInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
            UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
            DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
        },
    )
    controller.claimLister = p.ClaimInformer.Lister()
    controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
    controller.classLister = p.ClassInformer.Lister()
    controller.classListerSynced = p.ClassInformer.Informer().HasSynced
    controller.podLister = p.PodInformer.Lister()
    controller.podIndexer = p.PodInformer.Informer().GetIndexer()
    controller.podListerSynced = p.PodInformer.Informer().HasSynced
    controller.NodeLister = p.NodeInformer.Lister()
    controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
    // This custom indexer will index pods by its PVC keys. Then we don't need
    // to iterate all pods every time to find pods which reference given PVC.
    if err := common.AddPodPVCIndexerIfNotPresent(controller.podIndexer); err != nil {
        return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
    }
    csiTranslator := csitrans.New()
    controller.translator = csiTranslator
    controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
    controller.filteredDialOptions = p.FilteredDialOptions
    return controller, nil
}

AddPodPVCIndexerIfNotPresent()

增加pvc索引pod

  • claimName=带pvc直接用claimName
  • claimName=Ephemeral用pod.name+volumeName
  • 最后返回key=namespace+claimName
// AddPodPVCIndexerIfNotPresent adds the PodPVCIndexFunc with the current global setting for GenericEphemeralVolume.
func AddPodPVCIndexerIfNotPresent(indexer cache.Indexer) error {
    return AddIndexerIfNotPresent(indexer, PodPVCIndex,
        PodPVCIndexFunc(utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume)))
}

Run()

// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer ctrl.claimQueue.ShutDown()
    defer ctrl.volumeQueue.ShutDown()
    klog.Infof("Starting persistent volume controller")
    defer klog.Infof("Shutting down persistent volume controller")
    if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
        return
    }
    ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)

    go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
//不断的处理从volumeQueue里面获取到的PersistentVolume
    go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
//不断的处理从claimoueue里面获取到的PersistentVolumeClaim
    go wait.Until(ctrl.claimWorker, time.Second, stopCh)
    metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)
    <-stopCh
}

initializeCaches()

初始化缓存

  • pvLister遍历所有数据,copy一份,存储到内存中作为缓存
    • 判断缓存中是否有这个对象,如果没有就直接添加到缓存中
      • 如果存在就比较缓存中的resourceVersion和事件中的resourceVersion的大小
        • 如果比缓存中新,就更新缓存中的对象
  • pvcLister遍历所有数据,copy一份,存储到内存中作为缓存
    • 判断缓存中是否有这个对象,如果没有就直接添加到缓存中
    • 如果存在就比较缓存中的resourceVersion和事件中的resourceVersion的大小
      • 如果比缓存中新,就更新缓存中的对象
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addClaim/addVolume to
// perform initial synchronization of the controller.
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
    volumeList, err := volumeLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
        return
    }
    for _, volume := range volumeList {
        volumeClone := volume.DeepCopy()
        if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {
            klog.Errorf("error updating volume cache: %v", err)
        }
    }
    claimList, err := claimLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
        return
    }
    for _, claim := range claimList {
        if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {
            klog.Errorf("error updating claim cache: %v", err)
        }
    }
    klog.V(4).Infof("controller initialized")
}

resync()

周期性的List pv pvc放到对应的claimoueue&volumeQueue里面重新处理

// resync supplements short resync period of shared informers - we don't want
// all consumers of PV/PVC shared informer to have a short resync period,
// therefore we do our own.
func (ctrl *PersistentVolumeController) resync() {
    klog.V(4).Infof("resyncing PV controller")
    pvcs, err := ctrl.claimLister.List(labels.NewSelector())
    if err != nil {
        klog.Warningf("cannot list claims: %s", err)
        return
    }
    for _, pvc := range pvcs {
        ctrl.enqueueWork(ctrl.claimQueue, pvc)
    }
    pvs, err := ctrl.volumeLister.List(labels.NewSelector())
    if err != nil {
        klog.Warningf("cannot list persistent volumes: %s", err)
        return
    }
    for _, pv := range pvs {
        ctrl.enqueueWork(ctrl.volumeQueue, pv)
    }
}

相关文章:

  • 面向对象编程基础:从方法论到实践的全面解析
  • 【BEPU V1物理】BEPUphysics v1 入门指南 汉化笔记#1
  • JavaScript 基础语法系统学习笔记
  • 使用MPI-IO并行读写HDF5文件
  • 操作系统简要概述
  • 深入解析 Android 图形系统:Canvas、Skia、OpenGL 与 SurfaceFlinger 的协作
  • Vue 3 自定义指令
  • Mac配置开发环境
  • 【Hadoop入门】Hadoop生态之Pig简介
  • 一体化关节模组核心芯片(人形机器人)
  • 双指针、滑动窗口
  • QScrCpy源码解析(4)获取手机端数据知识补充
  • 文章记单词 | 第30篇(六级)
  • 帆软 FCA-FineBI 认证:迈向商业智能专家之路
  • 文章记单词 | 第28篇(六级)
  • ROS第十梯:ROS+VSCode+Python+C++利用launch自启动节点
  • 几何与游标
  • 大模型时代的“数字共生”:AI如何塑造人类社会的新范式
  • Python中try用法、内置异常类型与自定义异常类型拓展
  • # 基于OpenCV与Dlib的人脸融合技术实现
  • 车建兴被留置:跌落的前常州首富和红星系重整迷路
  • 董军同德国国防部长举行会谈
  • 上海锦江乐园摩天轮正在拆除中,预计5月底6月初拆完
  • 来沪一个月几乎未花住宿钱,女子虚构卫生问题屡薅酒店羊毛被刑拘
  • 加拿大新政府宣誓就职
  • 排污染黑海水后用沙土覆盖黑泥?汕尾环保部门:非欲盖弥彰