MaxCompute开发UDF和UDTF案例
文章目录
- 一、Java开发UDF
- 1、创建Maven项目
- 2、创建UDF类
- 3、打包上传资源
- 4、创建函数MyUDF
- 5、SQL验证
- 二、Java开发UDTF
- 1、创建Maven项目
- 2、创建UDTF类
- 3、打包上传更新资源
- 4、创建函数MyUDTF
- 5、SQL验证
- 三、常见问题
- 1、发布函数报错
一、Java开发UDF
1、创建Maven项目
创建Maven项目,名称为 MaxComputeUDF 配置pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>MaxComputeUDF</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><dependencies><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.5</version></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.11</version></dependency><dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.7.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-core</artifactId><version>${sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-udf</artifactId><version>${sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-udf-local</artifactId><version>${sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-mapred</artifactId><version>${sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-mapred-local</artifactId><version>${sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-graph</artifactId><version>${sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-graph-local</artifactId><version>${sdk.version}</version></dependency></dependencies><properties><sdk.version>0.38.3-public</sdk.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><build><plugins><!-- Scala Compiler --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions><configuration><args><arg>-nobootcp</arg></args></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><!-- <exclude>org.slf4j:*</exclude>--><!-- <exclude>log4j:*</exclude>--></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
2、创建UDF类
逻辑简单,仅供测试使用
package com.aliyun;import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;@Resolve({"string->string"})
public final class MyUDF extends UDF {public String evaluate(String s) {if (s == null) {return null;}return s.toLowerCase();}
}
3、打包上传资源
在Dataworks数据开发页面上传资源并发布
4、创建函数MyUDF
配置如下
5、SQL验证
--函数名不区分大小写
SELECT myudf("HELLO")
二、Java开发UDTF
1、创建Maven项目
上面已创建项目,pom文件也配置好了,直接跳过该步骤
2、创建UDTF类
package com.aliyun;import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import com.alibaba.fastjson.*;@Resolve({"string->bigint,string,string"})
public class MyUDTF extends UDTF {@Overridepublic void process(Object[] args) throws UDFException {String event = (String) args[0];JSONArray jsonArray = JSON.parseArray(event);for (int i = 0; i < jsonArray.size(); i++) {JSONObject jsonObject = jsonArray.getJSONObject(i);String ett = (String) jsonObject.getString("aa");String eventName = (String) jsonObject.getString("bb");String eventJson = (String) jsonObject.getString("cc");forward(Long.parseLong(ett), eventName, eventJson);}}
}
3、打包上传更新资源
打包后直接更新第一次上传的jar包,重新发布
4、创建函数MyUDTF
配置如下
5、SQL验证
测试数据如下
{"a": "","b": "app","key": [{"aa": "1","bb": "张三","cc": {"age": "21","des": "工人"}},{"aa": "2","bb": "李四","cc": {"age": 24,"des": "大学生"}},{"aa": "3","bb": "王五","cc": {"age": "33","des": "老师"}}]
}
测试SQL如下
SELECT MyUDTF(GET_JSON_OBJECT('{"a": "","b": "app","key": [{"aa": "1","bb": "张三","cc": {"age": "21","des": "工人"}},{"aa": "2","bb": "李四","cc": {"age": 24,"des": "大学生"}},{"aa": "3","bb": "王五","cc": {"age": "33","des": "老师"}}]
}','$.key')
) AS (id,name,jsonvalue)
;
测试结果如下
三、常见问题
1、发布函数报错
失败原因:Fail to add or update function MyUDF. Error message is MaxCompute exception happened. ErrorCode: InvalidParameter, ErrorMessage: ODPS-0421111: Resource not found - ‘maxcomputeudf.jar’.
问题原因: 引用的资源没有发布,导致找不到资源
解决方案: 先发布引用的资源, 再重试发布函数即可,这里注意未发布的资源会有上传的符号