当前位置: 首页 > news >正文

需求:如何高效的推荐产品

需求

hive数据库表中,用户表有5千万条数据,即有5千万个用户;
mysql数据库表中,产品表有1万条数据,即有1万个不同的产品;
hive数据库表中,当前推送记录表(用户和已推送的产品关系表)中100亿条数据;
需求: 如何高效的每天给每个用户筛选出不同的50个产品,且这些产品在之前没有给这个用户推送过;

为了高效地每天为每个用户筛选出50个之前未推送过的产品,考虑到数据量巨大(用户表5千万条、产品表1万条、推送记录表100亿条),建议采用以下方案。该方案利用大数据处理框架(如Spark)进行分布式计算,并通过维护一个用户已推送产品表(user_pushed_products)来增量更新推送状态,从而避免每天处理全量推送记录表。

方案概述

  1. 初始设置:一次性处理历史推送记录表,构建user_pushed_products表,存储每个用户已推送的产品集合。
  2. 每日更新
    • 从推送记录表中提取前一天的推送记录(约25亿条),分组得到每个用户新增的已推送产品。
    • 更新user_pushed_products表,将新增产品合并到每个用户的已推送产品集合中。
  3. 每日推荐
    • 广播所有产品列表(1万条产品ID)。
    • 对于每个用户,计算未推送产品(所有产品减去已推送产品),并随机选择50个产品。
    • 输出推荐结果,并將新推送记录添加到推送记录表中。

详细步骤

1. 初始构建用户已推送产品表

使用Spark处理历史推送记录表(100亿条),生成user_pushed_products表,包含user_idpushed_products(数组类型,存储已推送产品ID)。

val historicalPushRecords = spark.sql("SELECT user_id, product_id FROM push_record WHERE date < 'current_date'")
val historicalPushedByUser = historicalPushRecords.groupBy("user_id").agg(collect_set("product_id").as("pushed_products"))
historicalPushedByUser.write.format("parquet").saveAsTable("user_pushed_products")
2. 每日更新用户已推送产品表
  • 从前一天的推送记录中提取数据(约25亿条),分组聚合每个用户的新增推送产品。
  • 与现有user_pushed_products表全外连接,合并已推送产品列表,并去重(确保无重复产品ID)。
// 读取前一天的推送记录
val yesterdayPushRecords = spark.sql("SELECT user_id, product_id FROM push_record WHERE date = 'yesterday'")// 分组聚合每个用户的新增产品列表
val yesterdayPushedByUser = yesterdayPushRecords.groupBy("user_id").agg(collect_list("product_id").as("new_pushed_products"))// 读取当前user_pushed_products表
val userPushedProducts = spark.sql("SELECT user_id, pushed_products FROM user_pushed_products")// 连接并更新产品列表
val updatedUserPushedProducts = userPushedProducts.join(yesterdayPushedByUser, Seq("user_id"), "full_outer").map { row =>val userId = row.getAs[Long]("user_id")val existingProducts = row.getAs[Seq[Long]]("pushed_products")val newProducts = row.getAs[Seq[Long]]("new_pushed_products")val mergedProducts = if (existingProducts != null) existingProducts else Seq()val addedProducts = if (newProducts != null) newProducts else Seq()val allProducts = (mergedProducts ++ addedProducts).distinct(userId, allProducts)}.toDF("user_id", "pushed_products")// 覆盖写入user_pushed_products表
updatedUserPushedProducts.write.format("parquet").mode("overwrite").saveAsTable("user_pushed_products")
3. 每日推荐未推送产品
  • 从MySQL产品表读取所有产品ID(1万条),并广播到所有节点。
  • 对于每个用户 in user_pushed_products,计算未推送产品(所有产品广播值减去已推送产品集合),并随机选择50个产品。如果未推送产品不足50个,则选择所有可用产品。
// 从MySQL读取所有产品ID
val allProducts = spark.read.jdbc(jdbcUrl, "product_table", properties).select("product_id").collect().map(_.getLong(0)).toSet
val broadcastProducts = spark.sparkContext.broadcast(allProducts)// 生成推荐产品
val recommendations = updatedUserPushedProducts.rdd.map { row =>val userId = row.getAs[Long]("user_id")val pushedProducts = row.getAs[Seq[Long]]("pushed_products").toSetval allProductsSet = broadcastProducts.valueval unpusheedProducts = allProductsSet.diff(pushedProducts)val unpusheedList = unpusheedProducts.toArrayval numUnpusheed = unpusheedList.lengthval random = new Random()val selectedProducts = if (numUnpusheed <= 50) unpusheedList else {random.shuffle(unpusheedList.toList).take(50).toArray}(userId, selectedProducts)
}.toDF("user_id", "recommended_products")// 保存推荐结果
recommendations.write.format("parquet").saveAsTable("daily_recommendations")// 将新推送记录添加到推送记录表(可选,根据实际推送操作决定)
val newPushRecords = recommendations.selectExpr("user_id", "explode(recommended_products) as product_id", "current_date as date")
newPushRecords.write.format("parquet").mode("append").saveAsTable("push_record")

性能优化建议

  • 分区和缓存:对user_pushed_products表按user_id进行分区(例如,哈希分区),以提高查询和更新效率。使用列式存储格式(如Parquet)压缩数据。
  • 广播变量:产品表很小(1万条),广播所有产品ID以减少数据传输。
  • 分布式计算:使用Spark集群,设置适当的分区数(例如,10k分区)以平衡负载。确保集群有足够资源处理Shuffle操作(初始Shuffle约800 GB,每日Shuffle约20 GB)。
  • 监控和调优:监控作业执行时间,调整Spark配置(如spark.sql.shuffle.partitions、内存设置)以避免OOM错误。
  • 处理完全推送用户:在推荐前,检查用户已推送产品数量是否达到1万,如果是则跳过推荐,减少计算量。

注意事项

  • 该方案假设推送记录表包含date字段,用于筛选每日新记录。
  • 初始构建user_pushed_products表需要处理100亿条记录,可能耗时较长,但仅需一次。
  • 每日更新和推荐作业应自动化调度(如使用Airflow或Oozie)。
  • 如果产品表更新(新增或删除产品),需要调整广播产品列表,并可能影响未推送产品计算。

通过维护user_pushed_products表并增量更新,可以高效处理每日推荐,避免全量处理推送记录表,显著降低计算开销。


文章转载自:

http://9DFQUTC6.zkqwk.cn
http://7O8oiRkq.zkqwk.cn
http://r7JvDys4.zkqwk.cn
http://znd3gcys.zkqwk.cn
http://vZHN6aYI.zkqwk.cn
http://WyUOi24P.zkqwk.cn
http://ZdY6Zh5Y.zkqwk.cn
http://Fa0yjyP3.zkqwk.cn
http://vSqhAXtz.zkqwk.cn
http://uOjzocpk.zkqwk.cn
http://bWqnCKCr.zkqwk.cn
http://jnu4LEWT.zkqwk.cn
http://pfwIvpXj.zkqwk.cn
http://HY7yqUTW.zkqwk.cn
http://pc7UPQR3.zkqwk.cn
http://UFXhOhwU.zkqwk.cn
http://Fm6EDb70.zkqwk.cn
http://lZmvJORI.zkqwk.cn
http://cGOUxqh3.zkqwk.cn
http://92wPy6d0.zkqwk.cn
http://ztDeuGb4.zkqwk.cn
http://251PI9Ag.zkqwk.cn
http://yOUPNPb2.zkqwk.cn
http://o9hXPdD0.zkqwk.cn
http://Ilyix4Ln.zkqwk.cn
http://UuUSiNwV.zkqwk.cn
http://Pl8rCeau.zkqwk.cn
http://TGDXITN4.zkqwk.cn
http://N0BB6fsz.zkqwk.cn
http://PqXPBY05.zkqwk.cn
http://www.dtcms.com/a/386214.html

相关文章:

  • java21学习笔记-序列集合
  • Class57 代码实现
  • torch.gather
  • 自学嵌入式第四十二天:单片机-定时器和UART串口
  • 大数据毕业设计选题推荐-基于大数据的旅游网站用户行为数据分析系统-Hadoop-Spark-数据可视化-BigData
  • 深入浅出数据结构:队列(Queue)—— 生活中的排队艺术
  • spring通过Spring Integration实现udp通信
  • Linux内存管理章节十八:内核开发者的武器库:内存分配API实战指南
  • CAD如何输出PDF多页文件
  • 我对 WPF 动摇时的选择:.NET Framework 4.6.2+WPF+Islands+UWP+CompostionApi
  • 1.整流-滤波电路的缺点和PFC的引入
  • QT 项目 线程信号切换 举例
  • 构网型5MW中压储能变流升压一体机技术方案
  • 【数据工程】8. SQL 入门教程
  • C++---前向声明
  • 在Qt项目中使用QtConcurrent::run,实现异步等待和同步调用
  • 经验分享只靠口头传递会带来哪些问题
  • Linux底层-内核数据接口:/proc
  • PEFT+DeepSpeed 1 (微调 分布式 显存优化)
  • Spring Boot 下 Druid 连接池:多维度优化打造卓越性能
  • 提升学术研究能力:从开题构思难题到AI辅助提纲生成
  • spring-kafka的消息拦截器RecordInterceptor
  • VSCode + Python 开发踩坑:虚拟环境不在项目根目录导致包无法识别该怎么办
  • 【MCP】【FastMCP】[特殊字符] 使用 UV 创建 FastMCP 服务完整示例
  • 蓝绿部署(Blue-Green Deployment)介绍(一种用于降低软件发布风险的部署策略)流量切换(金丝雀发布)
  • 羽毛球地板:从专业运动场景到全民健身市场的技术跃迁与产业重构
  • 【实战】预警算法--噪声添加机制
  • Three.js 中如何给 3D 模型添加文字标签?
  • 贪心算法应用:NFV功能部署问题详解
  • 第八章:Jmeter 非GUl命令详解