首页 > 其他分享 >hadoop - hadoop2.6 分布式 - 简单实例学习 - 统计某年的最高温度和按年份将温度从高到底排序

hadoop - hadoop2.6 分布式 - 简单实例学习 - 统计某年的最高温度和按年份将温度从高到底排序

时间:2023-02-22 12:06:41浏览次数:49  
标签:01 某年 KeyPari hadoop hot import public 温度


1.背景

     哎,学习hadoop不容易啊,各种bug,摸不着头脑,时而管用,时而不知道namenode怎么停止了,确实郁闷!还好,坚持下去了!好了,不说了,开始简单示例 :

  1.1  数据格式 :

         日期 -空格 - 时间- tab键-温度


1949-10-01 14:23:01 34°C
1949-05-23 21:23:01 16°C
1950-11-21 23:23:01 21°C
1950-08-01 12:23:01 53°C
1951-02-02 23:23:01 11°C
1951-08-01 07:23:01 23°C
1952-10-01 15:23:01 21°C
1952-08-01 12:23:01 23°C
1953-02-01 02:23:01 31°C
1953-08-01 09:22:01 32°C


  1.2 需求

      (1)计算每年的最高温度

      (2)将每年的温度,按照从高到底的顺序排序

  1.3 分析

        思路:
     (1)年份从低到高排序,同时每一年温度降序排序;
     (2)安装年份分组,每一年对应一个reduce任务;

        目的:

        自定义分组,分区,排序,和 key ;

        mapper输出 : key 为封装对象,将年份和温度进行分装 后 通过 对象为key进行排序:int year , int hot ;

2.实现

   2.1 key 实现 :继承 WritableComparable 

       


package demo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class KeyPari implements WritableComparable<KeyPari>{

private int year;
private int hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}

@Override
public void readFields(DataInput in) throws IOException {

this.year=in.readInt();
this.hot=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {

out.writeInt(year);
out.writeInt(hot);
}

@Override
public int compareTo(KeyPari o) {
int res = Integer.compare(year, o.getYear());
if(res!=0){
return res;
}
return Integer.compare(hot, o.getHot());
}


@Override
public String toString() {
return year+"\t"+hot;
}

@Override
public int hashCode() {
return new Integer(year+hot).hashCode();
}



}



   2.2 实现排序 :继承 WritableComparator


package demo;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

//排序
public class SortHot extends WritableComparator{

public SortHot() {
super(KeyPari.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {

//排序规则
KeyPari k1=(KeyPari) a;
KeyPari k2=(KeyPari) b;

int res = Integer.compare(k1.getYear(),k2.getYear());
if(res!=0){
return res;
}

return -Integer.compare(k1.getHot(),k2.getHot());
}

}



  2.3 自定义分区 :继承 Partitioner


package demo;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FristPartition extends Partitioner<KeyPari,Text>{

//自定义分区
@Override
public int getPartition(KeyPari key, Text value, int num) {
return (key.getYear()*127)%num;
}

}


  2.4 自定义分组 :继承 WritableComparator


package demo;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

//分组

public class GroupHot extends WritableComparator{

public GroupHot() {
super(KeyPari.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
//排序规则
KeyPari k1=(KeyPari) a;
KeyPari k2=(KeyPari) b;
return Integer.compare(k1.getYear(),k2.getYear());
}

}



  2.5  测试类 - RunJob

          注意两个目录:inputpath 和 outputpath ,后面需要使用 


package demo;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//运行类
public class RunJob {


private static SimpleDateFormat SDF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//map
static class HotMapper extends Mapper<LongWritable, Text, KeyPari, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line=value.toString();
String[] ss = line.split("\t");
if(ss.length==2){
//等于2处理
try {
Date date = SDF.parse(ss[0]);

Calendar c=Calendar.getInstance();
c.setTime(date);
int year=c.get(1);
String hot=ss[1].substring(0,ss[1].lastIndexOf("°C"));
KeyPari keyPari=new KeyPari();
keyPari.setYear(year);
keyPari.setHot(Integer.parseInt(hot));

context.write(keyPari, value);

} catch (Exception e) {
e.printStackTrace();
}
}

}
}


//reduce
static class HotReduce extends Reducer<KeyPari,Text,KeyPari,Text>{

@Override
protected void reduce(KeyPari kp, Iterable<Text> values,
Reducer<KeyPari, Text, KeyPari, Text>.Context context)
throws IOException, InterruptedException {
for(Text t:values){
context.write(kp, t);
}
}

}


@SuppressWarnings("deprecation")
public static void main(String[] args) {

Configuration cfg = new Configuration();

try {
Job job = new Job(cfg);
job.setJobName("hot");
job.setJarByClass(RunJob.class);
job.setMapperClass(HotMapper.class);
job.setReducerClass(HotReduce.class);
job.setOutputKeyClass(KeyPari.class);
job.setOutputValueClass(Text.class);

job.setNumReduceTasks(5);
job.setPartitionerClass(FristPartition.class);
job.setSortComparatorClass(SortHot.class);
job.setGroupingComparatorClass(GroupHot.class);


FileInputFormat.addInputPath(job,new Path("/usr/input/hot/"));
FileOutputFormat.setOutputPath(job,new Path("/usr/output/hot"));

System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
e.printStackTrace();
}

}



}



 2.6 测试 - 一定要测试,看能不能解析,否则,错哪里你都不知道


package demo;

import java.io.FileNotFoundException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class Test {

private static SimpleDateFormat SDF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws FileNotFoundException {

String line="1949-10-01 14:23:01 34°C";
String[] ss = line.split("\t");
if(ss.length==2){
try {
Date date = SDF.parse(ss[0]);

Calendar c=Calendar.getInstance();
c.setTime(date);
int year=c.get(1);
String hot=ss[1].substring(0,ss[1].lastIndexOf("°C"));
KeyPari keyPari=new KeyPari();
keyPari.setYear(year);
keyPari.setHot(Integer.parseInt(hot));

System.out.println(keyPari);

} catch (Exception e) {
e.printStackTrace();
}
}



}

}


  2.7 源码下载

   

3.运行测试

   3.1 运行前需要删除 output目录及其子目录和内容

       

root@note1:~# hdfs dfs -rm /usr/output/hot/*


 

root@note1:~# hdfs dfs -rmdir /usr/output/hot



root@note1:~# hdfs dfs -rmdir /usr/output


    删除目录 :-rmdir  , 删除文件:-rm

  3.2 上传文件

      

root@note1:~# hdfs dfs -put data /usr/input/hot/


  3.3 运行

 

root@note1:~# hadoop jar hot.jar demo.RunJob


  3.4 进度

    (1)在 ​​http://note1:8088/cluster​​ 可以看见执行进度

hadoop - hadoop2.6 分布式 - 简单实例学习 - 统计某年的最高温度和按年份将温度从高到底排序_分布式

         当然我的是执行完毕了!!

   (2)终端查看

hadoop - hadoop2.6 分布式 - 简单实例学习 - 统计某年的最高温度和按年份将温度从高到底排序_分组_02


 3.5 查看前10条记录


root@note1:~# hdfs dfs -cat /usr/output/hot/part* | head -n10


 3.6 查看某一年的最高温度


root@note1:~# hdfs dfs -cat /usr/output/hot/part-r-00004 | head -n1



 

hadoop - hadoop2.6 分布式 - 简单实例学习 - 统计某年的最高温度和按年份将温度从高到底排序_自定义分区_03


4.总结

    启动的时候,目前还是很纠结,无缘无故的出错,不错这次挺顺利的!



标签:01,某年,KeyPari,hadoop,hot,import,public,温度
From: https://blog.51cto.com/u_15976424/6078829

相关文章