day23 pipeline管道
目录
转换器(transformer)
估计器(estimator)
管道(pipeline)
pipeline代码教学
导入库和数据加载
分离特征和标签,划分数据集
定义预处理步骤
构建完整pipeline
使用Pipeline进行训练和评估
作业:整理下全部逻辑的先后顺序,看看能不能制作出适合所有机器学习的通用pipeline
通用pipeline的逻辑步骤
通用pipeline实例代码
pipeline在机器学习领域中可以翻译为“管道”,也可以翻译为”流水线“,是机器学习中一个重要的概念。
在机器学习中,通常会按照一定的顺序对数据进行预处理、特征提取、模型训练和评估等步骤,以实现机器学习模型的训练和评估。为了方便管理这些步骤,我们可以使用pipeline来构建一个完整的机器学习流水线。
pipeline是一个用于组合多个估计器(estimator)的estimator,它实现了一个流水线,其中每个估计器都实现了fit和transform方法,fit方法用于训练模型,transform方法用来对数据进行预处理和特征提取。
在此之前我们先介绍一下转换器(transformer)和估计器(estimator)的概念。
转换器(transformer)
转换器(transformer)是一个用于对数据进行预处理和特征提取的 estimator,它实现一个 transform 方法,用于对数据进行预处理和特征提取。转换器通常用于对数据进行预处理,例如对数据进行归一化、标准化、缺失值填充等。转换器也可以用于对数据进行特征提取,例如对数据进行特征选择、特征组合等。转换器的特点是无状态的,即它们不会存储任何关于数据的状态信息(指的是不存储内参)。转换器仅根据输入数据学习转换规则(比如函数规律、外参),并将其应用于新的数据。因此,转换器可以在训练集上学习转换规则,并在训练集之外的新数据上应用这些规则。
常见的转换器包括数据缩放器(如StandardScaler、MinMaxScaler)、特征选择器(如SelectKBest、PCA)、特征提取器(如CountVectorizer、TF-IDFVectorizer)等。
之前我们都是说对xxxx类进行实例化,现在可以换一个更加准确的说法,如下:
# 导入StandardScaler转换器
from sklearn.preprocessing import StandardScaler# 初始化转换器
scaler = StandardScaler()# 1. 学习训练数据的缩放规则(计算均值和标准差),本身不存储数据
scaler.fit(X_train)# 2. 应用规则到训练数据和测试数据
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)# 也可以使用fit_transform一步完成
# X_train_scaled = scaler.fit_transform(X_train)
估计器(estimator)
估计器(Estimator)是实现机器学习算法的对象或类。它用于拟合(fit)数据并进行预测(predict)。估计器是机器学习模型的基本组成部分,用于从数据中学习模式、进行预测和进行模型评估。
估计器的主要方法是fit和predict。fit方法用于根据输入数据学习模型的参数和规律,而predict方法用于对新的未标记样本进行预测。估计器的特点是有状态的,即它们在训练过程中存储了关于数据的状态信息,以便在预测阶段使用。估计器通过学习训练数据中的模式和规律来进行预测。因此,估计器需要在训练集上进行训练,并使用训练得到的模型参数对新数据进行预测。
常见的估计器包括分类器(classifier)、回归器(regresser)、聚类器(clusterer)。
from sklearn.linear_model import LinearRegression
# 创建一个回归器
model = LinearRegression()
# 在训练集上训练模型
model.fit(X_train_scaled, y_train)
# 对测试集进行预测
y_pred = model.predict(X_test_scaled)
管道(pipeline)
了解了分类器和估计器,所以可以理解为在机器学习中是由转换器和估计器按照一定顺序组合在一起来完成了整个流程。
机器学习的管道机制通过将多个转换器和估计器按顺序连接在一起,可以构建一个完整的数据处理和模型训练流程。在管道机制中,可以使用Pipeline类来组织和连接不同的转换器和估计器。Pipeline类提供了一种简单的方式来定义和管理机器学习任务的流程。
管道机制是按照封装顺序依次执行的一种机制,在机器学习算法中得以应用的根源在于,参数集在新数据集(比如测试集)上重复使用。且代码看上去更加简洁明确。这也意味着,很多个不同数据集,只要处理成管道的输入形式,后续的代码就可以复用,(这里为我们未来的python文件拆分作做铺垫),也就是把很多个类和函数操作写进一个新的pipeline中。
这符合编程中的一个非常经典的思想:don't repeat yourself。(dry原则),也叫做封装思想,我们之前提到过类似的思想应用:函数、类。
Pipeline最大的价值和核心应用场景之一,就是与交叉验证和网格搜索等结合使用,来:
1.防止数据集泄漏:这是在使用交叉验证时,Pipeline自动完成预处理并在每个折叠内独立fit/transform的关键优势。
2.简化超参数调优:可以方便地同时调优预处理步骤和模型参数。
pipeline代码教学
导入库和数据加载
# 导入基础库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time # 导入 time 库
import warnings# 忽略警告
warnings.filterwarnings("ignore")# 设置中文字体和负号正常显示
plt.rcParams['font.sans-serif'] = ['Heiti TC']
plt.rcParams['axes.unicode_minus'] = False# 导入 Pipeline 和相关预处理工具
from sklearn.pipeline import Pipeline # 用于创建机器学习工作流
from sklearn.compose import ColumnTransformer # 用于将不同的预处理应用于不同的列
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder, StandardScaler # 用于数据预处理(有序编码、独热编码、标准化)
from sklearn.impute import SimpleImputer # 用于处理缺失值# 导入机器学习模型和评估工具
from sklearn.ensemble import RandomForestClassifier # 随机森林分类器
from sklearn.metrics import classification_report, confusion_matrix # 用于评估分类器性能
from sklearn.model_selection import train_test_split # 用于划分训练集和测试集# --- 加载原始数据 ---
# 我们加载原始数据,不对其进行任何手动预处理
data = pd.read_csv('data.csv')print("原始数据加载完成,形状为:", data.shape) # (7500,18)
# print(data.head()) # 可以打印前几行看看原始数据
分离特征和标签,划分数据集
# --- 分离特征和标签 (使用原始数据) ---
y = data['Credit Default'] # 标签
X = data.drop(['Credit Default'], axis=1) # 特征 (axis=1 表示按列删除)print("\n特征和标签分离完成。")
print("特征 X 的形状:", X.shape)
print("标签 y 的形状:", y.shape)# --- 划分训练集和测试集 (在任何预处理之前划分) ---
# 按照8:2划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 80%训练集,20%测试集print("\n数据集划分完成 (预处理之前)。")
print("X_train 形状:", X_train.shape)
print("X_test 形状:", X_test.shape)
print("y_train 形状:", y_train.shape)
print("y_test 形状:", y_test.shape)
特征和标签分离完成。
特征 X 的形状: (7500, 17)
标签 y 的形状: (7500,)
数据集划分完成 (预处理之前)。
X_train 形状: (6000, 17)
X_test 形状: (1500, 17)
y_train 形状: (6000,)
y_test 形状: (1500,)
定义预处理步骤
# --- 定义不同列的类型和它们对应的预处理步骤 ---
# 这些定义是基于原始数据 X 的列类型来确定的# 识别原始的 object 列 (对应你原代码中的 discrete_features 在预处理前)
object_cols = X.select_dtypes(include=['object']).columns.tolist()
# 识别原始的非 object 列 (通常是数值列)
numeric_cols = X.select_dtypes(exclude=['object']).columns.tolist()# 有序分类特征 (对应你之前的标签编码)
# 注意:OrdinalEncoder默认编码为0, 1, 2... 对应你之前的1, 2, 3...需要在模型解释时注意
# 这里的类别顺序需要和你之前映射的顺序一致
ordinal_features = ['Home Ownership', 'Years in current job', 'Term']
# 定义每个有序特征的类别顺序,这个顺序决定了编码后的数值大小
ordinal_categories = [['Own Home', 'Rent', 'Have Mortgage', 'Home Mortgage'], # Home Ownership 的顺序 (对应1, 2, 3, 4)['< 1 year', '1 year', '2 years', '3 years', '4 years', '5 years', '6 years', '7 years', '8 years', '9 years', '10+ years'], # Years in current job 的顺序 (对应1-11)['Short Term', 'Long Term'] # Term 的顺序 (对应0, 1)
]
# 构建处理有序特征的 Pipeline: 先填充缺失值,再进行有序编码
ordinal_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='most_frequent')), # 用众数填充分类特征的缺失值('encoder', OrdinalEncoder(categories=ordinal_categories, handle_unknown='use_encoded_value', unknown_value=-1)) # 进行有序编码
])
print("有序特征处理 Pipeline 定义完成。")# 标签分类特征 (对应你之前的独热编码)
nominal_features = ['Purpose'] # 使用原始列名
# 构建处理标签特征的 Pipeline: 先填充缺失值,再进行独热编码
nominal_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='most_frequent')), # 用众数填充分类特征的缺失值('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) # 进行独热编码, sparse_output=False 使输出为密集数组
])
print("标签特征处理 Pipeline 定义完成。")# 连续特征 (对应你之前的众数填充 + 添加标准化)
# 从所有列中排除掉分类特征,得到连续特征列表
# continuous_features = X.columns.difference(object_cols).tolist() # 原始X中非object类型的列
# 也可以直接从所有列中排除已知的有序和标签特征
continuous_features = [f for f in X.columns if f not in ordinal_features + nominal_features]# 构建处理连续特征的 Pipeline: 先填充缺失值,再进行标准化
continuous_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='most_frequent')), # 用众数填充缺失值 (复现你的原始逻辑)('scaler', StandardScaler()) # 标准化,一个好的实践 (如果你严格复刻原代码,可以移除这步)
])
print("连续特征处理 Pipeline 定义完成。")
# --- 构建 ColumnTransformer ---
# 将不同的预处理应用于不同的列子集,构造一个完备的转化器
# ColumnTransformer 接收一个 transformers 列表,每个元素是 (名称, 转换器对象, 列名列表)
preprocessor = ColumnTransformer(transformers=[('ordinal', ordinal_transformer, ordinal_features), # 对 ordinal_features 列应用 ordinal_transformer('nominal', nominal_transformer, nominal_features), # 对 nominal_features 列应用 nominal_transformer('continuous', continuous_transformer, continuous_features) # 对 continuous_features 列应用 continuous_transformer],remainder='passthrough' # 如何处理没有在上面列表中指定的列。# 'passthrough' 表示保留这些列,不做任何处理。# 'drop' 表示丢弃这些列。
)print("\nColumnTransformer (预处理器) 定义完成。")
# print(preprocessor) # 可以打印 preprocessor 对象看看它的结构
构建完整pipeline
# --- 构建完整的 Pipeline ---
# 将预处理器和模型串联起来
# 使用你原代码中 RandomForestClassifier 的默认参数和 random_state
pipeline = Pipeline(steps=[('preprocessor', preprocessor), # 第一步:应用所有的预处理 (我们刚刚定义的 ColumnTransformer 对象)('classifier', RandomForestClassifier(random_state=42)) # 第二步:随机森林分类器 (使用默认参数和指定的 random_state)
])print("\n完整的 Pipeline 定义完成。")
# print(pipeline) # 可以打印 pipeline 对象看看它的结构
使用Pipeline进行训练和评估
# --- 1. 使用 Pipeline 在划分好的训练集和测试集上评估 ---
# 完全模仿你原代码的第一个评估步骤print("\n--- 1. 默认参数随机森林 (训练集 -> 测试集) ---") # 使用你原代码的输出文本
# import time # 引入 time 库 (已在文件顶部引入)start_time = time.time() # 记录开始时间# 在原始的 X_train, y_train 上拟合整个Pipeline
# Pipeline会自动按顺序执行 preprocessor 的 fit_transform(X_train),
# 然后用处理后的数据和 y_train 拟合 classifier
pipeline.fit(X_train, y_train)# 在原始的 X_test 上进行预测
# Pipeline会自动按顺序执行 preprocessor 的 transform(X_test),
# 然后用处理后的数据进行 classifier 的 predict
pipeline_pred = pipeline.predict(X_test)end_time = time.time() # 记录结束时间print(f"训练与预测耗时: {end_time - start_time:.4f} 秒") # 使用你原代码的输出格式print("\n默认随机森林 在测试集上的分类报告:") # 使用你原代码的输出文本
print(classification_report(y_test, pipeline_pred))
print("默认随机森林 在测试集上的混淆矩阵:") # 使用你原代码的输出文本
print(confusion_matrix(y_test, pipeline_pred))
作业:整理下全部逻辑的先后顺序,看看能不能制作出适合所有机器学习的通用pipeline
通用pipeline的逻辑步骤
1. 问题定义与数据理解
目标:明确业务问题、确定机器学习任务(分类 / 回归 / 聚类等),理解数据背景。
-
步骤:
-
明确项目目标(如预测房价、用户分类等)。
-
分析数据来源、规模、字段含义(如特征、标签)。
-
评估数据质量(是否存在缺失、异常、噪声等)。
-
2. 数据收集与加载
目标:获取原始数据并加载到环境中。
-
步骤:
-
从数据库、文件(CSV/Excel 等)、API 等渠道读取数据。
-
划分数据集(通常分为训练集、验证集、测试集,比例如 7:1:2)。
-
3. 数据预处理(核心环节)
目标:清洗、转换数据,使其适合模型训练。
-
子步骤及顺序:
-
数据清洗
-
处理缺失值(删除、插值、填充默认值等)。
-
处理异常值(识别离群点,删除或修正)。
-
-
特征工程
-
特征处理:
-
离散型特征:独热编码(One-Hot Encoding)、标签编码(Label Encoding)。
-
连续型特征:标准化(Standardization)、归一化(Normalization)。
-
-
特征衍生:根据业务逻辑创建新特征(如时间差、比率等)。
-
特征选择:过滤无关或冗余特征(如相关性分析、方差阈值、模型重要性排序)。
-
-
样本处理
-
处理类别不平衡(过采样 / 欠采样,如 SMOTE、随机欠采样)。
-
打乱数据顺序(避免数据顺序影响模型训练)。
-
-
4. 模型选择与训练
目标:选择合适的算法并训练模型。
-
步骤:
-
根据任务类型选择模型(如分类用逻辑回归、随机森林;回归用线性回归、XGBoost)。
-
初始化模型参数(可使用默认参数或预定义参数)。
-
使用训练集训练模型(调用
model.fit()
)。
-
5. 模型验证与调优
目标:评估模型性能并优化参数。
-
子步骤:
-
性能评估
-
使用验证集计算指标(分类:准确率、精确率、召回率、F1 值、AUC-ROC;回归:MSE、RMSE、MAE)。
-
可视化结果(如混淆矩阵、ROC 曲线、预测值与真实值对比图)。
-
-
超参数调优
-
使用交叉验证(Cross-Validation)避免过拟合。
-
调优方法:网格搜索(Grid Search)、随机搜索(Random Search)、贝叶斯优化。
-
-
模型调整
-
根据评估结果调整模型(如更换算法、增加正则化、调整特征)。
-
-
6. 模型测试与部署
目标:在未知数据上验证模型,并将其投入实际应用。
-
步骤:
-
使用测试集进行最终评估(确保测试集未参与训练和调优)。
-
部署模型(如封装为 API、嵌入应用程序、使用 Docker 容器等)。
-
监控模型在生产环境中的性能(如延迟、准确率变化)。
-
通用pipeline实例代码
# 伪代码示例
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.impute import SimpleImputer # 1. 数据划分
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)# 2. 特征预处理
ordinal_features = ['年龄', '工龄'] # 有序特征
nominal_features = ['性别', '职业'] # 无序特征
continuous_features = ['收入','存款'] # 连续特征ordinal_categories = [] # 对应的标签编码
ordinal_transformer = Pipeline(steps=[('imputer',SimpleImputer(strategy='most_frequent')), # 众数填补缺失值('encoder',OrdinalEncoder(categories=ordinal_categories, handle_unknown='use_encoder_value', unknown_value=-1))
])nominal_transformer = Pipeline(steps=[('imputer',SimpleImputer(strategy='most_frequent')), # 众数填补缺失值('onehot',OnehotEncoder(handle_unknown='ignore', sparse_output=False))
])continuous_transformer = Pipeline(steps=[('imputer',SimpleImputer(strategy='most_frequent')), # 众数填补缺失值('scaler',StandardScaler())
])preprocessor = ColumnTransformer(transformers=[('ordinal', ordinal_transformer, ordinal_features),('nominal', nominal_transformer, nominal_features),('continuous',continuous_transformer,continuous_features)],reminder='passthrough') # 处理没有在上面列表中的列 passthrough表示保留# 3. 构建Pipeline
pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', RandomForestClassifier(random_state=42))
])# 4. 超参数调优
param_grid = {'classifier__n_estimators': [100, 200],'classifier__max_depth': [None, 10, 20]
}grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='f1_macro')
grid_search.fit(X_train, y_train)# 5. 模型评估
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)
print(classification_report(y_test, y_pred))
@浙大疏锦行