首页 > 其他分享 >实现mapreduce多文件自定义输出

实现mapreduce多文件自定义输出

时间:2023-09-20 12:03:12浏览次数:46  
标签:输出 自定义 hadoop mapreduce job key org apache import


 

普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。

如果只是想做到输出结果的文件名可控,实现自己的LogNameMultipleTextOutputFormat类,设置jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);就可以了,但是这种方式只限于使用旧版本的hadoop api.如果想采用新版本的api接口或者自定义输出内容的格式等等更多的需求,那么就要自己动手重写一些hadoop api了。

    首先需要构造一个自己的MultipleOutputFormat类实现FileOutputFormat类(注意是org.apache.hadoop.mapreduce.lib.output包的FileOutputFormat)

 

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;


/**
 * This abstract class extends the FileOutputFormat, allowing to write the
 * output data to different output files. There are three basic use cases for
 * this class. 
 * Created on 2012-07-08
 * @author zhoulongliu
 * @param <K>
 * @param <V>
 */
public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends
        FileOutputFormat<K, V> {


   //接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名
    private MultiRecordWriter writer = null;


    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        if (writer == null) {
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));
        }
        return writer;
    }


    /**
     * get task output path
     * @param conf
     * @return
     * @throws IOException
     */
    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) {
            workPath = ((FileOutputCommitter) committer).getWorkPath();
        } else {
            Path outputPath = super.getOutputPath(conf);
            if (outputPath == null) {
                throw new IOException("Undefined job output-path");
            }
            workPath = outputPath;
        }
        return workPath;
    }


    /**
     * 通过key, value, conf来确定输出文件名(含扩展名) Generate the file output file name based
     * on the given key and the leaf file name. The default behavior is that the
     * file name does not depend on the key.
     * 
     * @param key the key of the output data
     * @param name the leaf file name
     * @param conf the configure object
     * @return generated file name
     */
    protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);


   /**
    * 实现记录写入器RecordWriter类
    * (内部类)
    * @author zhoulongliu
    *
    */
    public class MultiRecordWriter extends RecordWriter<K, V> {
        /** RecordWriter的缓存 */
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;
        private TaskAttemptContext job = null;
        /** 输出目录 */
        private Path workPath = null;


        public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
            super();
            this.job = job;
            this.workPath = workPath;
            recordWriters = new HashMap<String, RecordWriter<K, V>>();
        }


        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
            while (values.hasNext()) {
                values.next().close(context);
            }
            this.recordWriters.clear();
        }


        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            // 得到输出文件名
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
           //如果recordWriters里没有文件名,那么就建立。否则就直接写值。
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);
            if (rw == null) {
                rw = getBaseRecordWriter(job, baseName);
                this.recordWriters.put(baseName, rw);
            }
            rw.write(key, value);
        }


        // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException,
                InterruptedException {
            Configuration conf = job.getConfiguration();
           //查看是否使用解码器  
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = ",";
            RecordWriter<K, V> recordWriter = null;
            if (isCompressed) {
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                //这里我使用的自定义的OutputFormat 
                recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
                        keyValueSeparator);
            } else {
                Path file = new Path(workPath, baseName);
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                //这里我使用的自定义的OutputFormat 
                recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
            }
            return recordWriter;
        }
    }


}

 接着你还需要自定义一个LineRecordWriter实现记录写入器RecordWriter类,自定义输出格式。

 

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * 
 * 重新构造实现记录写入器RecordWriter类
 * Created on 2012-07-08
 * @author zhoulongliu
 * @param <K>
 * @param <V>
 */
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {

    private static final String utf8 = "UTF-8";//定义字符编码格式
    private static final byte[] newline;
    static {
        try {
            newline = "\n".getBytes(utf8);//定义换行符
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
        }
    }
    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

     //实现构造方法,出入输出流对象和分隔符
    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
        this.out = out;
        try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
        }
    }

    public LineRecordWriter(DataOutputStream out) {
        this(out, "\t");
    }

    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
        } else {
            out.write(o.toString().getBytes(utf8));
        }
    }
   
    /**
     * 将mapreduce的key,value以自定义格式写入到输出流中
     */
    public synchronized void write(K key, V value) throws IOException {
        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }
        if (!nullKey) {
            writeObject(key);
        }
        if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
        }
        if (!nullValue) {
            writeObject(value);
        }
        out.write(newline);
    }

    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.close();
    }

}

 接着,你实现刚刚重写MultipleOutputFormat类中的generateFileNameForKeyValue方法自定义返回需要输出文件的名称,我这里是以key值中以逗号分割取第一个字段的值作为输出文件名,这样第一个字段值相同的会输出到一个文件中并以其值作为文件名。

 

public static class VVLogNameMultipleTextOutputFormat extends MultipleOutputFormat<Text, NullWritable> {
        
        @Override
        protected String generateFileNameForKeyValue(Text key, NullWritable value, Configuration conf) { 
            String sp[] = key.toString().split(",");
            String filename = sp[1];
            try {
                Long.parseLong(sp[1]);
            } catch (NumberFormatException e) {
                filename = "000000000000";
            }
            return filename;
        }


    }

  最后就是在job调用时设置了

Configuration conf = getConf();
        Job job = new Job(conf);
        job.setNumReduceTasks(12);
        ......
        job.setMapperClass(VVEtlMapper.class); 
        job.setReducerClass(EtlReducer.class);
        job.setOutputFormatClass(VVLogNameMultipleTextOutputFormat.class);//设置自定义的多文件输出类
       FileInputFormat.setInputPaths(job,new Path(args[0]));
       FileOutputFormat.setOutputPath(job,new Path(args[1]));
       FileOutputFormat.setCompressOutput(job, true);//设置输出结果采用压缩 
       FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class); //设置输出结果采用lzo压缩

   ok,这样你就完成了支持新的hadoop api自定义的多文件输出mapreduce编写。

 

 

标签:输出,自定义,hadoop,mapreduce,job,key,org,apache,import
From: https://blog.51cto.com/u_16255870/7535851

相关文章

  • 软件测试|Python中如何控制输出小数点位数
    简介在数据处理、科学计算和金融分析等领域,经常需要对浮点数的输出进行格式化,以控制小数点后的位数。Python提供了多种方法来实现这个目标。在本文中,我们将深入探讨几种指定输出小数点位数的方法,帮助我们在不同场景下选择合适的方式。使用字符串格式化Python的字符串格式化功能非常......
  • el-table中自定义悬浮提示结构,添加复制功能
    效果展示:代码:代码copyText(text){navigator.clipboard.writeText(text).then(()=>{this.$message.success("文本复制成功");}).catch(()=>{this.$message.error("文本复制失败");......
  • 表格的自定义排序 编辑 拖拽 缩放
    终于能闲下来做点自己想做的事情了.. 简单表格排序  可以双击编辑自定义编辑后的规则 可拖动列进行列替换 可推动边框进行列宽度的缩放  ie6下中文不自动换行 非ie下字母和数字也不自动换行确实让人恼火 chrome浏览器下点击运行好像问题很大 拿到本地测试会比较好<!......
  • 第05章-自定义函数和JSON数据解析
    目录5.1实现自定义UDF25.2实现自定义UDTF35.3实现自定义UDAF45.4解析JSON数据65.4.1解析OBJECT数据65.4.2解析ARRAY数据75.4.3禁止使用get_json_object函数8第05章自定义函数和JSON数据解析自定义函数简介有一些sql很难处理的逻辑,我们可以使用自定义函数去处理。比......
  • uniapp项目实践总结(十八)自定义多列瀑布流组件
    导语:有时候展示图片等内容,会遇到图片高度不一致的情况,这时候就不能使用等高双列或多列展示了,这时候会用到瀑布流的页面布局,下面就一起探讨一下瀑布流的实现方法。目录准备工作原理分析实战演练案例展示准备工作在pages/index文件夹下面新建一个waterfall.vue的组件;按......
  • 在C#中如何自定义配置上周和本周起始日来查询业务数据?
    作者:西瓜程序猿主页传送门:https://blog.51cto.com/kimiliucn前言在做某个报表管理功能时,有一个需求:需要根据自定义配置的[周起始日]来统计上周、本周的订单数据。在C#中并没有封装的方法根据我们需要来直接获取上一周某天到某天、本周某天到某天,所以需要我们自己封装方法来实现(我们......
  • 在C#中如何自定义配置上周和本周起始日来查询业务数据?
    作者:西瓜程序猿主页传送门:https://www.cnblogs.com/kimiliucn前言在做某个报表管理功能时,有一个需求:需要根据自定义配置的[周起始日]来统计上周、本周的订单数据。在C#中并没有封装的方法根据我们需要来直接获取上一周某天到某天、本周某天到某天,所以需要我们自己封装方法......
  • 11-NO-GUI模式运行测试以及输出测试结果
    CommandLine运行Locust性能测试一、使用headless参数,直接运行测试实际压测场景,使用linux服务器作为压测机一般是没有可视化桌面的,因此需要用到--headless参数来来运行locust测试,如下locust-fyourlocustfile.py--headless在命令行中输入后回车,locust就会自动开始执行yourl......
  • 【JavaScript保姆级教程】输出函数和初识变量
    @TOC前言JavaScript是一种强大的脚本语言,广泛应用于网页开发和应用程序编写。本文将全面介绍JavaScript中输出内容的方法,包括使用document.write()函数、调试工具如console.log()和对话框函数如alert(),以及变量的声明和赋值。此外,我们还将探索输入提示框prompt()函数的使用方法。深......
  • 【JavaScript保姆级教程】输出函数和初识变量
    @TOC前言JavaScript是一种强大的脚本语言,广泛应用于网页开发和应用程序编写。本文将全面介绍JavaScript中输出内容的方法,包括使用document.write()函数、调试工具如console.log()和对话框函数如alert(),以及变量的声明和赋值。此外,我们还将探索输入提示框prompt()函数的使用方法。深......