吴恩达机器学习课程(PyTorch 适配)学习笔记:3.3 推荐系统全面解析
3.3 推荐系统全面解析:从基础到实践
推荐系统概述
什么是推荐系统?
推荐系统是信息过滤系统的子类,旨在预测用户对物品的"评分"或"偏好",帮助用户在大量信息中发现相关内容。根据推荐算法的不同,推荐系统可以分为多种类型。
推荐系统的重要性
- 信息过载:帮助用户从海量信息中筛选相关内容
- 商业价值:提升用户 engagement、增加销售额和用户粘性
- 个性化体验:为每个用户提供定制化的内容和服务
推荐系统分类
核心方法详解
基于内容过滤 (Content-Based Filtering)
基本原理
基于内容过滤通过分析物品的内容特征和用户的历史行为,为用户推荐与其过去喜欢的物品相似的物品。
算法流程
import torch
import torch.nn as nn
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as npclass ContentBasedRecommender:"""基于内容的推荐系统"""def __init__(self):self.user_profiles = {}self.item_features = {}self.vectorizer = Nonedef extract_item_features(self, items, descriptions):"""提取物品特征"""# 使用TF-IDF向量化文本描述self.vectorizer = TfidfVectorizer(max_features=100)features = self.vectorizer.fit_transform(descriptions).toarray()for i, item_id in enumerate(items):self.item_features[item_id] = torch.tensor(features[i], dtype=torch.float32)return self.item_featuresdef build_user_profile(self, user_id, interacted_items, ratings):"""构建用户画像"""user_profile = torch.zeros_like(list(self.item_features.values())[0])total_weight = 0for item_id, rating in zip(interacted_items, ratings):if item_id in self.item_features:# 使用评分作为权重user_profile += rating * self.item_features[item_id]total_weight += ratingif total_weight > 0:user_profile /= total_weightself.user_profiles[user_id] = user_profilereturn user_profiledef recommend(self, user_id, candidate_items, top_k=10):"""为用户生成推荐"""if user_id not in self.user_profiles:return []user_profile = self.user_profiles[user_id]scores = []for item_id in candidate_items:if item_id in self.item_features:item_vector = self.item_features[item_id]# 计算余弦相似度similarity = torch.cosine_similarity(user_profile.unsqueeze(0), item_vector.unsqueeze(0))scores.append((item_id, similarity.item()))# 按相似度排序并返回top_kscores.sort(key=lambda x: x[1], reverse=True)return scores[:top_k]# 示例使用
def content_based_demo():# 模拟数据items = ['item1', 'item2', 'item3', 'item4', 'item5']descriptions = ["action adventure sci-fi movie","romantic comedy drama film","sci-fi action thriller movie", "documentary nature wildlife","comedy romantic drama film"]recommender = ContentBasedRecommender()item_features = recommender.extract_item_features(items, descriptions)# 用户1喜欢科幻动作片user1_interactions = ['item1', 'item3']user1_ratings = [5.0, 4.5]recommender.build_user_profile('user1', user1_interactions, user1_ratings)# 生成推荐recommendations = recommender.recommend('user1', items)print("基于内容的推荐结果:")for item_id, score in recommendations:print(f"物品 {item_id}: 相似度 {score:.4f}")content_based_demo()
协同过滤 (Collaborative Filtering)
基本原理
协同过滤基于"相似用户喜欢相似物品"的假设,通过分析用户-物品交互矩阵来发现模式和关系。
矩阵分解方法
class MatrixFactorization(nn.Module):"""矩阵分解推荐模型"""def __init__(self, n_users, n_items, n_factors=20):super(MatrixFactorization, self).__init__()self.user_factors = nn.Embedding(n_users, n_factors)self.item_factors = nn.Embedding(n_items, n_factors)# 初始化权重self.user_factors.weight.data.uniform_(-0.1, 0.1)self.item_factors.weight.data.uniform_(-0.1, 0.1)def forward(self, user, item):user_vec = self.user_factors(user)item_vec = self.item_factors(item)# 点积计算预测评分return (user_vec * item_vec).sum(1)class NeuralCollaborativeFiltering(nn.Module):"""神经协同过滤模型"""def __init__(self, n_users, n_items, n_factors=50, hidden_layers=[64, 32, 16]):super(NeuralCollaborativeFiltering, self).__init__()# 嵌入层self.user_embedding = nn.Embedding(n_users, n_factors)self.item_embedding = nn.Embedding(n_items, n_factors)# MLP层layers = []input_size = n_factors * 2for hidden_size in hidden_layers:layers.append(nn.Linear(input_size, hidden_size))layers.append(nn.ReLU())layers.append(nn.Dropout(0.2))input_size = hidden_sizelayers.append(nn.Linear(input_size, 1))self.mlp = nn.Sequential(*layers)def forward(self, user, item):user_embedded = self.user_embedding(user)item_embedded = self.item_embedding(item)# 拼接用户和物品嵌入vector = torch.cat([user_embedded, item_embedded], dim=-1)# 通过MLPreturn self.mlp(vector).squeeze()
数据处理技术
二进制标签处理
def process_binary_interactions(interactions, positive_threshold=4.0):"""处理二进制交互数据"""binary_interactions = (interactions >= positive_threshold).float()return binary_interactionsclass InteractionDataset(torch.utils.data.Dataset):"""用户-物品交互数据集"""def __init__(self, user_ids, item_ids, ratings, binary=False):self.user_ids = torch.tensor(user_ids, dtype=torch.long)self.item_ids = torch.tensor(item_ids, dtype=torch.long)self.ratings = torch.tensor(ratings, dtype=torch.float)if binary:self.ratings = (self.ratings >= 4.0).float()def __len__(self):return len(self.user_ids)def __getitem__(self, idx):return self.user_ids[idx], self.item_ids[idx], self.ratings[idx]
均值归一化
def mean_normalization(ratings_matrix):"""均值归一化处理"""# 计算每个用户的平均评分user_means = torch.mean(ratings_matrix, dim=1, keepdim=True)# 计算全局平均评分global_mean = torch.mean(ratings_matrix[ratings_matrix != 0])# 归一化:减去用户均值,处理缺失值normalized_matrix = ratings_matrix.clone()mask = ratings_matrix != 0normalized_matrix[mask] = ratings_matrix[mask] - user_means[mask.nonzero()[:, 0]]return normalized_matrix, user_means, global_meandef denormalize_predictions(predictions, user_means, user_ids):"""反归一化预测结果"""return predictions + user_means[user_ids].squeeze()
完整的推荐系统 PyTorch 实现
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as npclass RecommenderSystem:"""完整的推荐系统实现"""def __init__(self, n_users, n_items, model_type='mf', n_factors=50):self.n_users = n_usersself.n_items = n_itemsself.model_type = model_typeif model_type == 'mf':self.model = MatrixFactorization(n_users, n_items, n_factors)elif model_type == 'ncf':self.model = NeuralCollaborativeFiltering(n_users, n_items, n_factors)else:raise ValueError("不支持的模型类型")self.criterion = nn.MSELoss()self.optimizer = optim.Adam(self.model.parameters(), lr=0.001, weight_decay=1e-5)def train_model(self, train_loader, epochs=50, val_loader=None):"""训练推荐模型"""train_losses = []val_losses = []for epoch in range(epochs):# 训练阶段self.model.train()epoch_loss = 0for batch_idx, (users, items, ratings) in enumerate(train_loader):self.optimizer.zero_grad()predictions = self.model(users, items)loss = self.criterion(predictions, ratings)loss.backward()self.optimizer.step()epoch_loss += loss.item()avg_train_loss = epoch_loss / len(train_loader)train_losses.append(avg_train_loss)# 验证阶段if val_loader is not None:val_loss = self.evaluate_model(val_loader)val_losses.append(val_loss)print(f'Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}')else:print(f'Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}')return train_losses, val_lossesdef evaluate_model(self, data_loader):"""评估模型性能"""self.model.eval()total_loss = 0with torch.no_grad():for users, items, ratings in data_loader:predictions = self.model(users, items)loss = self.criterion(predictions, ratings)total_loss += loss.item()return total_loss / len(data_loader)def predict_ratings(self, user_ids, item_ids):"""预测用户对物品的评分"""self.model.eval()with torch.no_grad():predictions = self.model(torch.tensor(user_ids, dtype=torch.long),torch.tensor(item_ids, dtype=torch.long))return predictions.numpy()def recommend_for_user(self, user_id, candidate_items, top_k=10):"""为用户生成top-k推荐"""user_ids = [user_id] * len(candidate_items)scores = self.predict_ratings(user_ids, candidate_items)# 组合物品ID和预测分数item_scores = list(zip(candidate_items, scores))# 按分数降序排序item_scores.sort(key=lambda x: x[1], reverse=True)return item_scores[:top_k]# 示例:在MovieLens数据集上训练推荐系统
def movielens_demo():# 模拟MovieLens数据n_users, n_items = 1000, 2000n_interactions = 50000# 生成模拟数据np.random.seed(42)user_ids = np.random.randint(0, n_users, n_interactions)item_ids = np.random.randint(0, n_items, n_interactions)ratings = np.random.randint(1, 6, n_interactions).astype(np.float32)# 创建数据集dataset = InteractionDataset(user_ids, item_ids, ratings)train_size = int(0.8 * len(dataset))val_size = len(dataset) - train_sizetrain_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)val_loader = DataLoader(val_dataset, batch_size=512, shuffle=False)# 训练矩阵分解模型print("训练矩阵分解模型...")mf_recommender = RecommenderSystem(n_users, n_items, 'mf', n_factors=20)mf_train_losses, mf_val_losses = mf_recommender.train_model(train_loader, epochs=30, val_loader=val_loader)# 训练神经协同过滤模型print("\n训练神经协同过滤模型...")ncf_recommender = RecommenderSystem(n_users, n_items, 'ncf', n_factors=50)ncf_train_losses, ncf_val_losses = ncf_recommender.train_model(train_loader, epochs=30, val_loader=val_loader)# 比较模型性能import matplotlib.pyplot as pltplt.figure(figsize=(12, 5))plt.subplot(1, 2, 1)plt.plot(mf_train_losses, label='MF Train Loss')plt.plot(mf_val_losses, label='MF Val Loss')plt.plot(ncf_train_losses, label='NCF Train Loss')plt.plot(ncf_val_losses, label='NCF Val Loss')plt.xlabel('Epoch')plt.ylabel('Loss')plt.title('Training History')plt.legend()plt.grid(True, alpha=0.3)# 生成推荐示例test_user_id = 42candidate_items = list(range(100)) # 前100个物品作为候选mf_recommendations = mf_recommender.recommend_for_user(test_user_id, candidate_items, top_k=5)ncf_recommendations = ncf_recommender.recommend_for_user(test_user_id, candidate_items, top_k=5)print(f"\n用户 {test_user_id} 的推荐结果:")print("矩阵分解推荐:", mf_recommendations)print("神经协同过滤推荐:", ncf_recommendations)# 绘制最终损失比较plt.subplot(1, 2, 2)models = ['Matrix Factorization', 'Neural CF']final_losses = [mf_val_losses[-1], ncf_val_losses[-1]]bars = plt.bar(models, final_losses, color=['skyblue', 'lightcoral'])plt.ylabel('Final Validation Loss')plt.title('Model Comparison')# 在柱状图上添加数值for bar, loss in zip(bars, final_losses):plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,f'{loss:.4f}', ha='center', va='bottom')plt.tight_layout()plt.show()movielens_demo()
协同过滤与内容过滤对比
详细对比分析
特性 | 协同过滤 | 基于内容过滤 |
---|---|---|
数据需求 | 用户-物品交互数据 | 物品内容特征 + 用户历史 |
冷启动问题 | 新用户/新物品问题严重 | 新用户问题存在,新物品容易处理 |
推荐多样性 | 可能产生意外发现 | 容易陷入内容相似性陷阱 |
可解释性 | 较低(“类似用户也喜欢”) | 较高(基于内容特征) |
数据稀疏性 | 对稀疏数据敏感 | 相对不敏感 |
领域知识 | 不需要领域知识 | 需要内容特征工程 |
混合推荐方法
class HybridRecommender:"""混合推荐系统"""def __init__(self, n_users, n_items, content_features):self.cf_model = NeuralCollaborativeFiltering(n_users, n_items)self.content_model = ContentBasedRecommender()self.content_features = content_features# 混合权重self.alpha = 0.7 # CF权重self.beta = 0.3 # 内容过滤权重def recommend(self, user_id, candidate_items, top_k=10):"""混合推荐"""# 协同过滤预测cf_scores = self.cf_model.predict(user_id, candidate_items)# 基于内容预测content_scores = self.content_model.recommend(user_id, candidate_items)# 混合分数hybrid_scores = []for i, item_id in enumerate(candidate_items):cf_score = cf_scores[i] if i < len(cf_scores) else 0content_score = next((score for item, score in content_scores if item == item_id), 0)hybrid_score = self.alpha * cf_score + self.beta * content_scorehybrid_scores.append((item_id, hybrid_score))# 排序并返回top_khybrid_scores.sort(key=lambda x: x[1], reverse=True)return hybrid_scores[:top_k]
大规模推荐场景适配
分布式训练策略
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallelclass DistributedRecommender:"""分布式推荐系统"""def __init__(self, n_users, n_items, n_factors=50):self.model = NeuralCollaborativeFiltering(n_users, n_items, n_factors)# 初始化分布式训练self.setup_distributed()# 使用DistributedDataParallel包装模型self.model = DistributedDataParallel(self.model)def setup_distributed(self):"""设置分布式训练环境"""dist.init_process_group(backend='nccl')torch.cuda.set_device(dist.get_rank())def train_distributed(self, train_loader, epochs=50):"""分布式训练"""self.model.train()for epoch in range(epochs):epoch_loss = 0for batch_idx, (users, items, ratings) in enumerate(train_loader):users = users.cuda()items = items.cuda()ratings = ratings.cuda()self.optimizer.zero_grad()predictions = self.model(users, items)loss = self.criterion(predictions, ratings)loss.backward()self.optimizer.step()epoch_loss += loss.item()# 只在主进程打印日志if dist.get_rank() == 0:print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(train_loader):.4f}')
负采样技术
class NegativeSamplingDataset(Dataset):"""负采样数据集"""def __init__(self, positive_interactions, n_items, negative_ratio=4):self.positive_interactions = positive_interactionsself.n_items = n_itemsself.negative_ratio = negative_ratio# 构建用户交互过的物品集合self.user_interacted = {}for user, item, rating in positive_interactions:if user not in self.user_interacted:self.user_interacted[user] = set()self.user_interacted[user].add(item)def __len__(self):return len(self.positive_interactions) * (1 + self.negative_ratio)def __getitem__(self, idx):if idx < len(self.positive_interactions):# 正样本user, item, rating = self.positive_interactions[idx]return user, item, 1.0else:# 负样本neg_idx = idx - len(self.positive_interactions)pos_idx = neg_idx // self.negative_ratiouser, pos_item, _ = self.positive_interactions[pos_idx]# 随机选择用户未交互过的物品作为负样本neg_item = np.random.randint(0, self.n_items)while neg_item in self.user_interacted[user]:neg_item = np.random.randint(0, self.n_items)return user, neg_item, 0.0
在线学习与增量更新
class OnlineRecommender:"""支持在线学习的推荐系统"""def __init__(self, n_users, n_items, n_factors=50):self.model = NeuralCollaborativeFiltering(n_users, n_items, n_factors)self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)self.criterion = nn.MSELoss()# 用户和物品的最近交互缓存self.recent_interactions = {}def update_model(self, user_id, item_id, rating):"""在线更新模型"""self.model.train()# 准备数据user_tensor = torch.tensor([user_id], dtype=torch.long)item_tensor = torch.tensor([item_id], dtype=torch.long)rating_tensor = torch.tensor([rating], dtype=torch.float)# 前向传播self.optimizer.zero_grad()prediction = self.model(user_tensor, item_tensor)loss = self.criterion(prediction, rating_tensor)# 反向传播(小批量更新)loss.backward()self.optimizer.step()# 更新最近交互缓存if user_id not in self.recent_interactions:self.recent_interactions[user_id] = []self.recent_interactions[user_id].append((item_id, rating))# 保持缓存大小if len(self.recent_interactions[user_id]) > 100:self.recent_interactions[user_id].pop(0)return loss.item()
推荐系统伦理问题与应对
主要伦理问题
1. 信息茧房与过滤气泡
class DiversityEnhancer:"""多样性增强器"""def __init__(self, content_features):self.content_features = content_featuresdef enhance_diversity(self, recommendations, user_history, diversity_weight=0.3):"""增强推荐多样性"""diversified_recs = []for item_id, score in recommendations:# 计算与用户历史的平均相似度similarity_to_history = self._avg_similarity_to_history(item_id, user_history)# 多样性调整分数:原始分数 - 多样性惩罚diversity_penalty = similarity_to_history * diversity_weightadjusted_score = score - diversity_penaltydiversified_recs.append((item_id, adjusted_score))# 重新排序diversified_recs.sort(key=lambda x: x[1], reverse=True)return diversified_recsdef _avg_similarity_to_history(self, item_id, user_history):"""计算物品与用户历史的平均相似度"""if not user_history:return 0total_similarity = 0item_vector = self.content_features.get(item_id)if item_vector is None:return 0for hist_item in user_history:hist_vector = self.content_features.get(hist_item)if hist_vector is not None:similarity = torch.cosine_similarity(item_vector.unsqueeze(0), hist_vector.unsqueeze(0)).item()total_similarity += similarityreturn total_similarity / len(user_history)
2. 公平性与偏见缓解
class FairnessAwareRecommender:"""公平性感知推荐系统"""def __init__(self, n_users, n_items, sensitive_attributes):self.model = NeuralCollaborativeFiltering(n_users, n_items)self.sensitive_attributes = sensitive_attributes # 用户敏感属性字典def fairness_regularization(self, predictions, user_ids, lambda_fair=0.1):"""公平性正则化"""fairness_loss = 0# 计算不同群体的平均预测差异groups = {}for user_id, pred in zip(user_ids, predictions):group = self.sensitive_attributes.get(user_id, 'unknown')if group not in groups:groups[group] = []groups[group].append(pred)# 计算组间差异group_means = {}for group, preds in groups.items():group_means[group] = torch.mean(torch.stack(preds))# 计算最大组间差异if len(group_means) > 1:mean_values = list(group_means.values())max_diff = max(mean_values) - min(mean_values)fairness_loss = lambda_fair * max_diffreturn fairness_lossdef train_with_fairness(self, train_loader, epochs=50, lambda_fair=0.1):"""带公平性约束的训练"""self.model.train()for epoch in range(epochs):epoch_loss = 0for users, items, ratings in train_loader:self.optimizer.zero_grad()predictions = self.model(users, items)main_loss = self.criterion(predictions, ratings)# 添加公平性正则化fair_loss = self.fairness_regularization(predictions, users, lambda_fair)total_loss = main_loss + fair_losstotal_loss.backward()self.optimizer.step()epoch_loss += total_loss.item()print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(train_loader):.4f}')
3. 可解释性推荐
class ExplainableRecommender:"""可解释推荐系统"""def __init__(self, model, user_features, item_features):self.model = modelself.user_features = user_featuresself.item_features = item_featuresdef generate_explanation(self, user_id, item_id, top_k_reasons=3):"""生成推荐解释"""explanations = []# 基于内容的解释if user_id in self.user_features and item_id in self.item_features:user_vec = self.user_features[user_id]item_vec = self.item_features[item_id]# 找出最重要的特征feature_importance = user_vec * item_vectop_features = torch.topk(feature_importance, top_k_reasons)feature_names = ['动作', '喜剧', '科幻', '浪漫', '剧情'] # 示例特征名for idx, importance in zip(top_features.indices, top_features.values):if importance > 0:explanations.append(f"您喜欢{feature_names[idx]}类内容,此物品在该方面匹配度较高")# 基于协同过滤的解释explanations.append("与您相似的用户也喜欢此内容")return explanations# 伦理检查工具
class EthicsChecker:"""伦理检查器"""@staticmethoddef check_recommendation_ethics(recommendations, user_context, item_metadata):"""检查推荐结果的伦理性"""issues = []for item_id, score in recommendations:item_info = item_metadata.get(item_id, {})# 检查内容适宜性if not EthicsChecker.is_age_appropriate(item_info, user_context.get('age')):issues.append(f"物品 {item_id} 可能不适合用户年龄")# 检查多样性if EthicsChecker.is_too_similar_to_history(item_id, user_context.get('history')):issues.append(f"物品 {item_id} 与用户历史过于相似")# 检查公平性if EthicsChecker.has_sensitive_bias(item_info, user_context):issues.append(f"物品 {item_id} 可能存在偏见问题")return issues@staticmethoddef is_age_appropriate(item_info, user_age):"""检查年龄适宜性"""item_age_rating = item_info.get('age_rating', 0)return user_age is None or user_age >= item_age_rating@staticmethoddef is_too_similar_to_history(item_id, user_history):"""检查多样性"""# 实现多样性检查逻辑return False@staticmethoddef has_sensitive_bias(item_info, user_context):"""检查偏见"""# 实现偏见检测逻辑return False
实践建议与最佳实践
模型选择指南
def select_recommendation_approach(data_characteristics):"""根据数据特性选择推荐方法"""has_content_features = data_characteristics['has_content_features']interaction_sparsity = data_characteristics['interaction_sparsity']cold_start_problem = data_characteristics['cold_start_problem']if has_content_features and cold_start_problem:return "基于内容过滤或混合方法"elif interaction_sparsity < 0.95 and not cold_start_problem:return "协同过滤方法"else:return "混合推荐方法"
评估指标
def evaluate_recommendation_quality(model, test_loader, top_k=10):"""评估推荐质量"""model.eval()all_predictions = []all_actual = []with torch.no_grad():for users, items, ratings in test_loader:predictions = model(users, items)all_predictions.extend(predictions.numpy())all_actual.extend(ratings.numpy())# 计算各种评估指标from sklearn.metrics import mean_squared_error, mean_absolute_errorimport numpy as npmse = mean_squared_error(all_actual, all_predictions)mae = mean_absolute_error(all_actual, all_predictions)rmse = np.sqrt(mse)print(f"RMSE: {rmse:.4f}")print(f"MAE: {mae:.4f}")return {'rmse': rmse, 'mae': mae}
总结
推荐系统是现代互联网服务的核心技术之一:
- 推荐系统的基本原理和不同类型的方法
- 基于内容过滤和协同过滤的核心算法与实现
- 数据处理技术,包括二进制标签处理和均值归一化
- 完整的PyTorch推荐系统实现框架
- 大规模推荐场景的适配策略
- 推荐系统的伦理问题和应对方案
在实际应用中,推荐系统的成功不仅取决于算法的先进性,还需要考虑业务场景、数据特性、用户体验和伦理责任。建议在实践中持续进行A/B测试,监控推荐效果,并重视用户反馈。