Koordinator-nodeMetricController
Reconcile()
- clientgo获取node名称对应node和nodeMetric,记录在nodeExist/nodeMetricExist
- 若!nodeExist,删除对应nodeMetric
- 若!nodeMetricExist,初始化nodeMetric,然后创建nodeMetric
- node和nodeMetric都存在时
- 获取nodeMetricSpec,若spec有变化,则更新nodeMetric。因为
ColocationStrategy
里的策略变化,要把nodeMetric里的相关属性进行变化。
- 获取nodeMetricSpec,若spec有变化,则更新nodeMetric。因为
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NodeMetricReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx, "node-metric-reconciler", req.NamespacedName)
// if cache unavailable, requeue the req
if !r.cfgCache.IsCfgAvailable() {
// all nodes would be enqueued once the config is available, so here we just drop the req
klog.Warningf("colocation config is not available, drop the req %v until a valid config is set",
req.NamespacedName)
return ctrl.Result{}, nil
}
node, nodeMetric := &corev1.Node{}, &slov1alpha1.NodeMetric{}
nodeExist, nodeMetricExist := true, true
nodeName, nodeMetricName := req.Name, req.Name
if err := r.Client.Get(context.TODO(), req.NamespacedName, node); err != nil {
if !errors.IsNotFound(err) {
metrics.RecordNodeMetricReconcileCount(false, "reconcileNodeGetError")
klog.Errorf("failed to find node %v, error: %v", nodeName, err)
return ctrl.Result{Requeue: true}, err
}
nodeExist = false
}
if err := r.Client.Get(context.TODO(), req.NamespacedName, nodeMetric); err != nil {
if !errors.IsNotFound(err) {
metrics.RecordNodeMetricReconcileCount(false, "reconcileNodeMetricGetError")
klog.Errorf("failed to find nodeMetric %v, error: %v", nodeMetricName, err)
return ctrl.Result{Requeue: true}, err
}
nodeMetricExist = false
}
if !nodeExist && !nodeMetricExist {
// return if neither Node nor NodeMetric exists.
return ctrl.Result{}, nil
} else if !nodeExist {
// if !nodeExist && nodeMetricExist, delete NodeMetric.
if err := r.Client.Delete(context.TODO(), nodeMetric); err != nil {
klog.Errorf("failed to delete nodeMetric %v, error: %v", nodeMetricName, err)
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
metrics.RecordNodeMetricReconcileCount(false, "deleteNodeMetric")
return ctrl.Result{Requeue: true}, err
}
metrics.RecordNodeMetricReconcileCount(true, "deleteNodeMetric")
return ctrl.Result{}, nil
} else if !nodeMetricExist {
// if nodeExist && !nodeMetricExist, create an empty NodeMetric CR.
if err := r.initNodeMetric(node, nodeMetric); err != nil {
klog.Errorf("failed to init nodeMetric %v, error: %v", nodeMetricName, err)
return ctrl.Result{Requeue: true}, err
}
if err := r.Client.Create(context.TODO(), nodeMetric); err != nil {
metrics.RecordNodeMetricReconcileCount(false, "createNodeMetric")
klog.Errorf("failed to create nodeMetric %v, error: %v", nodeMetricName, err)
return ctrl.Result{Requeue: true}, err
}
metrics.RecordNodeMetricReconcileCount(true, "createNodeMetric")
} else {
// update nodeMetric spec if both exists
nodeMetricSpec, err := r.getNodeMetricSpec(node, &nodeMetric.Spec)
if err != nil {
klog.Errorf("failed to get nodeMetric spec of %v: %v", nodeMetricName, err)
return ctrl.Result{Requeue: true}, err
}
if !reflect.DeepEqual(nodeMetricSpec, &nodeMetric.Spec) {
nodeMetric.Spec = *nodeMetricSpec
err = r.Client.Update(context.TODO(), nodeMetric)
if err != nil {
metrics.RecordNodeMetricReconcileCount(false, "updateNodeMetric")
klog.Errorf("failed to update nodeMetric %v, error: %v", nodeMetricName, err)
return ctrl.Result{Requeue: true}, err
}
metrics.RecordNodeMetricReconcileCount(true, "updateNodeMetric")
}
}
klog.V(6).Infof("nodemetric-controller succeeded to update nodeMetric %v", nodeMetricName)
return ctrl.Result{}, nil
}
getNodeMetricSpec()
获取NodeColocationStrategy,再从上面获取NodeMetricCollectPolicy
其实就3个aggregateDurationSeconds
、reportIntervalSeconds
、nodeAggregatePolicy
func (r *NodeMetricReconciler) getNodeMetricSpec(node *corev1.Node, oldSpec *slov1alpha1.NodeMetricSpec) (*slov1alpha1.NodeMetricSpec, error) {
if node == nil {
klog.Errorf("getNodeMetricSpec failed to get spec for nil node")
return nil, fmt.Errorf("invalid node input")
}
// if cfg cache error status(like unmarshall error),then use oldSpec
if oldSpec != nil && r.cfgCache.IsErrorStatus() {
return oldSpec, nil
}
nodeMetricSpec := getDefaultSpec()
cfg := r.cfgCache.GetCfgCopy()
mergedStrategy := sloconfig.GetNodeColocationStrategy(cfg, node)
nodeMetricCollectPolicy, err := getNodeMetricCollectPolicy(mergedStrategy)
if err != nil {
metrics.RecordNodeMetricSpecParseCount(false, "getNodeMetricCollectPolicy")
klog.Warningf("getNodeMetricSpec(): failed to get nodeMetricCollectPolicy for node %s, set the default error: %v", node.Name, err)
} else {
metrics.RecordNodeMetricSpecParseCount(true, "getNodeMetricCollectPolicy")
nodeMetricSpec.CollectPolicy = nodeMetricCollectPolicy
}
return nodeMetricSpec, nil
}