ODPS SQL,对group by里每个group用python进行处理
python代码
from odps.udf import annotate
from odps.udf import BaseUDAF@annotate('* -> string')
class GroupCalculator(BaseUDAF):"""聚合函数 - 用于对每个group的所有记录进行聚合计算"""def new_buffer(self):return []def iterate(self, buffer, *args):# 将所有输入值添加到buffer中buffer.append(list(args))def merge(self, buffer, pbuffer):buffer.extend(pbuffer)def terminate(self, buffer):if not buffer:return Nonereturn str(buffer)
SQL代码
CREATE TABLE sales_data (region STRING,category STRING,product_id STRING,sales_amount DOUBLE,quantity BIGINT,cost DOUBLE
);INSERT INTO sales_data VALUES
('北京', '电子产品', 'P001', 1000.0, 5, 800.0),
('北京', '电子产品', 'P002', 1500.0, 3, 1200.0),
('上海', '电子产品', 'P001', 800.0, 4, 600.0),
('上海', '服装', 'C001', 300.0, 2, 200.0),
('广州', '电子产品', 'P003', 2000.0, 2, 1600.0),
('广州', '服装', 'C002', 500.0, 1, 350.0);SELECT region,GROUP_CALCULATOR(sales_amount, quantity, cost, sales_amount/quantity) as aggregated_analysis
FROM sales_data
GROUP BY region;
select结果
region aggregated_analysis
上海 [[800.0, 4, 600.0, 200.0], [300.0, 2, 200.0, 150.0]]
北京 [[1000.0, 5, 800.0, 200.0], [1500.0, 3, 1200.0, 500.0]]
广州 [[2000.0, 2, 1600.0, 1000.0], [500.0, 1, 350.0, 500.0]]