1. Counter 计数器
1.1 概述
在执行 MapReduce 程序的时候,控制台输出信息中通常有下面所示片段内容:
File System Counters
FILE: Number of bytes read=136988
FILE: Number of bytes written=589973
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=3245
Map output records=3245
Map output bytes=81674
Map output materialized bytes=88170
Input split bytes=136
Combine input records=0
Spilled Records=3245
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=17
Total committed heap usage (bytes)=1029177344
File Input Format Counters
Bytes Read=136795
INFO - Counters: 30
File System Counters
FILE: Number of bytes read=450348
FILE: Number of bytes written=1269337
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=3245
Map output records=3245
Map output bytes=81674
Map output materialized bytes=88170
Input split bytes=136
Combine input records=0
Combine output records=0
Reduce input groups=55
Reduce shuffle bytes=88170
Reduce input records=3245
Reduce output records=55
Spilled Records=6490
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=17
Total committed heap usage (bytes)=2058354688
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=136795
File Output Format Counters
Bytes Written=1221
可以发现,输出信息中的核心词是 Counters,中文叫做“计数器”。在进行 MapReduce 运算过程中,许多时候,用户希望了解程序的运行情况。Hadoop 内置的计数器功能收集作业的主要统计信息,可以帮助用户理解程序的运行情况,辅助用户诊断故障。
这些记录了该程序运行过程的的一些信息的计数,如 Map input records=3245
,表示 Map 有 3245 条记录。可以看出来这些内置计数器可以被分为若干个组,即对于大多数的计数器来说,Hadoop 使用的组件分为若干类。
1.2 内置计数器
Hadoop 为每个 MapReduce 作业维护一些内置的计数器,这些计数器报告各种指标,例如和 MapReduce 程序执行中每个阶段输入输出的数据量相关的计数器,可以帮助用户进行判断程序逻辑是否生效、正确。
Hadoop 内置计数器根据功能进行分组。每个组包括若干个不同的计数器,分别是:MapReduce 任务计数器(Map-Reduce Framework)、文件系统计数器(File System Counters)、作业计数器(Job Counters)、输入文件任务计数器(File Input Format Counters)、输出文件计数器(File Output Format Counters)。
需要注意的是,内置的计数器都是 MapReduce 程序中全局的计数器,跟 MapReduce 分布式运算没有关系,不是所谓的每个局部的统计信息。
(1)Map-Reduce Framework Counters
计数器名称 | 说明 |
---|---|
MAP_INPUT_RECORDS | 所有 mapper 已处理的输入记录数 |
MAP_OUTPUT_RECORDS | 所有 mapper 产生的输出记录数 |
MAP_OUTPUT_BYTES | 所有 mapper 产生的未经压缩的输出数据的字节数 |
MAP_OUTPUT_MATERIALIZED_BYTES | 所有 mapper 输出后确实写到磁盘上字节数 |
COMBINE_INPUT_RECORDS | 所有 combiner(如果有)已处理的输入记录数 |
COMBINE_OUTPUT_RECORDS | 所有 combiner(如果有)已产生的输出记录数 |
REDUCE_INPUT_GROUPS | 所有 reducer 已处理分组的个数 |
REDUCE_INPUT_RECORDS | 所有 reducer 已经处理的输入记录的个数。每当某个 reducer 的迭代器读一个值时,该计数器的值增加。 |
REDUCE_OUTPUT_RECORDS | 所有 reducer 输出记录数 |
REDUCE_SHUFFLE_BYTES | shuffle 时复制到 reducer 的字节数 |
SPILLED_RECORDS | 所有 map 和 reduce 任务溢出到磁盘的记录数 |
CPU_MILLISECONDS | 一个任务的总 CPU 时间,以毫秒为单位,可由 /proc/cpuinfo 获取。 |
PHYSICAL_MEMORY_BYTES | 一个任务所用的物理内存,以字节数为单位,可由 /proc/meminfo 获取。 |
VIRTUAL_MEMORY_BYTES | 一个任务所用虚拟内存的字节数,由 /proc/meminfo 获取。 |
(2)File System Counters Counters
计数器名称 | 说明 |
---|---|
BYTES_READ | 程序从文件系统中读取的字节数 |
BYTES_WRITTEN | 程序往文件系统中写入的字节数 |
READ_OPS | 文件系统中进行的读操作的数量(例如 open 操作、filestatus 操作) |
LARGE_READ_OPS | 文件系统中进行的大规模读操作的数量 |
WRITE_OPS | 文件系统中进行的写操作的数量(例如 create 操作、append 操作) |
(3)Job Counters
计数器名称 | 说明 |
---|---|
Launched map tasks | 启动的 map 任务数,包括以“推测执行”方式启动的任务。 |
Launched reduce tasks | 启动的 reduce 任务数,包括以“推测执行”方式启动的任务。 |
Data-local map tasks | 与输人数据在同一节点上的 map 任务数 |
Total time spent by all maps in occupied slots (ms) | 所有 map 任务在占用的插槽中花费的总时间(毫秒) |
Total time spent by all reduces in occupied slots (ms) | 所有 reduce 任务在占用的插槽中花费的总时间(毫秒) |
Total time spent by all map tasks (ms) | 所有 MapTask 花费的时间 |
Total time spent by all reduce tasks (ms) | 所有 ReduceTask 花费的时间 |
(4)File Input/Output Format Counters
计数器名称 | 说明 |
---|---|
读取的字节数(BYTES_READ) | 由 map 任务通过 FilelnputFormat 读取的字节数 |
写的字节数(BYTES_WRITTEN) | 由 map 任务(针对仅含 map 的作业)或者 reduce 任务通过 FileOutputFormat 写的字节数 |
1.3 自定义计数器
虽然 Hadoop 内置的计数器比较全面,给作业运行过程的监控带了方便,但是对于一些业务中的特定要求(统计过程中对某种情况发生进行计数统计)MapReduce 还是提供了用户编写自定义计数器的方法。最重要的是,计数器是全局的统计,避免了用户自己维护全局变量的不利性。
自定义计数器的使用分为 2 步:
-
首先通过 context.getCounter() 方法获取一个全局计数器,创建的时候需要指定计数器所属的组名和计数器的名字;
/** * Get the Counter for the given groupName and counterName. * @param counterName counter name * @return the Counter for the given groupName and counterName */ public Counter getCounter(String groupName, String counterName);
-
然后在程序中需要使用计数器的地方,调用 counter 提供的方法即可,如 +1 操作。
2. DB 操作
2.1 概述
通常企业会使用关系型数据来存储业务相关的数据,但随着数据的规模越来越大,尤其是像 MySQL 这种,在单表超过 5 千万条记录时,尽管对表使用了特定的存储引擎和索引优化,但依然不可避免的存在性能下降问题。
此时,我们可以通过使用 MapReduce 从 MySQL 中定期迁移使用频率较低的历史数据到 HDFS 中,一方面可以降低对 MySQL 的存储和计算负载,另一方面,通过分布式计算引擎可以更加高效的处理过去的历史数据。
对于 MapReduce 框架来说,使用 Inputformat 进行数据读取操作,读取的数据首先由 Mapper 处理,然后交给 Reducer 处理,最终使用 OutputFormat 进行数据的输出操作。默认情况下,输入输出的组件实现都是针对文本数据处理的,分别是 TextInputFormat、TextOutputFormat。
为了方便 MapReduce 直接访问关系型数据库,Hadoop 提供了 DBInputFormat 和 DBOutputFormat 两个类。其中 DBInputFormat 负责从数据库中读取数据,而 DBOutputFormat 负责把数据最终写入数据库中。
- DBInputFormat 类用于从 SQL 表读取数据。DBInputFormat 底层一行一行读取表中的数据,返回键值对。其中 key 是 LongWritable 类型,表中数据的记录行号,从 0 开始;value 是 DBWritable 类型,表示该行数据对应的对象类型。
- DBOutputFormat 将 reduce 输出发送到 SQL 表。DBOutputFormat 接收键值对,其中 key 必须具有扩展 DBWritable 的类型。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface DBWritable
/* Objects that are read from/written to a database should implement DBWritable. DBWritable, is similar to Writable except that the write(PreparedStatement) method takes a PreparedStatement, and readFields(ResultSet) takes a ResultSet. */
/* Implementations are responsible for writing the fields of the object to PreparedStatement, and reading the fields of the object from the ResultSet. */
// Example:
// If we have the following table in the database :
CREATE TABLE MyTable (
counter INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
);
// then we can read/write the tuples from/to the table with :
public class MyWritable implements Writable, DBWritable {
private int counter;
private long timestamp;
// Writable#write() implementation
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
// Writable#readFields() implementation
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, counter);
statement.setLong(2, timestamp);
}
public void readFields(ResultSet resultSet) throws SQLException {
counter = resultSet.getInt(1);
timestamp = resultSet.getLong(2);
}
}
2.2 读取 DB
在 mysql 创建表 itheima_goods 并加载数据到表中。要求使用 MapReduce 程序将表中的数据导出存放在指定的文件下。
因为涉及到 Java 操作 mysql,因此需要在 pom.xml 中额外添加 mysql-jdbc 驱动。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.32</version>
</dependency>
a. GoodsBean
定义 GoodsBean 的实体类,用于封装插入表中的数据(对象属性跟表的字段一一对应即可)。并且需要实现序列化机制 Writable。此外,从数据库读取/写入数据库的对象应实现 DBWritable。DBWritable 与 Writable 相似,区别在于 write(PreparedStatement)
方法采用 PreparedStatement,而 readFields(ResultSet)
采用 ResultSet。
public class GoodsBean implements Writable, DBWritable {
/**
* 商品ID
*/
private long goodsId;
/**
* 商品编号
*/
private String goodsSn;
/**
* 商品名称
*/
private String goodsName;
/**
* 市场价
*/
private double marketPrice;
/**
* 门店价
*/
private double shopPrice;
/**
* 总销售量
*/
private long saleNum;
// ... 省略 get/set ...
public void fillFields(long goodsId, String goodsSn, String goodsName, double marketPrice, double shopPrice, long saleNum) {
this.goodsId = goodsId;
this.goodsSn = goodsSn;
this.goodsName = goodsName;
this.marketPrice = marketPrice;
this.shopPrice = shopPrice;
this.saleNum = saleNum;
}
@Override
public String toString() {
return goodsId + "\t" + goodsSn + "\t" + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(goodsId);
out.writeUTF(goodsSn);
out.writeUTF(goodsName);
out.writeDouble(marketPrice);
out.writeDouble(shopPrice);
out.writeLong(saleNum);
}
@Override
public void readFields(DataInput in) throws IOException {
goodsId = in.readLong();
goodsSn = in.readUTF();
goodsName = in.readUTF();
marketPrice = in.readDouble();
shopPrice = in.readDouble();
saleNum = in.readLong();
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setLong(1, goodsId);
statement.setString(2, goodsSn);
statement.setString(3, goodsName);
statement.setDouble(4, marketPrice);
statement.setDouble(5, shopPrice);
statement.setLong(6, saleNum);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
goodsId = resultSet.getLong(1);
goodsSn = resultSet.getString(2);
goodsName = resultSet.getString(3);
marketPrice = resultSet.getDouble(4);
shopPrice = resultSet.getDouble(5);
saleNum = resultSet.getLong(5);
}
}
b. ReadDBMapper
public class ReadDBMapper extends Mapper<LongWritable, GoodsBean, LongWritable, Text> {
Text outValue = new Text();
@Override
protected void map(LongWritable key, GoodsBean value, Mapper<LongWritable, GoodsBean, LongWritable, Text>.Context context) throws IOException, InterruptedException {
outValue.set(value.toString());
context.write(key, outValue);
}
}
c. ReadDBDriver
public class ReadDBDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration config = new Configuration();
DBConfiguration.configureDB(config,
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://192.168.6.160:3306/stock_db",
"root",
"123456");
Job job = Job.getInstance(config, ReadDBDriver.class.getSimpleName());
job.setJarByClass(ReadDBDriver.class);
job.setMapperClass(ReadDBMapper.class);
// MR 程序可以没有 reduce 阶段
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path("/Users/xxx/Downloads/mysqlout"));
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, GoodsBean.class,
"SELECT goodsId,goodsSn,goodsName,marketPrice,shopPrice,saleNum FROM itheima_goods",
"SELECT count(goodsId) FROM itheima_goods");
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new ReadDBDriver(), args);
System.exit(status);
}
}
2.3 写入 DB
a. WriteDBMapper
public class WriteDBMapper extends Mapper<LongWritable, Text, NullWritable, GoodsBean> {
private NullWritable outK = NullWritable.get();
private GoodsBean outV = new GoodsBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, GoodsBean>.Context context) throws IOException, InterruptedException {
// 获取两个计数器,用于统计合法/非法数据条数
Counter sc = context.getCounter("mr2sql", "parseSuccessCounter");
Counter fc = context.getCounter("mr2sql", "parseFailedCounter");
String[] fields = value.toString().split("\\s+");
if (fields.length > 6) {
// 合法数据 —— 提取字段封装成对象
outV.fillFields(Long.parseLong(fields[1]), fields[2], fields[3], Double.parseDouble(fields[4]),
Double.parseDouble(fields[5]), Long.parseLong(fields[6]));
context.write(outK, outV);
sc.increment(1);
} else {
// 非法数据 —— 计入统计
fc.increment(1);
}
}
}
b. WriteDBReducer
在使用 DBOutputFormat 时,要求程序最终输出的 key 必须是继承自 DBWritable 的类型,value 则没有具体要求。
public class WriteDBReducer extends Reducer<NullWritable, GoodsBean, GoodsBean, NullWritable> {
@Override
protected void reduce(NullWritable key, Iterable<GoodsBean> values, Reducer<NullWritable, GoodsBean, GoodsBean, NullWritable>.Context context) throws IOException, InterruptedException {
for (GoodsBean value : values) {
context.write(value, key);
}
}
}
c. WriteDBDriver
public class WriteDBDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration config = new Configuration();
DBConfiguration.configureDB(config,
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://192.168.6.160:3306/stock_db?useUnicode=true&characterEncoding=utf-8",
"root",
"123456");
Job job = Job.getInstance(config, WriteDBDriver.class.getSimpleName());
job.setJarByClass(WriteDBDriver.class);
job.setMapperClass(WriteDBMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(GoodsBean.class);
job.setReducerClass(WriteDBReducer.class);
FileInputFormat.setInputPaths(job, new Path("/Users/xxx/Downloads/mysqlout"));
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "itheima_goods_bak", "goodsId", "goodsSn", "goodsName", "marketPrice", "shopPrice", "saleNum");
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new WriteDBDriver(), args);
System.exit(status);
}
}
3. Join 关联
3.1 概述
在实际的数据库应用中,我们经常需要从多个数据表中读取数据,这时我们就可以使用 SQL 语句中的连接(JOIN),在两个或多个数据表中查询数据。
在使用 MapReduce 框架进行数据处理的过程中,也会涉及到从多个数据集读取数据,进行 join 关联的操作,只不过此时需要使用 Java 代码并根据 MapReduce 的编程规范进行业务的实现。
但是由于 MapReduce 的分布式设计理念的特殊性,因此对于 MapReduce 实现 join 操作具备了一定的特殊性。特殊主要体现在:究竟在 MapReduce 中的什么阶段进行数据集的关联操作,map 阶段还是 reduce 阶段?之间的区别又是什么?
整个 MapReduce 的 join 分为两类:Map-side Join、Reduce-side Join。
3.2 Reduce-side Join
a. 说明
顾名思义,在 reduce 阶段执行 join 关联操作。这也是最容易想到和实现的 join 方式。因为通过 shuffle 过程就可以将相关的数据分到相同的分组中,这将为后面的 join 操作提供了便捷。
基本上,Reduce Side Join 大致步骤如下:
- Mapper 分别读取不同的数据集;
- Mapper 的输出中,通常以 join 的字段作为输出的 key;
- 不同数据集的数据经过 shuffle,key 一样的会被分到同一分组处理;
- 在 reducer 中根据业务需求把数据进行关联整合汇总,最终输出。
【弊端】reduce 端 join 最大的问题是整个 join 的工作是在 reduce 阶段完成的,但是通常情况下 MapReduce 中 reduce 的并行度是极小的(默认是 1 个),这就使得所有的数据都挤压到 reduce 阶段处理,压力颇大。虽然可以设置 reduce 的并行度,但是又会导致最终结果被分散到多个不同文件中。并且在数据从 Mapper 到 Reducer 的过程中,shuffle 阶段十分繁琐,数据集大时成本极高。
b. 案例
有两份结构化的文件:itheima_goods(商品信息)、itheima_order_goods(订单信息)。要求使用 MapReduce 统计出每笔订单中对应的具体的商品名称信息。
# itheima_goods
goodsId(商品id) | goodsSn(商品编号) | goodsName(商品名称)
# itheima_order_goods
orderId(订单ID) | goodsId(商品ID) | payPrice(实际支付价格)
思路分析:
- 使用 Mapper 处理订单数据和商品数据,输出的时候以 goodsId 作为 key。相同 goodsId 的商品和订单项会到同一个 reducer 的同一个分组,在分组中进行订单和商品信息的关联合并。
- 在 MapReduce 程序中可以通过 context 获取到当前处理的切片所属的文件名称。根据文件名来判断当前处理的是订单数据还是商品数据,以此来进行不同逻辑的输出。
- join 处理完之后,最后可以再通过 MapReduce 程序排序功能,将属于同一笔订单的所有商品信息汇聚在一起。
ReduceJoinMapper
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
public static final String ORDER_PREFIX = "order#";
public static final String GOODS_PREFIX = "goods#";
Text outK = new Text();
Text outV = new Text();
String fileName = null;
StringBuilder builder = new StringBuilder(128);
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 获取当前处理的切片所属文件名称
FileSplit split = (FileSplit) context.getInputSplit();
fileName = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 清空 builder
builder.setLength(0);
// 切割处理读取的一行数据
String[] split = value.toString().split("\\|");
// 判断当前处理的文件
if (fileName.contains("order")) {
outK.set(split[1]);
builder.append(split[0]).append("\t").append(split[2]);
outV.set(builder.insert(0, ORDER_PREFIX).toString());
} else {
outK.set(split[0]);
builder.append(split[1]).append("\t").append(split[2]);
outV.set(builder.insert(0, GOODS_PREFIX).toString());
}
// outK:goodsId
context.write(outK, outV);
}
}
ReduceJoinReducer
public class ReduceJoinReducer extends Reducer<Text, Text, Text, Text> {
private List<String> goodsList = new ArrayList<>();
private List<String> orderList = new ArrayList<>();
Text outV = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 按 goodsId 分组,一次 reduce 一个商品所关联的所有订单
for (Text value : values) {
String valueStr = value.toString();
if (valueStr.startsWith(ReduceJoinMapper.GOODS_PREFIX)) {
goodsList.add(valueStr.split("#")[1]);
} else {
orderList.add(valueStr.split("#")[1]);
}
}
int orderSize = orderList.size();
int goodsSize = goodsList.size();
for (int i = 0; i < orderSize; i++) {
for (int j = 0; j < goodsSize; j++) {
outV.set(orderList.get(i) + "\t" + goodsList.get(j));
context.write(key, outV);
}
}
// 复用
orderList.clear();
goodsList.clear();
}
}
ReduceJoinDriver
public class ReduceJoinDriver {
public static void main(String[] args) throws Exception {
// 1. 获取Job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 关联本Driver类
job.setJarByClass(ReduceJoinDriver.class);
// 3. 关联Mapper和Reducer
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
// 4. 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 6. 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交Job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
ReducerJoinSortDemo:对上面 Job 的输出再进行排序
public class ReducerJoinSortDemo {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取Job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 关联本Driver类
job.setJarByClass(ReducerJoinSortDemo.class);
// 3. 关联Mapper和Reducer
job.setMapperClass(ReduceJoinSortMapper.class);
job.setReducerClass(ReduceJoinSortReducer.class);
// 4. 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6. 设置程序的输入输出路径(输入路径是上一个MR的输出)
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交Job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
public static class ReduceJoinSortMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outK = new Text();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,
Text, Text>.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
outK.set(fields[1]);
outV.set(fields[1] + "\t" + fields[0] + "\t" + fields[3] + "\t" + fields[4] + "\t" + fields[2]);
context.write(outK, outV);
}
}
public static class ReduceJoinSortReducer extends Reducer<Text, Text, Text, NullWritable> {
private NullWritable outV = NullWritable.get();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text,
Text, NullWritable>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, outV);
}
}
}
}
3.3 分布式缓存
DistributedCache 是 Hadoop 框架提供的一种机制,可以将 Job 指定的文件,在 Job 执行前,先行分发到 Task 执行的机器上,并有相关机制对 cache 文件进行管理(所以必须在 YARN 模式下使用)。
DistributedCache 能够缓存应用程序所需的文件 (包括文本、档案文件、jar 文件等)。
MapReduce 框架在作业所有 Task 执行之前会把指定的分布式缓存文件拷贝到各个 Task 运行的节点上。
使用步骤:
-
添加缓存文件,可以使用 MapReduce 的 API 添加需要缓存的文件/目录(可以在路径末尾阶段追加
# + '别名'
,在 map 阶段可以使用该别名)。/* 缓存压缩包文件到 task 运行节点的工作目录 */ job.addCacheArchive(URI uri); /* 缓存普通文件到 task 运行节点的工作目录 */ job.addCacheFile(URI uri); //////////////////////////////////////////////// String cache = "hdfs://x.x.x.x:8020/cache/file"; cache += "#myfile"; job.addCacheFile(new Path(cache).toUri(), conf);
-
MapReduce 程序中读取缓存文件:在 Mapper 或 Reducer 的 setup() 中,用 BufferedReader 获取分布式缓存中的文件内容。BufferedReader 是带缓冲区的字符流,能够减少访问磁盘的次数,提高文件读取性能;并且可以一次性读取一行字符。
protected void setup(Context context) throw IOException,InterruptedException { // 读取缓存文件中的内容(也可直接根据别名读取) BufferReader br = new BufferedReader(new InputStreamReader(new FileInputStream("file"), "utf-8")); // ... }
3.4 Map-side Join
a. 说明
Map-side Join 就是在 map 阶段执行 join 关联操作,并且程序也没有了 reduce 阶段,避免了 shuffle 时候的繁琐。实现的关键是使用 MapReduce 的分布式缓存。
尤其是涉及到一大一小数据集的处理场景时,map 端的 join 将会发挥出得天独厚的优势。
Map-side Join 的大致思路如下:
- 首先分析 join 处理的数据集,使用分布式缓存技术将小的数据集进行分布式缓存;
- MapReduce 框架在执行的时候会自动将缓存的数据分发到各个 MapTask 运行的机器上;
- 程序只运行 Mapper,在 Mapper 初始化的时候从分布式缓存中读取小数据集数据,然后和自己读取的大数据集进行 join 关联,输出最终的结果;
- 整个 join 的过程没有 shuffle,没有 Reducer。
【优势】减少 shuffle 时候的数据传输成本,并且 Mapper 的并行度可以根据输入数据量自动调整,充分发挥分布式计算的优势。
b. 案例
需求同上。
思路分析:
- Map-side Join 是指在 Mapper 任务中加载特定数据集,此案例中把商品数据进行分布式缓存,使用 Mapper 读取订单数据和缓存的商品数据进行连接。
- 通常为了方便使用,会在 Mapper 的初始化方法 setup() 中读取分布式缓存文件加载的程序的内存中,便于后续 Mapper 处理数据。
- 因为在 Mapper 阶段已经完成了数据的关联操作,因此程序不需要进行 reduce。需要在 Job 中将 ReduceTask 的个数设置为 0,也就是 Mapper 的输出就是程序最终的输出。
MapJoinMapper
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> goodsMap = new HashMap<>();
Text outK = new Text();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 读取分布式缓存文件(关于路径,直接指定文件名称即可)
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt"), "utf-8"));
String line = null;
while ((line = br.readLine()) != null) {
String[] fields = line.split("\\|");
// 把读取的缓存内容添加到map中
goodsMap.put(fields[0], fields[1] + "\t" + fields[2]);
}
IOUtils.closeStream(br);
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\\|");
String goodsInfo = goodsMap.get(fields[1]);
outK.set(value + "\t" + goodsInfo);
context.write(outK, NullWritable.get());
}
}
MapJoinDriver
public class MapJoinDriver {
public static void main(String[] args) throws Exception {
// 1. 获取Job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 关联本Driver类
job.setJarByClass(MapJoinDriver.class);
// 3. 关联Mapper和Reducer
job.setMapperClass(MapJoinMapper.class);
job.setNumReduceTasks(0);
// 4. 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6. 添加分布式缓存文件
job.addCacheFile(new URI("/data/join/cache/itheima_goods.txt"));
// 7. 设置输入/输出目录
FileInputFormat.setInputPaths(job, new Path("/data/join/input"));
FileOutputFormat.setOutputPath(job, new Path("/data/join/output"));
// 8. 提交Job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
分布式缓存的使用必须使用 MapReduce 的 YARN 模式运行!
将 jar 包上传集群执行命令:hadoop jar xxx.jar
,注意保证 YARN 集群提前启动成功~
4. MR 工作流
使用 Hadoop 里面的 MapReduce 来处理海量数据是非常简单方便的,但有时候我们的应用程序,往往需要多个 MR 作业来计算结果。
比如说一个最简单的使用 MR 提取海量搜索日志的 TopN 的问题,注意,这里面其实涉及了两个 MR 作业,第一个是词频统计,第两个是排序求 TopN,这显然是需要两个 MapReduce 作业来完成的。其他的比如一些数据挖掘类的作业,常常需要迭代组合好几个作业才能完成,这类作业类似于 DAG 类的任务,各个作业之间是具有先后,或相互依赖的关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。
具有依赖式的作业提交后,Hadoop 会根据依赖的关系,先后执行的 Job 任务,每个任务的运行都是独立的。
在 Hadoop 里实际上提供了 JobControl 类来组合一个具有依赖关系的作业,在新版的 API 里,又新增了 ControlledJob 类,细化了任务的分配,通过这两个类,我们就可以轻松的完成类似 DAG 作业的模式,这样我们就可以通过一个提交来完成原来需要提交 2 次的任务,大大简化了任务的繁琐度。
- JobControl
- 工作流 Job 控制器,一次可以提交、管理多个 Job;
- JobControl 类实现了线程 Runnable 接口,需要实例化一个线程来让它启动。
- ControlledJob
- 可以将普通作业包装成受控作业。并且支持设置依赖关系;
- Hadoop 会根据依赖的关系,先后执行 Job 任务,每个任务的运行都是独立的。
【案例】针对 MR Reduce-side Join 方式处理订单和商品数据之间的关联,需要进行两步程序处理,首先把两个数据集进行 join 操作,然后针对 join 的结果进行排序,保证同一笔订单的商品数据聚集在一起。
两个程序带有依赖关系,可以使用工作流进行任务的设定,依赖的绑定,一起提交执行。
public class MRJobFlow {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
// 1. 将普通作业包装成受控作业
ControlledJob controlledJob1 = new ControlledJob(conf);
controlledJob1.setJob(getJob1(conf));
ControlledJob controlledJob2 = new ControlledJob(conf);
controlledJob2.setJob(getJob2(conf));
// 2. 设置作业间的依赖关系
controlledJob2.addDependingJob(controlledJob1);
// 3. 创建主控制器,控制job1、job2一起提交
JobControl mainCtrl = new JobControl("myctrl");
mainCtrl.addJob(controlledJob1);
mainCtrl.addJob(controlledJob2);
// 4. 另起线程执行 JobControl
Thread t = new Thread(mainCtrl);
t.start();
// 5. 监测工作流完成情况
while (true) {
if (mainCtrl.allFinished()) {
System.out.println("SUCCESS=>" + mainCtrl.getSuccessfulJobList());
mainCtrl.stop();
break;
}
}
}
private static Job getJob1(Configuration conf) throws IOException {
// 1. 创建Job对象
Job job = Job.getInstance(conf);
// 2. 关联本Driver类
job.setJarByClass(ReduceJoinDriver.class);
// 3. 关联Mapper和Reducer
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
// 4. 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 6. 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("/.../reduce-join/input"));
FileOutputFormat.setOutputPath(job, new Path("/.../reduce-join/output"));
return job;
}
private static Job getJob2(Configuration conf) throws IOException {
// 1. 创建Job对象
Job job = Job.getInstance(conf);
// 2. 关联本Driver类
job.setJarByClass(ReducerJoinSort.class);
// 3. 关联Mapper和Reducer
job.setMapperClass(ReducerJoinSort.ReduceJoinSortMapper.class);
job.setReducerClass(ReducerJoinSort.ReduceJoinSortReducer.class);
// 4. 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6. 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("/.../reduce-join/output"));
FileOutputFormat.setOutputPath(job, new Path("/.../reduce-join/sort"));
return job;
}
}
标签:11,Text,MapReduce,public,job,Job,new,class
From: https://www.cnblogs.com/liujiaqi1101/p/17523541.html