随着大数据平台发展,现已可以处理多类型的非结构化、半结构化数据,其中将IP地址转换为归属地是常见的一种场景。本文为您介绍如何通过MaxCompute UDF实现将IPv4或IPv6地址转换为归属地。
前提条件
请确认已满足如下条件:- 已安装MaxCompute客户端。
更多安装并配置MaxCompute客户端信息,请参见安装并配置MaxCompute客户端。
- 已安装MaxCompute Studio,操作详情请参见安装MaxCompute Studio。
背景信息
要实现将IPv4或IPv6地址转换为归属地,必须要有IP地址库,您需要下载IP地址库文件并以资源形式上传至MaxCompute项目。开发MaxCompute UDF,并基于IP地址库文件注册函数,从而在SQL语句中调用函数将IP地址转换为归属地。
注意事项
本文提供的IP地址库文件,仅供验证该最佳实践使用,请您结合实际业务情况,自行维护IP地址库文件。
操作流程
基于MaxCompute UDF将IPv4或IPv6地址转换为归属地的操作流程如下:
- 步骤一:上传IP地址库文件将IP地址库文件作为资源上传至MaxCompute项目,后续创建的MaxCompute UDF会依赖此资源。
- 步骤二:建立项目连接连接MaxCompute项目,并创建MaxCompute Java Module。
- 步骤三:编写MaxCompute UDF在IntelliJ IDEA上编写MaxCompute UDF代码。
- 步骤四:注册MaxCompute UDF将MaxCompute UDF注册为函数。
- 步骤五:调用MaxCompute UDF转换IP地址为归属地在SQL语句中调用注册好的函数将IP地址转换为归属地。
步骤一:上传IP地址库文件
- 下载IP地址库文件至本地,解压得到ipv4.txt和ipv6.txt,并放置于MaxCompute客户端的安装目录
...\odpscmd_public\bin
下。本文提供的IP地址库文件,仅供验证该最佳实践使用,请您结合实际业务情况,自行维护IP地址库文件。
- 安装并登录MaxCompute客户端,进入目标MaxCompute项目。
- 执行
add file
命令,将ipv4.txt和ipv6.txt以File类型资源上传至MaxCompute项目。命令示例如下。
更多添加资源信息,请参见添加资源。add file ipv4.txt -f; add file ipv6.txt -f;
- (用于本地调试)将ipv4.txt和ipv6.txt复制到本地项目的
warehouse/example_project/_resources_
目录下。
步骤二:建立项目连接
- 连接MaxCompute项目。操作详情请参见管理项目连接。
- 创建MaxCompute Java Module。操作详情请参见创建MaxCompute Java Module。
步骤三:编写MaxCompute UDF
- 创建Java Class对象。后续步骤中编写的MaxCompute UDF代码会用到此处创建的Java Class。
- 进入IntelliJ IDEA界面,在Project区域,右键单击Module的源码目录(即src > main > java),选择new > Java Class。
- 在New Java Class对话框,输入Class名称,按下Enter键并在代码编辑区域输入代码。您需要依次创建3个Java Class对象,Class名称及对应代码如下,代码可直接复制使用,无需修改。
- IpUtils
package com.aliyun.odps.udf.utils; import java.math.BigInteger; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; public class IpUtils { /** * 将字符串形式的ip地址转换为long * * @param ipInString * 字符串形式的ip地址 * @return 返回long形式的ip地址 */ public static long StringToLong(String ipInString) { ipInString = ipInString.replace(" ", ""); byte[] bytes; if (ipInString.contains(":")) bytes = ipv6ToBytes(ipInString); else bytes = ipv4ToBytes(ipInString); BigInteger bigInt = new BigInteger(bytes); // System.out.println(bigInt.toString()); return bigInt.longValue(); } /** * 将字符串形式的ip地址转换为long * * @param ipInString * 字符串形式的ip地址 * @return bigint的string形式的ip地址 */ public static String StringToBigIntString(String ipInString) { ipInString = ipInString.replace(" ", ""); byte[] bytes; if (ipInString.contains(":")) bytes = ipv6ToBytes(ipInString); else bytes = ipv4ToBytes(ipInString); BigInteger bigInt = new BigInteger(bytes); return bigInt.toString(); } /** * 将整数形式的ip地址转换为字符串形式 * * @param ipInBigInt * 整数形式的ip地址 * @return 字符串形式的ip地址 */ public static String BigIntToString(BigInteger ipInBigInt) { byte[] bytes = ipInBigInt.toByteArray(); byte[] unsignedBytes = Arrays.copyOfRange(bytes, 1, bytes.length); // 去除符号位 try { String ip = InetAddress.getByAddress(unsignedBytes).toString(); return ip.substring(ip.indexOf('/') + 1).trim(); } catch (UnknownHostException e) { throw new RuntimeException(e); } } /** * ipv6地址转有符号byte[17] */ private static byte[] ipv6ToBytes(String ipv6) { byte[] ret = new byte[17]; ret[0] = 0; int ib = 16; boolean comFlag = false;// ipv4混合模式标记 if (ipv6.startsWith(":"))// 去掉开头的冒号 ipv6 = ipv6.substring(1); String groups[] = ipv6.split(":"); for (int ig = groups.length - 1; ig > -1; ig--) {// 反向扫描 if (groups[ig].contains(".")) { // 出现ipv4混合模式 byte[] temp = ipv4ToBytes(groups[ig]); ret[ib--] = temp[4]; ret[ib--] = temp[3]; ret[ib--] = temp[2]; ret[ib--] = temp[1]; comFlag = true; } else if ("".equals(groups[ig])) { // 出现零长度压缩,计算缺少的组数 int zlg = 9 - (groups.length + (comFlag ? 1 : 0)); while (zlg-- > 0) {// 将这些组置0 ret[ib--] = 0; ret[ib--] = 0; } } else { int temp = Integer.parseInt(groups[ig], 16); ret[ib--] = (byte) temp; ret[ib--] = (byte) (temp >> 8); } } return ret; } /** * IPv4地址转有符号byte[5] */ private static byte[] ipv4ToBytes(String ipv4) { byte[] ret = new byte[5]; ret[0] = 0; // 先找到ip地址字符串中.的位置 int position1 = ipv4.indexOf("."); int position2 = ipv4.indexOf(".", position1 + 1); int position3 = ipv4.indexOf(".", position2 + 1); // 将每个.之间的字符串转换成整型 ret[1] = (byte) Integer.parseInt(ipv4.substring(0, position1)); ret[2] = (byte) Integer.parseInt(ipv4.substring(position1 + 1, position2)); ret[3] = (byte) Integer.parseInt(ipv4.substring(position2 + 1, position3)); ret[4] = (byte) Integer.parseInt(ipv4.substring(position3 + 1)); return ret; } /** * @param ipAdress ipv4或ipv6字符串 * @return 4:ipv4, 6:ipv6, 0:地址不对 * @throws Exception */ public static int isIpV4OrV6(String ipAdress) throws Exception { InetAddress address = InetAddress.getByName(ipAdress); if (address instanceof Inet4Address) return 4; else if (address instanceof Inet6Address) return 6; return 0; } /* * 验证ip是否属于某个IP段 * * ipSection ip段(以'-'分隔) * * ip 所验证的ip号码 */ public static boolean ipExistsInRange(String ip, String ipSection) { ipSection = ipSection.trim(); ip = ip.trim(); int idx = ipSection.indexOf('-'); String beginIP = ipSection.substring(0, idx); String endIP = ipSection.substring(idx + 1); return getIp2long(beginIP) <= getIp2long(ip) && getIp2long(ip) <= getIp2long(endIP); } public static long getIp2long(String ip) { ip = ip.trim(); String[] ips = ip.split("\\."); long ip2long = 0L; for (int i = 0; i < 4; ++i) { ip2long = ip2long << 8 | Integer.parseInt(ips[i]); } return ip2long; } public static long getIp2long2(String ip) { ip = ip.trim(); String[] ips = ip.split("\\."); long ip1 = Integer.parseInt(ips[0]); long ip2 = Integer.parseInt(ips[1]); long ip3 = Integer.parseInt(ips[2]); long ip4 = Integer.parseInt(ips[3]); long ip2long = 1L * ip1 * 256 * 256 * 256 + ip2 * 256 * 256 + ip3 * 256 + ip4; return ip2long; } public static void main(String[] args) { System.out.println(StringToLong("2002:7af3:f3be:ffff:ffff:ffff:ffff:ffff")); System.out.println(StringToLong("54.38.72.63")); } private class Invalid{ private Invalid() { } } }
- IpV4Obj
package com.aliyun.odps.udf.objects; public class IpV4Obj { public long startIp ; public long endIp ; public String city; public String province; public IpV4Obj(long startIp, long endIp, String city, String province) { this.startIp = startIp; this.endIp = endIp; this.city = city; this.province = province; } @Override public String toString() { return "IpV4Obj{" + "startIp=" + startIp + ", endIp=" + endIp + ", city='" + city + '\'' + ", province='" + province + '\'' + '}'; } public void setStartIp(long startIp) { this.startIp = startIp; } public void setEndIp(long endIp) { this.endIp = endIp; } public void setCity(String city) { this.city = city; } public void setProvince(String province) { this.province = province; } public long getStartIp() { return startIp; } public long getEndIp() { return endIp; } public String getCity() { return city; } public String getProvince() { return province; } }
- IpV6Obj
package com.aliyun.odps.udf.objects; public class IpV6Obj { public String startIp ; public String endIp ; public String city; public String province; public String getStartIp() { return startIp; } @Override public String toString() { return "IpV6Obj{" + "startIp='" + startIp + '\'' + ", endIp='" + endIp + '\'' + ", city='" + city + '\'' + ", province='" + province + '\'' + '}'; } public IpV6Obj(String startIp, String endIp, String city, String province) { this.startIp = startIp; this.endIp = endIp; this.city = city; this.province = province; } public void setStartIp(String startIp) { this.startIp = startIp; } public String getEndIp() { return endIp; } public void setEndIp(String endIp) { this.endIp = endIp; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } }
- IpUtils
- 编写MaxCompute UDF代码。
- 在Project区域,右键单击Module的源码目录(即src > main > java),选择new > MaxCompute Java。
- 在Create new MaxCompute java class对话框,单击UDF并填写Name后,按Enter键并在代码编写区域输入代码。例如Java Class名称为IpLocation。代码内容如下,代码可直接复制使用,无需修改。
package com.aliyun.odps.udf.udfFunction; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.utils.IpUtils; import com.aliyun.odps.udf.objects.IpV4Obj; import com.aliyun.odps.udf.objects.IpV6Obj; import java.io.*; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; public class IpLocation extends UDF { public static IpV4Obj[] ipV4ObjsArray; public static IpV6Obj[] ipV6ObjsArray; public IpLocation() { super(); } @Override public void setup(ExecutionContext ctx) throws UDFException, IOException { //IPV4 if(ipV4ObjsArray==null) { BufferedInputStream bufferedInputStream = ctx.readResourceFileAsStream("ipv4.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(bufferedInputStream)); ArrayList<IpV4Obj> ipV4ObjArrayList=new ArrayList<>(); String line = null; while ((line = br.readLine()) != null) { String[] f = line.split("\\|", -1); if(f.length>=5) { long startIp = IpUtils.StringToLong(f[0]); long endIp = IpUtils.StringToLong(f[1]); String city=f[3]; String province=f[4]; IpV4Obj ipV4Obj = new IpV4Obj(startIp, endIp, city, province); ipV4ObjArrayList.add(ipV4Obj); } } br.close(); List<IpV4Obj> collect = ipV4ObjArrayList.stream().sorted(Comparator.comparing(IpV4Obj::getStartIp)).collect(Collectors.toList()); ArrayList<IpV4Obj> basicIpV4DataList=(ArrayList)collect; IpV4Obj[] ipV4Objs = new IpV4Obj[basicIpV4DataList.size()]; ipV4ObjsArray = basicIpV4DataList.toArray(ipV4Objs); } //IPV6 if(ipV6ObjsArray==null) { BufferedInputStream bufferedInputStream = ctx.readResourceFileAsStream("ipv6.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(bufferedInputStream)); ArrayList<IpV6Obj> ipV6ObjArrayList=new ArrayList<>(); String line = null; while ((line = br.readLine()) != null) { String[] f = line.split("\\|", -1); if(f.length>=5) { String startIp = IpUtils.StringToBigIntString(f[0]); String endIp = IpUtils.StringToBigIntString(f[1]); String city=f[3]; String province=f[4]; IpV6Obj ipV6Obj = new IpV6Obj(startIp, endIp, city, province); ipV6ObjArrayList.add(ipV6Obj); } } br.close(); List<IpV6Obj> collect = ipV6ObjArrayList.stream().sorted(Comparator.comparing(IpV6Obj::getStartIp)).collect(Collectors.toList()); ArrayList<IpV6Obj> basicIpV6DataList=(ArrayList)collect; IpV6Obj[] ipV6Objs = new IpV6Obj[basicIpV6DataList.size()]; ipV6ObjsArray = basicIpV6DataList.toArray(ipV6Objs); } } public String evaluate(String ip){ if(ip==null||ip.trim().isEmpty()||!(ip.contains(".")||ip.contains(":"))) { return null; } int ipV4OrV6=0; try { ipV4OrV6= IpUtils.isIpV4OrV6(ip); } catch (Exception e) { return null; } //如果是IPv4 if(ipV4OrV6==4) { int i = binarySearch(ipV4ObjsArray, IpUtils.StringToLong(ip)); if(i>=0) { IpV4Obj ipV4Obj = ipV4ObjsArray[i]; return ipV4Obj.city+","+ipV4Obj.province; }else{ return null; } }else if(ipV4OrV6==6)//如果是IPv6 { int i = binarySearchIPV6(ipV6ObjsArray, IpUtils.StringToBigIntString(ip)); if(i>=0) { IpV6Obj ipV6Obj = ipV6ObjsArray[i]; return ipV6Obj.city+","+ipV6Obj.province; }else{ return null; } }else{//如果不符合IPv4或IPv6格式 return null; } } @Override public void close() throws UDFException, IOException { super.close(); } private static int binarySearch(IpV4Obj[] array,long ip){ int low=0; int hight=array.length-1; while (low<=hight) { int middle=(low+hight)/2; if((ip>=array[middle].startIp)&&(ip<=array[middle].endIp)) { return middle; } if (ip < array[middle].startIp) hight = middle - 1; else { low = middle + 1; } } return -1; } private static int binarySearchIPV6(IpV6Obj[] array,String ip){ int low=0; int hight=array.length-1; while (low<=hight) { int middle=(low+hight)/2; if((ip.compareTo(array[middle].startIp)>=0)&&(ip.compareTo(array[middle].endIp)<=0)) { return middle; } if (ip.compareTo(array[middle].startIp) < 0) hight = middle - 1; else { low = middle + 1; } } return -1; } private class Invalid{ private Invalid() { } } }
- 准备本地调试数据。
- 在本地项目的
warehouse/example_project/__tables__/wc_in2/p1=2/p2=1/
目录下,打开data文件。 - 将data文件的最后一列数据修改为ipv4.txt中的IP地址(可在ipv4.txt中任选3个IP地址填入),并保存。
- 在本地项目的
- 调试MaxCompute UDF,确保代码可以运行成功。更多调试操作,请参见通过本地运行调试UDF。
- 右键单击编写完成的MaxCompute UDF脚本,选择Run。
- 在Run/Debug Configurations对话框,配置下图红框所示运行参数,单击OK。返回无报错,说明代码运行成功,即可继续执行后续步骤。如有报错,请按照IntelliJ IDEA报错信息处理。说明 运行参数可参照图示数据填写。
步骤四:注册MaxCompute UDF
- 右键单击已经编译成功的MaxCompute UDF脚本,选择Deploy to server…。
- 在Package a jar, submit resource and register function对话框中,配置参数信息。
步骤五:调用MaxCompute UDF转换IP地址为归属地
- 安装并登录MaxCompute客户端。
- 执行SQL SELECT语句调用MaxCompute UDF将IPv4或IPv6地址转换为归属地。命令示例如下。
- 转换IPv4地址为归属地
返回结果如下。select ipv4_ipv6_aton('116.11.XX.XX');
北海市,广西壮族自治区
- 转换IPv6地址为归属地
返回结果如下。select ipv4_ipv6_aton('2001:0250:080b:0:0:0:0:0');
保定市,河北省
- 转换IPv4地址为归属地