需求:如何高效的推荐产品
需求
hive数据库表中,用户表有5千万条数据,即有5千万个用户;
mysql数据库表中,产品表有1万条数据,即有1万个不同的产品;
hive数据库表中,当前推送记录表(用户和已推送的产品关系表)中100亿条数据;
需求: 如何高效的每天给每个用户筛选出不同的50个产品,且这些产品在之前没有给这个用户推送过;
为了高效地每天为每个用户筛选出50个之前未推送过的产品,考虑到数据量巨大(用户表5千万条、产品表1万条、推送记录表100亿条),建议采用以下方案。该方案利用大数据处理框架(如Spark)进行分布式计算,并通过维护一个用户已推送产品表(user_pushed_products
)来增量更新推送状态,从而避免每天处理全量推送记录表。
方案概述
- 初始设置:一次性处理历史推送记录表,构建
user_pushed_products
表,存储每个用户已推送的产品集合。 - 每日更新:
- 从推送记录表中提取前一天的推送记录(约25亿条),分组得到每个用户新增的已推送产品。
- 更新
user_pushed_products
表,将新增产品合并到每个用户的已推送产品集合中。
- 每日推荐:
- 广播所有产品列表(1万条产品ID)。
- 对于每个用户,计算未推送产品(所有产品减去已推送产品),并随机选择50个产品。
- 输出推荐结果,并將新推送记录添加到推送记录表中。
详细步骤
1. 初始构建用户已推送产品表
使用Spark处理历史推送记录表(100亿条),生成user_pushed_products
表,包含user_id
和pushed_products
(数组类型,存储已推送产品ID)。
val historicalPushRecords = spark.sql("SELECT user_id, product_id FROM push_record WHERE date < 'current_date'")
val historicalPushedByUser = historicalPushRecords.groupBy("user_id").agg(collect_set("product_id").as("pushed_products"))
historicalPushedByUser.write.format("parquet").saveAsTable("user_pushed_products")
2. 每日更新用户已推送产品表
- 从前一天的推送记录中提取数据(约25亿条),分组聚合每个用户的新增推送产品。
- 与现有
user_pushed_products
表全外连接,合并已推送产品列表,并去重(确保无重复产品ID)。
// 读取前一天的推送记录
val yesterdayPushRecords = spark.sql("SELECT user_id, product_id FROM push_record WHERE date = 'yesterday'")// 分组聚合每个用户的新增产品列表
val yesterdayPushedByUser = yesterdayPushRecords.groupBy("user_id").agg(collect_list("product_id").as("new_pushed_products"))// 读取当前user_pushed_products表
val userPushedProducts = spark.sql("SELECT user_id, pushed_products FROM user_pushed_products")// 连接并更新产品列表
val updatedUserPushedProducts = userPushedProducts.join(yesterdayPushedByUser, Seq("user_id"), "full_outer").map { row =>val userId = row.getAs[Long]("user_id")val existingProducts = row.getAs[Seq[Long]]("pushed_products")val newProducts = row.getAs[Seq[Long]]("new_pushed_products")val mergedProducts = if (existingProducts != null) existingProducts else Seq()val addedProducts = if (newProducts != null) newProducts else Seq()val allProducts = (mergedProducts ++ addedProducts).distinct(userId, allProducts)}.toDF("user_id", "pushed_products")// 覆盖写入user_pushed_products表
updatedUserPushedProducts.write.format("parquet").mode("overwrite").saveAsTable("user_pushed_products")
3. 每日推荐未推送产品
- 从MySQL产品表读取所有产品ID(1万条),并广播到所有节点。
- 对于每个用户 in
user_pushed_products
,计算未推送产品(所有产品广播值减去已推送产品集合),并随机选择50个产品。如果未推送产品不足50个,则选择所有可用产品。
// 从MySQL读取所有产品ID
val allProducts = spark.read.jdbc(jdbcUrl, "product_table", properties).select("product_id").collect().map(_.getLong(0)).toSet
val broadcastProducts = spark.sparkContext.broadcast(allProducts)// 生成推荐产品
val recommendations = updatedUserPushedProducts.rdd.map { row =>val userId = row.getAs[Long]("user_id")val pushedProducts = row.getAs[Seq[Long]]("pushed_products").toSetval allProductsSet = broadcastProducts.valueval unpusheedProducts = allProductsSet.diff(pushedProducts)val unpusheedList = unpusheedProducts.toArrayval numUnpusheed = unpusheedList.lengthval random = new Random()val selectedProducts = if (numUnpusheed <= 50) unpusheedList else {random.shuffle(unpusheedList.toList).take(50).toArray}(userId, selectedProducts)
}.toDF("user_id", "recommended_products")// 保存推荐结果
recommendations.write.format("parquet").saveAsTable("daily_recommendations")// 将新推送记录添加到推送记录表(可选,根据实际推送操作决定)
val newPushRecords = recommendations.selectExpr("user_id", "explode(recommended_products) as product_id", "current_date as date")
newPushRecords.write.format("parquet").mode("append").saveAsTable("push_record")
性能优化建议
- 分区和缓存:对
user_pushed_products
表按user_id
进行分区(例如,哈希分区),以提高查询和更新效率。使用列式存储格式(如Parquet)压缩数据。 - 广播变量:产品表很小(1万条),广播所有产品ID以减少数据传输。
- 分布式计算:使用Spark集群,设置适当的分区数(例如,10k分区)以平衡负载。确保集群有足够资源处理Shuffle操作(初始Shuffle约800 GB,每日Shuffle约20 GB)。
- 监控和调优:监控作业执行时间,调整Spark配置(如
spark.sql.shuffle.partitions
、内存设置)以避免OOM错误。 - 处理完全推送用户:在推荐前,检查用户已推送产品数量是否达到1万,如果是则跳过推荐,减少计算量。
注意事项
- 该方案假设推送记录表包含
date
字段,用于筛选每日新记录。 - 初始构建
user_pushed_products
表需要处理100亿条记录,可能耗时较长,但仅需一次。 - 每日更新和推荐作业应自动化调度(如使用Airflow或Oozie)。
- 如果产品表更新(新增或删除产品),需要调整广播产品列表,并可能影响未推送产品计算。
通过维护user_pushed_products
表并增量更新,可以高效处理每日推荐,避免全量处理推送记录表,显著降低计算开销。