首页 > 其他分享 >MapReduce-day2

MapReduce-day2

时间:2022-08-28 23:44:18浏览次数:52  
标签:day2 hadoop MapReduce job org apache import class

预聚合

在map合并之后,reduce拉取之前有预聚合操作(combiner或者map join)

预聚合目的:减少reduce拉取的次数,加快map任务处理的速度。

不能确定combiner函数会调用多少次,因为不确定map任务有多少个

combiner不适用于求平均数、根号、次方~

 

在idea中实现MapReduce

WordCount

package com.shujia;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {
    //map读取数据的key类型定死是LongWritable,代表的是行号,从0开始,value是一行数据,Text
    static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        //context代表的是hadoop的上下文,将来可以使用它将数据写出map
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            //写map处理逻辑
            //对每一行数据进行分割
            //将hadoop转换为java类型
            String row=value.toString();
            String[] words = row.split(" ");
            //遍历数据,得到每一个单词
            for (String word : words) {
                //将String转为Text
                Text key2 = new Text(word);
                //对每一个单词进行封装,利用context写出map
                context.write(key2,new LongWritable(1L));
            }
            context.write(new Text("行号:【" + key + "】,数据:【" + value + "】"), new LongWritable(1L));
        }
    }

    static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            //迭代values,进行求和
            Long sum=0L;
            for (LongWritable value : values) {
                long l=value.get();
                sum=sum+l;
            }
           // context.write(key,new LongWritable(sum));
            context.write(key, new LongWritable(1L));

        }
    }

    public static void main(String[] args) throws Exception{
        //获取hadoop相关的配置
        Configuration conf=new Configuration();
        //创建作业job
        Job job = Job.getInstance(conf);
        //给作业起一个名字,在yarn中可以看到;可写可不写
        job.setJobName("word count");
        //设置reduce的个数;可写可不写;默认一个
        job.setNumReduceTasks(1);
        //设置该作业的运行的主类
        job.setJarByClass(WordCount.class);

        //设置该作业将来的map类和reduce类
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        //设置map阶段k-v输出的数据类型
        //hadoop中字符串的类型对应的是叫做Text
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置HDFS的输入路径和输出路径      addInputPath接收一个目录,setOutputPath接收多个目录
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //注意,这里设置的是输出的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //启动mr任务
        job.waitForCompletion(true);
    }
}

 HarryPotter

主类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HarryPotterDemo {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(HarryPotterDemo.class);

        job.setMapperClass(HarryMapper.class);
        job.setReducerClass(HarryReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.waitForCompletion(true);
    }
}

 

 map类

package com.shujia.HarryPotter;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class HarryMapper extends Mapper<LongWritable,Text, Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //对每一行数据进行清洗
        String info=value.toString();
        //将逗号,句号.'换成空格
        String s = info.replaceAll("[,|.|\']", " ");
        String s3=s.toLowerCase();
        String[] s1 = s3.split(" ");
        for (String s2 : s1) {
            context.write(new Text(s2),new LongWritable(1L));
        }

    }
}

 

Reduce类

package com.shujia.HarryPotter;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class HarryReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

        long sum=0L;
        for (LongWritable value : values) {
            sum+=value.get();
        }
        context.write(key,new LongWritable(sum));
    }
}

 

 

ik分词器

引入依赖(父工程)

<!-- https://mvnrepository.com/artifact/com.janeluo/ikanalyzer -->
            <dependency>
                <groupId>com.janeluo</groupId>
                <artifactId>ikanalyzer</artifactId>
                <version>2012_u6</version>
            </dependency>

 在子工程中引入依赖

        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
        </dependency>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <!--  打包出来的带依赖jar包名称 -->
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

代码

package com.shujia.ik;

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.StringReader;

/*
ik分词器
 */
public class IKTest {
    public static void main(String[] args) throws Exception{
        BufferedReader br=new BufferedReader(new FileReader("D:\\soft\\projects\\bigdata19-project\\bigdata19-mapreduce\\data\\dldl.txt"));
        String line = br.readLine();
        //将文本变成能够被IK分词器进行分词的对象
        StringReader stringReader = new StringReader(line);

        //创建分词器对象,进行分词
        IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);

        Lexeme lexeme=null;

        while((lexeme=ikSegmenter.next())!=null){
            //String s=lexeme.toString();
            String s=lexeme.getLexemeText();//获取词
            System.out.println(s);
        }
    }
}

 

三国演义案例:

package com.shujia.ik;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;
/*
统计曹操,董卓,刘备出现的次数
 */
class SgyyMapper extends Mapper<LongWritable,Text, Text,LongWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringReader stringReader = new StringReader(line);
        IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
        Lexeme lexeme=null;
        while((lexeme=ikSegmenter.next())!=null){
            String ciyu=lexeme.getLexemeText();
            if("曹操".equals(ciyu)||"董卓".equals(ciyu)||"刘备".equals(ciyu)){
                context.write(new Text(ciyu),new LongWritable(1L));
            }
        }
    }
}

class SgyyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum=0L;
        for (LongWritable value : values) {
            sum+=value.get();
        }
        context.write(key,new LongWritable(sum));
    }
}

public class SgyyDeno {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SgyyDeno.class);
        job.setMapperClass(SgyyMapper.class);
        job.setReducerClass(SgyyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.waitForCompletion(true);
    }
}

 

标签:day2,hadoop,MapReduce,job,org,apache,import,class
From: https://www.cnblogs.com/wqy1027/p/16632868.html

相关文章

  • MapReduce程序编写(举例:统计单词个数)
    publicclassWordCount{//map读取数据的key类型定死是LongWritable,代表的是行号,从0开始,value是一行数据,TextstaticclassMyMapperextendsMapper<LongWrita......
  • Day2 常用Dos命令
    常用Dos命令 #盘符切换 #查看当前目录下所有文件dir #切换目录cdchangedirectory cd.. #清理屏幕cls(clearscreen) #退出终端exit #查看电脑ipipconfig ......
  • 学习日记 Day2. Java 1
    Java个人小结Java强类型语言;标识符注意概念所有标识符都应该以字母,美元符,或者下划线开始;由字母,美元符,下划线或数字的任意组合;不能使用关键字作为变量名或方法名;......
  • MapReduce计算流程
    MapReduce的计算流程1.1原始数据FileThebookschronicletheadventuresoftheadolescentwizardHarryPotterandhisbestfriendsRonWeasleyandHermioneGra......
  • day27--Java集合10
    Java集合1021.集合家庭作业21.1Homework01按要求实现:封装一个新闻类,包括标题和内容属性,提供get、set方法,重写toString方法,打印对象时只打印标题;只提供一个带参数......
  • day25--Java集合08
    Java集合0815.HashTable15.1HashTable的基本介绍存放的元素是键值对:即K-VHashTable的键和值都不能为nullHashTable的使用方法基本上和HashMap一样HashTable是线程安......
  • MapReduce-day1
    MapReducehadoop-ha问题dfs.ha.fencing.methods表示:alistofscriptsorJavaclasseswhichwillbeusedtofencetheActiveNameNodeduringafailover而配置......
  • MapReduce核心原理(下)
    MapReduce中的排序MapTask和ReduceTask都会对数据按key进行排序。该操作是Hadoop的默认行为,任何应用程序不管需不需要都会被排序。默认排序是字典顺序排序,排序方法......
  • hadoop day2-内容理解
    进程理解HDFS相关(NN,DN,SSN)NameNode(NN)功能:1、接受客户端的读/写服务因为NameNode知道数据文件与DataNode的对应关系2、保存文件的时候会保存文件的元数据信息a......
  • day24--Java集合07
    Java集合0714.HashMap底层机制(k,v)是一个Node,实现了Map.Entry<K,V>,查看HashMap的源码可以看到jdk7.0的HashMap底层实现[数组+链表],jdk8.0底层[数组+链表+红黑树]14.......