当前位置: 首页 > news >正文

flink UTDF函数

代码示例IP解析demo案例
https://help.aliyun.com/zh/flink/developer-reference/udtfs

package com.xxx.udx;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.maxmind.db.CHMCache;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CountryResponse;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
//import org.apache.flink.table.sources.parquet.update.UpdateVectorizedColumnRowInputParquetFormat;
import org.apache.flink.table.types.DataType;
//import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;/*** 使用节点ip解析服务*/public class GetResolveIp4Ip6AP extends TableFunction<Row> {CloseableHttpClient client = null;private static RequestConfig requestConfig = null;InputStream path =  null;DatabaseReader reader = null;@Overridepublic void open(FunctionContext context) throws Exception {super.open(context);requestConfig = RequestConfig.custom().setConnectTimeout(50000) //一、连接超时:connectionTimeout-->指的是连接一个url的连接等待时间.setSocketTimeout(5000)  // 二、读取数据超时:SocketTimeout-->指的是连接上一个url,获取response的返回等待时间.setConnectionRequestTimeout(5000).setMaxRedirects(0).build();client = getConnection();//client.getParams().setParameter(HttpMethodParams.HTTP_CONTENT_CHARSET, "UTF-8");path=GetResolveIp4Ip6AP.class.getResourceAsStream("/GeoLite2-Country.mmdb");reader = new DatabaseReader.Builder(path).withCache(new CHMCache()).build();}public void eval (String ip) throws Exception{String countryCode = "";String countryName =""; //todo 新加String city = "";String stateprov = "";//todoRow row = new Row(3);String res="";if(null != ip  && !ip.isEmpty() && ip.split("\\.").length==4 ){ //todo IP4解析res = doGet(ip,client);}else if(null != ip  && !ip.isEmpty() && !ip.equals("127.0.0.1")){//todo IP6解析try {InetAddress ipAddress = InetAddress.getByName(ip);//-com.maxmind.geoip2.record.Country country=null;CountryResponse response = reader.country(ipAddress);com.maxmind.geoip2.record.Country country = response.getCountry();countryCode= country.getIsoCode();}catch (Exception e){System.out.println("IP:"+ip);e.printStackTrace();}String deal_countryCode;if(countryCode==null|| countryCode.isEmpty()){deal_countryCode="UNKNOWN";}else{deal_countryCode=countryCode;}row.setField(0,deal_countryCode);//row.setField(1,countryName);row.setField(1,"未知");row.setField(2,"未知");collect(row);return;}if(res ==""){row.setField(0,"UNKNOWN");//row.setField(1,"未知");//todo 新加row.setField(1,"未知");row.setField(2,"未知");collect(row);return;}try{JSONObject json = JSON.parseObject(res);countryCode = json.getString("country_code");countryName = "";//todo 新加stateprov = json.getString("region_name");city = json.getString("city");}catch (Exception e){e.printStackTrace();}if(countryCode==null || countryCode.isEmpty()){countryCode="UNKNOWN";}if(stateprov==null || stateprov.isEmpty()){stateprov="未知";}if(city==null || city.isEmpty()){city="未知";}row.setField(0,countryCode);//row.setField(1,countryName);//todo 新加row.setField(1,stateprov);row.setField(2,city);collect(row);}@Overridepublic void close() throws Exception {if(null != client){client.close();reader.close();path.close();}super.close();}//todo flink 注掉
//    @Override
//    // 如果返回值是Row,则必须重载实现getResultType方法,显式地声明返回的字段类型。
//    public DataType getResultType(Object[] arguments, Class[] argTypes) {
//        return DataTypes.createRowType(DataTypes.STRING, DataTypes.STRING, DataTypes.STRING);
//    }private static CloseableHttpClient getConnection() {HttpHost target = new HttpHost("0.0.0.0", 80);PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();connectionManager.setMaxTotal(2000);//客户端总并行链接最大数connectionManager.setDefaultMaxPerRoute(2000);//每个主机的最大并行链接数connectionManager.setMaxPerRoute(new HttpRoute(target), 2000);HttpClientBuilder httpBuilder = HttpClients.custom();httpBuilder.setConnectionManager(connectionManager);CloseableHttpClient httpClient = httpBuilder.build();return httpClient;}private static String doGet(String ip, CloseableHttpClient client) {//简单的对ip地址的合法性做一下验证if(null == ip  || ip.isEmpty() || ip.split("\\.").length!=4 ){return "";}if (null == client) {client =  getConnection();}HttpGet getMethod = new HttpGet("http://0.0.0.0:80/json/" + ip);getMethod.setConfig(requestConfig);String res = "";try {CloseableHttpResponse response = client.execute(getMethod);if (response.getStatusLine().getStatusCode() == 200) {res =  EntityUtils.toString(response.getEntity());
//                System.out.println(res);}} catch (IOException e) {e.printStackTrace();} finally {getMethod.releaseConnection();}return res;}}
http://www.dtcms.com/a/456581.html

相关文章:

  • 乐陵网站开发贾汪区建设局网站
  • VS安装EEPlus库及解决[弃用的]CS0618问题
  • 《算法闯关指南:优选算法--滑动窗口》--15.串联所有单词的子串,16.最小覆盖子串
  • 行驶证识别技术通过OCR和AI实现信息自动化采集与处理,涵盖图像预处理、文字识别及结构化校验,提升效率与准确性
  • 第十七篇:数组与链表:结构特性、操作与经典题目
  • 营销型网站的优点深圳推广系统
  • 攻防世界-Web-easyupload
  • 符号主义对自然语言处理深层语义分析的影响与启示
  • 高要区住房和城乡建设局网站西安建设市场信息平台
  • 新手可以做网站营运吗成都手机模板建站
  • 成都市做网站的公司网站开发客户端
  • 农业机械网站模板网站建设图片像素是多大的
  • 素材网站php程序源码公司简介介绍
  • 网站规划的一般步骤建设企业网站包含什么
  • 奉贤做网站建设wordpress访问很慢吗?
  • 大学生网站建设开题报告免费交流网站建设
  • 邢台做网站优化哪儿好怎样建网站
  • 企业网站 asp php修改wordpress邮件
  • 宠物网站开发背景怎样自己制作效果图
  • 适合企业做外贸的几个网站商会网站模板
  • 遂溪网站开发公司建设网站需要哪些材料
  • 手机 网站开发软件ui培训班有用吗
  • php网站开发需求分析海南乐秀同城群软件下载
  • 佛山顺德网站设计公司关键词排名优化报价
  • 自己做网站怎么做的浙江网站建设公司名单
  • 网络推广什么做seo1短视频网页入口营销
  • 东莞公司网站搭建多少钱深圳品牌床垫有哪些
  • 网站广告动态图怎么做本机做网站如何访问
  • 万网买的网站备案淘宝刷单网站开发
  • 睢宁建设局网站网址英文