首页 > 其他分享 >MapReduce实战之压缩/解压缩案例

MapReduce实战之压缩/解压缩案例

时间:2022-11-11 11:03:50浏览次数:43  
标签:实战 解压缩 MapReduce hadoop job io org apache import


1 数据流的压缩和解压缩

CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据。

测试一下如下压缩方式:

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

 

package com.atguigu.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

public static void main(String[] args) throws Exception {
compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
// decompress("e:/hello.txt.bz2");
}

// 压缩
private static void compress(String filename, String method) throws Exception {

// 1 获取输入流
FileInputStream fis = new FileInputStream(new File(filename));

Class codecClass = Class.forName(method);

CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());

// 2 获取输出流
FileOutputStream fos = new FileOutputStream(new File(filename +codec.getDefaultExtension()));
CompressionOutputStream cos = codec.createOutputStream(fos);

// 3 流的对拷
IOUtils.copyBytes(fis, cos, 1024*1024*5, false);

// 4 关闭资源
fis.close();
cos.close();
fos.close();
}

// 解压缩
private static void decompress(String filename) throws FileNotFoundException, IOException {

// 0 校验是否能解压缩
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = factory.getCodec(new Path(filename));

if (codec == null) {
System.out.println("cannot find codec for file " + filename);
return;
}

// 1 获取输入流
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));

// 2 获取输出流
FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));

// 3 流的对拷
IOUtils.copyBytes(cis, fos, 1024*1024*5, false);

// 4 关闭资源
cis.close();
fos.close();
}
}

2 Map输出端采用压缩

即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置:

1)给大家提供的hadoop源码支持的压缩格式有:BZip2Codec 、DefaultCodec

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass WordCountDriver {

publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration configuration = new Configuration();

// 开启map端输出压缩
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

Job job = Job.getInstance(configuration);

job.setJarByClass(WordCountDriver.class);

job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean result = job.waitForCompletion(true);

System.exit(result ? 1 : 0);
}
}

2)Mapper保持不变

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

publicclass WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

@Override
protectedvoid map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 循环写出
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}

3)Reducer保持不变

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

publicclass WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protectedvoid reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {

int count = 0;
// 1 汇总
for(IntWritable value:values){
count += value.get();
}

// 2 输出
context.write(key, new IntWritable(count));
}
}

3 Reduce输出端采用压缩

基于workcount案例处理

1)修改驱动

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
importorg.apache.hadoop.io.compress.DefaultCodec;
importorg.apache.hadoop.io.compress.GzipCodec;
importorg.apache.hadoop.io.compress.Lz4Codec;
importorg.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass WordCountDriver {

publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

job.setJarByClass(WordCountDriver.class);

job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);

// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

boolean result = job.waitForCompletion(true);

System.exit(result?1:0);
}
}

标签:实战,解压缩,MapReduce,hadoop,job,io,org,apache,import
From: https://blog.51cto.com/u_12654321/5843258

相关文章

  • Linux vmstat命令实战详解
    vmstat命令是最常见的Linux/Unix监控工具,可以展现给定时间间隔的服务器的状态值,包括服务器的CPU使用率,内存使用,虚拟内存交换情况,IO读写情况。这个命令是我查看Linux/Unix......
  • 0:Base API-Java API 实战
    目录​​0.1引言​​​​0.2API的定义和用处​​​​0.3Scanner(普通类)​​​​0.4Number(包装类)​​​​0.5Math(工具类)​​​​0.6Random(父子类)​​​​0.7ThreadLoca......
  • 1:Unit test and main function-Java API 实战
    目录​​1.抛出企业问题,脱离main测试,模块化编程​​​​2.Junit单元测试的含义和用途​​​​3.怎么获取各种Jar包?MavenRepository获取各类各个版本的jar,这就是仓库。......
  • 4:File-Java API 实战
    目录​​1.引言​​​​2.绝对路径和相对路径?先学送快递吧!​​​​3.绝对路径​​​​4.相对路径​​​​5.File类​​​​6.Linux上的绝对路径有所不同​​1.引言文......
  • 基于Koa2框架的项目搭建及实战开发
    基于Koa2框架的项目搭建及实战开发Koa是基于Node.js平台的下一代web开发框架,由express原班人马打造,致力于成为一个更小、更富有表现力、更健壮的Web框架。使用k......
  • 工具篇 之 Android WIFI ADB 实战
    LZ-Says:累哇哇。。。前言enmmm,新工作,新起点,新开始。。。今天忘记拿usb线,想着怎么破?enmmm,想了想,突然想到有个WIFIADB,遂,开始一波实战~~~实践直接插件里搜索,AndroidWIFI......
  • 周逸(第八单元)(实例+实战)
    实例01deffun_bmi(person,height,weight):'''功能:根据身高和体重计算BMI指数person:姓名height:身高,单位:米weight:体重,单位:千克'''......
  • 电影推荐系统项目实战:环境配置与搭建:Linux环境下 MongoDB的配置与安装 ----- centos7
    1.在主机中下载好Linux版本的MongoDB压缩包:连接如下:https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel62-3.4.3.tgz 2.打开VM,启动虚拟机(这里是hadoop102)......
  • 强化学习代码实战-04时序差分算法(SARSA)
    importnumpyasnpimportrandom#获取一个格子的状态defget_state(row,col):ifrow!=3:return'ground'ifrow==3andcol==11:......
  • PiL测试实战(上)| 模型生成代码的单元级PiL测试
    前言  对于嵌入式代码,为了测试软件能否在目标芯片上实现预期的功能,通常需要进行PiL测试(Processor-in-the-Loop-Testing)。目前市面上较为常见的嵌入式软件调试工具有P......