首页 > 其他分享 >mapreduce的多种格式文件输出-自定义OutputFormat

mapreduce的多种格式文件输出-自定义OutputFormat

时间:2024-05-31 18:00:29浏览次数:23  
标签:OutputFormat return String 自定义 格式文件 job context throws out

/**
 * @description: mapreduce多种格式的文件输出方式
 */
public class MultipleTypeOutputFormat<K, V> extends FileOutputFormat<K, V> {
    private static final String ORCEXTENSION = ".orc";
    private static final String CSVEXTENSION = ".csv";
    public static final String SKIP_TEMP_DIRECTORY = "orc.mapreduce.output.skip-temporary-directory";

    public MultipleTypeOutputFormat() {
    }

    /**
     * 具体数据写出对象
     *
     * @param job the information about the current task.
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        // 根据需要,你可以在这里添加逻辑以决定使用SequenceFileOutputFormat还是TextOutputFormat
        //根据基准路径和输出文件截出标识
        String outputNameStr = job.getConfiguration().get(BASE_OUTPUT_NAME);
        String dirOutputStr = job.getConfiguration().get(OUTDIR);
        if (outputNameStr.contains(":")) {
            outputNameStr = outputNameStr.split(":")[1];
        }
        if (dirOutputStr.contains(":")) {
            dirOutputStr = dirOutputStr.split(":")[1];
        }
        //输出格式标识
        String flag = "";
        if (outputNameStr.startsWith(dirOutputStr)) {
            String pathStr = outputNameStr.substring(dirOutputStr.length() + 1, outputNameStr.length());
            if (pathStr.contains("/")) {
                flag = pathStr.split("/")[0];
            } else if (pathStr.contains(File.separator)) {
                flag = pathStr.split(File.separator)[0];
            }
        }
        //从这个方法里面可以获取一个configuration
        Configuration configuration = job.getConfiguration();
        //根据标识输出相应的数据
        switch (flag) {
            case "cleandata"://清洗明细结果ORC格式
            case "basetotaldata"://清洗基准值ORC格式
            case "calcdata"://清洗计算出异常记录结果ORC格式
                //文件的输出路径
                Path file = this.getDefaultWorkFile(job, ORCEXTENSION);
                TypeDescription schema = TypeDescription.fromString("struct<did:string,dno:bigint,dtm:bigint,kind:int,typ:bigint,val:string>");
                OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(new Configuration());
                //该类型新版本新增的
                CompressionKind zlib = CompressionKind.ZSTD;
                Writer writer = OrcFile.createWriter(file, writerOptions.setSchema(schema).compress(zlib));
                OrcMapreduceRecordWriter orcMapreduceRecordWriter = new OrcMapreduceRecordWriter(writer);
                return orcMapreduceRecordWriter;
            case "infodata"://清洗场景识别明细数据ORC格式
                //文件的输出路径
                file = this.getDefaultWorkFile(job, ORCEXTENSION);
                schema = TypeDescription.fromString("struct<did:string,dno:string,dtm:bigint,kind:int,typ:bigint,val:string>");
                writerOptions = OrcFile.writerOptions(new Configuration());
                zlib = CompressionKind.ZSTD;
                writer = OrcFile.createWriter(file, writerOptions.setSchema(schema).compress(zlib));
                orcMapreduceRecordWriter = new OrcMapreduceRecordWriter(writer);
                return orcMapreduceRecordWriter;
            case "cleancsvdata"://清洗结果CSV格式
                file = this.getDefaultWorkFile(job, CSVEXTENSION);
                Configuration conf = job.getConfiguration();
                String keyValueSeparator = conf.get(TextOutputFormat.SEPERATOR, "\t");
                FileSystem fs = file.getFileSystem(conf);
                FSDataOutputStream fileOut = fs.create(file, false);
                return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
        }

        return null;
    }

    /**
     * 输出job的工作路径
     *
     * @param context   the task context
     * @param extension an extension to add to the filename
     * @return
     * @throws IOException
     */
    @Override
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
        if (context.getConfiguration().getBoolean(SKIP_TEMP_DIRECTORY, false)) {
            return new Path(getOutputPath(context), getUniqueFile(context, getOutputName(context), extension));
        } else {
            //自定义 map 输出和 reduce 输出
            String fileNameprefix = context.getConfiguration().get("fileNameprefix");
            if (StringUtils.isNotBlank(fileNameprefix)) {
                String outputPath = context.getConfiguration().get("outputPath");
                String fileName = getMUniqueFile(context, fileNameprefix, extension);
                return new Path(outputPath, fileName);
            } else {
                //默认方式
                FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
                return new Path(committer.getWorkPath(), getUniqueFile(context, getOutputName(context), extension));
            }
        }
    }

    /**
     * 自定义 模拟源码对文件名进行编写
     *
     * @param context
     * @param name
     * @param extension
     * @return
     */
    public synchronized static String getMUniqueFile(TaskAttemptContext context, String name, String extension) {
        TaskID taskId = context.getTaskAttemptID().getTaskID();
        int partition = taskId.getId();
        StringBuilder result = new StringBuilder();
        result.append(name);
        result.append('-');
        result.append(NumberFormat.getInstance().format(partition));
        result.append(extension);
        return result.toString();
    }

    /**
     * 输出job的提交对象
     *
     * @param context the task context
     * @return
     * @throws IOException
     */
    @Override
    public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {

        return super.getOutputCommitter(context);
    }

    protected static class LineRecordWriter<K, V>
            extends RecordWriter<K, V> {
        private static final String utf8 = "UTF-8";
        private static final byte[] newline;

        static {
            try {
                newline = "\n".getBytes(utf8);
            } catch (UnsupportedEncodingException uee) {
                throw new IllegalArgumentException("can't find " + utf8 + " encoding");
            }
        }

        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        /**
         * 按行输出
         * @param out
         * @param keyValueSeparator
         */
        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
            this.out = out;
            try {
                this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
            } catch (UnsupportedEncodingException uee) {
                throw new IllegalArgumentException("can't find " + utf8 + " encoding");
            }
        }

        public LineRecordWriter(DataOutputStream out) {
            this(out, "\t");
        }

        /**
         * Write the object to the byte stream, handling Text as a special
         * case.
         *
         * @param o the object to print
         * @throws IOException if the write throws, we pass it on
         */
        private void writeObject(Object o) throws IOException {
            if (o instanceof Text) {
                Text to = (Text) o;
                out.write(to.getBytes(), 0, to.getLength());
            } else {
                out.write(o.toString().getBytes(utf8));
            }
        }

        public synchronized void write(K key, V value)
                throws IOException {

            boolean nullKey = key == null || key instanceof NullWritable;
            boolean nullValue = value == null || value instanceof NullWritable;
            if (nullKey && nullValue) {
                return;
            }
            if (!nullKey) {
                writeObject(key);
            }
            if (!(nullKey || nullValue)) {
                out.write(keyValueSeparator);
            }
            if (!nullValue) {
                writeObject(value);
            }
            out.write(newline);
        }

        public synchronized void close(TaskAttemptContext context) throws IOException {
            out.close();
        }
    }
}

注意,不同类型的标识取的的比较low,很难通用,下次注意,另外在reduce的输出是的<K,V>指定为

 //指定reduce输出
job.setOutputKeyClass(NullWritable.class);//red输出的key
job.setOutputValueClass(Writable.class);//red输出的value

 

标签:OutputFormat,return,String,自定义,格式文件,job,context,throws,out
From: https://www.cnblogs.com/zyanrong/p/18225044

相关文章

  • 微信小程序下载预览PDF(可自定义文件名称)
    wx.showLoading({title:'加载中',mask:true,})constfileName='测试.pdf'constnewPath=`${wx.env.USER_DATA_PATH}/${fileName}.pdf`;wx.downloadFile({url:�......
  • Nginx企业级负载均衡:技术详解系列(15)—— 一篇文章教你如何自定义错误日志
    你好,我是赵兴晨,97年文科程序员。在今天的文章中,我将带你深入了解Nginx的一个强大功能——自定义错误日志。无论是对于运维人员还是开发者,掌握这一技能都是提升工作效率、优化系统监控的关键。主要是能装13。图片自定义错误日志在Nginx中,自定义错误日志的设置可以让你更......
  • Echarts 实现自定义曲线的弧度
    文章目录问题分析问题分析在ECharts中,可以通过控制数据点的位置来调整曲线的弧度。具体来说,可以通过设置数据项的控制点来调整曲线的形状。ECharts中的折线图和曲线图都是通过控制点来绘制曲线的,可以通过设置数据项的控制点来调整曲线的弧度。以下是一......
  • springboot基本使用十一(自定义全局异常处理器)
    例如:我们都知道在java中被除数不能为0,为0就会报byzero错误@RestControllerpublicclassTestController{@GetMapping("/ex")publicIntegerex(){inta=10/0;returna;}}打印结果:如何将这个异常进行处理?创建全局异常处理类......
  • 云CAD(在线编辑DWG的API)实现自定义实体的详细方法
    前言自定义实体在CAD二次开发中使用的频率较高,本章节主要阐述网页CAD中使用自定义实体的方法,mxcad 可以根据用户的具体需求来创建和管理自定义实体,可以通过从自定义实体类McDbCustomEntity() 中继承实体的名称、属性、方法,也可结合自身需求对自定义实体类中的属性或方法进行重......
  • 【源码】Spring Data JPA原理解析之Repository自定义方法命名规则执行原理(二)
     SpringDataJPA系列1、SpringBoot集成JPA及基本使用2、SpringDataJPACriteria查询、部分字段查询3、SpringDataJPA数据批量插入、批量更新真的用对了吗4、SpringDataJPA的一对一、LazyInitializationException异常、一对多、多对多操作5、SpringDataJPA自定义......
  • JavaDS-学习数据结构之如果从零开始手搓顺序表,顺带学习自定义异常怎么用!
    前言笔者开始学习数据结构了,虽然笔者已经会用了,不管是C++中的stl亦或是Java中的集合,为了算法比赛多少都突击过,但只知其然而不知其所以然,还是会限制发展的,因此,笔者写下这篇博客.内容是手搓一个顺序表.顺带加一点异常的使用,大伙看个乐子就好了.有错误直接私信喷我就......
  • QT事件触发顺序探讨:处理自定义事件与系统事件的冲突
    1.课题背景在项目开发过程中用到了纯按键的QT交互,我们通过自定义以下全局键盘事件类进行交互的实现:classKEYPRESSFILTER_EXPORTKeyPressFilter:publicQObject{Q_OBJECTpublic:staticKeyPressFilter*instance(){if(m_instance==nullptr){......
  • C#自定义事件的写法
    C#事件基于委托例1:只用于学习,理解事件底层原理,不推荐这么写;例2:系统用的就是该方式,例如按钮的Click事件;例3:最简略的写法,但是需要客户代码转换EventArgs;1、事件声明完整格式范例: 1//自定义事件参数,默认以EventArgs结尾,需要继承EventArgs类2publicclassMyEventArgs:......
  • 【源码】Spring Data JPA原理解析之Repository自定义方法命名规则执行原理(一)
     SpringDataJPA系列1、SpringBoot集成JPA及基本使用2、SpringDataJPACriteria查询、部分字段查询3、SpringDataJPA数据批量插入、批量更新真的用对了吗4、SpringDataJPA的一对一、LazyInitializationException异常、一对多、多对多操作5、SpringDataJPA自定......