目录
一、功能描述
简历数据经过ETL流程,已经上传到HDFS上,需要针对简历内容进行解析,存储到数据库中。
本案例解析解析使用ResumeSDK,数据库使用Postgres
二、代码实现
1、代码结构
2、ResumeDBWritable代码
package com.soft863.writable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class ResumeDBWritable implements DBWritable, Writable {
private String name;
private Integer age;
public ResumeDBWritable(ResumeWritable resume) {
this.name = resume.getName();
this.age =Integer.parseInt(resume.getAge()) ;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}
@Override
public void readFields(DataInput in) throws IOException {
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, name);
statement.setInt(2, age);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
}
}
3、ResumeWritable代码
package com.soft863.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ResumeWritable implements Writable {
private String name="";
private String sex="";
private String age="0";
private String phone="";
private String location="未知";
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
@Override
public String toString() {
return "ResumeWritable{" +
"name='" + name + '\'' +
", sex='" + sex + '\'' +
", age='" + age + '\'' +
", phone='" + phone + '\'' +
'}';
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.name);
out.writeUTF(this.sex);
out.writeUTF(this.age);
out.writeUTF(this.phone);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.sex = in.readUTF();
this.age = in.readUTF();
this.phone = in.readUTF();
}
}
4、ResumeSDK代码
package com.soft863.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.soft863.writable.ResumeWritable;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Consts;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.UUID;
public class ResumeSDK {
public static String parserStr(String content) {
try {
String url = "http://resumesdk.market.alicloudapi.com/ResumeParser";
// 设置头字段
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Authorization", "APPCODE " + "7619614c299a42b080cbeebb0cf7659e");
httpPost.addHeader("Content-Type", "application/json; charset=UTF-8");
httpPost.addHeader("Content-Type", "application/json");
// 读取简历内容
byte[] bytes = content.getBytes();
String data = new String(Base64.encodeBase64(bytes), Consts.UTF_8);
// 设置内容信息
JSONObject json = new JSONObject();
json.put("file_name", UUID.randomUUID()); // 文件名
json.put("file_cont", data); // 经base64编码过的文件内容
json.put("need_avatar", 0); // 是否需要解析头像
json.put("ocr_type", 1); // 1为高级ocr
StringEntity params = new StringEntity(json.toString(), Consts.UTF_8);
httpPost.setEntity(params);
// 发送请求
HttpClient httpclient = new DefaultHttpClient();
HttpResponse response = httpclient.execute(httpPost);
// 处理返回结果
String resCont = null;
resCont = EntityUtils.toString(response.getEntity(), Consts.UTF_8);
System.out.println(resCont);
return resCont;
} catch (IOException e) {
System.out.println(e);
return "";
}
}
public static ResumeWritable parser(String content) {
try {
ResumeWritable resume = new ResumeWritable();
String jsonString = parserStr(content);
// 使用FastJSON解析JSON字符串
JSONObject jsonObject = JSON.parseObject(jsonString);
JSONObject result = jsonObject.getJSONObject("result");
String name = result.getString("name");
if (name != null) {
resume.setName(name);
}
String sex = result.getString("sex");
if (sex != null) {
resume.setSex(sex);
}
String phone = result.getString("phone");
if (phone != null) {
resume.setPhone(phone);
}
String location = result.getString("hometown_address");
if (location != null) {
resume.setLocation(location);
}
String age = result.getString("age");
if (age != null) {
resume.setAge(age);
}
//coding
return resume;
} catch (Exception ex) {
return null;
}
}
public static String parserJson(String content) {
try {
ResumeWritable resume = new ResumeWritable();
String jsonString = parserStr(content);
// 使用FastJSON解析JSON字符串
JSONObject jsonObject = JSON.parseObject(jsonString);
JSONObject result = jsonObject.getJSONObject("result");
resume.setName(result.getString("name"));
resume.setSex(result.getString("gender"));
resume.setPhone(result.getString("phone"));
resume.setAge(result.getString("age"));
return resume.toString();
// return JSONObject.toJSONString(resume);
} catch (Exception e) {
System.out.println(e);
return "";
}
}
public static void main(String[] args) throws Exception {
String log = "读写能力一般英语:读写能力熟练|听说能力可沟通";
String resume = ResumeSDK.parserStr(log);
System.out.println(resume);
}
}
5、ResumeDBMapper代码
package com.soft863.resumedb;
import com.soft863.util.ResumeSDK;
import com.soft863.writable.ResumeWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ResumeDBMapper extends Mapper<Object, Text, Text, ResumeWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
try {
String strResume = value.toString();
//coding...
ResumeWritable resume = ResumeSDK.parser(strResume);
System.out.println(resume);
if (resume!=null) {
context.write(new Text(resume.getLocation()), resume);
}
}catch (Exception e){
System.out.println(e);
}
}
}
6、ResumeDBReducer代码
package com.soft863.resumedb;
import com.soft863.writable.ResumeWritable;
import com.soft863.writable.ResumeDBWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ResumeDBReducer extends Reducer<Text, ResumeWritable, ResumeDBWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<ResumeWritable> values, Context context) throws IOException, InterruptedException {
for(ResumeWritable value:values){
context.write(new ResumeDBWritable(value), NullWritable.get());
}
}
}
7、ResumeDBPartationer代码
package com.soft863.resumedb;
import com.soft863.writable.ResumeWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ResumeDBPartationer extends Partitioner<Text, ResumeWritable> {
@Override
public int getPartition(Text text, ResumeWritable resumeWritable, int numPartitions) {
if (text.toString().contains("河南")) {
return 1;
}
return 0;
}
}
8、ResumeDBDriver代码
package com.soft863.resumedb;
import com.soft863.writable.ResumeWritable;
import com.soft863.writable.ResumeDBWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class ResumeDBDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String path = args[0];
start(path);
}
public static void start(String sourcePath) {
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node11:9000");
DBConfiguration.configureDB(conf, "org.postgresql.Driver", "jdbc:postgresql://127.0.0.1:5432/skynet",
"postgres", "postgres");
Job job = Job.getInstance(conf);
//重要:指定本job所在的jar包
job.setJarByClass(ResumeDBDriver.class);
//设置wordCountJob所用的mapper逻辑类为哪个类
job.setMapperClass(ResumeDBMapper.class);
//设置wordCountJob所用的reducer逻辑类为哪个类
job.setReducerClass(ResumeDBReducer.class);
//设置map阶段输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ResumeWritable.class);
//设置最终输出的kv数据类型
job.setOutputKeyClass(ResumeDBWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(ResumeDBPartationer.class);
job.setNumReduceTasks(2);
//设置要处理的文本数据所存放的路径
Path sourceFile = new Path(sourcePath);
FileInputFormat.setInputPaths(job, sourceFile);
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "tb_resume",
"f_name", "f_age");
//提交job给hadoop集群
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
} catch (Exception e) {
System.out.println(e);
}
}
}
标签:简历,String,resume,MapReduce,org,apache,import,public,Postgres
From: https://blog.csdn.net/taogumo/article/details/143717514