首页 > 数据库 >MapReduce解析简历存储到Postgres数据库

MapReduce解析简历存储到Postgres数据库

时间:2024-11-12 17:14:45浏览次数:3  
标签:简历 String resume MapReduce org apache import public Postgres

目录

一、功能描述

二、代码实现

1、代码结构

2、ResumeDBWritable代码

3、ResumeWritable代码

4、ResumeSDK代码 

5、ResumeDBMapper代码

6、ResumeDBReducer代码

7、ResumeDBPartationer代码

8、ResumeDBDriver代码

一、功能描述

简历数据经过ETL流程,已经上传到HDFS上,需要针对简历内容进行解析,存储到数据库中。

本案例解析解析使用ResumeSDK,数据库使用Postgres

二、代码实现

1、代码结构

2、ResumeDBWritable代码

package com.soft863.writable;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class ResumeDBWritable implements DBWritable, Writable {
    private String name;
    private Integer age;

    public ResumeDBWritable(ResumeWritable resume) {
        this.name = resume.getName();
        this.age =Integer.parseInt(resume.getAge()) ;

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(age);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setString(1, name);
        statement.setInt(2, age);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {

    }
}

3、ResumeWritable代码

package com.soft863.writable;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class ResumeWritable implements Writable {
   private String name="";
    private  String sex="";
    private  String age="0";
    private String phone="";
    private String location="未知";

    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }


    @Override
    public String toString() {
        return "ResumeWritable{" +
                "name='" + name + '\'' +
                ", sex='" + sex + '\'' +
                ", age='" + age + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.name);
        out.writeUTF(this.sex);
        out.writeUTF(this.age);
        out.writeUTF(this.phone);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.name = in.readUTF();
        this.sex = in.readUTF();
        this.age = in.readUTF();
        this.phone = in.readUTF();
    }
}

4、ResumeSDK代码 

package com.soft863.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.soft863.writable.ResumeWritable;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Consts;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.UUID;

public class ResumeSDK {


    public static String parserStr(String content) {
        try {
            String url = "http://resumesdk.market.alicloudapi.com/ResumeParser";
            // 设置头字段
            HttpPost httpPost = new HttpPost(url);
            httpPost.setHeader("Authorization", "APPCODE " + "7619614c299a42b080cbeebb0cf7659e");
            httpPost.addHeader("Content-Type", "application/json; charset=UTF-8");
            httpPost.addHeader("Content-Type", "application/json");

            // 读取简历内容
            byte[] bytes = content.getBytes();
            String data = new String(Base64.encodeBase64(bytes), Consts.UTF_8);

            // 设置内容信息
            JSONObject json = new JSONObject();
            json.put("file_name", UUID.randomUUID());    // 文件名
            json.put("file_cont", data);    // 经base64编码过的文件内容
            json.put("need_avatar", 0);        // 是否需要解析头像
            json.put("ocr_type", 1);        // 1为高级ocr
            StringEntity params = new StringEntity(json.toString(), Consts.UTF_8);
            httpPost.setEntity(params);

            // 发送请求
            HttpClient httpclient = new DefaultHttpClient();
            HttpResponse response = httpclient.execute(httpPost);

            // 处理返回结果
            String resCont = null;
            resCont = EntityUtils.toString(response.getEntity(), Consts.UTF_8);
            System.out.println(resCont);
            return resCont;
        } catch (IOException e) {
            System.out.println(e);
            return "";
        }


    }

    public static ResumeWritable parser(String content) {
        try {
            ResumeWritable resume = new ResumeWritable();
            String jsonString = parserStr(content);

            // 使用FastJSON解析JSON字符串
            JSONObject jsonObject = JSON.parseObject(jsonString);
            JSONObject result = jsonObject.getJSONObject("result");
            String name = result.getString("name");
            if (name != null) {
                resume.setName(name);
            }
            String sex = result.getString("sex");
            if (sex != null) {
                resume.setSex(sex);
            }
            String phone = result.getString("phone");
            if (phone != null) {
                resume.setPhone(phone);
            }
            String location = result.getString("hometown_address");
            if (location != null) {
                resume.setLocation(location);
            }
            String age = result.getString("age");
            if (age != null) {
                resume.setAge(age);
            }
            //coding
            return resume;
        } catch (Exception ex) {
            return null;
        }
    }

    public static String parserJson(String content) {
        try {
            ResumeWritable resume = new ResumeWritable();
            String jsonString = parserStr(content);

            // 使用FastJSON解析JSON字符串
            JSONObject jsonObject = JSON.parseObject(jsonString);
            JSONObject result = jsonObject.getJSONObject("result");

            resume.setName(result.getString("name"));
            resume.setSex(result.getString("gender"));
            resume.setPhone(result.getString("phone"));
            resume.setAge(result.getString("age"));
            return resume.toString();
//            return JSONObject.toJSONString(resume);
        } catch (Exception e) {
            System.out.println(e);
            return "";
        }
    }

    public static void main(String[] args) throws Exception {

        String log = "读写能力一般英语:读写能力熟练|听说能力可沟通";
        String resume = ResumeSDK.parserStr(log);
        System.out.println(resume);
    }
}

5、ResumeDBMapper代码

package com.soft863.resumedb;

import com.soft863.util.ResumeSDK;
import com.soft863.writable.ResumeWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class ResumeDBMapper extends Mapper<Object, Text, Text, ResumeWritable> {
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      try {
          String strResume = value.toString();
          //coding...
          ResumeWritable resume = ResumeSDK.parser(strResume);
          System.out.println(resume);
          if (resume!=null) {
              context.write(new Text(resume.getLocation()), resume);
          }
      }catch (Exception e){
          System.out.println(e);
      }

    }
}

6、ResumeDBReducer代码

package com.soft863.resumedb;

import com.soft863.writable.ResumeWritable;
import com.soft863.writable.ResumeDBWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ResumeDBReducer extends Reducer<Text, ResumeWritable, ResumeDBWritable, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<ResumeWritable> values, Context context) throws IOException, InterruptedException {
       for(ResumeWritable value:values){
           context.write(new ResumeDBWritable(value), NullWritable.get());
       }

    }
}

7、ResumeDBPartationer代码

package com.soft863.resumedb;

import com.soft863.writable.ResumeWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ResumeDBPartationer extends Partitioner<Text, ResumeWritable> {

    @Override
    public int getPartition(Text text, ResumeWritable resumeWritable, int numPartitions) {
        if (text.toString().contains("河南")) {
            return 1;
        }
        return 0;
    }
}

8、ResumeDBDriver代码

package com.soft863.resumedb;

import com.soft863.writable.ResumeWritable;
import com.soft863.writable.ResumeDBWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;
public class ResumeDBDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String path = args[0];
        start(path);
    }

    public static void start(String sourcePath) {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://node11:9000");
            DBConfiguration.configureDB(conf, "org.postgresql.Driver", "jdbc:postgresql://127.0.0.1:5432/skynet",
                    "postgres", "postgres");

            Job job = Job.getInstance(conf);

            //重要:指定本job所在的jar包
            job.setJarByClass(ResumeDBDriver.class);
            //设置wordCountJob所用的mapper逻辑类为哪个类
            job.setMapperClass(ResumeDBMapper.class);
            //设置wordCountJob所用的reducer逻辑类为哪个类
            job.setReducerClass(ResumeDBReducer.class);

            //设置map阶段输出的kv数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(ResumeWritable.class);
            //设置最终输出的kv数据类型
            job.setOutputKeyClass(ResumeDBWritable.class);
            job.setOutputValueClass(NullWritable.class);

            job.setPartitionerClass(ResumeDBPartationer.class);
            job.setNumReduceTasks(2);

            //设置要处理的文本数据所存放的路径
            Path sourceFile = new Path(sourcePath);
            FileInputFormat.setInputPaths(job, sourceFile);

            job.setOutputFormatClass(DBOutputFormat.class);
            DBOutputFormat.setOutput(job, "tb_resume",
                    "f_name", "f_age");

            //提交job给hadoop集群
            boolean success = job.waitForCompletion(true);
            System.exit(success ? 0 : 1);
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

标签:简历,String,resume,MapReduce,org,apache,import,public,Postgres
From: https://blog.csdn.net/taogumo/article/details/143717514

相关文章

  • 搭建 PostgreSQL 主从架构
    操作场景PostgreSQL是一个开源对象关系型数据库管理系统,并侧重于可扩展性和标准的符合性。PostgreSQL面向企业复杂SQL处理的OLTP在线事务处理场景,支持NoSQL数据类型(JSON/XML/hstore),支持GIS(GeographicInformationSystem或Geo-Informationsystem)地理信息处理,在可靠......
  • PostgreSQL流复制主从监控和自动故障转移的轻量级实现
    如何实现PostgreSQL的高可用,之前研究过repmgr以及pg_auto_failover,起作用都是起到主节点故障时,实现“自动故障转移”的目的。但是repmgr以及pg_auto_failover得缺点是对数据库侵入过多,需要在监控的数据库内部进行一系列的配置操作,同时需要启动第三方服务实现节点的可用性监控,这又......
  • Rocky9系统安装PostgreSQL
    官网https://www.postgresql.org/环境查看安装登录官网根据平台选择帮助文档sudodnfinstall-yhttps://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpmsudodnf-qymoduledisablepostgresqlsudodnfinst......
  • PostgreSQL configure: error: readline library not found
    前言安装PostgreSQL时报错,以下复制代码configure:error:readlinelibrarynotfoundIfyouhavereadlinealreadyinstalled,seeconfig.logfordetailsonthefailure.Itispossiblethecompilerisn'tlookingintheproperdirectory.Use--without-readline......
  • postgresql事务与oracle中的事务差异
    事务事务ID及回卷参见postgresql中的事务回卷原理及预防措施。子事务(事务处理:概念与技术4.7)  子事务具有ACI特性,但是不具有D特性。只会在主事务提交时,才会提交,无法单独提交。pg不支持子事务。xact保存点保存点是不支持子事务/嵌套事务时的折中实现,但它是ANSISQL......
  • PostgreSQL 安装 POSTGRES_FDW
    PostgreSQL安装POSTGRES_FDW插件postgres_fdw模块提供外部数据包装器postgres_fdw它可以用于访问存储在外部PostgreSQL服务器中的数据。使用postgres_fdw访问外部数据需要做以下几点准备:1、使用CREATEextension安装postgres_fdw扩展2、使用createserver......
  • 【数据库系列】postgresql链接详解
    ......
  • PostgreSQL技术大讲堂 - 第71讲:PostgreSQL 17 版本升级
     PostgreSQL技术大讲堂-第71讲,主题:PostgreSQL17版本升级讲课内容:PostgreSQL17版本升级  0、升级前准备工作  1、介绍小版本升级方式(pg12.2-to-pg12.20)  2、介绍大版本升级方式(pg12-to-pg17)  3、升级后验证   PostgreSQL版本更新很快,几乎......
  • mapreduce案例_电信用户平均停留时间
    packagecom.wll.dianxin;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.......
  • mapreduce案例_用户停留时间关联城市名
    importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.Job;impo......