当前位置: 首页 > news >正文

移动端优秀网站上海传媒公司名字

移动端优秀网站,上海传媒公司名字,深圳企业注销一窗通,python网站开发基础问题描述 我们需要确定"ABC_manage_channel"列的逻辑,该列的值在客户连续在同一渠道下单时更新为当前渠道,否则保留之前的值。具体规则如下: 初始值为第一个订单的渠道如果客户连续两次在同一渠道下单,则更新为当前渠…

问题描述

我们需要确定"ABC_manage_channel"列的逻辑,该列的值在客户连续在同一渠道下单时更新为当前渠道,否则保留之前的值。具体规则如下:

  • 初始值为第一个订单的渠道
  • 如果客户连续两次在同一渠道下单,则更新为当前渠道
  • 否则保持前一个值不变

数据准备

首先创建orders表并插入测试数据:

CREATE OR REPLACE TABLE orders (customerid INTEGER,channel VARCHAR(20),order_date DATE
);INSERT INTO orders (customerid, channel, order_date) VALUES
(1, 'TMALL', '2024-11-01'),
(1, 'TMALL', '2024-11-02'),
(1, 'TMALL', '2024-11-03'),
(1, 'douyin', '2024-11-25'),
(1, 'JD', '2025-01-13'),
(1, 'JD', '2025-01-14'),
(1, 'douyin', '2025-03-02'),
(1, 'douyin', '2025-03-27'),
(3, 'JD', '2024-04-23'),
(4, 'JD', '2025-02-15'),
(5, 'JD', '2024-08-30'),
(6, 'douyin', '2024-10-05'),
(7, 'JD', '2024-05-29'),
(7, 'douyin', '2024-09-15'),
(7, 'Wholesale', '2024-12-22'),
(7, 'JD', '2025-03-19'),
(8, 'douyin', '2024-08-01'),
(8, 'douyin', '2024-08-07'),
(8, 'douyin', '2024-11-15'),
(9, 'JD', '2025-03-19'),
(10, 'douyin', '2024-07-30'),
(10, 'douyin', '2024-12-27'),
(10, 'douyin', '2025-03-21'),
(10, 'douyin', '2025-03-23');

解决方案

方法一:使用SparkSQL(结合UDF)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, DateType# 初始化Spark会话
spark = SparkSession.builder.appName("ABCManageChannel").getOrCreate()# 创建并插入测试数据
spark.sql("""
CREATE OR REPLACE TABLE orders (customerid INTEGER,channel VARCHAR(20),order_date DATE
) USING parquet;INSERT INTO orders VALUES
(1, 'TMALL', '2024-11-01'),
(1, 'TMALL', '2024-11-02'),
(1, 'TMALL', '2024-11-03'),
(1, 'douyin', '2024-11-25'),
(1, 'JD', '2025-01-13'),
(1, 'JD', '2025-01-14'),
(1, 'douyin', '2025-03-02'),
(1, 'douyin', '2025-03-27'),
(3, 'JD', '2024-04-23'),
(4, 'JD', '2025-02-15'),
(5, 'JD', '2024-08-30'),
(6, 'douyin', '2024-10-05'),
(7, 'JD', '2024-05-29'),
(7, 'douyin', '2024-09-15'),
(7, 'Wholesale', '2024-12-22'),
(7, 'JD', '2025-03-19'),
(8, 'douyin', '2024-08-01'),
(8, 'douyin', '2024-08-07'),
(8, 'douyin', '2024-11-15'),
(9, 'JD', '2025-03-19'),
(10, 'douyin', '2024-07-30'),
(10, 'douyin', '2024-12-27'),
(10, 'douyin', '2025-03-21'),
(10, 'douyin', '2025-03-23');
""")# 读取数据并按客户分组排序
orders_df = spark.table("orders")# 定义UDF处理渠道序列
def calculate_abc(channels):abc = []prev_channel = Nonecurrent_abc = Nonefor idx, c in enumerate(channels):if idx == 0:current_abc = celse:if c == prev_channel:current_abc = c# 否则保持前一个current_abcabc.append(current_abc)prev_channel = creturn abcudf_calculate_abc = F.udf(calculate_abc, ArrayType(StringType()))# 使用SparkSQL处理
result_sql = spark.sql("""
WITH sorted_orders AS (SELECT customerid, channel, order_date,ROW_NUMBER() OVER (PARTITION BY customerid ORDER BY order_date) AS rnFROM orders
),
grouped AS (SELECT customerid, COLLECT_LIST(channel) OVER (PARTITION BY customerid ORDER BY order_date) AS channels,COLLECT_LIST(order_date) OVER (PARTITION BY customerid ORDER BY order_date) AS order_datesFROM sorted_orders
)
SELECT customerid, order_date, channel, abc
FROM (SELECT customerid, EXPLODE(ARRAYS_ZIP(order_dates, channels, abc_list)) AS dataFROM (SELECT customerid, order_dates, channels,udf_calculate_abc(channels) AS abc_listFROM grouped)
)
SELECT customerid, data.order_dates AS order_date,data.channels AS channel,data.abc_list AS ABC_manage_channel
""")result_sql.show()

方法二:不使用SparkSQL(使用DataFrame API)

# 使用DataFrame API处理
window_spec = Window.partitionBy("customerid").orderBy("order_date")# 收集每个客户的订单渠道并按时间排序
grouped_df = orders_df.withColumn("rn", F.row_number().over(window_spec)) \.groupBy("customerid") \.agg(F.collect_list(F.struct("order_date", "channel")).alias("orders"))# 定义UDF处理订单序列
schema = ArrayType(StructType([StructField("order_date", DateType()),StructField("channel", StringType()),StructField("ABC_manage_channel", StringType())
]))def process_orders(orders):abc_list = []prev_channel = Nonecurrent_abc = Nonesorted_orders = sorted(orders, key=lambda x: x.order_date)for idx, order in enumerate(sorted_orders):if idx == 0:current_abc = order.channelelse:if order.channel == prev_channel:current_abc = order.channelabc_list.append((order.order_date, order.channel, current_abc))prev_channel = order.channelreturn abc_listudf_process_orders = F.udf(process_orders, schema)# 应用UDF并展开结果
result_df = grouped_df.withColumn("processed", udf_process_orders("orders")) \.select(F.explode("processed").alias("data")) \.select(F.col("data.order_date").alias("order_date"),F.col("data.channel").alias("channel"),F.col("data.ABC_manage_channel").alias("ABC_manage_channel"))result_df.show()

解释

  • 方法一使用SparkSQL结合UDF,通过窗口函数排序并收集渠道数据,使用UDF处理每个客户的订单序列,生成ABC管理渠道列。
  • 方法二使用DataFrame API,通过分组和聚合操作收集订单数据,利用UDF处理每个分组内的订单序列,最后展开结果。
http://www.dtcms.com/a/573618.html

相关文章:

  • 建设一个网站的需求分析一个网站开发流程
  • PsPasswd(7.19):远程修改密码的边界与合规建议
  • 【钉钉多元表格(自动化)】钉钉群根据表格 自动推送当天值日生信息
  • LangFlow源码深度解析:Component核心机制与生态体系
  • dede织梦仿站网站建设做网站赚谁的钱
  • DropLoRA技术详解:克服大模型微调过拟合的创新方法
  • 【剑斩OFFER】算法的暴力美学——串联所有单词的字串
  • 学习Linux——进程管理
  • 在k8s中部署seaweedfs,上传文件到seaweedfs方法
  • 极氪与火山引擎深化合作,Data Agent赋能车辆数据管理效率
  • Kotlin 使用命令行编译
  • 1450dpi+93% 相似度,这款发光纳米纤维让皮肤纹理“复印”更精准
  • 匠魂(1)
  • LeetCode Hot100 自用
  • 做婚介网站可行性报告模板绵阳网站建设多少钱
  • 单位服务器网站打不开网站参考页面设计
  • 陇南建设网站大良营销网站建设信息
  • mac M系列芯片 unity 安装会遇到的错误以及解决
  • Reka UI - 一款免费开源的 Vue 无头 UI 组件库,样式定制开发项目的绝佳选择
  • 个人二级网站怎么做营销咨询服务合同
  • UDP-复用分用
  • 做网站需要什么特色网站制作入门
  • QListWidget的图标模式
  • 【大模型实战笔记 6】Prompt Engineering 提示词工程
  • 能源生态系统的架构设计:利益相关方治理与跨行业协作
  • 潍坊seo管理浙江seo外包
  • BuildingAI二开 用户信息增加Coze套餐名称和剩余天数技术架构
  • 韦东山嵌入式Linux学习第3篇环境搭建与开发板操作:IMX6ULL_Pro
  • 自动驾驶深度学习模型的SOTIF优化方案
  • 对销售和营销的思考