当前位置: 首页 > news >正文

决策树学习

决策树

决策树基本算法

在这里插入图片描述

划分选择

‌‌‌‌ 
‌‌‌‌  决策树最关键的就是如何选择划分属性。一般而言,随着划分过程不断进行,我们希望决策树的分支节点所包含的样本尽可能属于同一类别,即结点的“纯度”(purity)越来越高

信息增益

‌‌‌‌  “信息熵”(information entropy)是度量样本集合纯度最常用的一种指标。假定当前样本集合DDD中第kkk类样本所占比例为pk(k=1,2,⋯ ,∣Y∣)p_k\left(k=1,2,\cdots, \left|\mathcal{Y}\right|\right)pk(k=1,2,,Y), 则DDD的信息熵定义为
Ent(D)=−∑k=1∣Y∣pklog⁡2pk \mathrm{Ent}\left( D \right) = -\sum_{k=1}^{\left| \mathcal{Y} \right| }p_{k} \log_{2} p_{k} Ent(D)=k=1Ypklog2pk
Ent(D)\mathrm{Ent}\left( D \right)Ent(D)的值越小,则DDD的纯度越高 (可以证明均匀分布的时候最大,其中一个为1的时候最小)

def calc_entropy(df: pd.DataFrame, label_col: str) -> float:if df.empty:return 0.0counts = df[label_col].value_counts(normalize=True)# -(|C_k| / |D|) * log2(|C_k| / |D|)return -np.sum(counts * np.log2(counts))

‌‌‌‌  假定离散属性aaaVVV个可能的取值{a1,a2,⋯ ,aV}\left\{ a^1, a^2,\cdots,a^V \right\}{a1,a2,,aV}, 若使用aaa来对样本集DDD进行划分,则会产生VVV个分支结点,其中第vvv个分支节点包含了DDD中所有在属性aaa上取值为ava^vav的样本,记为DvD^vDv.我们可以根据信息熵公式计算出DvD^vDv的信息熵,在考虑到不同分支节点所包含的样本数不同,给分支节点赋予权重∣Dv∣∣D∣\frac{\left| D^v \right|}{\left| D \right|}DDv,即样本数越多的分支节点的影响越大,于是可以算出用属性aaa对样本集DDD进行划分所获得的“信息增益”(information gain)
Gain(D,a)=Ent(D)−∑v=1V∣Dv∣∣D∣Ent(Dv) \mathrm{Gain}\left( D, a \right) =\mathrm{Ent}\left( D \right) - \sum_{v=1}^{V} \frac{\left| D^v \right| }{\left| D \right| } \mathrm{Ent}\left( D^v \right) Gain(D,a)=Ent(D)v=1VDDvEnt(Dv)

def calc_feature_entropy(df: pd.DataFrame, feature_name: str, label_col: str) -> float:if df.empty:return 0.0counts = df[feature_name].value_counts(normalize=True)# (|D_i| / |D|) * entropy(D_i)return sum(p * calc_entropy(df[df[feature_name] == feature], label_col)for feature, p in counts.items())

‌‌‌‌  一般而言,信息增益越大,则意味着使用属性aaa进行划分所获得的“纯度提升”越大。因此,我们可以用信息增益来进行决策树的划分属性选择,即a=arg⁡max⁡a∈AGain(D,a)a = \arg \max_{a\in A} \mathrm{Gain}\left( D, a \right)a=argmaxaAGain(D,a).著名的ID3决策树学习算法就是以信息增益为准则来选择划分属性。

增益率

实际上,信息增益准则对可取值数目较多的属性有所偏好,为减少这种偏好可能带来的不利影响,著名的C4.5决策树算法不直接使用信息增益,而是使用“增益率”(gain ratio)来选择最优化分属性。增益率定义为:
Gain_ratio(D,a)=Gain(D,a)IV(a) \mathrm{Gain\_ratio} \left( D, a \right) = \frac{\mathrm{Gain}\left( D, a \right) }{\mathrm{IV}\left( a \right) } Gain_ratio(D,a)=IV(a)Gain(D,a)
其中
IV(a)=−∑v=1V∣Dv∣∣D∣log⁡2∣Dv∣∣D∣ \mathrm{IV}\left( a \right) =- \sum_{v=1}^{V}\frac{\left| D^{v} \right| }{\left| D \right| } \log_{2}\frac{\left| D^{v} \right| }{\left| D \right| } IV(a)=v=1VDDvlog2DDv
(相当于对特征那一列算信息熵)
称为属性aaa的“固有值”(intrinsic value)。属性aaa的的可能取值数目越多(即VVV越大),则IV(a)\mathrm{IV}\left( a \right)IV(a)的值通常会越大。

‌‌‌‌  需要注意的是,增益率准则对可取值数目较少的属性有所偏好,因此C4.5蒜贩并不是直接选择增益率最大的候选划分属性,而是使用了一个启发式:先从候选划分属性中找出信息增益高于平均水平的属性,再从中选择增益率最高的。

基尼指数

‌‌‌‌  CART决策树使用“基尼指数”(Gini index)来选择划分属性。数据集DDD的纯度可用基尼值来度量:
Gini(D)=∑k=1∣Y∣∑k′≠kpkpk′=1−∑k=1∣Y∣pk2 \begin{aligned} \mathrm{Gini}\left( D \right) &= \sum_{k=1}^{\left| \mathcal{Y} \right| }\sum_{k^{\prime}\neq k} p_{k} p_{k^{\prime}}\\ &= 1 - \sum_{k=1}^{\left| \mathcal{Y} \right| }p_{k}^{2} \end{aligned} Gini(D)=k=1Yk=kpkpk=1k=1Ypk2
‌‌‌‌  直观来说,Gini(D)\mathrm{Gini}\left( D \right)Gini(D)反映了从数据集DDD中随机抽取两个样本,其类别标记不一致的概率。因此,Gini(D)\mathrm{Gini}\left( D \right)Gini(D)越小,则数据集DDD的纯度越高。
‌‌‌‌  属性aaa的基尼指数定义为
Gini_index(D,a)=∑v=1V∣Dv∣∣D∣Gini(Dv) \mathrm{Gini\_index}\left( D, a \right) = \sum_{v=1}^{V} \frac{\left| D^v \right| }{\left| D \right| }\mathrm{Gini}\left( D^v \right) Gini_index(D,a)=v=1VDDvGini(Dv)
于是,我们在候选属性集合中,选择那个使得划分后基尼指数最小的属性作为最优划分属性,即a∗=arg⁡min⁡a∈AGini_index(D,a)a_{*} = \arg\min_{a\in A} \mathrm{Gini\_index}\left( D,a \right)a=argminaAGini_index(D,a)

连续值

‌‌‌‌  由于连续属性的可取值数目不在有限,因此,不能直接根据连续属性的可取值来对结点进行划分。此时连续属性离散化技术可派上用场。最简单的策略是采用二分法对连续属性进行处理,这正是C4.5决策树算法中采用的机制
‌‌‌‌  给定样本集DDD和连续属性aaa,假定aaaDDD熵出现的nnn个不同取值,将这些值从小到大进行排序,记为{a1,a2,⋯ ,an}\left\{ a^1, a^2,\cdots, a^n \right\}{a1,a2,,an}。基于划分点ttt可将DDD分为子集Dt−D_{t}^{-}DtDt+D_{t}^{+}Dt+,其中Dt−D_{t}^{-}Dt包含那些属性aaa上取值不大于ttt的样本,而Dt+D_{t}^{+}Dt+则包含那些在属性aaa上取值大于ttt的样本。显然,对相邻的属性取值aia^iaiai+1a^{i+1}ai+1来说,ttt在区间[ai,ai+1)\left[a^{i}, a^{i+1} \right)[ai,ai+1)中取任意值所产生的划分结果相同。因此,对连续属性aaa,我们可考察包含n−1n-1n1个元素的候选划分点集合
Ta={ai+ai+12:1≤i≤n−1} T_{a} = \left\{ \frac{a^{i} + a^{i+1}}{2}:1 \le i \le n -1 \right\} Ta={2ai+ai+1:1in1}
因此
Gain(D,a)=max⁡t∈TaGain(D,a,t)=max⁡t∈TaEnt(D)−∑λ∈{−,+}∣Dtλ∣∣D∣Ent(Dtλ) \begin{aligned} \mathrm{Gain}\left( D, a \right) &= \max\limits_{t \in T_{a}} \mathrm{Gain}\left( D, a, t \right) \\ &= \max\limits_{t \in T_{a}} \mathrm{Ent}\left( D \right) - \sum_{\lambda \in \left\{ -, + \right\} }\frac{\left| D_{t}^{\lambda} \right| }{\left| D \right| } \mathrm{Ent} \left( D_{t}^{\lambda} \right) \end{aligned} Gain(D,a)=tTamaxGain(D,a,t)=tTamaxEnt(D)λ{,+}DDtλEnt(Dtλ)
固有值
IV(a,t)=−∑λ∈{−,+}∣Dtλ∣∣D∣log⁡2∣Dtλ∣∣D∣ \mathrm{IV}\left( a,t \right) = - \sum_{\lambda \in \left\{ -, + \right\} }\frac{\left| D_{t}^{\lambda} \right| }{\left| D \right| } \log_{2}\frac{\left| D_{t}^{\lambda} \right| }{\left| D \right| } IV(a,t)=λ{,+}DDtλlog2DDtλ
信息增益率
Gain_ratio(D,a)=max⁡t∈TaGain_ratio(D,a,t)=max⁡t∈TaGain(D,a,t)IV(a,t) \begin{aligned} \mathrm{Gain\_ratio} \left( D, a \right) &= \max\limits_{t \in T_{a}} \mathrm{Gain\_ratio}\left( D, a, t \right) \\ &= \max\limits_{t \in T_{a}} \frac{\mathrm{Gain}\left( D, a, t \right)}{\mathrm{IV}\left( a, t \right) } \end{aligned} Gain_ratio(D,a)=tTamaxGain_ratio(D,a,t)=tTamaxIV(a,t)Gain(D,a,t)

基尼指数
Gini_index(D,a)=min⁡t∈Ta∑λ∈{−,+}∣Dtλ∣∣D∣Gini(Dtλ) \begin{aligned} \mathrm{Gini\_index}\left( D,a \right) = \min_{t \in T_{a}} \sum_{\lambda \in \left\{ -, + \right\} }\frac{\left| D_{t}^{\lambda} \right| }{\left| D \right| } \mathrm{Gini}\left( D_{t}^{\lambda} \right) \end{aligned} Gini_index(D,a)=tTaminλ{,+}DDtλGini(Dtλ)

缺失值

以下只写离散的,连续的可以当作有阈值后分成两个类,然后VVV只有2个取值

‌‌‌‌  我们需要解决两个问题:(1)如何在属性值缺失的情况下进行划分属性选择?(2)给定划分属性,若样本在该属性上的值缺失,如何对样本进行划分?

给定训练集DDD和属性aaa,令D~\tilde{D}D~表示DDD中在属性aaa上没有缺失值的样本子集。对问题(1),显然我们仅可根据D~\tilde{D}D~来判断属性aaa的优劣。假定属性aaaVVV个可取值{a1,a2,⋯ ,aV}\left\{ a^1, a^2, \cdots, a^V \right\}{a1,a2,,aV},令D~v\tilde{D}^{v}D~v表示D~\tilde{D}D~中在属性aaa上取值为ava^vav的样本子集,D~k\tilde{D}_{k}D~k表示D~\tilde{D}D~中属于第kkk类(k=1,2,⋯ ,∣Y∣k=1,2,\cdots, \left| \mathcal{Y} \right|k=1,2,,Y)的样本子集,显然D~=∪k=1∣Y∣D~k=∪v=1VD~v\tilde{D} = \cup_{k=1}^{\left| \mathcal{Y} \right|} \tilde{D}_{k}= \cup_{v=1}^{V}\tilde{D}^vD~=k=1YD~k=v=1VD~v。假定我们为每个样本x\mathbf{x}x赋予一个权重wxw_{\mathbf{x}}wx(可以考虑全1),并定义
ρ=∑x∈D~wx∑x∈Dwx \rho = \frac{\sum_{\mathbf{x} \in \tilde{D}} w_{\mathbf{x}}}{\sum_{x\in D} w_{\mathbf{x}}} ρ=xDwxxD~wx
p~k=∑x∈D~kwx∑x∈D~wx(1≤k≤∣Y∣) \tilde{p}_{k} = \frac{\sum_{x \in \tilde{D}_{k}} w_{\mathbf{x}}}{\sum_{\mathbf{x} \in \tilde{D}}w_{\mathbf{x}}} \left( 1 \le k \le \left| \mathcal{Y} \right| \right) p~k=xD~wxxD~kwx(1kY)
r~v=∑x∈D~vwx∑x∈D~wx(1≤v≤V) \tilde{r}_{v} = \frac{\sum_{\mathbf{x} \in \tilde{D}^{v}} w_{\mathbf{x}}}{\sum_{\mathbf{x} \in \tilde{D}}w_{\mathbf{x}}} \left( 1 \le v \le V \right) r~v=xD~wxxD~vwx(1vV)
p~vk=∑x∈D~kvwx∑x∈D~vwx(1≤v≤V) \tilde{p}_{v}^{k} = \frac{\sum_{\mathbf{x} \in \tilde{D}_{k}^{v}} w_{\mathbf{x}}}{\sum_{\mathbf{x} \in \tilde{D}^{v}}w_{\mathbf{x}}} \left( 1 \le v \le V \right) p~vk=xD~vwxxD~kvwx(1vV)
直观理解,ρ\rhoρ是无缺失值样本所占比例
p~k\tilde{p}_{k}p~k表示无缺失值样本中第kkk类所占的比例
r~v\tilde{r}_{v}r~v表示无缺失值样本中在属性aaa上取值ava^vav的样本所占比例
p~vk\tilde{p}_{v}^{k}p~vk表示属性aaa上取值ava^vav的样本中第kkk类所占的比例

Ent(D~)=−∑k=1∣Y∣p~klog⁡2p~k \mathrm{Ent}\left( \tilde{D} \right)= -\sum_{k=1}^{\left| \mathcal{Y} \right| }\tilde{p}_{k}\log_{2}\tilde{p}_{k} Ent(D~)=k=1Yp~klog2p~k
Ent(D~v)=−∑k=1∣Y∣p~vklog⁡2p~vk \mathrm{Ent}\left( \tilde{D}^{v} \right)= -\sum_{k=1}^{\left| \mathcal{Y} \right| }\tilde{p}_{v}^{k}\log_{2}\tilde{p}_{v}^{k} Ent(D~v)=k=1Yp~vklog2p~vk

def entropy(categories: pd.Series, weight: np.ndarray = None) -> float:if categories.empty:return 0.0if weight is None:counts = categories.value_counts(normalize=True)return -np.sum(counts * np.log2(counts))# 将类别转换为整数标签并计算加权频数labels, _ = pd.factorize(categories, sort=False)counts = np.bincount(labels, weights=weight, minlength=0)# 计算总权重和概率total_weight = counts.sum()if total_weight <= 0:return 0.0probabilities = counts / total_weight# 仅处理非零概率避免log(0)警告non_zero_probs = probabilities[probabilities > 0]return -np.sum(non_zero_probs * np.log2(non_zero_probs))

Gain(D,a)=ρ×Gain(D~,a)=ρ(Ent(D~)−∑v=1Vr~kEnt(D~v)) \begin{aligned} \mathrm{Gain}\left( D, a \right) &= \rho \times \mathrm{Gain}\left( \tilde{D}, a \right) \\ &= \rho \left( \mathrm{Ent}\left( \tilde{D} \right) - \sum_{v=1}^{V} \tilde{r}_{k} \mathrm{Ent}\left( \tilde{D}^{v} \right) \right) \\ \end{aligned} Gain(D,a)=ρ×Gain(D~,a)=ρ(Ent(D~)v=1Vr~kEnt(D~v))
IV(D)=ρIV(D~)=ρ(−∑v=1Vr~vlog⁡2r~v) \mathrm{IV}\left( D \right)=\rho IV\left( \tilde{D} \right) =\rho \left( -\sum_{v=1}^{V}\tilde{r}_{v}\log_{2}\tilde{r}_{v} \right) IV(D)=ρIV(D~)=ρ(v=1Vr~vlog2r~v)

# df_feature不包含na,weight_feature也不包含na
feature_values = df_feature[feature].unique()
# r_k * entropy(\tilde{D}^v)
feature_entropy = 0.0
for value in feature_values:mask = df_feature[feature] == valuevalue_weight = weight_feature[mask]value_entropy = entropy(df_feature.loc[mask, label_col], value_weight)rk = value_weight.sum() / weight_feature_sumfeature_entropy += rk * value_entropy
info_gain = rho * (total_entropy - feature_entropy)
iv = rho * entropy(df_feature[feature], weight_feature)  # 固有值

Gain_ratio(D,a)=Gain(D,a)IV(a) \mathrm{Gain\_ratio} \left( D, a \right) = \frac{\mathrm{Gain}\left( D, a \right) }{\mathrm{IV}\left( a \right) } Gain_ratio(D,a)=IV(a)Gain(D,a)
Gini(D~)=1−∑k=1∣Y∣p~k \mathrm{Gini}\left( \tilde{D} \right) =1 - \sum_{k=1}^{\left| \mathcal{Y} \right| }\tilde{p}_{k} Gini(D~)=1k=1Yp~k
Gini(D~v)=1−∑k=1∣Y∣p~vk \mathrm{Gini}\left( \tilde{D}^v \right) =1 - \sum_{k=1}^{\left| \mathcal{Y} \right| }\tilde{p}_{v}^{k} Gini(D~v)=1k=1Yp~vk

def gini(categories: pd.Series, weight: np.ndarray = None) -> float:if categories.empty:return 0.0if weight is None:counts = categories.value_counts(normalize=True)# 1 - sum(p_k^2)return 1 - np.sum(counts**2)# 将类别转换为整数标签并计算加权频数labels, _ = pd.factorize(categories, sort=False)counts = np.bincount(labels, weights=weight, minlength=0)# 计算总权重和概率total_weight = counts.sum()if total_weight <= 0:return 0.0probabilities = counts / total_weightreturn 1 - np.sum(probabilities**2)

Gini_index(D,a)=ρ∑v=1Vr~vGini(D~v) \begin{aligned} \mathrm{Gini\_index}\left( D, a \right) = \rho \sum_{v=1}^{V} \tilde{r}_{v}\mathrm{Gini}\left( \tilde{D}^{v} \right) \end{aligned} Gini_index(D,a)=ρv=1Vr~vGini(D~v)
‌‌‌‌  对问题(2),若样本x\mathbf{x}x在划分属性aaa上的取值已知,则将x\mathbf{x}x划入与其取值对应的子节点,且样本全职在子结点中保持为wxw_{\mathbf{x}}wx。若样本x\mathbf{x}x在划分属性aaa上的取值未知,则将x\mathbf{x}x同时划入所有的子结点,且样本权值在与属性值ava^vav对应的子结点中调整为r~vwx\tilde{r}_{v} w_{\mathbf{x}}r~vwx;直观来看,这就是让同一个样本以不同的概率划入到不同的子结点中去

‌‌‌‌  C4.5算法使用了上述解决方案

推理的时候也是按权重得到子结点,相同类别权重相加,然后找到权重最高的类别

Cart

cart算法会得到一颗二叉树

分类

假定离散属性aaaVVV个可能的取值{a1,a2,⋯ ,aV}\left\{ a^1, a^2,\cdots,a^V \right\}{a1,a2,,aV},可以根据aia^iaiDDD分为两类,D1D^1D1是取值为aia^iai,另一类是D2=D−D1D^2=D-D^1D2=DD1
对于连续的天然就是2个类
Gini_index(D,a)=min⁡ai∈{a1,a2,⋯ ,aV}ρ∑v=12r~vGini(D~v) \begin{aligned} \mathrm{Gini\_index}\left( D, a \right) = \min_{a^i\in \left\{ a^1, a^2,\cdots,a^V \right\} }\rho \sum_{v=1}^{2} \tilde{r}_{v}\mathrm{Gini}\left( \tilde{D}^{v} \right) \end{aligned} Gini_index(D,a)=ai{a1,a2,,aV}minρv=12r~vGini(D~v)

回归

也是分成两类,不妨假设sss是切分点
min⁡a,sρ[min⁡c1∑x∈R1wi(yi−c1)2+min⁡c2∑x∈R2wi(yi−c1)2] \min_{a,s}\rho \left[ \min_{c_{1}} \sum_{x \in R_{1}}w_{i}\left( y_{i} - c_{1} \right)^2 + \min_{c_{2}} \sum_{x \in R_{2}}w_{i}\left( y_{i} - c_{1} \right)^2\right] a,sminρ[c1minxR1wi(yic1)2+c2minxR2wi(yic1)2]
其中R1,R2R_{1}, R_{2}R1,R2均不包含缺失值

W=diag(w)\mathbf{W} = \mathrm{diag}\left( \mathbf{w} \right)W=diag(w)
min⁡c∑wi(yi−c)2=(y−c1)W(y−c1) \min_{c}\sum w_{i} \left( y_{i}-c \right)^2 = \left( \mathbf{y}-c \mathbf{1} \right) \mathbf{W}\left( \mathbf{y} - c \mathbf{1} \right) cminwi(yic)2=(yc1)W(yc1)
∇L=21TW(y−c1)=0⇒c=1TWy1TW1=∑wiyi∑wi \nabla L = 2\mathbf{1}^T\mathbf{W}\left( \mathbf{y} - c \mathbf{1} \right) =0 \Rightarrow c = \frac{\mathbf{1}^T \mathbf{W}\mathbf{y}}{\mathbf{1}^T \mathbf{W}\mathbf{1}}=\frac{\sum w_{i}y_{i}}{\sum w_{i}} L=21TW(yc1)=0c=1TW11TWy=wiwiyi
然后每个结点的值就是∑wiyi∑wi\frac{\sum w_{i}y_{i}}{\sum w_{i}}wiwiyi,也就是加权平均

剪枝

‌‌‌‌  剪枝是决策树学习算法对付“过拟合”的主要手段。在决策树学习中,为了尽可能正确分类训练样本,结点划分过程不断重复,有时会造成决策树分支过多,这时就可能因训练样本学得“太好”了,以至于把训练集自身的一些特点当作所有数据都具有的一般性质而导致过拟合。因此,可通过主动去掉一些分支来降低过拟合的风险

预剪枝

先计算一下验证集的acc,然后计算一下划分后的验证集acc;如果划分前的acc>=划分后的,那就不划分了。

一种方便的做法就是,直接算一下,然后先添加子节点算一遍acc,然后清空

后剪枝

基于acc

对于每一个叶子的父结点(即该节点的孩子都是叶结点),先计算一下划分后的验证集acc;然后计算一下裁剪掉acc,如果裁剪后acc>裁剪前的acc就裁剪
然后重复这个过程

做法类似预剪枝,相当于后序遍历,遍历完孩子后

Minimal Cost-Complexity Pruning

下面小写字母表示结点,例如ttt
大写字母表示树,例如TTT
TtT_{t}Tt表示以ttt为根节点的树
∣Tt∣\left| T_{t} \right|Tt表示TtT_tTt的叶子节点个数
leaf(T)\mathrm{leaf}\left( T \right)leaf(T)表示TTT的叶子节点

损失函数
Cα(T)=C(T)+α∣T∣=(∑t∈leaf(T)p(t)R(t))+α∣T∣ C_{\alpha}\left( T \right) = C\left( T \right) + \alpha \left| T \right| =\left( \sum_{t \in \mathrm{leaf(T)}} p\left( t \right) R\left( t \right) \right) + \alpha \left| T \right| Cα(T)=C(T)+αT=tleaf(T)p(t)R(t)+αT
其中p(t)p\left( t \right)p(t)表示训练集中划分到该结点的样本的比例,例如100个样本,25个到了该结点,那p(t)=25100=14p \left( t \right)=\frac{25}{100}=\frac{1}{4}p(t)=10025=41,如果有缺失值,可以考虑用划分到该结点的样本的权值和除以全训练集的权重和
R(t)R\left( t \right)R(t)表示训练集中划分到该结点的样本的损失,可以用基尼指数、MSE、熵、错误率之类
Cα(t)=C(t)+α=p(t)R(t)+α C_{\alpha}\left( t \right) = C(t) + \alpha = p\left( t \right) R\left( t \right) + \alpha Cα(t)=C(t)+α=p(t)R(t)+α

Cα(Tt)=Cα(t)C_{\alpha}\left( T_t \right) = C_{\alpha}\left( t \right)Cα(Tt)=Cα(t),说明他们发挥着差不多的作用,那一般子节点少的泛化性会高一点,就可以把他的子节点裁了

Cα(Tt)=Cα(t)⇒α=C(t)−C(Tt)∣Tt∣−1 C_{\alpha}\left( T_{t} \right) =C_{\alpha}\left( t \right) \Rightarrow \alpha = \frac{C\left( t \right) - C \left( T_{t} \right) }{\left| T_{t} \right| -1} Cα(Tt)=Cα(t)α=Tt1C(t)C(Tt)
算法流程:
(1)设k=0,T=T0k=0, T=T_{0}k=0,T=T0
(2)设α=+inf⁡\alpha = +\infα=+inf
(3)自下而上地对各内部结点ttt计算C(Tt)C(T_{t})C(Tt)∣Tt∣\left| T_{t} \right|Tt以及
g(t)=C(t)−C(Tt)∣Tt∣−1 g\left( t \right) = \frac{C\left( t \right) - C \left( T_{t} \right) }{\left| T_{t} \right| -1} g(t)=Tt1C(t)C(Tt)
α=min⁡(α,g(t)) \alpha = \min \left( \alpha, g\left( t \right) \right) α=min(α,g(t))
(4)自上而下地访问内部结点ttt,如果有g(t)=αg\left( t \right) = \alphag(t)=α,进行剪枝,并对叶结点ttt以多数表决法决定其类,得到树TTT
(5)设k=k+1,αk=α,Tk=Tk = k +1, \alpha_{k} = \alpha, T_{k} =Tk=k+1,αk=α,Tk=T
(6)如果TTT不是由根节点单独构成的树,则回到步骤(4)
(7)采用交叉验证法在子树序列T0,T1,⋯ ,TnT_{0}, T_{1}, \cdots,T_{n}T0,T1,,Tn中选取最优子树TαT_{\alpha}Tα

代码

id3

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import numpy as np
import pandas as pdclass Node:def __init__(self, feature_name=None, category=None, majority_class=None):self.feature_name = feature_name # 特征self.category = category # 类别self.majority_class = majority_class # 多数类(备用类别)self.children = {} # 子节点,键为特征值,值为子节点def add_child(self, feature_value, child_node):self.children[feature_value] = child_nodedef is_leaf(self):return len(self.children) == 0def __repr__(self):if self.category:return f"Leaf({self.category})"return f"Node({self.feature_name}, majority={self.majority_class})"def calc_entropy(df: pd.DataFrame, label_col: str) -> float:if df.empty:return 0.0counts = df[label_col].value_counts(normalize=True)# -(|C_k| / |D|) * log2(|C_k| / |D|)return -np.sum(counts * np.log2(counts))def calc_feature_entropy(df: pd.DataFrame, feature_name: str, label_col: str) -> float:if df.empty:return 0.0counts = df[feature_name].value_counts(normalize=True)# (|D_i| / |D|) * entropy(D_i)return sum(p * calc_entropy(df[df[feature_name] == feature], label_col)for feature, p in counts.items())def calc_information_gain(total_entropy: float, df: pd.DataFrame, feature_name: str, label_col: str
) -> float:return total_entropy - calc_feature_entropy(df, feature_name, label_col)def choose_best_feature(df: pd.DataFrame, label_col: str) -> str:total_entropy = calc_entropy(df, label_col)  # 预计算总熵best_gain = -1best_feature = Nonefor feature in df.columns:if feature == label_col:continuegain = calc_information_gain(total_entropy, df, feature, label_col)print(f"Feature: {feature}, Information Gain: {gain}")if gain > best_gain:best_gain = gainbest_feature = featureprint(f"Best Feature: {best_feature}, Max Information Gain: {best_gain}")return best_feature, best_gainclass DecisionTreeID3:def __init__(self, epsilon: float = 1e-6):self.root = Noneself.epsilon = epsilondef _build(self, df: pd.DataFrame, label_col: str):# 全属于同一类labels = df[label_col].unique()if len(labels) == 1:return Node(category=labels[0])# 样本最多的类majority_class = df[label_col].mode()[0]# 没有特征可分if df.shape[1] == 1:return Node(majority_class=majority_class)best_feature, max_info_gain = choose_best_feature(df, label_col)node = Node(feature_name=best_feature, majority_class=majority_class)if max_info_gain < self.epsilon:return nodefor feature_value, subset in df.groupby(best_feature):child_node = self._build(subset.drop(columns=best_feature), label_col)node.add_child(feature_value, child_node)return nodedef fit(self, df: pd.DataFrame, label_col: str):if df.empty or label_col not in df.columns:raise ValueError("DataFrame is empty or label column is missing.")if df.isna().values.any():raise ValueError("DataFrame contains NaN values. Please handle missing data before fitting the model.")numeric = df.select_dtypes(include=[np.number])if not np.isfinite(numeric.values).all():raise ValueError("DataFrame contains non-finite values (inf, -inf). Please handle these values before fitting the model.")self.root = self._build(df, label_col)def predict_row(self, row: pd.Series) -> str:node = self.rootwhile not node.is_leaf():if node.feature_name in row.index:feature_value = row[node.feature_name]if feature_value in node.children:node = node.children[feature_value]else:return node.majority_classelse:print(f"Warning: Feature {node.feature_name} not found in row, returning majority class.")return node.majority_classreturn node.category if node.category is not None else node.majority_classdef predict(self, df: pd.DataFrame) -> pd.Series:if self.root is None:raise ValueError("The model has not been fitted yet.")return df.apply(self.predict_row, axis=1)def print_tree(self, node=None, indent="", feature_value=None):if node is None:if self.root is None:print("Tree not built yet.")returnnode = self.root# 打印当前节点prefix = f"{indent}{feature_value} → " if feature_value is not None else indentif node.is_leaf():value = node.category if node.category is not None else node.majority_classprint(f"{prefix}{value}")else:print(f"{prefix}[{node.feature_name}] (majority: {node.majority_class})")# 递归打印子节点for value, child in node.children.items():self.print_tree(child, indent + "    ", value)if __name__ == "__main__":df = pd.read_csv("PlayTennis.csv")df = pd.read_csv("watermelon.csv")df.drop(columns=["编号", "密度", "含糖率"], inplace=True)  # 删除不需要的列tree = DecisionTreeID3()tree.fit(df, label_col="好坏")tree.print_tree()predictions = tree.predict(df.drop(columns=["好坏"]))print("Predictions:")print(predictions)

C4.5

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from collections import defaultdict
from queue import Queueimport numpy as np
import pandas as pdclass Node:def __init__(self, feature_name=None, threshold=None, category=None, majority_class=None):self.feature_name = feature_name  # 特征self.threshold = threshold  # 阈值(用于连续特征)self.category = category  # 类别self.majority_class = majority_class  # 多数类(备用类别)self.ratio = {}  # 特征值对应的比例self.children = {}  # 子节点,键为特征值或者("<=", ">"),值为子节点def add_child(self, feature_value, child_node, child_ratio):self.children[feature_value] = child_nodeself.ratio[feature_value] = child_ratiodef clear_child(self):self.ratio = {}self.children = {}def is_continuous(self):return self.threshold is not Nonedef is_leaf(self):return len(self.children) == 0def entropy(categories: pd.Series, weight: np.ndarray = None) -> float:if categories.empty:return 0.0if weight is None:counts = categories.value_counts(normalize=True)return -np.sum(counts * np.log2(counts))# 将类别转换为整数标签并计算加权频数labels, _ = pd.factorize(categories, sort=False)counts = np.bincount(labels, weights=weight, minlength=0)# 计算总权重和概率total_weight = counts.sum()if total_weight <= 0:return 0.0probabilities = counts / total_weight# 仅处理非零概率避免log(0)警告non_zero_probs = probabilities[probabilities > 0]return -np.sum(non_zero_probs * np.log2(non_zero_probs))def choose_best_feature(df: pd.DataFrame, label_col: str, weight: np.ndarray) -> str:stats = []  # (特征, 信息增益, 信息增益率, 是否连续特征,阈值, 比例)if (weight <= 0).any():return None, None, None, None, None, Noneweight_sum = weight.sum()for feature in df.columns:if feature == label_col:continuefeature_mask = df[feature].notna()weight_feature = weight[feature_mask]  # 过滤掉缺失值的权重df_feature = df.loc[feature_mask]  # \tilde{D}weight_feature_sum = weight_feature.sum()rho = weight_feature_sum / weight_sum  # 权重total_entropy = entropy(df_feature[label_col], weight_feature)  # entropy(\tilde{D})# 连续特征if pd.api.types.is_numeric_dtype(df_feature[feature]):values = df_feature[feature].sort_values().unique()if len(values) <= 1:  # 值不足无法划分continuethresholds = (values[:-1] + values[1:]) / 2  # 划分点max_info_gain = -np.inf  # 最大信息增益best_info_gain_rate = -np.inf  # 最佳信息增益率best_threshold = None  # 最佳划分点best_rk = Nonefor threshold in thresholds:left_mask = df_feature[feature] <= thresholdleft_weight = weight_feature[left_mask]left_weight_sum = left_weight.sum()right_mask = ~left_maskright_weight = weight_feature[right_mask]right_weight_sum = right_weight.sum()rk_left = left_weight_sum / weight_feature_sumrk_right = right_weight_sum / weight_feature_sumleft_entropy = entropy(df_feature.loc[left_mask, label_col], left_weight)right_entropy = entropy(df_feature.loc[right_mask, label_col], right_weight)feature_entropy = rk_left * left_entropy + rk_right * right_entropyinfo_gain = rho * (total_entropy - feature_entropy)if info_gain > max_info_gain:max_info_gain = info_gainbest_threshold = thresholdiv = rho * (-rk_left * np.log2(rk_left) - rk_right * np.log2(rk_right))  # 固有值if iv <= 0:continuebest_info_gain_rate = info_gain / ivbest_rk = {"<=": rk_left, ">": rk_right}if max_info_gain > -np.inf:  # 仅添加有效划分点stats.append((feature,max_info_gain,best_info_gain_rate,True,best_threshold,best_rk,))else:# 离散特征feature_values = df_feature[feature].unique()if len(feature_values) <= 1:continueall_rk = {}# r_k * entropy(\tilde{D}^v)feature_entropy = 0.0for value in feature_values:mask = df_feature[feature] == valuevalue_weight = weight_feature[mask]value_entropy = entropy(df_feature.loc[mask, label_col], value_weight)rk = value_weight.sum() / weight_feature_sumall_rk[value] = rkfeature_entropy += rk * value_entropyinfo_gain = rho * (total_entropy - feature_entropy)iv = rho * entropy(df_feature[feature], weight_feature)  # 固有值if iv <= 0:continueinfo_gain_rate = info_gain / ivstats.append((feature, info_gain, info_gain_rate, False, None, all_rk))# 按照信息增益率排序,选择最优特征if not stats:return None, None, None, None, None, Nonemean_info_gain = np.mean([s[1] for s in stats])useful_stats = [s for s in stats if s[1] > mean_info_gain]# if not useful_stats:#     return max(stats, key=lambda x: x[2])return max(useful_stats or stats, key=lambda x: x[2])class DecisionTreeC45:def __init__(self,epsilon: float = 1e-6,pre_prune=False,post_prune=False,reuse_feature: bool = False,):self.root = Noneself.epsilon = epsilonself.pre_prune = pre_pruneself.post_prune = post_pruneself.reuse_feature = reuse_featuredef _build(self,df: pd.DataFrame,label_col: str,weight: np.ndarray,root=None,val_df=None,) -> Node:# 全属于同一类labels = df[label_col].unique()if len(labels) == 1:return Node(category=labels[0])# 样本最多的类majority_class = df[label_col].mode()[0]# 没有特征可分if df.shape[1] == 1:return Node(majority_class=majority_class)(best_feature,max_info_gain,best_info_gain_rate,is_continuous,threshold,best_rk,) = choose_best_feature(df, label_col, weight)node = Node(feature_name=best_feature,threshold=threshold,majority_class=majority_class,)if root is None:root = nodeif best_feature is None or best_info_gain_rate < self.epsilon:# return Node(majority_class=majority_class)return node# 预剪枝if val_df and self.pre_prune:pre_acc = self.calculate_acc(val_df, label_col, root)# 创建临时子树if is_continuous:node.add_child("<=",Node(majority_class=df.loc[df[best_feature] <= threshold, label_col].mode()[0]),best_rk["<="],)node.add_child(">",Node(majority_class=df.loc[df[best_feature] > threshold, label_col].mode()[0]),best_rk[">"],)else:for value, rk in best_rk.items():node.add_child(value,Node(majority_class=df.loc[df[best_feature] == value, label_col].mode()[0]),rk,)prune_acc = self.calculate_acc(val_df, label_col, root)# 清除临时子树node.clear_child()# 如果剪枝后准确率没有提升,则终止划分if pre_acc >= prune_acc:return nodena_mask = df[best_feature].isna()if is_continuous:  # 连续特征left_mask = df[best_feature] <= threshold  # 小于等于阈值的样本(不包含na)right_mask = df[best_feature] > thresholdleft_weight = weight.copy()left_weight[na_mask] *= best_rk["<="]  # NA权重分配给左子树left_weight = left_weight[left_mask | na_mask]right_weight = weight.copy()right_weight[na_mask] *= best_rk[">"]right_weight = right_weight[right_mask | na_mask]if not self.reuse_feature:df = df.drop(columns=[best_feature])left_df = df.loc[left_mask | na_mask]if self.reuse_feature and df.loc[left_mask, best_feature].nunique() == 1:left_df = left_df.drop(columns=[best_feature])right_df = df.loc[right_mask | na_mask]if self.reuse_feature and df.loc[right_mask, best_feature].nunique() == 1:right_df = right_df.drop(columns=[best_feature])left_node = self._build(left_df, label_col, left_weight, root, val_df)right_node = self._build(right_df, label_col, right_weight, root, val_df)node.add_child("<=", left_node, best_rk["<="])node.add_child(">", right_node, best_rk[">"])else:  # 离散特征for value, rk in best_rk.items():mask = df[best_feature] == valuecur_weight = weight.copy()cur_weight[na_mask] *= rkcur_weight = cur_weight[mask | na_mask]cur_df = df.loc[mask | na_mask].drop(columns=[best_feature])next_node = self._build(cur_df, label_col, cur_weight, root, val_df)node.add_child(value, next_node, rk)# 后剪枝if val_df and self.post_prune and node.children:flag = Truefor child in node.children.values():if not child.is_leaf():flag = Falsebreakif not flag:return nodepre_acc = self.calculate_acc(val_df, label_col, root)children = node.childrenratio = node.rationode.clear_child()prune_acc = self.calculate_acc(val_df, label_col, root)if pre_acc >= prune_acc:return nodenode.children = childrennode.ratio = ratioreturn nodedef fit(self, df: pd.DataFrame, label_col: str, val_df=None, weight=None):if df.empty or label_col not in df.columns:raise ValueError("DataFrame is empty or label column is missing.")if df[label_col].isna().any():raise ValueError("Label column contains NaN values.")if weight is None:weight = np.ones(len(df))  # 初始权重为1elif (weight <= 0).any():raise ValueError("Weight is not positive.")weight = np.ones(len(df))  # 初始权重为1self.root = self._build(df, label_col, weight, None, val_df)def predict_row(self, row: pd.Series, node) -> str:q = Queue()q.put((node, 1.0))res = defaultdict(float)while not q.empty():current_node, current_weight = q.get()if current_node.is_leaf():res[current_node.categoryif current_node.category is not Noneelse current_node.majority_class] += current_weightcontinueif current_node.feature_name not in row.index:print(f"Warning: Feature {current_node.feature_name} not found in row, returning majority class.")res[current_node.majority_class] += current_weightcontinuefeature_value = row[current_node.feature_name]if pd.notna(feature_value):if current_node.is_continuous():if feature_value <= current_node.threshold:child_node = current_node.children["<="]else:child_node = current_node.children[">"]q.put((child_node, current_weight))else:if feature_value in current_node.children:child_node = current_node.children[feature_value]q.put((child_node, current_weight))else:# 如果特征值不在子节点中,返回多数类print(f"Warning: Feature {current_node.feature_name} has value {feature_value} not found in children, returning majority class.")res[current_node.majority_class] += current_weightelse:for value, child_node in current_node.children.items():q.put((child_node, current_weight * current_node.ratio[value]))# Combine results from all pathsif not res:return self.root.majority_classreturn max(res, key=res.get)def calculate_acc(self, df, label_col, node):if not df or df.empty:return 0.0preds = df.apply(lambda row: self.predict_row(row, node), axis=1)return (preds == df[label_col]).mean()def predict(self, df: pd.DataFrame) -> pd.Series:if self.root is None:raise ValueError("The model has not been fitted yet.")return df.apply(lambda row: self.predict_row(row, self.root), axis=1)def print_tree(self, node=None, indent: str = "", prefix: str = ""):"""打印决策树结构"""if node is None:node = self.rootprint("Decision Tree:")# 叶子节点打印类别if node.is_leaf():leaf_class = node.category if node.category else node.majority_classprint(f"{indent}{prefix} Leaf: {leaf_class}")return# 内部节点打印特征if node.is_continuous():print(f"{indent}{prefix} {node.feature_name} <= {node.threshold:.3f}")# 递归打印子节点self.print_tree(node.children.get("<="), indent + "  ", "├── <=: ")self.print_tree(node.children.get(">"), indent + "  ", "└── >: ")else:print(f"{indent}{prefix} {node.feature_name}")# 递归打印所有子节点children = list(node.children.items())for i, (value, child_node) in enumerate(children):last_child = i == len(children) - 1branch = "└──" if last_child else "├──"self.print_tree(child_node, indent + "  ", f"{branch} {value}: ")if __name__ == "__main__":df = pd.read_csv("watermelon_na.csv")df.drop(columns=["编号"], inplace=True)  # 删除不需要的列tree = DecisionTreeC45(epsilon=1e-6, reuse_feature=False)tree.fit(df, label_col="好坏")tree.print_tree()

Cart

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from collections import defaultdict
from copy import deepcopy
from queue import Queueimport numpy as np
import pandas as pdclass Node:def __init__(self,feature_name=None,threshold=None,category=None,majority_class=None,is_continuous=False,node_weight=None,gini=None,R=None,R_T=None,leaf_count=None,):self.feature_name = feature_name  # 特征self.threshold = threshold  # 阈值self.category = category  # 类别self.majority_class = majority_class  # 多数类(备用类别)self.is_continuous = is_continuous  # 连续self.ratio = {}  # 特征值对应的比例self.children = {}  # 子节点,键为特征值或者("<=", ">"),值为子节点 (>表示大于或者!=)self.node_weight = node_weight  # 节点权重(样本权重和)self.gini = gini  # 节点基尼指数self.R = R  # 节点的不纯度代价 R(t)self.R_T = R_T  # 子树的不纯度代价 R(T_t)self.leaf_count = leaf_count  # 子树叶子节点数量def add_child(self, feature_value, child_node, child_ratio):self.children[feature_value] = child_nodeself.ratio[feature_value] = child_ratiodef clear_child(self):self.ratio = {}self.children = {}# 剪枝后变为叶节点,R_T等于节点自身Rself.R_T = self.R_Tself.leaf_count = 1def is_leaf(self):return len(self.children) == 0def gini(categories: pd.Series, weight: np.ndarray = None) -> float:if categories.empty:return 0.0if weight is None:counts = categories.value_counts(normalize=True)# 1 - sum(p_k^2)return 1 - np.sum(counts**2)# 将类别转换为整数标签并计算加权频数labels, _ = pd.factorize(categories, sort=False)counts = np.bincount(labels, weights=weight, minlength=0)# 计算总权重和概率total_weight = counts.sum()if total_weight <= 0:return 0.0probabilities = counts / total_weightreturn 1 - np.sum(probabilities**2)def choose_best_feature(df: pd.DataFrame, label_col: str, weight: np.ndarray) -> str:if (weight <= 0).any():return None, None, None, None, Nonebest_feature = Nonemin_gini_index = np.infbest_is_continuous = Nonebest_threshold = Nonebest_rk = Noneweight_sum = weight.sum()for feature in df.columns:if feature == label_col:continuefeature_mask = df[feature].notna()weight_feature = weight[feature_mask]  # 过滤掉缺失值的权重df_feature = df.loc[feature_mask]  # \tilde{D}weight_feature_sum = weight_feature.sum()rho = weight_feature_sum / weight_sum  # 权重is_continuous = pd.api.types.is_numeric_dtype(df_feature[feature])splits = Noneif is_continuous:values = df_feature[feature].sort_values().unique()if len(values) <= 1:  # 值不足无法划分continuesplits = (values[:-1] + values[1:]) / 2  # 划分点else:feature_values = df_feature[feature].unique()if len(feature_values) <= 1:continuesplits = feature_valuesfor value in splits:left_mask = ((df_feature[feature] <= value)if is_continuouselse (df_feature[feature] == value))left_weight = weight_feature[left_mask]left_weight_sum = left_weight.sum()right_mask = ~left_maskright_weight = weight_feature[right_mask]right_weight_sum = right_weight.sum()rk_left = left_weight_sum / weight_feature_sumrk_right = right_weight_sum / weight_feature_sumleft_gini = gini(df_feature.loc[left_mask, label_col], left_weight)right_gini = gini(df_feature.loc[right_mask, label_col], right_weight)feature_gini = rk_left * left_gini + rk_right * right_gini# rho * (rk * gini(\tilde{D}^{v}))gini_index = rho * feature_giniif gini_index < min_gini_index:best_feature = featuremin_gini_index = gini_indexbest_is_continuous = is_continuousbest_threshold = valuebest_rk = {"<=": rk_left, ">": rk_right}return best_feature, min_gini_index, best_is_continuous, best_threshold, best_rkclass DecisionTreeCart:def __init__(self,epsilon: float = 1e-6,pre_prune=False,post_prune=False,reuse_feature: bool = True,ccp_alpha=0.0,):self.root = Noneself.epsilon = epsilonself.pre_prune = pre_pruneself.post_prune = post_pruneself.reuse_feature = reuse_featureself.ccp_alpha = ccp_alphadef _build(self,df: pd.DataFrame,label_col: str,weight: np.ndarray,root=None,val_df=None,total_weight=None,) -> Node:# 计算当前节点权重和基尼指数node_weight = weight.sum() / total_weightnode_gini = gini(df[label_col], weight)node_R = node_weight * node_gini# 全属于同一类labels = df[label_col].unique()if len(labels) == 1:return Node(category=labels[0],node_weight=node_weight,gini=node_gini,R=node_R,R_T=node_R,leaf_count=1,)# 样本最多的类majority_class = df[label_col].mode()[0]# 没有特征可分if df.shape[1] == 1:return Node(majority_class=majority_class,node_weight=node_weight,gini=node_gini,R=node_R,R_T=node_R,leaf_count=1,)best_feature, min_gini_index, best_is_continuous, best_threshold, best_rk = (choose_best_feature(df, label_col, weight))node = Node(feature_name=best_feature,threshold=best_threshold,majority_class=majority_class,is_continuous=best_is_continuous,node_weight=node_weight,gini=node_gini,R=node_R,)if root is None:root = nodeif best_feature is None or min_gini_index < self.epsilon:# return Node(majority_class=majority_class)node.R_T = node_Rnode.leaf_count = 1return nodena_mask = df[best_feature].isna()left_mask = Noneif best_is_continuous:left_mask = (df[best_feature] <= best_threshold)  # 小于等于阈值的样本(不包含na)else:left_mask = df[best_feature] == best_threshold  # 等于阈值的样本(不包含na)right_mask = (~left_mask) & (~na_mask)  # 其他样本(不包含na)# 预剪枝if val_df and self.pre_prune:pre_acc = self.calculate_acc(val_df, label_col, root)# 创建临时子树node.add_child("<=",Node(majority_class=df.loc[left_mask, label_col].mode()[0]),best_rk["<="],)node.add_child(">",Node(majority_class=df.loc[right_mask, label_col].mode()[0]),best_rk[">"],)prune_acc = self.calculate_acc(val_df, label_col, root)# 清除临时子树node.clear_child()# 如果剪枝后准确率没有提升,则终止划分if pre_acc >= prune_acc:node.R_T = node_Rnode.leaf_count = 1return nodeleft_weight = weight.copy()left_weight[na_mask] *= best_rk["<="]  # NA权重分配给左子树left_weight = left_weight[left_mask | na_mask]right_weight = weight.copy()right_weight[na_mask] *= best_rk[">"]right_weight = right_weight[right_mask | na_mask]if not self.reuse_feature:df = df.drop(columns=[best_feature])left_df = df.loc[left_mask | na_mask]if self.reuse_feature and df.loc[left_mask, best_feature].nunique() == 1:left_df = left_df.drop(columns=[best_feature])right_df = df.loc[right_mask | na_mask]if self.reuse_feature and df.loc[right_mask, best_feature].nunique() == 1:right_df = right_df.drop(columns=[best_feature])left_node = self._build(left_df, label_col, left_weight, root, val_df, total_weight)right_node = self._build(right_df, label_col, right_weight, root, val_df, total_weight)node.add_child("<=", left_node, best_rk["<="])node.add_child(">", right_node, best_rk[">"])node.R_T = left_node.R_T + right_node.R_Tnode.leaf_count = left_node.leaf_count + right_node.leaf_countreturn nodedef fit(self, df: pd.DataFrame, label_col: str, val_df=None, weight=None):if df.empty or label_col not in df.columns:raise ValueError("DataFrame is empty or label column is missing.")if df[label_col].isna().any():raise ValueError("Label column contains NaN values.")if weight is None:weight = np.ones(len(df))  # 初始权重为1elif (weight <= 0).any():raise ValueError("Weight is not positive.")total_weight = weight.sum()self.root = self._build(df, label_col, weight, None, val_df, total_weight)if self.post_prune and self.ccp_alpha > 0 and self.val_df:self._cost_complexity_pruning(val_df, label_col)def _cost_complexity_pruning(self, val_df, label_col):"""执行代价复杂度剪枝算法步骤:1. 初始化: k=0, T=T_0, alpha=+inf2. 自底向上计算每个内部节点t的g(t)3. 找到最小的g(t)作为alpha_k,并剪枝对应的节点4. 重复直到只剩根节点5. 使用验证集选择最优子树"""# 步骤1: 初始化k = 0T = deepcopy(self.root)  # 当前子树alpha = float("inf")self.pruned_trees = []  # 存储子树序列 T0, T1, ..., Tnself.alphas = []  # 存储对应的alpha序列# 添加原始树self.pruned_trees.append(deepcopy(T))self.alphas.append(alpha)# 步骤2-6: 循环剪枝直到只剩根节点while not T.is_leaf():min_g = float("inf")min_g_nodes = []q = Queue()q.put(T)while not q.empty():p = q.get()for child in p.children.values():if child.is_leaf():continueassert child.leaf_count > 1q.put(child)cur_g = (child.R - child.R_T) / (child.leaf_count - 1)if cur_g < min_g:min_g = cur_gmin_g_nodes.append(child)elif abs(cur_g - min_g) < 1e-6:  # cur_g == min_gmin_g_nodes.append(child)# 如果没有找到可剪枝的节点,停止循环if not min_g_nodes:break# 更新alphaalpha = min_g# 剪枝所有g(t)=alpha的节点for node in min_g_nodes:node.clear_child()# 更新整棵树的R_T和leaf_countself._update_tree_metrics(T)# 保存当前子树和alphak += 1self.pruned_trees.append(deepcopy(T))self.alphas.append(alpha)# 如果达到ccp_alpha阈值则停止if alpha > self.ccp_alpha:break# 步骤7: 使用验证集选择最优子树if val_df is not None:best_acc = -1best_tree_idx = -1for i, tree in enumerate(self.pruned_trees):acc = self.calculate_acc(val_df, label_col, tree)if acc > best_acc:best_acc = accbest_tree_idx = i# 选择最优子树self.root = self.pruned_trees[best_tree_idx]print(f"Selected subtree T_{best_tree_idx} with alpha={self.alphas[best_tree_idx]:.6f}, accuracy={best_acc:.4f}")def _update_tree_metrics(self, node):if node.is_leaf():returntotal_R_T = 0.0total_leaf_count = 0for child in node.children.values():self._update_tree_metrics(child)total_R_T += child.R_Ttotal_leaf_count += child.leaf_countnode.R_T = total_R_Tnode.leaf_count = total_leaf_countdef predict_row(self, row: pd.Series, node) -> str:q = Queue()q.put((node, 1.0))res = defaultdict(float)while not q.empty():current_node, current_weight = q.get()if current_node.is_leaf():res[current_node.categoryif current_node.category is not Noneelse current_node.majority_class] += current_weightcontinueif current_node.feature_name not in row.index:print(f"Warning: Feature {current_node.feature_name} not found in row, returning majority class.")res[current_node.majority_class] += current_weightcontinuefeature_value = row[current_node.feature_name]if pd.notna(feature_value):if current_node.is_continuous:if feature_value <= current_node.threshold:child_node = current_node.children["<="]else:child_node = current_node.children[">"]q.put((child_node, current_weight))else:if feature_value == current_node.threshold:child_node = current_node.children["<="]else:child_node = current_node.children[">"]q.put((child_node, current_weight))else:for value, child_node in current_node.children.items():q.put((child_node, current_weight * current_node.ratio[value]))# Combine results from all pathsif not res:return self.root.majority_classreturn max(res, key=res.get)def calculate_acc(self, df, label_col, node):if not df or df.empty:return 0.0preds = df.apply(lambda row: self.predict_row(row, node), axis=1)return (preds == df[label_col]).mean()def predict(self, df: pd.DataFrame) -> pd.Series:if self.root is None:raise ValueError("The model has not been fitted yet.")return df.apply(lambda row: self.predict_row(row, self.root), axis=1)def print_tree(self, node=None, indent: str = "", prefix: str = ""):"""打印决策树结构"""if node is None:node = self.rootprint("Decision Tree:")print(f"Total leaves: {node.leaf_count}")# 叶子节点打印类别if node.is_leaf():leaf_class = node.category if node.category else node.majority_classprint(f"{indent}{prefix} Leaf: {leaf_class} "f"(R={node.R:.4f}, samples={node.node_weight:.3f})")return# 内部节点打印特征if node.is_continuous:print(f"{indent}{prefix} {node.feature_name} <= {node.threshold:.3f} "f"[R_T={node.R_T:.4f}, leaves={node.leaf_count}]")# 递归打印子节点self.print_tree(node.children.get("<="), indent + "  ", "├── <=: ")self.print_tree(node.children.get(">"), indent + "  ", "└── >: ")else:print(f"{indent}{prefix} {node.feature_name} == {node.threshold} "f"[R_T={node.R_T:.4f}, leaves={node.leaf_count}]")self.print_tree(node.children.get("<="), indent + "  ", "├── ==: ")self.print_tree(node.children.get(">"), indent + "  ", "└── !=: ")if __name__ == "__main__":df = pd.read_csv("watermelon.csv")df.drop(columns=["编号"], inplace=True)  # 删除不需要的列tree = DecisionTreeCart(epsilon=1e-6, pre_prune=False, post_prune=False, reuse_feature=True, ccp_alpha=0)tree.fit(df, label_col="好坏")tree.print_tree()

Cart回归

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from copy import deepcopy
from queue import Queueimport numpy as np
import pandas as pdclass Node:def __init__(self,feature_name=None,threshold=None,value=None,is_continuous=False,node_weight=None,gini=None,R=None,R_T=None,leaf_count=None,):self.feature_name = feature_name  # 特征self.threshold = threshold  # 阈值self.value = value  # 叶节点的预测值self.is_continuous = is_continuous  # 连续self.ratio = {}  # 特征值对应的比例self.children = {}  # 子节点,键为特征值或者("<=", ">"),值为子节点 (>表示大于或者!=)self.node_weight = node_weight  # 节点权重(样本权重和)self.gini = gini  # 节点基尼指数self.R = R  # 节点的不纯度代价 R(t)self.R_T = R_T  # 子树的不纯度代价 R(T_t)self.leaf_count = leaf_count  # 子树叶子节点数量def add_child(self, feature_value, child_node, child_ratio):self.children[feature_value] = child_nodeself.ratio[feature_value] = child_ratiodef clear_child(self):self.ratio = {}self.children = {}# 剪枝后变为叶节点,R_T等于节点自身Rself.R_T = self.R_Tself.leaf_count = 1def is_leaf(self):return len(self.children) == 0def calculate_r(y: np.ndarray, weight: np.ndarray = None):if len(y) == 0:return 0.0if weight is None:weight = np.ones(len(y))if weight.sum() <= 0:return 0.0# min_c \sum w_i (y_i - c)^2 = (y - c1)^T W (y - c1) => c = (1^T W y) / (1^T W 1) = (\sum w_i y_i) / (\sum w_i)c = np.average(y, weights=weight)# \sum w_i (y_i - c)^2return np.sum(weight * ((y - c) ** 2))def choose_best_feature(df: pd.DataFrame, label_col: str, weight: np.ndarray) -> str:if (weight <= 0).any():return None, None, None, None, Nonebest_feature = Nonemin_mse = np.infbest_is_continuous = Nonebest_threshold = Nonebest_rk = Noneweight_sum = weight.sum()for feature in df.columns:if feature == label_col:continuefeature_mask = df[feature].notna()weight_feature = weight[feature_mask]  # 过滤掉缺失值的权重df_feature = df.loc[feature_mask]  # \tilde{D}weight_feature_sum = weight_feature.sum()rho = weight_feature_sum / weight_sum  # 权重is_continuous = pd.api.types.is_numeric_dtype(df_feature[feature])splits = Noneif is_continuous:values = df_feature[feature].sort_values().unique()if len(values) <= 1:  # 值不足无法划分continuesplits = (values[:-1] + values[1:]) / 2  # 划分点else:feature_values = df_feature[feature].unique()if len(feature_values) <= 1:continuesplits = feature_valuesfor value in splits:left_mask = ((df_feature[feature] <= value)if is_continuouselse (df_feature[feature] == value))left_weight = weight_feature[left_mask]left_weight_sum = left_weight.sum()right_mask = ~left_maskright_weight = weight_feature[right_mask]right_weight_sum = right_weight.sum()rk_left = left_weight_sum / weight_feature_sumrk_right = right_weight_sum / weight_feature_sumleft_mse = calculate_r(df_feature.loc[left_mask, label_col].to_numpy(), left_weight)right_mse = calculate_r(df_feature.loc[right_mask, label_col].to_numpy(), right_weight)feature_mse = rk_left * left_mse + rk_right * right_mse# rho * (rk * gini(\tilde{D}^{v}))weighted_mse = rho * feature_mseif weighted_mse < min_mse:best_feature = featuremin_mse = weighted_msebest_is_continuous = is_continuousbest_threshold = valuebest_rk = {"<=": rk_left, ">": rk_right}return best_feature, min_mse, best_is_continuous, best_threshold, best_rkclass DecisionTreeCartRegression:def __init__(self,epsilon: float = 1e-6,pre_prune=False,post_prune=False,reuse_feature: bool = True,ccp_alpha=0.0,):self.root = Noneself.epsilon = epsilonself.pre_prune = pre_pruneself.post_prune = post_pruneself.reuse_feature = reuse_featureself.ccp_alpha = ccp_alphadef _build(self,df: pd.DataFrame,label_col: str,weight: np.ndarray,root=None,val_df=None,total_weight=None,) -> Node:# 计算当前节点权重和基尼指数node_weight = weight.sum() / total_weightnode_value = np.average(df[label_col].to_numpy(), weights=weight)node_R = node_weight * calculate_r(df[label_col].to_numpy(), weight)# y一样或者没有特征可分if df[label_col].max() - df[label_col].min() < 1e-6 or df.shape[1] == 1:return Node(value=node_value,node_weight=node_weight,gini=node_value,R=node_R,R_T=node_R,leaf_count=1,)best_feature, min_gini_index, best_is_continuous, best_threshold, best_rk = (choose_best_feature(df, label_col, weight))node = Node(feature_name=best_feature,threshold=best_threshold,value=node_value,is_continuous=best_is_continuous,node_weight=node_weight,gini=node_value,R=node_R,)if root is None:root = nodeif best_feature is None:# return Node(majority_class=majority_class)node.R_T = node_Rnode.leaf_count = 1return nodena_mask = df[best_feature].isna()left_mask = Noneif best_is_continuous:left_mask = (df[best_feature] <= best_threshold)  # 小于等于阈值的样本(不包含na)else:left_mask = df[best_feature] == best_threshold  # 等于阈值的样本(不包含na)right_mask = (~left_mask) & (~na_mask)  # 其他样本(不包含na)# 预剪枝if val_df and self.pre_prune:pre_mse = self.calculate_mse_from_node(val_df, label_col, root)# 创建临时子树(一层)left_value = (np.average(df.loc[left_mask, label_col].to_numpy(), weights=weight[left_mask])if left_mask.any()else node_value)right_value = (np.average(df.loc[right_mask, label_col].to_numpy(), weights=weight[right_mask])if right_mask.any()else node_value)node.add_child("<=", Node(value=left_value), best_rk["<="])node.add_child(">", Node(value=right_value), best_rk[">"])prune_mse = self.calculate_mse_from_node(val_df, label_col, root)# 清除临时子树node.clear_child()# 如果剪枝后准确率没有提升,则终止划分if pre_mse <= prune_mse:node.R_T = node_Rnode.leaf_count = 1return nodeleft_weight = weight.copy()left_weight[na_mask] *= best_rk["<="]  # NA权重分配给左子树left_weight = left_weight[left_mask | na_mask]right_weight = weight.copy()right_weight[na_mask] *= best_rk[">"]right_weight = right_weight[right_mask | na_mask]if not self.reuse_feature:df = df.drop(columns=[best_feature])left_df = df.loc[left_mask | na_mask]if self.reuse_feature and df.loc[left_mask, best_feature].nunique() == 1:left_df = left_df.drop(columns=[best_feature])right_df = df.loc[right_mask | na_mask]if self.reuse_feature and df.loc[right_mask, best_feature].nunique() == 1:right_df = right_df.drop(columns=[best_feature])left_node = self._build(left_df, label_col, left_weight, root, val_df, total_weight)right_node = self._build(right_df, label_col, right_weight, root, val_df, total_weight)node.add_child("<=", left_node, best_rk["<="])node.add_child(">", right_node, best_rk[">"])node.R_T = left_node.R_T + right_node.R_Tnode.leaf_count = left_node.leaf_count + right_node.leaf_countreturn nodedef fit(self, df: pd.DataFrame, label_col: str, val_df=None, weight=None):if df.empty or label_col not in df.columns:raise ValueError("DataFrame is empty or label column is missing.")if df[label_col].isna().any():raise ValueError("y column contains NaN values.")if not pd.api.types.is_numeric_dtype(df[label_col]):raise ValueError("y column only support numeric values.")if weight is None:weight = np.ones(len(df))  # 初始权重为1elif (weight <= 0).any():raise ValueError("Weight is not positive.")total_weight = weight.sum()self.root = self._build(df, label_col, weight, None, val_df, total_weight)if self.post_prune and self.ccp_alpha > 0 and self.val_df:self._cost_complexity_pruning(val_df, label_col)def _cost_complexity_pruning(self, val_df, label_col):"""执行代价复杂度剪枝算法步骤:1. 初始化: k=0, T=T_0, alpha=+inf2. 自底向上计算每个内部节点t的g(t)3. 找到最小的g(t)作为alpha_k,并剪枝对应的节点4. 重复直到只剩根节点5. 使用验证集选择最优子树"""# 步骤1: 初始化k = 0T = deepcopy(self.root)  # 当前子树alpha = float("inf")self.pruned_trees = []  # 存储子树序列 T0, T1, ..., Tnself.alphas = []  # 存储对应的alpha序列# 添加原始树self.pruned_trees.append(deepcopy(T))self.alphas.append(alpha)# 步骤2-6: 循环剪枝直到只剩根节点while not T.is_leaf():min_g = float("inf")min_g_nodes = []q = Queue()q.put(T)while not q.empty():p = q.get()for child in p.children.values():if child.is_leaf():continueassert child.leaf_count > 1q.put(child)cur_g = (child.R - child.R_T) / (child.leaf_count - 1)if cur_g < min_g:min_g = cur_gmin_g_nodes.append(child)elif abs(cur_g - min_g) < 1e-6:  # cur_g == min_gmin_g_nodes.append(child)# 如果没有找到可剪枝的节点,停止循环if not min_g_nodes:break# 更新alphaalpha = min_g# 剪枝所有g(t)=alpha的节点for node in min_g_nodes:node.clear_child()# 更新整棵树的R_T和leaf_countself._update_tree_metrics(T)# 保存当前子树和alphak += 1self.pruned_trees.append(deepcopy(T))self.alphas.append(alpha)# 如果达到ccp_alpha阈值则停止if alpha > self.ccp_alpha:break# 步骤7: 使用验证集选择最优子树if val_df is not None:best_mse = float("inf")best_tree_idx = -1for i, tree in enumerate(self.pruned_trees):cur_mse = self.calculate_mse_from_node(val_df, label_col, tree)if cur_mse < best_mse:best_mse = cur_msebest_tree_idx = i# 选择最优子树self.root = self.pruned_trees[best_tree_idx]print(f"Selected subtree T_{best_tree_idx} with alpha={self.alphas[best_tree_idx]:.6f}, accuracy={best_mse:.4f}")def _update_tree_metrics(self, node):if node.is_leaf():returntotal_R_T = 0.0total_leaf_count = 0for child in node.children.values():self._update_tree_metrics(child)total_R_T += child.R_Ttotal_leaf_count += child.leaf_countnode.R_T = total_R_Tnode.leaf_count = total_leaf_countdef predict_row(self, row: pd.Series, node) -> str:q = Queue()q.put((node, 1.0))ans = []while not q.empty():current_node, current_weight = q.get()if current_node.is_leaf():ans.append(current_weight * current_node.value)continueif current_node.feature_name not in row.index:print(f"Warning: Feature {current_node.feature_name} not found in row.")ans.append(current_weight * current_node.value)continuefeature_value = row[current_node.feature_name]if pd.notna(feature_value):if current_node.is_continuous:if feature_value <= current_node.threshold:child_node = current_node.children["<="]else:child_node = current_node.children[">"]q.put((child_node, current_weight))else:if feature_value == current_node.threshold:child_node = current_node.children["<="]else:child_node = current_node.children[">"]q.put((child_node, current_weight))else:for value, child_node in current_node.children.items():q.put((child_node, current_weight * current_node.ratio[value]))# Combine results from all pathsassert len(ans) > 0return sum(ans)def calculate_mse_from_node(self, df, label_col, node):if not df or df.empty:return 0.0preds = df.apply(lambda row: self.predict_row(row, node), axis=1)return ((preds - df[label_col]) ** 2).mean()def predict(self, df: pd.DataFrame) -> pd.Series:if self.root is None:raise ValueError("The model has not been fitted yet.")return df.apply(lambda row: self.predict_row(row, self.root), axis=1)def print_tree(self, node=None, indent: str = "", prefix: str = ""):"""打印决策树结构"""if node is None:node = self.rootprint("Decision Tree:")print(f"Total leaves: {node.leaf_count}")# 叶子节点打印预测值if node.is_leaf():print(f"{indent}{prefix} Leaf: value={node.value:.4f} "f"(R={node.R:.2f}, samples={node.node_weight:.3f})")return# 内部节点打印特征信息if node.is_continuous:print(f"{indent}{prefix} {node.feature_name} <= {node.threshold:.4f} "f"[R_T={node.R_T:.2f}, leaves={node.leaf_count}]")# 递归打印子节点self.print_tree(node.children.get("<="), indent + "  ", "├── <=: ")self.print_tree(node.children.get(">"), indent + "  ", "└── >: ")else:print(f"{indent}{prefix} {node.feature_name} == {node.threshold} "f"[R_T={node.R_T:.2f}, leaves={node.leaf_count}]")self.print_tree(node.children.get("<="), indent + "  ", "├── ==: ")self.print_tree(node.children.get(">"), indent + "  ", "└── !=: ")if __name__ == "__main__":pass
http://www.dtcms.com/a/277035.html

相关文章:

  • Spring Cloud Gateway 实战指南
  • 设计模式深度解析:单例、工厂、适配器与代理模式
  • 基于 Python 的深度学习音乐推荐系统设计与实现
  • LLM对话框项目总结II
  • Maven 构建命令
  • RedisJSON 的 `JSON.ARRAPPEND`一行命令让数组动态生长
  • vue防内存泄漏和性能优化浅解
  • CSS中@media介绍和使用示例
  • 7.13 note
  • 型模块化协作机器人结构设计cad【1张】三维图+设计说明书
  • 机器人猫咪能否温暖中老年孤独
  • 【Complete Search】-基础完全搜索-Basic Complete Search
  • 摩尔线程MUSA架构深度调优指南:从CUDA到MUSA的显存访问模式重构原则
  • Java: OracleHelper
  • Appium源码深度解析:从驱动到架构
  • Vue3 实现文件上传功能
  • HarmonyOS组件/模板集成创新活动-开发者工具箱
  • Vue配置特性(ref、props、混入、插件与作用域样式)
  • FusionOne HCI 23 超融合实施手册(超聚变超融合)
  • 第七章 算法题
  • NO.4数据结构数组和矩阵|一维数组|二维数组|对称矩阵|三角矩阵|三对角矩阵|稀疏矩阵
  • 电源中的声学-噪声,如何抑制开关电源的噪声
  • 飞算JavaAI:开启 Java 开发 “人机协作” 新纪元
  • 二叉树算法详解和C++代码示例
  • 项目合作复盘:如何把项目经验转化为可复用资产
  • 【C++】第十五节—一文详解 | 继承
  • ArkUI Inspector工具用法全解析
  • 【保姆级图文详解】Spring AI 中的工具调用原理解析,工具开发:文件操作、联网搜索、网页抓取、资源下载、PDF生成、工具集中注册
  • 在 JetBrains 系列 IDE(如 IntelliJ IDEA、PyCharm 等)中如何新建一个 PlantUML 文件
  • jEasyUI 创建带复选框的树形菜单