首页 > 其他分享 >第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作

第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作

时间:2023-05-17 21:00:39浏览次数:32  
标签:SerDes America hadoop Hive hive org apache import 92


 

第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作:

 

0^^Hadoop^^America^^5000|8000|12000|level8^^male
1^^Spark^^America^^8000|10000|15000|level9^^famale
2^^Flink^^America^^7000|8000|13000|level10^^male
3^^Hadoop^^America^^9000|11000|12000|level10^^famale
4^^Spark^^America^^10000|11000|12000|level12^^male
5^^Flink^^America^^11000|12000|18000|level18^^famale
6^^Hadoop^^America^^15000|16000|19000|level16^^male
7^^Spark^^America^^18000|19000|20000|level20^^male
8^^Flink^^America^^15000|16000|19000|level19^^male

 

实现:inputformat格式编码解析,灵活对hive源数据进行清洗

1,按^^进行分割

2,同时也按|进行切分

 

实现步骤:

1,源数据位置:

root@master:/usr/local/IMF_testdata/hivestudy#ls

  employeesinputformat.txt  IMFInputFormat2.jar

 

2,查看文件内容

root@master:/usr/local/IMF_testdata/hivestudy#cat employeesinputformat.txt

0^^Hadoop^^America^^5000|8000|12000|level8^^male

1^^Spark^^America^^8000|10000|15000|level9^^famale

2^^Flink^^America^^7000|8000|13000|level10^^male

3^^Hadoop^^America^^9000|11000|12000|level10^^famale

4^^Spark^^America^^10000|11000|12000|level12^^male

5^^Flink^^America^^11000|12000|18000|level18^^famale

6^^Hadoop^^America^^15000|16000|19000|level16^^male

7^^Spark^^America^^18000|19000|20000|level20^^male

8^^Flink^^America^^15000|16000|19000|level19^^male

 

3,开发inputformat代码,源代码附后.导出jar包IMFInputFormat2.jar

代码中使用了正则表达式对文本进行了解析:

String patternhive = "^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";

按^^及|进行解析,解析以后进行分组,依次获取各分组的值,然后使用"\u001"组拼接成字符串.

问题:使用"\t"拼接在hive中导入数据为null;

解决:使用"\u001"组拼接成字符串.,顺利导入数据到hive。

 


4,在hive中的操作:

删表:

drop table employee_inputformat;

 

导入jar包

add jar/usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar;

 

建立表
CREATE TABLEemployee_InputFormat(userid  INT,nameString,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4string , gendre string)  stored asINPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
 
加载数据
LOAD DATA LOCAL INPATH'/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLEemployee_InputFormat;
 
数据查询
select * from employee_InputFormat;

 

5,运行结果如下:

 

第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作_hadoop

 

 

 

 

hive>    desc formatted employee_inputformat;
OK
# col_name              data_type               comment             
                 
userid                  int                                         
name                    string                                      
address                 string                                      
salarys1                int                                         
salarys2                int                                         
salarys3                int                                         
salarys4                string                                      
gendre                  string                                      
                 
# Detailed Table Information             
Database:               default                  
Owner:                  root                     
CreateTime:             Sun Dec 11 20:47:21 CST 2016     
LastAccessTime:         UNKNOWN                  
Protect Mode:           None                     
Retention:              0                        
Location:              hdfs://master:9000/user/hive/warehouse/employee_inputformat     
Table Type:             MANAGED_TABLE            
Table Parameters:                
       COLUMN_STATS_ACCURATE   true                
       numFiles                1                   
       totalSize               467                 
       transient_lastDdlTime  1481460441          
                 
# Storage Information            
SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
InputFormat:           com.dt.spark.hive.IMFInputFormat        
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat      
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
       serialization.format    1                   
Time taken: 0.111 seconds, Fetched: 36row(s)
hive>

 

附件源代码:

 

package com.dt.spark.hive;
 
 
import java.io.IOException;
 
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
importorg.apache.hadoop.mapred.JobConfigurable;
importorg.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
importorg.apache.hadoop.mapred.TextInputFormat; 
 
public class IMFInputFormat extends  TextInputFormat implements    
JobConfigurable 
     {
            public RecordReader<LongWritable,Text> getRecordReader(    
                     InputSplit genericSplit, JobConfjob, Reporter reporter)   
                     throws IOException {    
             
                     
                reporter.setStatus(genericSplit.toString());    
                 return new IMFRecordReader((FileSplit)genericSplit,job);    
             }    
  
  
  
}

源代码:

package com.dt.spark.hive;
 
import java.io.IOException;
import java.io.InputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;
 
public  class IMFRecordReader implements RecordReader<LongWritable,Text> {
 
    private CompressionCodecFactorycompressionCodecs = null;
    private  long start;
    private  long pos;
    private  long end;
    private LineReaderlineReader;
    int  maxLineLength;
 
    public IMFRecordReader(FileSplitinputSplit, Configuration job) throws IOException {
maxLineLength =  job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);
start =  inputSplit.getStart();
end =  start + inputSplit.getLength();
       final Pathfile = inputSplit.getPath();
compressionCodecs = new CompressionCodecFactory(job);
       final CompressionCodeccodec = compressionCodecs.getCodec(file);
 
// Open file and seek to thestart of the split
fs =  file.getFileSystem(job);
fileIn =fs.open(file);
       booleanskipFirstLine = false;
       if (codec !=null) {
lineReader = new LineReader(codec.createInputStream(fileIn),job);
end = Long.MAX_VALUE;
       } else {
           if (start
skipFirstLine = true;
start;
fileIn.seek(start);
           }
lineReader = new LineReader(fileIn,job);
       }
       if (skipFirstLine) {
start +=  lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE,end - start));
       }
       this.pos =start;
    }
 
    public IMFRecordReader(InputStreamin, longoffset, longendOffset, intmaxLineLength) {
       this.maxLineLength =maxLineLength;
       this.lineReader =new LineReader(in);
       this.start =offset;
       this.pos =offset;
       this.end =endOffset;
    }
 
    public IMFRecordReader(InputStreamin, longoffset, longendOffset, Configuration job) throws IOException {
       this.maxLineLength =job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);
       this.lineReader =new LineReader(in,job);
       this.start =offset;
       this.pos =offset;
       this.end =endOffset;
    }
 
    public LongWritable createKey() {
       return  new LongWritable();
    }
 
    public Text createValue() {
       return  new Text();
    }
 
/**
     * Reads the next record inthe split. getusefull fields from the raw nginx
     * log.
     * 
     * @param key
     *            key of the record which will map tothe byte offset of the
     *            record's line
     * @param value
     *            the record in text format
     * @return true if a recordexisted, false otherwise
     * @throws IOException
     */
 
    public  synchronized boolean next(LongWritablekey, Text value)throws IOException {
// Stay within the split
       while (pos <end) {
key.set(pos);
           intnewSize = lineReader.readLine(value, maxLineLength,
                  Math.max((int) Math.min(Integer.MAX_VALUE,end - pos),maxLineLength));
 
           if (newSize
              return  false;
patternhive ="^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
 
phive = Pattern.compile(patternhive);
strhive =  value.toString();
mhive =  phive.matcher(strhive);
resultstr =  "defaultisblank";
           while (mhive.find()) {
resultstr =  mhive.group(1) + "\001" +  mhive.group(2) + "\001" +  mhive.group(3) + "\001" +  mhive.group(4)
"\001" +mhive.group(5) + "\001" +mhive.group(6) + "\001" +"IMF" + mhive.group(7) +"\001"
mhive.group(8);
 
           }
           ;
 
           if (resultstr ==null || resultstr == "defaultisblank") {
           } else {
value.set(resultstr);
pos +=  newSize;
 
              if (newSize <maxLineLength)
                  returntrue;
 
           }
       }
 
       return  false;
    }
 
    public  float getProgress() {
       if (start ==end) {
           return 0.0f;
       } else {
           return Math.min(1.0f, (pos -start) / (float) (end -start));
       }
    }
 
    public  synchronized long getPos() throws IOException {
       returnpos;
    }
 
    public  synchronized void close() throws IOException {
       if (lineReader !=null)
lineReader.close();
    }
 
}

 

 


 

 

标签:SerDes,America,hadoop,Hive,hive,org,apache,import,92
From: https://blog.51cto.com/u_10561036/6293797

相关文章

  • hive(一)
    数据仓库数据仓库,英文名称为DataWarehouse,可简写为DW。是一个用于存储,分析,报告的数据系统.数据仓库的目的是构建面向分析的集成化数据环境,分析结果为企业提供决策支持.数据库和数据仓库区别数据库和数据仓库的区别实际就是OLTP和OLAP的区别OLTP系统的典型应用就是RDBMS,也......
  • HIVE跨集群迁移
    查看mysql使用端口ps-ef|grepmysqlss-antp|grep[ps查出来的pid]停止HIVE写入服务创建备份路径mkdir-p/root/hivebackup/执行备份数据库命令:(在目标集群执行)mysqldump-uroot-pPassword-h1.1.1.1-P3306--databaseshive_prode>/root/jws/hiveba......
  • impala jdbc导出hive数据字典
    业务需求太多了,给完整导出为html文件,以及之前搞的publicstaticvoidmain(String[]args)throwsException{kerberos();}publicstaticvoidkerberos(){URLresource=Thread.currentThread().getContextClassLoader().getResource("");......
  • GYM102392 简要题解
    自己下午闲着没事单挑了一下,两小时左右一度rk1,但后继无力了。。。。A.MaxorMin肯定沿着出现过的数操作;然后发现如果a[i]=k,a[j]>k,a[k]<k就会增加一次操作所以维护一下差分序列即可。B.LevelUp两维DP,这个疑似edu出过。要注意的是:需要关于x排个序,不然会漏一些转移。D.......
  • 1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(二)
    文章目录9、分别在hive和impala中查询验证结果(比較HDFS存儲三種格式文件的查詢性能textfile、orc、parquet)1)、查詢總條數2)、隨便找一條信息,按照name查詢3)、按照多条件查询4)、按照時間區間查詢5)、兩張表join6)、總結1、文件存儲2、hive查詢與impala查詢速度3、不同查詢類型的查詢......
  • ASEMI代理LTC6992IS6-1#TRMPBF原装ADI车规级LTC6992IS6-1#TRMPBF
    编辑:llASEMI代理LTC6992IS6-1#TRMPBF原装ADI车规级LTC6992IS6-1#TRMPBF型号:LTC6992IS6-1#TRMPBF品牌:ADI/亚德诺封装:TSOT-23-6批号:2023+引脚数量:6工作温度:-55°C~125°C安装类型:表面贴装型LTC6992IS6-1#TRMPBF汽车芯片LTC6992IS6-1#TRMPBF特性脉宽调制(PWM)由控制简单的......
  • 小知识:设置archive_lag_target参数强制日志切换
    为客户测试一个ADG场景问题,发现测试环境的日志切换频率过低,总是需要定期手工切换,这非常影响测试心情。实际上,可以设置archive_lag_target参数强制日志切换。比如设置:altersystemsetarchive_lag_target=1800;这样即使库没任何压力,半小时也会切换一次日志。该设置同时也适......
  • P9235网络稳定性
    前置知识最近公共祖先(LCA)\(\text{Kruskal}\)重构树简化题意P9235网络稳定性给一个有边权的无向图,给你点\(u\)到另一个点\(v\)所有的路径上最大的边权最小是多少。solution先来介绍一下\(\text{Kruskal}\)重构树。这个算法是最小生成树的\(\text{Kruskal......
  • JCO 1921路由计划
    XJCO1921-ProgrammingProjectCoursework2–RoutePlanningDeadline:12PMBSTonMonday,15May2023Thisworkisthesecondcourseworkforthismodule.Itcorrespondsto60%oftheoverallassessmentforthismodule.SubmissionsshouldbemadeviaGradesc......
  • 22092133《Java程序设计》第一周学习总结
    1本周学习总结: 一个Java源文件可能编译出多个字节码文件。Scanner是Java的一个类,使用Scanner对象读取数据的时候,要注意next()方法只能读取到有效字符之前遇到的空白,并不能得到带有空格的字符串,nextLine()方法以Enter为结束符,返回输入回车之前的字符就可以获得空白2.书面作业......