首页 > 其他分享 >mapreduce 去重的问题怎么解决

mapreduce 去重的问题怎么解决

时间:2023-08-09 21:33:57浏览次数:30  
标签:怎么 Text reduce mapreduce hadoop job 解决 import class

mapreduce 去重的问题怎么解决?


 john 89


 tom 100


 mary 100


 mary 200


 tom 20


———–


我刚学mapreduce,正在练习,上面这个我计算了很久也不对,就是对第一列去重,去重后应该是3


如果用mapreduce计算成功后,part-00000 的文件内容 是:


请问下,这个mapreduce怎么写啊?



map按第一列为key,value无所谓         
reduce class中初始化一个计数器         
每个reduce方法中计数器每次加一         
reduce 的cleanup方法中commit计数器就可以了

map 知道怎么写了,那reduce的具体怎么写啊?
直接一个Map,在Map里面定义一个全局的HashSet,map方法里面把key加入进去,cleanup方法里面把结果写入就行了。
直接一个Map,在Map里面定义一个全局的HashSet,map方法里面把key加入进去,cleanup方法里面把结果写入就行了。         


只用map不可能解决这个问题 

如果在不同的map中都用同一个key,怎么解决? 


必须用reduce去group后的key才能得到去重效果
直接一个Map,在Map里面定义一个全局的HashSet,map方法里面把key加入进去,cleanup方法里面把结果写入就行了。                    
只用map不可能解决这个问题          
如果在不同的map中都用同一个key,怎么解决?          

必须用reduce去group后的key才能得到去重效果         

嗯,对,没想那么仔细,谢谢指正。
public static class Map extends Mapper<LongWritable, Text, Text, Text> { 

                public void map(LongWritable key, Text value, Context context) 

                                throws IOException, InterruptedException { 

                        String line = value.toString(); 

                        try { 

                                String[] lineSplit = line.split("\t"); 

                                context.write(new Text(lineSplit[0]), new Text("")); 

                            } catch (java.lang.ArrayIndexOutOfBoundsException e) { 

                                context.getCounter(Counter.LINESKIP).increment(1); 

                                return; 

                        } 

                } 

        } 


        public static class Reduce extends Reducer<Text, Text, Text, Text> { 

                private Set<String> count = new HashSet<String>(); 


                public void reduce(Text key, Iterable<Text> values, Context context) 

                                throws IOException, InterruptedException { 

                      for(Text value:values){ 

                             count.add(value.toString()); 

                     } 

                        context.write(key, new Text("")); 

                } 

        } 

-------------------------  这个问题纠结我2周了,这个方面的学习资料太少了,我的map和reduce是这样写的,但是数据量大一些,就会内存溢出,我想我这个思路是错误的 

        你说的  “必须用reduce去group后的key才能得到去重效果 ”,这个 map和reduce是具体怎么写的啊?
public static class Map extends Mapper<LongWritable, Text, Text, Text> {          

                public void map(LongWritable key, Text value, Context context)          

                                throws IOException, InterruptedException {          

                        String line = value.toString();          

                        try {          

                                String[] lineSplit = line.split("\t");          

                                context.write(new Text(lineSplit[0]), new Text(""));          

                             context.write(new Text("uniq") ,new Text(lineSplit[0]) );          


                            } catch (java.lang.ArrayIndexOutOfBoundsException e) {          

                                context.getCounter(Counter.LINESKIP).increment(1);          

                                return;          

                        }          

                }          

        }          


        public static class Reduce extends Reducer<Text, Text, Text, Text> {          

                private Set<String> count = new HashSet<String>();          


                public void reduce(Text key, Iterable<Text> values, Context context)          

                                throws IOException, InterruptedException {          

                      for(Text value:values){          

                             count.add(value.toString());          

                     }          

                        context.write("uniq", new Text(count.size()+""));          

                }          

        }          

-------------------------  这个问题纠结我2周了,这个方面的学习资料太少了,我的map和reduce是这样写的,但是数据量大一些,就会内存溢出,我想我这个思路是错误的          

        你说的  “必须用reduce去group后的key才能得到去重效果 ”,这个 map和reduce是具体怎么写的啊?         
    -------------刚才写的mapreduce错了,以这个为准
map按第一列为key,value无所谓          
reduce class中初始化一个计数器          
每个reduce方法中计数器每次加一          
reduce 的cleanup方法中commit计数器就可以了         

  谢谢了,请教下,你说的这个map我知道怎么写了,但是这个reduce怎么写啊?
reduce阶段只用一个计数器就行了
import                  java.io.IOException;                

                                  

                 import                  org.apache.hadoop.conf.Configuration;                

                 import                  org.apache.hadoop.fs.Path;                

                 import                  org.apache.hadoop.io.LongWritable;                

                 import                  org.apache.hadoop.io.NullWritable;                

                 import                  org.apache.hadoop.io.Text;                

                 import                  org.apache.hadoop.mapreduce.Job;                

                 import                  org.apache.hadoop.mapreduce.Mapper;                

                 import                  org.apache.hadoop.mapreduce.Reducer;                

                 import                  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;                

                 import                  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;                

                 import                  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;                

                                  

                 public                  class                  wzl189_distinct {                

                                  public                  static                  class                  MyMapper                  extends                

                                  Mapper<Object, Text, Text, NullWritable> {                

                                  

                                  Text outKey =                  new                  Text();                

                                  

                                  @Override                

                                  public                  void                  map(Object key, Text value, Context context)                

                                  throws                  IOException, InterruptedException {                

                                  

                                  String tmp[] = value.toString().split(                 " "                 );                

                                  if                  (tmp.length !=                  2                 )                

                                  return                 ;                

                                  outKey.set(tmp[                 0                 ]);                

                                  context.write(outKey, NullWritable.get());                

                                  

                                  }                

                                  }                

                                  

                                  public                  static                  class                  MyReducer                  extends                

                                  Reducer<Text, NullWritable, LongWritable, NullWritable> {                

                                  

                                  long                  myCount = 0l;                

                                  

                                  @Override                

                                  public                  void                  reduce(Text key, Iterable<NullWritable> values,                

                                  Context context)                  throws                  IOException, InterruptedException {                

                                  ++myCount;                

                                  }                

                                  

                                  @Override                

                                  public                  void                  cleanup(Context context)                  throws                  IOException,                

                                  InterruptedException {                

                                  context.write(                 new                  LongWritable(myCount), NullWritable.get());                

                                  };                

                                  }                

                                  

                                  public                  static                  void                  main(String[] args)                  throws                  Exception {                

                                  Configuration conf =                  new                  Configuration();                

                                  if                  (args.length !=                  2                 ) {                

                                  System.err.println(                 "Usage: <in> <out>"                 );                

                                  System.exit(                 2                 );                

                                  }                

                                  

                                  conf.set(                 "mapred.child.java.opts"                 ,                  "-Xmx350m -Xmx1024m"                 );                

                                  

                                  @SuppressWarnings                 (                 "deprecation"                 )                

                                  Job job =                  new                  Job(conf,                  "wzl189_distinct"                 );                

                                  job.setNumReduceTasks(                 1                 );                

                                  job.setInputFormatClass(TextInputFormat.                 class                 );                

                                  job.setJarByClass(wzl189_distinct.                 class                 );                

                                  job.setMapperClass(MyMapper.                 class                 );                

                                  

                                  job.setMapOutputKeyClass(Text.                 class                 );                

                                  job.setMapOutputValueClass(NullWritable.                 class                 );                

                                  

                                  job.setReducerClass(MyReducer.                 class                 );                

                                  job.setOutputKeyClass(Text.                 class                 );                

                                  job.setOutputValueClass(NullWritable.                 class                 );                

                                  

                                  FileInputFormat.addInputPath(job,                  new                  Path(args[                 0                 ]));                

                                  FileOutputFormat.setOutputPath(job,                  new                  Path(args[                 1                 ]));                

                                  System.exit(job.waitForCompletion(                 true                 ) ?                  0                  :                  1                 );                

                                  }                

                 }
import                   java.io.IOException;                 

                                    

                  import                   org.apache.hadoop.conf.Configuration;                 

                  import                   org.apache.hadoop.fs.Path;                 

                  import                   org.apache.hadoop.io.LongWritable;                 

                  import                   org.apache.hadoop.io.NullWritable;                 

                  import                   org.apache.hadoop.io.Text;                 

                  import                   org.apache.hadoop.mapreduce.Job;                 

                  import                   org.apache.hadoop.mapreduce.Mapper;                 

                  import                   org.apache.hadoop.mapreduce.Reducer;                 

                  import                   org.apache.hadoop.mapreduce.lib.input.FileInputFormat;                 

                  import                   org.apache.hadoop.mapreduce.lib.input.TextInputFormat;                 

                  import                   org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;                 

                                    

                  public                   class                   wzl189_distinct {                 

                                        public                   static                   class                   MyMapper                   extends                 

                                                Mapper<Object, Text, Text, NullWritable> {                 

                                    

                                            Text outKey =                   new                   Text();                 

                                    

                                            @Override                 

                                            public                   void                   map(Object key, Text value, Context context)                 

                                                    throws                   IOException, InterruptedException {                 

                                    

                                                String tmp[] = value.toString().split(                  " "                  );                 

                                                if                   (tmp.length !=                   2                  )                 

                                                    return                  ;                 

                                                outKey.set(tmp[                  0                  ]);                 

                                                context.write(outKey, NullWritable.get());                 

                                    

                                            }                 

                                        }                 

                                    

                                        public                   static                   class                   MyReducer                   extends                 

                                                Reducer<Text, NullWritable, LongWritable, NullWritable> {                 

                                    

                                            long                   myCount = 0l;                 

                                    

                                            @Override                 

                                            public                   void                   reduce(Text key, Iterable<NullWritable> values,                 

                                                    Context context)                   throws                   IOException, InterruptedException {                 

                                                ++myCount;                 

                                            }                 

                                    

                                            @Override                 

                                            public                   void                   cleanup(Context context)                   throws                   IOException,                 

                                                    InterruptedException {                 

                                                context.write(                  new                   LongWritable(myCount), NullWritable.get());                 

                                            };                 

                                        }                 

                                    

                                        public                   static                   void                   main(String[] args)                   throws                   Exception {                 

                                            Configuration conf =                   new                   Configuration();                 

                                            if                   (args.length !=                   2                  ) {                 

                                                System.err.println(                  "Usage: <in> <out>"                  );                 

                                                System.exit(                  2                  );                 

                                            }                 

                                    

                                            conf.set(                  "mapred.child.java.opts"                  ,                   "-Xmx350m -Xmx1024m"                  );                 

                                    

                                            @SuppressWarnings                  (                  "deprecation"                  )                 

                                            Job job =                   new                   Job(conf,                   "wzl189_distinct"                  );                 

                                            job.setNumReduceTasks(                  1                  );                 

                                            job.setInputFormatClass(TextInputFormat.                  class                  );                 

                                            job.setJarByClass(wzl189_distinct.                  class                  );                 

                                            job.setMapperClass(MyMapper.                  class                  );                 

                                    

                                            job.setMapOutputKeyClass(Text.                  class                  );                 

                                            job.setMapOutputValueClass(NullWritable.                  class                  );                 

                                    

                                            job.setReducerClass(MyReducer.                  class                  );                 

                                            job.setOutputKeyClass(Text.                  class                  );                 

                                            job.setOutputValueClass(NullWritable.                  class                  );                 

                                    

                                            FileInputFormat.addInputPath(job,                   new                   Path(args[                  0                  ]));                 

                                            FileOutputFormat.setOutputPath(job,                   new                   Path(args[                  1                  ]));                 

                                            System.exit(job.waitForCompletion(                  true                  ) ?                   0                   :                   1                  );                 

                                        }                 

                  }                 





reduce阶段只用一个计数器就行了         

太感谢了,你了解这么多啊,我都搞了2周,没有结果,想再请教最后一个问题: 

假如 第一列是姓名,第二列是班级(先不管我这个需求是否合理) 

 john 100 

 john 100 

 mary 100 

 mary 200 

 tom  200 


想统计处如下结果,就是按班级人数去重 

100 2 

200 2 


这个mapreduce怎么写啊?  望高手最后再解答下,万分感谢了。
map 输出key 用 班级 + 分隔符 + 姓名         
重写 grouping 实现二次排序,如果reduce num > 1 还需要重写 partition         
reduce略作修改,增个姓名变量 ,比较当前姓名是否和前一个姓名是否一致,如果不一致 计数器+=1         

代码就不贴了,LZ多思考一下,这种简单的MR不难解决


标签:怎么,Text,reduce,mapreduce,hadoop,job,解决,import,class
From: https://blog.51cto.com/u_16115638/7025766

相关文章

  • 解决Avalonia 11.X版本的中文字体问题
    网上搜索的方法使用接口“IFontManagerImpl”这个方法目前只能用于Avalonia10.X版本,因为11版本后官方把这个接口的成员都设置成了非plubic,所以之前的版本解决办法用不上了,经过搜索github的官方那边的问题集锦,要解决解决Avalonia11.X版本的中文字体问题有2个思路:1.在程序里面嵌......
  • ERROR: JAVA_HOME /root/software/jdk1.8.0_262 does not exist.问题的解决
    jdk出了点儿问题,就打算直接卸载重新安装一下预先下载好jdk的压缩包备用1、在usr目录下新建java目录mkdir/usr/java然后进入到新建的java目录下:cd/usr/java2、将已经下载好的jdk的压缩包上传到java目录下3、解压jdk压缩包tar-zxvfjdk-8u161-linux-x64.tar.gz解......
  • R5 7530U参数 R5 7530U性能怎么样 锐龙R57530U相当于什么水平
    R57530U采用Zen3架构为6核12线程,3MB二级缓存,16MB三级缓存,与R56600U一致。R57530U性能怎么样这些点很重要看过你就懂了 http://www.adiannao.cn/dy ......
  • Oracle 安装 Failed to Create oracle Oracle Home User 解决方案
    WindowsServer2016安装Oracle12报错:FailedtoCreateoracleOracleHomeUser的解决方案:1、打开域安全策略(secpol.msc)-安全设置-账户策略-密码策略-密码必须符合复杂性要求。定义这个策略设置为:已禁用。 2、最后cmd运行刷新组策略命令为:gpupdate/force 3、重新......
  • 关闭任务计划程序前您必须关闭所有会话框的解决方法
    关闭任务计划程序前您必须关闭所有会话框的解决方法问题描述创建计划任务后关闭窗口时弹出来的,把所有窗口都关了,还是一样弹出提示。解决方案提示关闭任务计划程序前,您必须关闭所有会话框,是设置错误造成的,解决方法如下:方案一 可以用任务管理器,在应用程序中选择该程序,然后在......
  • 大连人工智能计算平台——华为昇腾AI平台——高性能计算HPC的pytorch源码编译报错——
     如题:pytorch源码编译报错——USE_CUDA=OFF  在编译pytorch源码的时候发现错误,虽然编译环境中已经安装好CUDA和cudnn,环境变量也都设置好,但是编译好的pytorch包wheel总是在运行torch.cuda.is_available()显示false,于是从编译源码的过程中进行重新检查,发现在编译的过程中提......
  • 使用golang解决LeetCode热题Hot100(1-10)
    使用golang解决LeetCode热题Hot1001.两数之和https://leetcode.cn/problems/two-sum/题目给定一个整数数组nums和一个整数目标值target,请你在该数组中找出和为目标值target的那两个整数,并返回它们的数组下标。你可以假设每种输入只会对应一个答案。但是,数组中同一个......
  • R9 7940HS参数 锐龙R97940HS性能怎么样 相当于什么水平
    ​​R97940HS采用了4nm工艺,采用8核Zen4CPU,并且搭载最新的锐龙AI引擎,CPU频率可达5.2GHz,拥有40MB缓存,核显为12CURDNA3,核显频率高达3GHz,TDP为35-45W。R97940HS性能怎么样这些点很重要看过你就懂了 http://www.adiannao.cn/dy ......
  • allocator 不是模板 解决方案
    场景严重性 代码 说明 项目 文件 行 禁止显示状态错误(活动) E0864 allocator不是模板 cvos_srtmp_service C:\ProgramFiles(x86)\MicrosoftVisualStudio\2019\Professional\VC\Tools\MSVC\14.16.27023\include\regex 1175 错误(活动) E0864 associated_allocator不是模板 ......
  • MySQL数据表的损坏及容灾解决方案
    引言在互联网应用中,MySQL是最常用的关系型数据库之一。然而,数据表的损坏可能会导致数据丢失或无法正常访问,给业务运营带来严重影响。本文将讨论MySQL数据表容易损坏的情况,并提供相应的容灾解决方案。数据表容易损坏的情况MySQL数据表在以下情况下容易发生损坏:硬件故障:例如磁盘......