首页 > 其他分享 >【hadoop】 3005-hadoop对象序列化编码

【hadoop】 3005-hadoop对象序列化编码

时间:2023-05-17 22:32:51浏览次数:41  
标签:mobileNumber downFlow upFlow hadoop 3005 import 序列化 public


一、hadoop序列化操作

Writable接口, 是根据  DataInput  和  DataOutput  实现的简单、有效的序列化对象





MR的任意Key和Value必须实现Writable接口 .


MR的任意key必须实现WritableComparable接口












二、自定义Writable,实现MapReduce程序



1、需求内容



日期                         手机号             流量1          流量2



20150403            1380013800           201                 51



20150403            1390013800           202                 52



20150403            1370013800           203                 54



20150403            1360013800           204                 55



统计  每个用户、上行流量、下行流量、总流量





2、程序实现代码



package serializable;
  

 import java.io.DataInput;
  
 import java.io.DataOutput;
  
 import java.io.IOException;
  

 import org.apache.hadoop.io.Writable;
  

 public class FlowBean implements Writable{
  
     
  
      private String mobileNumber;
  
      private long upFlow;
  
      private long downFlow;
  
     
  
      public FlowBean() {
  
      }
  
      public FlowBean(long upFlow, long downFlow,String mobileNumber) {
  
 
 
  
           this.mobileNumber = mobileNumber;
  
           this.upFlow = upFlow;
  
           this.downFlow = downFlow;
  
      }
  

      @Override
  
      public void write(DataOutput out) throws IOException {
  
          
  
           out.writeUTF(mobileNumber);
  
           out.writeLong(upFlow);
  
           out.writeLong(downFlow);
  
      }
  

      @Override
  
      public void readFields(DataInput in) throws IOException {
  
          
  
           this.mobileNumber = in.readUTF();
  
           this.upFlow = in.readLong();
  
           this.downFlow = in.readLong();
  
      }
  

      public String getMobileNumber() {
  
           return mobileNumber;
  
      }
  

      public void setMobileNumber(String mobileNumber) {
  
           this.mobileNumber = mobileNumber;
  
      }
  

      public long getUpFlow() {
  
           return upFlow;
  
      }
  

      public void setUpFlow(long upFlow) {
  
           this.upFlow = upFlow;
  
      }
  

      public long getDownFlow() {
  
           return downFlow;
  
      }
  

      public void setDownFlow(long downFlow) {
  
           this.downFlow = downFlow;
  
      }
  
      @Override
  
      public String toString() {
  
           return  this.upFlow + "\t" + this.downFlow + "\t" + (this.upFlow + this.downFlow);
  
      }
  
 }     
   

 
 

 
 

 
 

 
 
package 
 serializable;
 

 
import 
 java.io.IOException;
 
import 
 java.net.URI;
 

 
import 
 org.apache.commons.lang.StringUtils;
 
import 
 org.apache.hadoop.conf.Configuration;
 
import 
 org.apache.hadoop.fs.FileSystem;
 
import 
 org.apache.hadoop.fs.Path;
 
import 
 org.apache.hadoop.io.LongWritable;
 
import 
 org.apache.hadoop.io.Text;
 
import 
 org.apache.hadoop.mapreduce.Job;
 
import 
 org.apache.hadoop.mapreduce.Mapper;
 
import 
 org.apache.hadoop.mapreduce.Reducer;
 
import 
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
import 
 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 

 
public 
 class 
 FlowSumRunner {
 
                
 
                 
 private 
 static 
 final 
 String 
 HDFS_PATH 
 = 
 "hdfs://cloud01:9000" 
 ;
 
                
 
                 
 public 
 static 
 class 
 FlowSumMapper 
 extends 
 Mapper<LongWritable, Text, Text, FlowBean> {
 

 
                                FlowBean 
 flowBean 
 = 
 null
 ;
 
                                 
 @Override
 
                                 
 protected 
 void 
 map(LongWritable 
 key
 , Text 
 value
 , Context 
 context 
 )
 
                                                                 
 throws 
 IOException, InterruptedException {
 
                                                
 
                                                String[] 
 values 
 = StringUtils.split( 
 value
 .toString(), 
 "\t" 
 );
 
//根据实际位置调整         
 
                                                String 
 mobileNumber 
 = 
 values 
 [1];
 
                                                 
 long 
 upFlow 
 = 
 new 
 Long( 
 values
 [7]);
 
                                                 
 long 
 downFlow 
 = 
 new 
 Long( 
 values
 [8]);
 
                                    
 flowBean 
 = 
 new 
 FlowBean(
 upFlow
 , 
 downFlow
 ,
 mobileNumber 
 );
 
                                    
 context
 .write(
 new 
 Text(
 mobileNumber
 ), 
 flowBean
 );
 
                                }
 
                }
 
                
 
                 
 public 
 static 
 class 
 FlowSumReducer 
 extends 
 Reducer<Text, FlowBean,Text, FlowBean>{
 
                                
 
                                FlowBean 
 v 
 = 
 new 
 FlowBean();
 
                                 
 @Override
 
                                 
 protected 
 void 
 reduce(Text 
 k
 , Iterable<FlowBean> 
 values
 ,Context 
 context 
 )
 
                                                                 
 throws 
 IOException, InterruptedException {
 
                                                
 
                                                 
 long 
 sumUpFlow 
 = 0;
 
                                                 
 long 
 sumDownFlow 
 = 0;
 
                                                
 
                                                 
 for
 (FlowBean 
 value 
 :
 values
 ){
 
                                                                 
 sumUpFlow 
 += 
 value 
 .getUpFlow();
 
                                                                 
 sumDownFlow 
 += 
 value 
 .getDownFlow();
 
                                                }
 
                                                 
 v
 .setUpFlow(
 sumUpFlow 
 );
 
                                                 
 v
 .setDownFlow(
 sumDownFlow 
 );
 
                                                 
 context
 .write(
 k 
 , 
 v
 );
 
                                }
 
                }
 
                
 
                
 
                 
 public 
 static 
 void 
 main(String[] 
 args
 ) {
 

 
                                Configuration 
 conf 
 = 
 new 
 Configuration();
 
                                 
 try 
 {
 
                                                Job 
 job 
 = Job.getInstance( 
 conf
 );
 
                                                 
 job
 .setJarByClass(FlowSumRunner.
 class
 );
 
                                                 
 job
 .setJar(
 "flowSumJob.jar" 
 );
 

 
                                                 
 job
 .setMapperClass(FlowSumMapper.
 class
 );
 
                                                 
 job
 .setReducerClass(FlowSumReducer.
 class
 );
 

 
                                                 
 job
 .setMapOutputKeyClass(Text.
 class
 );
 
                                                 
 job
 .setMapOutputValueClass(FlowBean.
 class
 );
 

 
                                                 
 job
 .setOutputKeyClass(Text.
 class
 );
 
                                                 
 job
 .setOutputKeyClass(FlowBean.
 class
 );
 

 
                                                Path 
 inputPath 
 = 
 new 
 Path(
 HDFS_PATH 
 + 
 "/flow/input"
 );
 
                                                Path 
 outputDir 
 = 
 new 
 Path(
 HDFS_PATH 
 + 
 "/flow/output"
 );
 
                                
 
                                                FileInputFormat. setInputPaths(
 job
 , 
 inputPath
 );
 
                                                
 
                                                FileOutputFormat. setOutputPath(
 job
 , 
 outputDir
 );
 

 
                                                FileSystem 
 fs 
 = FileSystem.get( 
 new 
 URI(
 HDFS_PATH
 ), 
 conf
 );
 
                                                 
 if 
 (
 fs 
 .exists(
 outputDir
 )) {
 
                                                                 
 fs
 .delete(
 outputDir 
 , 
 true
 );
 
                                                }
 
                                                System. exit(
 job
 .waitForCompletion( 
 true
 ) ? 0 : 1);
 
                                } 
 catch 
 (Exception 
 e 
 ) {
 
                                                 
 e
 .printStackTrace();
 
                                }
 
                }
 
}

3、查看统计结果



[hadoop@cloud01 ~]$ hadoop fs -cat /flow/output/part-r-00000



具体执行结果略



标签:mobileNumber,downFlow,upFlow,hadoop,3005,import,序列化,public
From: https://blog.51cto.com/u_14361901/6294504

相关文章

  • 【hadoop】 4001-Partitioner编程
    MapReduce重要组件——Partitioner组件(1)Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理;(2)你可以自定义key的一个分发股则,如数据文件包含不同的省份,而输出的要求是每个省份输出一个文件;(3)提供了一个默认的HashPartitioner......
  • 18、什么是 java 序列化?什么情况下需要序列化?
    序列化就是一种用来处理对象流的机制。将对象的内容流化,将流化后的对象传输于网络之间。序列化是通过实现serializable接口,该接口没有需要实现的方法,implementSerializable只是为了标注该对象是可被序列化的,使用一个输出流(FileOutputStream)来构造一个ObjectOutputStream对象,接......
  • Request类的源码分析和序列化与反序列化
    Request类的源码分析和序列化与反序列化Request类源码分析#源码分析: self._request=request#将老的request传给_requestdef__getattr__(self,attr):try:returngetattr(self._request,attr)#在老的request中找attr找到了就返回这个方法......
  • drf——Request源码分析、序列化组件、序列化类的使用(字段类和参数)、反序列化校验和
    1.Request类源码分析#APIView+Response写个接口#总结: 1.新的request有个data属性,以后只要是在请求body体中的数据,无论什么编码格式,无论什么请求方式2.取文件还是从:request.FILES3.取其他属性,跟之前完全一样request.method.... 原理:新的request重写了__g......
  • Hadoop-3.3.5单机版安装步骤
    1.下载JDK和Hadoop[略]2.解压[略]3.创建hadoop数据存储的目录mkdir-p/home/hadoop/tmp/home/hadoop/hdfs/data/home/hadoop/hdfs/name4.配置JAVA环境和HADOOP_HOMEvim/etc/profile添加如下内容JAVA_HOME=/home/fanqi/jdk1.8.0_202HADOOP_HOME=/home/hadoop/hadoop-......
  • hadoop distcp 参数详解
    distcp是一个用于数据复制的工具,它可以将数据从一个Hadoop集群复制到另一个Hadoop集群。Usage:hadoopdistcp[OPTIONS]<srcurl><desturl>OPTIONS:-p[rbugpcax]Preservestatus(rbugpcax)r:replicationnumber......
  • hadoop中distcp的mapreduce任务中的task0详解及优化
    task0详解distcp是Hadoop中一个用于数据复制的工具,可用于大规模数据复制场景。在distcp执行过程中,会运行多个MapReduce任务,其中第一个任务通常被称为"task0"或"maintask"。task0主要负责以下操作:**1.解析命令行参数并生成distcp配置。**2.预处理数据源列表,对......
  • Restful规范,序列化和反序列化,drf介绍,drf之APIView源码分析
    Restful规范:-RESTful是一种定义API接口的设计风格,AIP接口的编写规范,,尤其适用于前后端分离的应用模式中-这种风格的理念认为后端开发任务就是提供数据的,对外提供的是数据资源的访问接口,所以在定义接口时,客户端访问的URL路径就表示这种要操作的数据资源-我们可以使用......
  • Hadoop入门
    2.Hadoop入门1.分布式和集群分布式:多台服务器相互配合完成一件工作(工作内部,各台服务器所完成的子任务不同)集群:多台服务器联合起来独立完成流水线式工作举例:洗衣店洗衣服如果分为四步:放入洗衣机、晾晒衣服、熨衣服、送给客户,每步都分别由不同种类的员工来做,那么这四个员工......
  • lombok (java 驼峰规范导致的 JSON 序列化问题)
    1、问题描述有一个接收类,出于某种原因(调用第三方接口)会使用首字母大写的情况@DatapublicclassHelloModel{ privateStrigATest; privateStrigBTest;}当我使用这个类接收一个JSON格式的数据,转换为对应的这个HelloModel类时,会出现ATest和BTest都为null的情......