CSI-PVController-claimWorker
claimWorker()
claim worker中循环执行workFunc()
- claim worker从claimQueue中取数据,也就是取出的都是PVC
- workFunc首先从队列中取出一个obj,然后拿name去informer缓存中尝试获取
- 如果在informer缓存。说明不是删除事件,执行
updateClaim()
函数 - 如果不在informer缓存中。
- 如果pvc控制器的缓存中不存在,说明pvc已经被删除,打印日志。
- 如果pvc控制器的缓存中存在,就调用
deleteClaim()
在缓存中删除这个pvc 并且将对应的pv加入volume队列。
- 如果在informer缓存。说明不是删除事件,执行
// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.claimQueue.Get()
if quit {
return true
}
defer ctrl.claimQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
}
// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the claim from its cache
klog.V(2).Infof("deletion of claim %q was already processed", key)
return false
}
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("expected claim, got %+v", claimObj)
return false
}
ctrl.deleteClaim(claim)
return false
}
for {
if quit := workFunc(); quit {
klog.Infof("claim worker queue shutting down")
return
}
}
}
updateClaim()
-
存储新的PVC版本到缓存:
new, err := ctrl.storeClaimUpdate(claim)
- 这行代码尝试将传入的PVC更新到内部缓存中。如果这是一个新的版本(即之前没有这个PVC的更新或者这是该PVC的一个新版本),
new
将返回true
;否则返回false
。err
用于捕获过程中可能发生的错误。
-
检查是否为新版本:
if !new { return }
- 如果
new
为false
,即这不是一个新的PVC版本,则直接返回,不继续处理。
-
同步PVC:
err = ctrl.syncClaim(claim)
- 尝试同步PVC的状态。这可能涉及与底层的存储系统交互,确保PVC的状态与期望的状态一致。
-
同步错误处理:
-
如果同步过程中出现错误,会进行错误检查和处理。
-
if errors.IsConflict(err) { ... } else { ... }
- 使用
errors.IsConflict(err)
检查错误是否为版本冲突(通常由于并发更新导致)。 - 如果是版本冲突,使用
klog.V(3).Infof
记录一个较低级别的信息日志,因为版本冲突在Kubernetes中是常见的,并且控制器通常可以自动恢复。 - 如果不是版本冲突,则使用
klog.Errorf
记录一个错误日志,因为这可能表示一个更严重的问题。
- 使用
-
总的来说,这段代码的目的是在PersistentVolumeController
中更新一个PVC的状态,并确保其状态与期望的状态同步。它处理了版本控制、错误捕获和日志记录,以确保操作的健壮性和可维护性。
// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeClaimUpdate(claim)//存储该pvc的新版本
if err != nil {
klog.Errorf("%v", err)
}
if !new {
return
}
err = ctrl.syncClaim(claim)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)
} else {
klog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)
}
}
}
syncClaim()
- 更新迁移注解:
- 调用
updateClaimMigrationAnnotations(claim)
方法尝试更新PVC上的迁移注解。这个方法可能会返回一个新的PVC对象(newClaim
),如果注解有更新,并且这个新的PVC对象需要被保存到API服务器中。 - 如果在更新注解时发生错误(
err != nil
),方法将返回这个错误。这意味着当前的PVC状态不会被进一步处理,而是会在下次调用syncClaim
时再次尝试。
- 调用
- 处理PVC的绑定状态:
- 接下来,代码检查PVC上是否存在
pvutil.AnnBindCompleted
注解。这个注解通常用来表示PVC是否已经完成绑定到一个PersistentVolume
(PV)。 - 如果PVC没有完成绑定(即不存在
AnnBindCompleted
注解),则调用syncUnboundClaim(claim)
方法来处理未绑定的PVC。这可能包括寻找合适的PV进行绑定等操作。 - 如果PVC已经完成绑定(即存在
AnnBindCompleted
注解),则调用syncBoundClaim(claim)
方法来处理已绑定的PVC。这可能包括检查PVC和PV的状态是否一致,以及处理任何需要的后续操作。
- 接下来,代码检查PVC上是否存在
总结来说,syncClaim
方法是PersistentVolumeController
用来同步PVC状态的核心方法。它首先尝试更新PVC的迁移注解,然后根据PVC是否已绑定到PV,分别调用不同的方法来处理PVC的状态。这个方法的设计体现了Kubernetes中控制循环(control loop)的思想,即通过不断检查和更新资源状态来确保系统的一致性和正确性。
// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
// Set correct "migrated-to" annotations on PVC and update in API server if
// necessary
newClaim, err := ctrl.updateClaimMigrationAnnotations(claim)
if err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
claim = newClaim
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
return ctrl.syncUnboundClaim(claim)
} else {
return ctrl.syncBoundClaim(claim)
}
}
syncBoundClaim()
- 检查PVC是否已绑定:
if claim.Spec.VolumeName == ""
- 首先检查PVC的
Spec.VolumeName
是否为空。如果为空,表示这个PVC之前绑定过,但现在不再绑定到任何PersistentVolume(PV)。
- 更新PVC状态为Lost:
- 如果PVC不再绑定到任何PV,则调用
updateClaimStatusWithEvent
方法更新PVC的状态为ClaimLost
,并记录一个警告事件,表明绑定的PV已经丢失,数据也可能丢失。
- 如果PVC不再绑定到任何PV,则调用
- 从存储中获取PV:
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
- 使用PVC的
VolumeName
从存储中获取对应的PV对象。如果获取过程中出现错误,返回错误。
- 检查PV是否存在:
- 如果未找到PV(
!found
),则更新PVC状态为ClaimLost
,并记录一个警告事件,表明绑定的PV不存在,数据可能丢失。
- 如果未找到PV(
- 类型断言:
volume, ok := obj.(*v1.PersistentVolume)
- 对获取到的对象进行类型断言,确保其类型为
*v1.PersistentVolume
。如果断言失败,返回错误。
- 记录日志:
- 使用
klog.V(4).Infof
记录日志,表明正在同步已绑定的PVC,并找到了对应的PV。
- 使用
- 检查PV的ClaimRef:
if volume.Spec.ClaimRef == nil
- 如果PV的
Spec.ClaimRef
为空,表示PV当前未绑定到任何PVC,或者控制器尚未收到更新的PV信息。此时,尝试重新绑定PV和PVC。
- 重新绑定PV和PVC:
- 调用
ctrl.bind(volume, claim)
方法尝试重新绑定PV和PVC。如果绑定失败,返回错误。
- 调用
- 检查PV的ClaimRef是否与PVC匹配:
else if volume.Spec.ClaimRef.UID == claim.UID
- 如果PV的
Spec.ClaimRef.UID
与PVC的UID
相匹配,表示PVC已正确绑定到PV。此时,仍然调用ctrl.bind
方法(大多数情况下,这个方法此时不会做任何操作,因为所有状态应该已经设置好了)。
- 处理绑定冲突:
- 如果PV的
Spec.ClaimRef.UID
与PVC的UID
不匹配,表示存在绑定冲突(即两个PVC绑定到了同一个PV)。此时,更新PVC的状态为ClaimLost
,并记录一个警告事件,表明存在绑定错误。
- 如果PV的
总结来说,这段代码的主要作用是确保PVC和PV之间的绑定关系是正确的,并在出现问题时进行相应的处理和状态更新。
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
// HasAnnotation(pvc, pvutil.AnnBindCompleted)
// This PVC has previously been bound
// OBSERVATION: pvc is not "Pending"
// [Unit test set 3]
if claim.Spec.VolumeName == "" {
// Claim was bound before but not any more.
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
}
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
if !found {
// Claim is bound to a non-existing volume.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
} else {
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil {
// Claim is bound but volume has come unbound.
// Or, a claim was bound and the controller has not received updated
// volume yet. We can't distinguish these cases.
// Bind the volume again and set all states to Bound.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
} else if volume.Spec.ClaimRef.UID == claim.UID {
// All is well
// NOTE: syncPV can handle this so it can be left out.
// NOTE: bind() call here will do nothing in most cases as
// everything should be already set.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
} else {
// Claim is bound but volume has a different claimant.
// Set the claim phase to 'Lost', which is a terminal
// phase.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
return err
}
return nil
}
}
}
bind()
- 绑定 PV 到 PVC:
- 调用
ctrl.bindVolumeToClaim(volume, claim)
尝试将 PV 绑定到 PVC。如果操作失败,记录错误日志并返回错误。 - 如果绑定成功,更新
volume
变量为updatedVolume
,以反映绑定后的状态。
- 调用
- 更新 PV 状态:
- 调用
ctrl.updateVolumePhase(volume, v1.VolumeBound, "")
更新 PV 的状态为Bound
,表示它已被绑定到一个 PVC。如果更新失败,记录错误日志并返回错误。 - 更新
volume
变量以反映状态更新后的 PV。
- 调用
- 绑定 PVC 到 PV:
- 调用
ctrl.bindClaimToVolume(claim, volume)
尝试将 PVC 绑定到 PV。如果操作失败,记录错误日志并返回错误。 - 如果绑定成功,更新
claim
变量为updatedClaim
。
- 调用
- 更新 PVC 状态:
- 调用
ctrl.updateClaimStatus(claim, v1.ClaimBound, volume)
更新 PVC 的状态为Bound
,表示它已被绑定到一个 PV。如果更新失败,记录错误日志并返回错误。 - 更新
claim
变量以反映状态更新后的 PVC。
- 调用
// bind saves binding information both to the volume and the claim and marks
// both objects as Bound. Volume is saved first.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {
var err error
// use updateClaim/updatedVolume to keep the original claim/volume for
// logging in error cases.
var updatedClaim *v1.PersistentVolumeClaim
var updatedVolume *v1.PersistentVolume
klog.V(4).Infof("binding volume %q to claim %q", volume.Name, claimToClaimKey(claim))
if updatedVolume, err = ctrl.bindVolumeToClaim(volume, claim); err != nil {
klog.V(3).Infof("error binding volume %q to claim %q: failed saving the volume: %v", volume.Name, claimToClaimKey(claim), err)
return err
}
volume = updatedVolume
if updatedVolume, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
klog.V(3).Infof("error binding volume %q to claim %q: failed saving the volume status: %v", volume.Name, claimToClaimKey(claim), err)
return err
}
volume = updatedVolume
if updatedClaim, err = ctrl.bindClaimToVolume(claim, volume); err != nil {
klog.V(3).Infof("error binding volume %q to claim %q: failed saving the claim: %v", volume.Name, claimToClaimKey(claim), err)
return err
}
claim = updatedClaim
if updatedClaim, err = ctrl.updateClaimStatus(claim, v1.ClaimBound, volume); err != nil {
klog.V(3).Infof("error binding volume %q to claim %q: failed saving the claim status: %v", volume.Name, claimToClaimKey(claim), err)
return err
}
claim = updatedClaim
klog.V(4).Infof("volume %q bound to claim %q", volume.Name, claimToClaimKey(claim))
klog.V(4).Infof("volume %q status after binding: %s", volume.Name, getVolumeStatusForLogging(volume))
klog.V(4).Infof("claim %q status after binding: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
return nil
}
bindVolumeToClaim()
- 检查 PV 是否需要绑定到目标 PVC。
- 如果需要绑定,则生成新的 PV 对象并更新其绑定信息。
- 通过
updateBindVolumeToClaim
完成状态更新和缓存同步。
- 获取绑定信息:
volumeClone, dirty, err := pvutil.GetBindVolumeToClaim(volume, claim)
: 这行代码调用pvutil.GetBindVolumeToClaim
函数,尝试将PVC的信息绑定到PV上。该函数返回三个值:一个PV的克隆(volumeClone
),一个布尔值(dirty
)表示PV是否被修改,以及一个error
对象。如果发生错误,方法将返回nil
和错误对象。
- 更新PV:
if dirty {
: 如果dirty
为true
,表示PV在绑定过程中被修改了。return ctrl.updateBindVolumeToClaim(volumeClone, true)
: 调用ctrl
的updateBindVolumeToClaim
方法,传入修改后的PV克隆和true
(可能表示需要强制更新或确认更新)。该方法负责更新PV的状态,并返回更新后的PV和可能的错误。
- 未修改时的处理:
- 如果
dirty
为false
,表示PV在尝试绑定过程中没有被修改。 klog.V(4).Infof("updating PersistentVolume[%s]: already bound to %q", volume.Name, claimToClaimKey(claim))
: 记录一个信息级别的日志,表明PV已经绑定到了指定的PVC,没有进一步的更新操作需要执行。-return volume, nil
: 返回原始的PV和nil
错误,表示操作成功完成,没有错误。
- 如果
总结来说,这段代码的主要功能是尝试将一个PV绑定到一个PVC。如果绑定过程中PV被修改,则更新PV的状态;如果PV未被修改(可能已经绑定到了该PVC),则记录日志并返回原始的PV。这个过程涉及到日志记录、数据绑定和条件更新操作。
func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim))
volumeClone, dirty, err := pvutil.GetBindVolumeToClaim(volume, claim)
if err != nil {
return nil, err
}
// Save the volume only if something was changed
if dirty {
return ctrl.updateBindVolumeToClaim(volumeClone, true)
}
klog.V(4).Infof("updating PersistentVolume[%s]: already bound to %q", volume.Name, claimToClaimKey(claim))
return volume, nil
}
GetBindVolumeToClaim()
- 初始化
dirty
标志:dirty
初始化为false
,用于标记PV是否被修改。
- 检查PV是否已绑定到PVC:
- 使用
IsVolumeBoundToClaim(volume, claim)
函数检查PV是否已经绑定到PVC。 - 如果未绑定,设置
shouldSetBoundByController
为true
,表示需要通过控制器来绑定。
- 使用
- 创建PV的深度复制:
- 使用
volume.DeepCopy()
创建PV的一个深度复制,以避免直接修改原始对象。
- 使用
- 绑定PV到PVC:
- 检查PV的
Spec.ClaimRef
是否为空,或者是否与给定的PVC不匹配(通过名称、命名空间和UID判断)。 - 如果不匹配,使用
reference.GetReference(scheme.Scheme, claim)
获取PVC的引用,并将其设置为PV复制的Spec.ClaimRef
。 - 设置
dirty
为true
,表示PV已被修改。
- 检查PV的
- 设置
AnnBoundByController
注解:- 如果
shouldSetBoundByController
为true
且PV复制对象上没有AnnBoundByController
注解,则添加该注解并设置其值为 “yes”。 - 同样,设置
dirty
为true
。
- 如果
- 返回结果:
- 返回PV的深度复制对象、
dirty
标志和nil
错误(如果没有发生错误)。
- 返回PV的深度复制对象、
总结
这个函数的核心功能是将一个PV绑定到一个PVC,并在必要时更新PV的状态(如设置绑定注解)。通过返回一个深度复制的PV对象和一个表示是否进行了修改的 dirty
标志,调用者可以决定是否需要进一步处理或持久化这个修改过的PV对象。
// GetBindVolumeToClaim returns a new volume which is bound to given claim. In
// addition, it returns a bool which indicates whether we made modification on
// original volume.
func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !IsVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set AnnBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, AnnBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, AnnBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
updateVolumePhase()
- 检查当前 PV 状态是否已与目标状态一致,若一致则直接返回。
- 深拷贝 PV 对象,避免修改原对象。
- 更新状态和消息。
- 调用 Kubernetes API 更新 PV 状态。
- 同步更新本地缓存。
- 记录日志。
// updateVolumePhase saves new volume phase to API server.
func (ctrl *PersistentVolumeController) updateVolumePhase(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, message string) (*v1.PersistentVolume, error) {
klog.V(4).Infof("updating PersistentVolume[%s]: set phase %s", volume.Name, phase)
if volume.Status.Phase == phase {
// Nothing to do.
klog.V(4).Infof("updating PersistentVolume[%s]: phase %s already set", volume.Name, phase)
return volume, nil
}
volumeClone := volume.DeepCopy()
volumeClone.Status.Phase = phase
volumeClone.Status.Message = message
newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().UpdateStatus(context.TODO(), volumeClone, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)
return newVol, err
}
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
klog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return newVol, err
}
klog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)
return newVol, err
}
bindClaimToVolume()
- 检查 PVC 是否需要绑定到目标 PV。
- 更新 PVC 的 Spec.VolumeName 和相关注解(如绑定状态和控制器标识)。
- 通过 Kubernetes API 更新 PVC 并同步本地缓存。
-
检查PVC是否已绑定:
- 通过比较
volume.Name
和claim.Spec.VolumeName
来判断PVC是否已经绑定到某个PV。如果它们不相等,说明需要绑定(shouldBind
设置为true
)。
- 通过比较
-
创建PVC的深拷贝:
- 由于传入的PVC可能指向watcher的缓存,直接修改它可能会导致问题。因此,这里使用
DeepCopy
方法创建了一个PVC的副本(claimClone
),后续的操作都将在这个副本上进行。
- 由于传入的PVC可能指向watcher的缓存,直接修改它可能会导致问题。因此,这里使用
-
执行绑定操作:
- 如果
shouldBind
为true
,则进行绑定操作:- 设置
claimClone.Spec.VolumeName
为volume.Name
,将PVC绑定到PV。 - 如果PVC的元数据中没有
AnnBoundByController
注解,则添加这个注解,值为"yes",表示这个绑定操作是由控制器完成的。
- 设置
- 如果
-
设置
AnnBindCompleted
注解:- 无论是否进行了绑定操作,如果PVC的元数据中没有
AnnBindCompleted
注解,则添加这个注解,值为"yes",表示绑定操作已经完成。
- 无论是否进行了绑定操作,如果PVC的元数据中没有
-
更新PVC:
- 如果进行了任何修改(即
dirty
为true
),则需要更新PVC:- 使用
ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update
方法更新Kubernetes中的PVC对象。 - 更新内部缓存,调用
ctrl.storeClaimUpdate(newClaim)
。 - 记录日志,表示PVC已成功绑定到PV。
- 使用
- 如果进行了任何修改(即
总结来说,这段代码的作用是检查一个PVC是否已经绑定到一个PV,如果没有绑定,则进行绑定操作,并更新PVC对象及其元数据注解,最后更新Kubernetes中的PVC对象和内部缓存。
func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q", claimToClaimKey(claim), volume.Name)
dirty := false
// Check if the claim was already bound (either by controller or by user)
shouldBind := false
if volume.Name != claim.Spec.VolumeName {
shouldBind = true
}
// The claim from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
if shouldBind {
dirty = true
// Bind the claim to the volume
claimClone.Spec.VolumeName = volume.Name
// Set AnnBoundByController if it is not set yet
if !metav1.HasAnnotation(claimClone.ObjectMeta, pvutil.AnnBoundByController) {
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBoundByController, "yes")
}
}
// Set AnnBindCompleted if it is not set yet
if !metav1.HasAnnotation(claimClone.ObjectMeta, pvutil.AnnBindCompleted) {
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBindCompleted, "yes")
dirty = true
}
if dirty {
klog.V(2).Infof("volume %q bound to claim %q", volume.Name, claimToClaimKey(claim))
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err)
return newClaim, err
}
_, err = ctrl.storeClaimUpdate(newClaim)
if err != nil {
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return newClaim, err
}
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: bound to %q", claimToClaimKey(claim), volume.Name)
return newClaim, nil
}
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: already bound to %q", claimToClaimKey(claim), volume.Name)
return claim, nil
}
updateClaimStatus()
- 根据目标 phase 更新 PVC 的状态。
- 根据是否关联 PV,更新或重置访问模式和容量。
- 通过 Kubernetes API 更新 PVC 状态,并同步本地缓存。
- 深拷贝:为了避免直接修改原始PVC对象,函数创建了
claim
的一个深拷贝claimClone
。 - 状态更新:
- 如果PVC的当前状态与要设置的状态不同,则更新
claimClone
的状态,并将dirty
标志设置为true
,表示有更改。 - 如果
volume
为nil
,表示PVC不与任何PV关联,因此需要清除AccessModes
和Capacity
。 - 如果
volume
不为nil
,则需要根据volume
的信息更新claimClone
的AccessModes
和Capacity
。特别注意,只有在PVC状态发生变化时(即之前不是这个状态),才更新Capacity
。
- 如果PVC的当前状态与要设置的状态不同,则更新
- 无更改处理:如果没有任何更改(
dirty
为false
),则记录一条日志并返回原始的PVC对象和nil
错误。 - 更新状态:如果有更改,使用Kubernetes客户端(
ctrl.kubeClient
)调用UpdateStatus
方法更新PVC的状态。这里使用了context.TODO()
作为上下文,通常在实际调用中需要提供一个具体的上下文。 - 内部缓存更新:更新内部缓存中的PVC信息。这是通过调用
ctrl.storeClaimUpdate
方法实现的。 - 日志记录和返回:最后,记录一条日志表明PVC已经进入了新的状态,并返回更新后的PVC对象和可能的错误。
关键点
- 深拷贝:为了避免对原始对象的意外修改,使用了深拷贝。
- 条件更新:根据
volume
是否为nil
以及PVC的当前状态,有条件地更新AccessModes
和Capacity
。 - 错误处理:在关键操作(如更新状态和内部缓存)中进行了错误检查和处理。
这段代码是Kubernetes中处理PVC状态更新的一个典型例子,展示了如何在复杂的系统中管理资源状态,并确保状态的一致性和正确性。
func (ctrl *PersistentVolumeController) updateClaimStatus(claim *v1.PersistentVolumeClaim, phase v1.PersistentVolumeClaimPhase, volume *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: set phase %s", claimToClaimKey(claim), phase)
dirty := false
claimClone := claim.DeepCopy()
if claim.Status.Phase != phase {
claimClone.Status.Phase = phase
dirty = true
}
if volume == nil {
// Need to reset AccessModes and Capacity
if claim.Status.AccessModes != nil {
claimClone.Status.AccessModes = nil
dirty = true
}
if claim.Status.Capacity != nil {
claimClone.Status.Capacity = nil
dirty = true
}
} else {
// Need to update AccessModes and Capacity
if !reflect.DeepEqual(claim.Status.AccessModes, volume.Spec.AccessModes) {
claimClone.Status.AccessModes = volume.Spec.AccessModes
dirty = true
}
// Update Capacity if the claim is becoming Bound, not if it was already.
// A discrepancy can be intentional to mean that the PVC filesystem size
// doesn't match the PV block device size, so don't clobber it
if claim.Status.Phase != phase {
volumeCap, ok := volume.Spec.Capacity[v1.ResourceStorage]
if !ok {
return nil, fmt.Errorf("PersistentVolume %q is without a storage capacity", volume.Name)
}
claimCap, ok := claim.Status.Capacity[v1.ResourceStorage]
if !ok || volumeCap.Cmp(claimCap) != 0 {
claimClone.Status.Capacity = volume.Spec.Capacity
dirty = true
}
}
}
if !dirty {
// Nothing to do.
klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: phase %s already set", claimToClaimKey(claim), phase)
return claim, nil
}
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(context.TODO(), claimClone, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: set phase %s failed: %v", claimToClaimKey(claim), phase, err)
return newClaim, err
}
_, err = ctrl.storeClaimUpdate(newClaim)
if err != nil {
klog.V(4).Infof("updating PersistentVolumeClaim[%s] status: cannot update internal cache: %v", claimToClaimKey(claim), err)
return newClaim, err
}
klog.V(2).Infof("claim %q entered phase %q", claimToClaimKey(claim), phase)
return newClaim, nil
}
syncUnboundClaim()
- 检查PVC是否指定了VolumeName:
- 如果
claim.Spec.VolumeName
为空,表示用户没有指定特定的PV。检查是否为延迟绑定模式。- 寻找最佳匹配的PV:调用
ctrl.volumes.findBestMatchForClaim
尝试找到一个与PVC最匹配的PV。- 如果找到匹配的PV:将PVC与PV绑定。
- 记录绑定操作的指标。
- 如果未找到匹配的PV:
- 根据是否为延迟绑定模式和是否设置了存储类,决定是否进行动态供应或记录事件。,以便后续再次尝试匹配。
- 如果找到匹配的PV:将PVC与PV绑定。
- 寻找最佳匹配的PV:调用
- 如果
claim.Spec.VolumeName
不为空,表示用户指定了特定的PV。- 查找指定的PV:通过
ctrl.volumes.store.GetByKey
尝试获取用户指定的PV。- 如果PV不存在:,以便后续再次尝试。
- 如果PV存在但不是
PersistentVolume
类型:- 返回错误。
- 如果PV存在且未被绑定:进行绑定。
- 如果PV已被绑定:
- 检查PV是否被当前PVC绑定,如果是,则完成绑定操作。
。
注解,这是一个异常情况,返回错误。
- 检查PV是否被当前PVC绑定,如果是,则完成绑定操作。
- 查找指定的PV:通过
- 如果
关键函数和概念
pvutil.IsDelayBindingMode
:检查PVC是否为延迟绑定模式。ctrl.volumes.findBestMatchForClaim
:寻找与PVC最匹配的PV。ctrl.bind
:将PVC与PV绑定。ctrl.updateClaimStatus
:更新PVC的状态。ctrl.eventRecorder.Event
:记录事件,如绑定失败或成功。checkVolumeSatisfyClaim
:检查PV是否满足PVC的要求。
总结
`方法是Kubernetes中PersistentVolumeController的核心方法之一,负责处理未绑定的PVC,尝试将其与合适的PV绑定,或者处理无法绑定的情况。通过详细的逻辑分支和错误处理,确保了PVC和PV的正确匹配和绑定。
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
// This is a new PVC that has not completed binding
// OBSERVATION: pvc is "Pending"
if claim.Spec.VolumeName == "" {
// User did not care which PV they get.
delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
if err != nil {
return err
}
// [Unit test set 1]
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)
}
if volume == nil {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
// No PV could be found
// OBSERVATION: pvc is "Pending", will retry
switch {
case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {
return err
}
case v1helper.GetPersistentVolumeClaimClass(claim) != "":
if err = ctrl.provisionClaim(claim); err != nil {
return err
}
return nil
default:
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
}
// Mark the claim as Pending and try to find a match in the next
// periodic syncClaim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else /* pv != nil */ {
// Found a PV for this claim
// OBSERVATION: pvc is "Pending", pv is "Available"
claimKey := claimToClaimKey(claim)
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
// record count error for provision if exists
// timestamp entry will remain in cache until a success binding has happened
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
return err
}
// OBSERVATION: claim is "Bound", pv is "Bound"
// if exists a timestamp entry in cache, record end to end provision latency and clean up cache
// End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric"
// [Unit test 12-1, 12-2, 12-4]
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
return nil
}
} else /* pvc.Spec.VolumeName != nil */ {
// [Unit test set 2]
// User asked for a specific PV.
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
if !found {
// User asked for a PV that does not exist.
// OBSERVATION: pvc is "Pending"
// Retry later.
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
}
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil {
// User asked for a PV that is not claimed
// OBSERVATION: pvc is "Pending", pv is "Available"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
// send an event
msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
// volume does not satisfy the requirements of the claim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
} else if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else if pvutil.IsVolumeBoundToClaim(volume, claim) {
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
// Finish the volume binding by adding claim UID.
if err = ctrl.bind(volume, claim); err != nil {
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else {
// User asked for a PV that is claimed by someone else
// OBSERVATION: pvc is "Pending", pv is "Bound"
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
// User asked for a specific PV, retry later
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
// This should never happen because someone had to remove
// AnnBindCompleted annotation on the claim.
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
}
}
}
}
}