首页 > 其他分享 >Hadoop序列化

Hadoop序列化

时间:2023-02-19 11:24:13浏览次数:41  
标签:downFlow upFlow Hadoop long import 序列化 public

序列化定义

  把内存中的数据持久化(把内存中的对象转换为字节码文件存储带磁盘上)和网络传输。

反序列化定义

  反序列化就是把接收到的字节序列(或其它协议传输的数据)或持久化的磁盘数据转换为内存对象。

为什么进行序列化操作?

  一般内存对象断电时就会消失,而且只能由本地进程去使用,序列化就可以存储内存对象并将对象发送到远程计算机。

Java序列化和Hadoop序列化对比

  Java的Serizable是重量级的序列化框架吗,一个对象序列化后会有很多额外的校验信息,不便在网络中传输。

  hadoop序列化更加紧凑(合理使用存储空间)快速(读写数据的额外开销少)互操作(支持多种语言的交互)

hadoop自定义数据序列化

  一般常用数据序列化的类型不能满足开发需要,比如自己定义一个bean类,如何对自定义的数据类型序列化呢?

  (1)实现Writable接口

  (2)重写序列化和反序列化方法。

  (3)反序列化时要反射调用空参构造函数,要有空参构造

  (4)反序列化和序列化的顺序要一致

  (5)要把结果显示到文件中需要重写toString(),可用"\t"分开方便后续使用。

  (6)如果想把bean作为key传输需要实现Comparable(MapReduce中的shuffule要求对key必须能排序)。

序列化案例

 统计一个手机号的上行流量、下行流量和总流量

  (1)输入数据

  

(2)输入数据格式:

id  手机号  ip  域名  上行流量  下行流量  网络状态码 

(3)输出数据格式

手机号  上行流量  下行流量  总流量

(4)根据输入、输出格式分析可知

  map阶段输出(reduce输入)和reduce输出的key是手机号,而map阶段输出的value要包括上行流量、下行流量、总流量,所以设计封装bean作为value以及作为reduce的输入value,然后将bean以给出的格式输出到文件中(重写toString()方法),完成操作。

(5)代码编写

  FlowBean

package com.rsh.mapreduce.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sunFlow) {
        this.sumFlow = sunFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    public FlowBean() {
    }


    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

FlowMapper

package com.rsh.mapreduce.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sunFlow) {
        this.sumFlow = sunFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    public FlowBean() {
    }


    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

FlowReducer

package com.rsh.mapreduce.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {

    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        long totalUp = 0;
        long totalDown = 0;

        //遍历相同号码的上行、下行流量、总流量进行累加
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }

        //封装outKV
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();

        //写出outK outV
        context.write(key,outV);
    }
}

FlowDriver

package com.rsh.mapreduce.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //关联本driver类
        job.setJarByClass(FlowDriver.class);

        //关联Mapper、Reducer类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //设置Map的outKV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设置程序最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设置程序的输入输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\\hadoopMR\\MRInput\\flow.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\hadoopMR\\MROutput4"));

        //提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

 

 

  

 

标签:downFlow,upFlow,Hadoop,long,import,序列化,public
From: https://www.cnblogs.com/20203923rensaihang/p/17134193.html

相关文章

  • Django Rest Frame work 如何使用serializers序列化函数新手教程
    DjangoRestFramework如何使用serializers序列化   DjangoRestFramework提供了serializers模块,用于序列化和反序列化模型实例以及原生数据类型......
  • Django Rest Frame work 如何使用serializers序列化
    DjangoRestFramework如何使用serializers序列化   DjangoRestFramework提供了serializers模块,用于序列化和反序列化模型实例以及Python原生数......
  • serializers序列化函数简单入门
    1.创建Django项目和应用程序首先,我们需要创建一个Django项目和一个Django应用程序。如果你已经有了Django项目和应用程序,请跳过这一步。$django-adminstartprojectm......
  • Hadoop-HDFS-shell命令
      第2章HDFS的Shell操作(开发重点)2.1基本语法hadoopfs具体命令 OR hdfsdfs具体命令两个是完全相同的。2.2命令大全[[email protected]]$......
  • Hadoop开启Yarn的日志监控功能
    1.开启JobManager日志(1)编辑NameNode配置文件${hadoop_home}/etc/hadoop/yarn-site.xml和mapred-site.xml编辑yarn-site.xml<!--SitespecificYARNconfigurationproperti......
  • hadoop组件面试常见问题
    1、谈谈对HDFS的理解?HDFS这种存储适合哪些场景?HDFS即HadoopDistributedFileSystem,Hadoop分布式文件系统。它为的是解决海量数据的存储与分析的问题,它本身是源于Goole在......
  • Hadoop
    HadoopHDFS、Yarn、MapReduceHadoop集群环境搭建完全分布式环境,伪分布式将其中的多台服务器改为一台,并将配置文件中的相关内容更改即可1、安装Linux系统模型机关闭......
  • Hadoop数据存储及管理
    一、分布式文件存储面临的挑战1.海量数据存储问题采用多台服务器,支持横向扩展2.海量数据问题查询便捷问题使用元数据记录文件和机器的位置信息3.大文件传输效率慢......
  • .Net Core WebAPI 序列化时忽略空值字段
    特定实体publicclassAPIResponseModel{///<summary>///返回标识,200:成功,其它:失败///</summary>publicstringCo......
  • 8.drf-序列化器
    在序列化类中,如果想使用request,则可以通过self.context['request']获取序列化器的主要由两大功能-对请求的数据进行校验(底层调用的是Django的Form和ModelForm)-对数......