第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作:
0^^Hadoop^^America^^5000|8000|12000|level8^^male
1^^Spark^^America^^8000|10000|15000|level9^^famale
2^^Flink^^America^^7000|8000|13000|level10^^male
3^^Hadoop^^America^^9000|11000|12000|level10^^famale
4^^Spark^^America^^10000|11000|12000|level12^^male
5^^Flink^^America^^11000|12000|18000|level18^^famale
6^^Hadoop^^America^^15000|16000|19000|level16^^male
7^^Spark^^America^^18000|19000|20000|level20^^male
8^^Flink^^America^^15000|16000|19000|level19^^male
实现:inputformat格式编码解析,灵活对hive源数据进行清洗
1,按^^进行分割
2,同时也按|进行切分
实现步骤:
1,源数据位置:
root@master:/usr/local/IMF_testdata/hivestudy#ls
employeesinputformat.txt IMFInputFormat2.jar
2,查看文件内容
root@master:/usr/local/IMF_testdata/hivestudy#cat employeesinputformat.txt
0^^Hadoop^^America^^5000|8000|12000|level8^^male
1^^Spark^^America^^8000|10000|15000|level9^^famale
2^^Flink^^America^^7000|8000|13000|level10^^male
3^^Hadoop^^America^^9000|11000|12000|level10^^famale
4^^Spark^^America^^10000|11000|12000|level12^^male
5^^Flink^^America^^11000|12000|18000|level18^^famale
6^^Hadoop^^America^^15000|16000|19000|level16^^male
7^^Spark^^America^^18000|19000|20000|level20^^male
8^^Flink^^America^^15000|16000|19000|level19^^male
3,开发inputformat代码,源代码附后.导出jar包IMFInputFormat2.jar
代码中使用了正则表达式对文本进行了解析:
String patternhive = "^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
按^^及|进行解析,解析以后进行分组,依次获取各分组的值,然后使用"\u001"组拼接成字符串.
问题:使用"\t"拼接在hive中导入数据为null;
解决:使用"\u001"组拼接成字符串.,顺利导入数据到hive。
4,在hive中的操作:
删表:
drop table employee_inputformat;
导入jar包
add jar/usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar;
建立表
CREATE TABLEemployee_InputFormat(userid INT,nameString,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4string , gendre string) stored asINPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
加载数据
LOAD DATA LOCAL INPATH'/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLEemployee_InputFormat;
数据查询
select * from employee_InputFormat;
5,运行结果如下:
hive> desc formatted employee_inputformat;
OK
# col_name data_type comment
userid int
name string
address string
salarys1 int
salarys2 int
salarys3 int
salarys4 string
gendre string
# Detailed Table Information
Database: default
Owner: root
CreateTime: Sun Dec 11 20:47:21 CST 2016
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://master:9000/user/hive/warehouse/employee_inputformat
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE true
numFiles 1
totalSize 467
transient_lastDdlTime 1481460441
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: com.dt.spark.hive.IMFInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.111 seconds, Fetched: 36row(s)
hive>
附件源代码:
package com.dt.spark.hive;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
importorg.apache.hadoop.mapred.JobConfigurable;
importorg.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
importorg.apache.hadoop.mapred.TextInputFormat;
public class IMFInputFormat extends TextInputFormat implements
JobConfigurable
{
public RecordReader<LongWritable,Text> getRecordReader(
InputSplit genericSplit, JobConfjob, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new IMFRecordReader((FileSplit)genericSplit,job);
}
}
源代码:
package com.dt.spark.hive;
import java.io.IOException;
import java.io.InputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;
public class IMFRecordReader implements RecordReader<LongWritable,Text> {
private CompressionCodecFactorycompressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReaderlineReader;
int maxLineLength;
public IMFRecordReader(FileSplitinputSplit, Configuration job) throws IOException {
maxLineLength = job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
final Pathfile = inputSplit.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodeccodec = compressionCodecs.getCodec(file);
// Open file and seek to thestart of the split
fs = file.getFileSystem(job);
fileIn =fs.open(file);
booleanskipFirstLine = false;
if (codec !=null) {
lineReader = new LineReader(codec.createInputStream(fileIn),job);
end = Long.MAX_VALUE;
} else {
if (start
skipFirstLine = true;
start;
fileIn.seek(start);
}
lineReader = new LineReader(fileIn,job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE,end - start));
}
this.pos =start;
}
public IMFRecordReader(InputStreamin, longoffset, longendOffset, intmaxLineLength) {
this.maxLineLength =maxLineLength;
this.lineReader =new LineReader(in);
this.start =offset;
this.pos =offset;
this.end =endOffset;
}
public IMFRecordReader(InputStreamin, longoffset, longendOffset, Configuration job) throws IOException {
this.maxLineLength =job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);
this.lineReader =new LineReader(in,job);
this.start =offset;
this.pos =offset;
this.end =endOffset;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/**
* Reads the next record inthe split. getusefull fields from the raw nginx
* log.
*
* @param key
* key of the record which will map tothe byte offset of the
* record's line
* @param value
* the record in text format
* @return true if a recordexisted, false otherwise
* @throws IOException
*/
public synchronized boolean next(LongWritablekey, Text value)throws IOException {
// Stay within the split
while (pos <end) {
key.set(pos);
intnewSize = lineReader.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE,end - pos),maxLineLength));
if (newSize
return false;
patternhive ="^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
phive = Pattern.compile(patternhive);
strhive = value.toString();
mhive = phive.matcher(strhive);
resultstr = "defaultisblank";
while (mhive.find()) {
resultstr = mhive.group(1) + "\001" + mhive.group(2) + "\001" + mhive.group(3) + "\001" + mhive.group(4)
"\001" +mhive.group(5) + "\001" +mhive.group(6) + "\001" +"IMF" + mhive.group(7) +"\001"
mhive.group(8);
}
;
if (resultstr ==null || resultstr == "defaultisblank") {
} else {
value.set(resultstr);
pos += newSize;
if (newSize <maxLineLength)
returntrue;
}
}
return false;
}
public float getProgress() {
if (start ==end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos -start) / (float) (end -start));
}
}
public synchronized long getPos() throws IOException {
returnpos;
}
public synchronized void close() throws IOException {
if (lineReader !=null)
lineReader.close();
}
}
标签:SerDes,America,hadoop,Hive,hive,org,apache,import,92 From: https://blog.51cto.com/u_10561036/6293797