首页 > 其他分享 >Hadoop序列化之MapReduce案例

Hadoop序列化之MapReduce案例

时间:2022-11-18 15:00:23浏览次数:45  
标签:hadoop Hadoop MapReduce org apache import 序列化 public

Hadoop序列化

序列化概述

序列化就是把内存中的对象、转换成字节系列(或者其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

1、JAVA序列化和hadoop序列化

java序列化:java序列化时一个重量级序列化框架,一个对象对序列化后,会附带很多额外的信息(包括校验信息,Header、继承体系),不便于在网络中高效传输。

Hadoop序列化特点

  • 紧凑:高效使用存储空间
  • 快速:读写数据的额外空间小
  • 可拓展:随着通信协议的升级而升级
  • 互操作:支持多语言的交互

hadoop的序列化 Writable

1)自定的类需要实现Writable接口

2)提高无参数的构造器(反序列化时通过反射的方式调用无参数构造器来创建对象)

3)重写write方法实现序列化的过程(自定义序列化的内容)

4)重写readFields方法实现反序列化过程

5)如果自定义的类需要作为输出的key或者value来使用的化,一般建议重写ToString方法,因为hadoop会默认调用对象的toString方法进行输出

Haoop序列化实例

在MapReduce之wordcount实例中已经对源码进行分析,这里不对源码继续分析

1、新建一个类进行封装手机号

FlowBean.java

package Wtritable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
* 用于封装一个手机号的 上行流量 下行流量 总流量
* 因为flowBean要作为value在mr过程中传入,且会落盘,因此需要支持序列化和反序列化
*
* */
public class FlowBean implements Writable {
    private Long upFlow;// 设置上行流量
    private Long downFlow;//设置下行流量
    private  Long sumFlow;// 设置总流量

    public  FlowBean(){}
    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public void  setSumFlow(){
        this.sumFlow=this.getUpFlow()+this.getDownFlow();
    }

    // 序列化方法
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    // 反序列化方法
    /* 注意:反序列融化读取数据孙粗需要与反序列化写入数据的顺序一致
    * */
    public void readFields(DataInput dataInput) throws IOException {
    this.upFlow= dataInput.readLong();
    this.downFlow=dataInput.readLong();
    this.sumFlow=dataInput.readLong();
    }
    @Override
    public String toString() {
        return this.upFlow+"\t"+this.downFlow+'\t'+this.sumFlow;
    }
}

2、重写Mapper类

FlowMapper.java

package Wtritable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
    // 定义写出的key
    private  Text outk=new Text();
    // 定义写出的value
    private  FlowBean outv=new FlowBean();

    @Override
    protected void map(LongWritable key, Text vale,Context context) throws IOException, InterruptedException {
        //1、将读取的一行数据转回String
        // 数据格式  12345567788 234 342324 200
        String line=vale.toString();
        //2、切割数据
        String[] flowMsg=line.split("\t");
        //3、封装数据
        outk.set(flowMsg[1]);
        // 4、封装value
        outv.setUpFlow(Long.parseLong(flowMsg[flowMsg.length-3]));
        outv.setDownFlow(Long.parseLong(flowMsg[flowMsg.length-2]));
        outv.setSumFlow();
        //5、写出
        context.write(outk,outv);
    }
}

3、重写Reudcer类

FlowReudecr.java

package Wtritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {
    //写出的value
    FlowBean outv=new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        long totalUpFlow=0;
        long totalDownFlow=0;
        long totalSumFlow=0;
        //1、迭代values,将相同的key和values汇总到一起
        for(FlowBean bean:values){
            totalUpFlow+=bean.getUpFlow();
            totalDownFlow+=bean.getDownFlow();
            totalSumFlow+=bean.getSumFlow();
        }
        //2、封装value
        outv.setUpFlow(totalUpFlow);
        outv.setDownFlow(totalDownFlow);
        outv.setSumFlow();

        //3、写出
        context.write(key,outv);
    }
}

4、执行代码

FlowDriver.java内容

package Wtritable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1、创建配置环境
        Configuration conf=new Configuration();
        // 2、创建Job环境
        Job job=Job.getInstance(conf);
        //3、关联驱动类
        job.setJarByClass(FlowDriver.class);
        //4、关联Mapper和Reducer类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //5、设置map输出的k和v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 6、设置最终输出的k和v的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //7、设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\\踩着上帝的小丑\\大数据测试数据\\a"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\踩着上帝的小丑\\大数据测试数据\\b")); //自己的路径
        // 8、提交数据
        job.waitForCompletion(true);
    }
}

标签:hadoop,Hadoop,MapReduce,org,apache,import,序列化,public
From: https://www.cnblogs.com/zt123456/p/16903245.html

相关文章