首页 > 其他分享 >亿级地址关联匹配如何实现每天全量更新?大数据环境下hive+addresstool解决方案

亿级地址关联匹配如何实现每天全量更新?大数据环境下hive+addresstool解决方案

时间:2024-04-01 20:13:36浏览次数:24  
标签:getString res hive put 全量 && address 亿级 null

在政务系统中有许多需要将业务地址关联到标准地址的场景,addresstool致力于解决地址关联匹配算法中的速度和准确性问题。
最近遇到一个业务痛点,由于客户标准地址在持续更新,导致历史上业务地址关联到的标准地址无法与最新的标准地址挂接,于是客户要求每日对全量业务地址进行挂接标准地址。为了解决之前单机版地址关联的效率问题,本次采用addresstool与大数据结合的方式,提高地址关联速度。
经实测,单核addresstool的地址关联速度在5000/秒-20000/秒之间(取决于业务地址质量),关联匹配正确率达到98%。hadoop分布式环境地址关联匹配速度能达到10万+/秒(具体取决与大数据节点数和地址质量,节点足够。每秒实现百万级地址关联)。

本文大数据环境为hive数据库,通过udf将addresstool封装,最后实现分布式计算。
直接上代码

public class AddressLink extends GenericUDF {
private PrimitiveObjectInspector addressIO;
private static AddressTool addressTool;

private String bld(String building){
    if(building!=null&&!building.isEmpty() ){
        if(building.endsWith("栋")||building.endsWith("幢")){
            return building.substring(0,building.length()-1);
        }else if(building.endsWith("号楼")){
            return building.substring(0,building.length()-2);
        }
    }

    return building;
}

private String unit(String unit){
    if(unit!=null&&!unit.isEmpty() ){
        if(unit.endsWith("单元")){
            return unit.substring(0,unit.length()-2);
        }
    }

    return unit;
}

private String room(String room){
    if(room!=null&&!room.isEmpty() ){
        if(room.endsWith("室")||room.endsWith("户")){
            return room.substring(0,room.length()-1);
        }
    }

    return room;
}


@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    if (arguments[0] instanceof ObjectInspector) {
        addressIO = (PrimitiveObjectInspector) arguments[0];
    }else{
        throw new UDFArgumentLengthException("The function GetMapValue accepts  1 argument. simple: GetSqName(sq_name)");
    }
    addressTool = new AddressTool();
    DataTable data = new DataTable();
    try{
        //注册Driver
        String driver = "org.postgresql.Driver";//prop.getProperty("driver");
        String url = "jdbc:postgresql://*****:5432/postgres";//prop.getProperty("url");
        String username = "******";//prop.getProperty("user");
        String password = "******";//prop.getProperty("password");
        Class.forName(driver);
        Connection connection = DriverManager.getConnection(url, username, password);
        Statement statement = connection.createStatement();

        // 数据初始化
        ResultSet res = statement.executeQuery("select id,province,city,county,town,community,road,road_no,aoi,sub_aoi,building,unit,room,address from st_address order by aoi,road,road_no");
        int cnt = 0;
        while (res.next()) {
            HashMap<String,String> mp = new HashMap<>();
            if(res.getString("id")!=null&& !Objects.equals(res.getString("id"), "")){mp.put("id",res.getString("id"));}
            if(res.getString("province")!=null&& !Objects.equals(res.getString("province"), "")){mp.put("province",res.getString("province"));}
            if(res.getString("city")!=null&& !Objects.equals(res.getString("city"), "")){mp.put("city",res.getString("city"));}
            if(res.getString("county")!=null&& !Objects.equals(res.getString("county"), "")){mp.put("county",res.getString("county"));}
            if(res.getString("town")!=null&& !Objects.equals(res.getString("town"), "")){mp.put("town",res.getString("town"));}
            if(res.getString("community")!=null&& !Objects.equals(res.getString("community"), "")){mp.put("community",res.getString("community"));}
            if(res.getString("road")!=null&& !Objects.equals(res.getString("road"), "")){mp.put("road",res.getString("road"));}
            if(res.getString("road_no")!=null&& !Objects.equals(res.getString("road_no"), "")){mp.put("road_no",res.getString("road_no"));}
            if(res.getString("aoi")!=null&& !Objects.equals(res.getString("aoi"), "")){mp.put("aoi",res.getString("aoi"));}
            if(res.getString("sub_aoi")!=null&& !Objects.equals(res.getString("sub_aoi"), "")){mp.put("sub_aoi",res.getString("sub_aoi"));}
            if(res.getString("building")!=null&& !Objects.equals(res.getString("building"), "")){mp.put("building",bld(res.getString("building")));}
            if(res.getString("unit")!=null&& !Objects.equals(res.getString("unit"), "")){mp.put("unit",unit(res.getString("unit")));}
            if(res.getString("room")!=null&& !Objects.equals(res.getString("room"), "")){mp.put("room",room(res.getString("room")));}
            if(res.getString("address")!=null&& !Objects.equals(res.getString("address"), "")){mp.put("address",res.getString("address"));}
            data.addAddressDic(mp);
            cnt = cnt + 1;
        }

        //标准数据地址数据加载到addresstool
        data.initData(addressTool);
        data = null;
        statement.close();
        connection.close();

    } catch (Exception throwables) {
        throwables.printStackTrace();
    }

    return ObjectInspectorFactory.getStandardMapObjectInspector(
            PrimitiveObjectInspectorFactory.javaStringObjectInspector,
            PrimitiveObjectInspectorFactory.javaStringObjectInspector);
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
    if(arguments[0].get()==null){
        return null;
    }


    String address =  PrimitiveObjectInspectorUtils.getString(arguments[0].get(), this.addressIO);
    // 中文地址中的异常字符预处理
    while(address.contains(" ")){address = address.replace(" ","");}
    while(address.contains("--")){address = address.replace("--","-");}
    while(address.contains("——")){address = address.replace("——","-");}
    while(address.contains("- ")){address = address.replace("- ","-");}
    while(address.contains(" -")){address = address.replace(" -","-");}
    while(address.contains("— ")){address = address.replace("— ","-");}
    while(address.contains(" —")){address = address.replace(" —","-");}


    // 地址关联
    StandardAddress stdAddress = addressTool.getStdAddress(address);
    Map<String,String> result = stdAddress.getStdAddress();
    // 地址级别判断
    if(stdAddress.addressLevel!=null&& !stdAddress.addressLevel.equals("")){
        result.put("addressLevel",stdAddress.addressLevel);
    }else{
        result.put("addressLevel","未知");
    }

    // 地址关联级别判断
    if(stdAddress.linkLevel!=null&& !stdAddress.linkLevel.equals("")){
        result.put("linkLevel",stdAddress.linkLevel);
    }else{
        result.put("linkLevel","未关联");
    }


    return result;
}


@Override
public String getDisplayString(String[] children) {
    return "Address(" + children[0] + ")";
}

}
addresstool在分布式节点下计算速度超级快,经实测,1千万地址数据在sparksql调用udf方式,耗时3.5分钟。5千万数据耗时8分钟。

通过addresstool与大数据结合,成功实现每日千万级业务地址全量关联更新。

java资源下载
https://download.csdn.net/download/u011024436/89035851
源码学习
https://gitee.com/addresstool/address

欢迎共同学习讨论

使用中有问题或者建议,欢迎联系邮箱addresstool@163.com

标签:getString,res,hive,put,全量,&&,address,亿级,null
From: https://www.cnblogs.com/addresstool/p/18109267

相关文章

  • Hive详解(5)
    Hive窗口函数案例需求:连续三天登陆的用户数据步骤:--建表createtablelogins( usernamestring, log_datestring)rowformatdelimitedfieldsterminatedby'';--加载数据loaddatalocalinpath'/opt/hive_data/login'intotablelogins;--......
  • Hive的row_number和regexp_extract结合带来的乱码问题
    selectuserid,from_unixtime(createtime,'yyyy-MM-dd')asdateid,regexp_extract(browser,'^([^\\(]*).*$',1)asbrowser,operationsystem,device,row_number()over......
  • Hive-技术补充-ANTLR的真实语法世界
    一、上下文上一篇博客<Hive-技术补充-ANTLR语法编写>,我们了解了如何使用ANTLR语法来表达词法结构和语法结构,下面我们循循渐进的处理身边用过的一些文件或语言:CSV、JSON、DOT、Cymbol、R 二、解析CSV文件有这样一份csv文件vidata.csvDetails,Month,AmountMidBonus,Ju......
  • openGauss 全量迁移gs_mysync
    全量迁移gs_mysync可获得性本特性自openGauss5.0.0版本开始引入。特性简介gs_mysync工具是一个基于Python语言的MySQL到openGauss的复制工具。该工具提供了初始全量数据及对象(视图、触发器、函数、存储过程)的复制能力,可实现数据及对象从MySQL迁移至openGauss。对于数据的全量......
  • 万字详解PHP+Sphinx中文亿级数据全文检索实战(实测亿级数据0.1秒搜索耗时)
    Sphinx官方文档:http://sphinxsearch.com/docs/sphinx3.html极简概括:由C++编写的高性能全文搜索引擎的开源组件,C/S架构,跨平台(支持Linux、Windows、MacOS),支持分布式部署,并可直接适配MySQL。解决问题:因为MySQL的like%keyword%不走索引,且全文索引不支持中文,所以需要借助其它......
  • Hive 刷题—— 每年的在校人数
    问题描述 year表示学生入学年度,num表示对应年度录取学生人数,stu_len表示录取学生的学制;说明:例如录取年度2018学制是3年,表示该批学生在校年份为2018~2019、2019~2020、2020-2021,在算每年的在校人数时,2018/2019/2020/2021年份都需要算上。示例数据 idyearnumstu_l......
  • Hive 刷题——HiveSql 实现分钟级的趋势图
    问题描述在Hive中,怎么用sql实现分钟级的趋势图?比如从交易表中,如何统计0点到每分钟的交易趋势图?原表:trade_A(trade_id,pay_time(格式是2020-08-0510:30:28),pay_gmv)。希望用sql实现分钟级的0点到当前分钟的GMV。结果表:result_A(minute_rn(分钟顺序),pay_gmv_td(每分钟的交易额,都是......
  • Oracle重做日志文件clear logfile与clear unarchived logfile浅析
    首先,从v$log动态视图中观察到ARC和STATUS两个字段STATUS:分为CURRENT、ACTIVE和INACTIVE三种,当数据库进程DBWn进行一次写入,脏数据从内存刷写到redologfile中,这时承载数据写入的redologfile状态即为CURRENT;而数据从redologfile拷贝到归档目录下时处于ACTIVE状态,完成数据从内存......
  • hiveserver2拒绝连接
    一、报错内容 二、解决办法基本都是core-site.xml文件中没做好代理导致的。在文件中添加如下配置<property><name>hadoop.proxyuser.xxx.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.x......
  • 第1章 Hive基本概念
    1.1什么是Hivehive简介Hive:由facebook开源用于解决海量结构化日志的数据统计工具。Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL的查询功能。2)Hive本质:将HQL转化成MapReduce程序。3)Hive的三个要点:Hive处理的数据存储在HDFS......