打包Mapreduce代码以及自定义类型
打包wordCount类
使用maven的assembly:assumbly插件
会生成如下的target打包文件,选择下方的mapreduce_test-1.0-SNAPSHOT-jar-with-dependencies.jar,这是包含依赖文件的jar包,将其传入虚拟机
在启动mapreduce之前,前往yarn-site.xml中设置内存,防止内容过大导致任务失败
启动mapreduce
hadoop jar /opt/jar/mapreduce_test-1.0-SNAPSHOT-jar-with-dependencies.jar mapreduce.wordcount.WordCountDriver "/hdfs_api" "/output"
使用自定义的类型进行mapreduce计算
如要计算如下的数据,计算每个人的购物总金额,则应当自定义一个使用了writeable接口的类
1,小明,男,iphone14,5999,1
2,小华,男,飞天茅台,2338,2
3,小红,女,兰蔻小黑瓶精华,1080,1
4,小魏,未知,米家走步机,1499,1
5,小华,男,长城红酒,158,10
6,小红,女,珀莱雅面膜,79,2
7,小华,男,珠江啤酒,11,3
8,小明,男,Apple Watch 8,2999,1
UserSale.java
package mapreduce.model;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义bean类实现Hadoop的序列化和反序列化
*
* 1.bean类实训writable接口
* 2.bean类必须提供空参构造方法
* 3.重写序列化方法write()
* 4.重写反序列化方法 反序列化的属性顺序必须和序列化顺序一致
* 5.重写toSting方法
*/
public class UserSale implements Writable {
//销售id
private int saleId;
//用户名称
private String userName;
//用户性别
private String sex;
//商品名称
private String goodsName;
//商品单价
private int price;
//购买数量
private int saleCount;
//购买总价
private int totalPrice;
//空参构造方法
public UserSale() {
}
//重写toString方法
@Override
public String toString() {
return " "+totalPrice;
}
//重写序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(saleId);
dataOutput.writeUTF(userName);
dataOutput.writeUTF(sex);
dataOutput.writeUTF(goodsName);
dataOutput.writeInt(price);
dataOutput.writeInt(saleCount);
dataOutput.writeInt(totalPrice);
}
//
@Override
public void readFields(DataInput dataInput) throws IOException {
this.saleId=dataInput.readInt();
this.userName=dataInput.readUTF();
this.sex=dataInput.readUTF();
this.goodsName=dataInput.readUTF();
this.price=dataInput.readInt();
this.saleCount=dataInput.readInt();
this.totalPrice=dataInput.readInt();
}
public int getSaleId() {
return saleId;
}
public void setSaleId(int saleId) {
this.saleId = saleId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public int getSaleCount() {
return saleCount;
}
public void setSaleCount(int saleCount) {
this.saleCount = saleCount;
}
public int getTotalPrice() {
return totalPrice;
}
public void setTotalPrice(int totalPrice) {
this.totalPrice = totalPrice;
}
public void setTotalPrice() {
this.totalPrice = this.price * this.saleCount;
}
}
UserSaleMapper.java
package mapreduce.UserSale;
import mapreduce.model.UserSale;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class UserSaleMapper extends Mapper<LongWritable, Text, Text, UserSale> {
//创建输出value对象
private UserSale valueOut = new UserSale();
//创建输出key对象
private Text keyOut = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserSale>.Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//根据分隔符拆分数据
String[] saleDetails = line.split(",");
//封装对象
valueOut.setSaleId(Integer.parseInt(saleDetails[0]));
valueOut.setUserName(saleDetails[1]);
valueOut.setSex(saleDetails[2]);
valueOut.setGoodsName(saleDetails[3]);
valueOut.setPrice(Integer.parseInt(saleDetails[4]));
valueOut.setSaleCount(Integer.parseInt(saleDetails[5]));
//计算总价
valueOut.setTotalPrice();
System.out.println(valueOut.toString());
keyOut.set(saleDetails[1]);
context.write(keyOut, valueOut);
}
}
UserSaleReducer.java
package mapreduce.UserSale;
import mapreduce.model.UserSale;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class UserSaleMapper extends Mapper<LongWritable, Text, Text, UserSale> {
//创建输出value对象
private UserSale valueOut = new UserSale();
//创建输出key对象
private Text keyOut = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserSale>.Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//根据分隔符拆分数据
String[] saleDetails = line.split(",");
//封装对象
valueOut.setSaleId(Integer.parseInt(saleDetails[0]));
valueOut.setUserName(saleDetails[1]);
valueOut.setSex(saleDetails[2]);
valueOut.setGoodsName(saleDetails[3]);
valueOut.setPrice(Integer.parseInt(saleDetails[4]));
valueOut.setSaleCount(Integer.parseInt(saleDetails[5]));
//计算总价
valueOut.setTotalPrice();
System.out.println(valueOut.toString());
keyOut.set(saleDetails[1]);
context.write(keyOut, valueOut);
}
}
UserSaleDriver.java
package mapreduce.model;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义bean类实现Hadoop的序列化和反序列化
*
* 1.bean类实训writable接口
* 2.bean类必须提供空参构造方法
* 3.重写序列化方法write()
* 4.重写反序列化方法 反序列化的属性顺序必须和序列化顺序一致
* 5.重写toSting方法
*/
public class UserSale implements Writable {
//销售id
private int saleId;
//用户名称
private String userName;
//用户性别
private String sex;
//商品名称
private String goodsName;
//商品单价
private int price;
//购买数量
private int saleCount;
//购买总价
private int totalPrice;
//空参构造方法
public UserSale() {
}
//重写toString方法
@Override
public String toString() {
return " "+totalPrice;
}
//重写序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(saleId);
dataOutput.writeUTF(userName);
dataOutput.writeUTF(sex);
dataOutput.writeUTF(goodsName);
dataOutput.writeInt(price);
dataOutput.writeInt(saleCount);
dataOutput.writeInt(totalPrice);
}
//
@Override
public void readFields(DataInput dataInput) throws IOException {
this.saleId=dataInput.readInt();
this.userName=dataInput.readUTF();
this.sex=dataInput.readUTF();
this.goodsName=dataInput.readUTF();
this.price=dataInput.readInt();
this.saleCount=dataInput.readInt();
this.totalPrice=dataInput.readInt();
}
public int getSaleId() {
return saleId;
}
public void setSaleId(int saleId) {
this.saleId = saleId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public int getSaleCount() {
return saleCount;
}
public void setSaleCount(int saleCount) {
this.saleCount = saleCount;
}
public int getTotalPrice() {
return totalPrice;
}
public void setTotalPrice(int totalPrice) {
this.totalPrice = totalPrice;
}
public void setTotalPrice() {
this.totalPrice = this.price * this.saleCount;
}
}
使用如上的方式导入jar包运行
计算成功