1.背景
哎,学习hadoop不容易啊,各种bug,摸不着头脑,时而管用,时而不知道namenode怎么停止了,确实郁闷!还好,坚持下去了!好了,不说了,开始简单示例 :
1.1 数据格式 :
日期 -空格 - 时间- tab键-温度
1949-10-01 14:23:01 34°C
1949-05-23 21:23:01 16°C
1950-11-21 23:23:01 21°C
1950-08-01 12:23:01 53°C
1951-02-02 23:23:01 11°C
1951-08-01 07:23:01 23°C
1952-10-01 15:23:01 21°C
1952-08-01 12:23:01 23°C
1953-02-01 02:23:01 31°C
1953-08-01 09:22:01 32°C
1.2 需求
(1)计算每年的最高温度
(2)将每年的温度,按照从高到底的顺序排序
1.3 分析
思路:
(1)年份从低到高排序,同时每一年温度降序排序;
(2)安装年份分组,每一年对应一个reduce任务;
目的:
自定义分组,分区,排序,和 key ;
mapper输出 : key 为封装对象,将年份和温度进行分装 后 通过 对象为key进行排序:int year , int hot ;
2.实现
2.1 key 实现 :继承 WritableComparable
package demo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class KeyPari implements WritableComparable<KeyPari>{
private int year;
private int hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.hot=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(hot);
}
@Override
public int compareTo(KeyPari o) {
int res = Integer.compare(year, o.getYear());
if(res!=0){
return res;
}
return Integer.compare(hot, o.getHot());
}
@Override
public String toString() {
return year+"\t"+hot;
}
@Override
public int hashCode() {
return new Integer(year+hot).hashCode();
}
}
2.2 实现排序 :继承 WritableComparator
package demo;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//排序
public class SortHot extends WritableComparator{
public SortHot() {
super(KeyPari.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//排序规则
KeyPari k1=(KeyPari) a;
KeyPari k2=(KeyPari) b;
int res = Integer.compare(k1.getYear(),k2.getYear());
if(res!=0){
return res;
}
return -Integer.compare(k1.getHot(),k2.getHot());
}
}
2.3 自定义分区 :继承 Partitioner
package demo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FristPartition extends Partitioner<KeyPari,Text>{
//自定义分区
@Override
public int getPartition(KeyPari key, Text value, int num) {
return (key.getYear()*127)%num;
}
}
2.4 自定义分组 :继承 WritableComparator
package demo;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分组
public class GroupHot extends WritableComparator{
public GroupHot() {
super(KeyPari.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//排序规则
KeyPari k1=(KeyPari) a;
KeyPari k2=(KeyPari) b;
return Integer.compare(k1.getYear(),k2.getYear());
}
}
2.5 测试类 - RunJob
注意两个目录:inputpath 和 outputpath ,后面需要使用
package demo;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//运行类
public class RunJob {
private static SimpleDateFormat SDF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//map
static class HotMapper extends Mapper<LongWritable, Text, KeyPari, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] ss = line.split("\t");
if(ss.length==2){
//等于2处理
try {
Date date = SDF.parse(ss[0]);
Calendar c=Calendar.getInstance();
c.setTime(date);
int year=c.get(1);
String hot=ss[1].substring(0,ss[1].lastIndexOf("°C"));
KeyPari keyPari=new KeyPari();
keyPari.setYear(year);
keyPari.setHot(Integer.parseInt(hot));
context.write(keyPari, value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//reduce
static class HotReduce extends Reducer<KeyPari,Text,KeyPari,Text>{
@Override
protected void reduce(KeyPari kp, Iterable<Text> values,
Reducer<KeyPari, Text, KeyPari, Text>.Context context)
throws IOException, InterruptedException {
for(Text t:values){
context.write(kp, t);
}
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Configuration cfg = new Configuration();
try {
Job job = new Job(cfg);
job.setJobName("hot");
job.setJarByClass(RunJob.class);
job.setMapperClass(HotMapper.class);
job.setReducerClass(HotReduce.class);
job.setOutputKeyClass(KeyPari.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(5);
job.setPartitionerClass(FristPartition.class);
job.setSortComparatorClass(SortHot.class);
job.setGroupingComparatorClass(GroupHot.class);
FileInputFormat.addInputPath(job,new Path("/usr/input/hot/"));
FileOutputFormat.setOutputPath(job,new Path("/usr/output/hot"));
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.6 测试 - 一定要测试,看能不能解析,否则,错哪里你都不知道
package demo;
import java.io.FileNotFoundException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class Test {
private static SimpleDateFormat SDF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws FileNotFoundException {
String line="1949-10-01 14:23:01 34°C";
String[] ss = line.split("\t");
if(ss.length==2){
try {
Date date = SDF.parse(ss[0]);
Calendar c=Calendar.getInstance();
c.setTime(date);
int year=c.get(1);
String hot=ss[1].substring(0,ss[1].lastIndexOf("°C"));
KeyPari keyPari=new KeyPari();
keyPari.setYear(year);
keyPari.setHot(Integer.parseInt(hot));
System.out.println(keyPari);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
2.7 源码下载
3.运行测试
3.1 运行前需要删除 output目录及其子目录和内容
root@note1:~# hdfs dfs -rm /usr/output/hot/*
root@note1:~# hdfs dfs -rmdir /usr/output/hot
root@note1:~# hdfs dfs -rmdir /usr/output
删除目录 :-rmdir , 删除文件:-rm
3.2 上传文件
root@note1:~# hdfs dfs -put data /usr/input/hot/
3.3 运行
root@note1:~# hadoop jar hot.jar demo.RunJob
3.4 进度
(1)在 http://note1:8088/cluster 可以看见执行进度
当然我的是执行完毕了!!
(2)终端查看
3.5 查看前10条记录
root@note1:~# hdfs dfs -cat /usr/output/hot/part* | head -n10
3.6 查看某一年的最高温度
root@note1:~# hdfs dfs -cat /usr/output/hot/part-r-00004 | head -n1
4.总结
启动的时候,目前还是很纠结,无缘无故的出错,不错这次挺顺利的!