首页 > 其他分享 >关联规则二项集hadoop实现

关联规则二项集hadoop实现

时间:2023-06-06 13:33:13浏览次数:47  
标签:java hadoop new 关联 org apache import 二项


近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。

算法的思想还是参考上次的图片:

关联规则二项集hadoop实现_apache

这里实现分为五个步骤:

  1. 针对原始输入计算每个项目出现的次数;
  2. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;
  3. 针对原始输入的事务进行按frequence list file进行排序并剪枝;
  4. 生成二项集规则;
  5. 计算二项集规则出现的次数,并删除小于阈值的二项集规则;

第一步的实现:包括步骤1和步骤2,代码如下:

GetFlist.java:

 

1. package org.fansy.date1108.fpgrowth.twodimension;  
2.   
3. import java.io.BufferedReader;  
4. import java.io.IOException;  
5. import java.io.InputStreamReader;  
6. import java.util.ArrayList;  
7. import java.util.Comparator;  
8. import java.util.Iterator;  
9. import java.util.List;  
10. import java.util.PriorityQueue;  
11. import java.util.regex.Pattern;  
12.   
13. import org.apache.hadoop.conf.Configuration;  
14. import org.apache.hadoop.fs.FSDataInputStream;  
15. import org.apache.hadoop.fs.FSDataOutputStream;  
16. import org.apache.hadoop.fs.FileSystem;  
17. import org.apache.hadoop.fs.Path;  
18. import org.apache.hadoop.io.IntWritable;  
19. import org.apache.hadoop.io.LongWritable;  
20. import org.apache.hadoop.io.Text;  
21. import org.apache.hadoop.mapreduce.Job;  
22. import org.apache.hadoop.mapreduce.Mapper;  
23. import org.apache.hadoop.mapreduce.Reducer;  
24. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
25. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
26.   
27. //  the specific comparator  
28. class MyComparator implements Comparator<String>{  
29. private String splitter=",";  
30. public MyComparator(String splitter){  
31. this.splitter=splitter;  
32.     }  
33. @Override  
34. public int compare(String o1, String o2) {  
35. // TODO Auto-generated method stub  
36.         String[] str1=o1.toString().split(splitter);  
37.         String[] str2=o2.toString().split(splitter);  
38. int num1=Integer.parseInt(str1[1]);  
39. int num2=Integer.parseInt(str2[1]);  
40. if(num1>num2){  
41. return -1;  
42. else if(num1<num2){  
43. return 1;  
44. else{  
45. return str1[0].compareTo(str2[0]);  
46.         }  
47.     }  
48. }  
49.   
50. public class GetFList {  
51. /**
52.      *  the program is based on the picture 
53.      */  
54. // Mapper  
55. public static class  MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{  
56. private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");  
57. private final IntWritable newvalue=new IntWritable(1);  
58. public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{  
59.             String [] items=splitter.split(value.toString());  
60. for(String item:items){  
61. new Text(item), newvalue);  
62.             }  
63.         }  
64.     }  
65. // Reducer  
66. public static class ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{  
67. public void reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{  
68. int temp=0;  
69. for(IntWritable v:value){  
70.                 temp+=v.get();  
71.             }  
72. new IntWritable(temp));  
73.         }  
74.     }  
75. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
76. // TODO Auto-generated method stub  
77.   
78. if(args.length!=3){  
79. "Usage: <input><output><min_support>");  
80. 1);  
81.         }  
82. 0];  
83. 1];  
84. int minSupport=0;  
85. try {  
86. 2]);  
87. catch (NumberFormatException e) {  
88. // TODO Auto-generated catch block  
89. 3;  
90.         }  
91. new Configuration();  
92. 1]+"_temp";  
93. new Job(conf,"the get flist job");  
94. class);  
95. class);  
96. class);  
97. class);  
98. class);  
99. class);       
100. new Path(input));  
101. new Path(temp));          
102. boolean succeed=job.waitForCompletion(true);  
103. if(succeed){          
104. //  read the temp output and write the data to the final output  
105. "/part-r-00000",minSupport);  
106. "the frequence list has generated ... ");  
107. // generate the frequence file  
108.             generateFList(list,output);  
109. "the frequence file has generated ... ");  
110. else{  
111. "the job is failed");  
112. 1);  
113.         }                 
114.     }  
115. //  read the temp_output and return the frequence list  
116. public static List<String> readFList(String input,int minSupport) throws IOException{  
117. // read the hdfs file  
118. new Configuration();  
119. new Path(input);  
120.            FileSystem fs=FileSystem.get(path.toUri(),conf);  
121.         FSDataInputStream in1=fs.open(path);  
122. new PriorityQueue<String>(15,new MyComparator("\t"));     
123. new InputStreamReader(in1);  
124. new BufferedReader(isr1);  
125.         String line;  
126. while((line=br.readLine())!=null){  
127. int num=0;  
128. try {  
129. "\t")[1]);  
130. catch (NumberFormatException e) {  
131. // TODO Auto-generated catch block  
132. 0;  
133.             }  
134. if(num>minSupport){  
135.                 queue.add(line);  
136.             }  
137.         }  
138.         br.close();  
139.         isr1.close();  
140.         in1.close();  
141. new ArrayList<String>();  
142. while(!queue.isEmpty()){  
143.             list.add(queue.poll());  
144.         }  
145. return list;  
146.     }  
147. // generate the frequence file  
148. public static void generateFList(List<String> list,String output) throws IOException{  
149. new Configuration();  
150. new Path(output);  
151.         FileSystem fs=FileSystem.get(path.toUri(),conf);  
152.         FSDataOutputStream writer=fs.create(path);  
153.         Iterator<String> i=list.iterator();  
154. while(i.hasNext()){  
155. "\n");//  in the last line add a \n which is not supposed to exist  
156.         }  
157.         writer.close();  
158.     }  
159. }


步骤1的实现其实就是最简单的wordcount程序的实现,在步骤2中涉及到HDFS文件的读取以及写入。在生成frequence list file时排序时用到了PriorityQueue类,同时自定义了一个类用来定义排序规则;

第二步:步骤3,代码如下:

SortAndCut.java:



1. package org.fansy.date1108.fpgrowth.twodimension;  
2.   
3. import java.io.BufferedReader;  
4. import java.io.IOException;  
5. import java.io.InputStreamReader;  
6. import java.net.URI;  
7. import java.util.HashSet;  
8. import java.util.Iterator;  
9. import java.util.LinkedHashSet;  
10. import java.util.Set;  
11. import java.util.regex.Pattern;  
12.   
13. import org.apache.hadoop.conf.Configuration;  
14. import org.apache.hadoop.fs.FSDataInputStream;  
15. import org.apache.hadoop.fs.FileSystem;  
16. import org.apache.hadoop.fs.Path;  
17. import org.apache.hadoop.io.LongWritable;  
18. import org.apache.hadoop.io.NullWritable;  
19. import org.apache.hadoop.io.Text;  
20. import org.apache.hadoop.mapreduce.Job;  
21. import org.apache.hadoop.mapreduce.Mapper;  
22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
24.   
25. public class SortAndCut {  
26. /**
27.      *  sort and cut the items
28.      */   
29. public static class M extends Mapper<LongWritable,Text,NullWritable,Text>{  
30. private LinkedHashSet<String> list=new LinkedHashSet<String>();  
31. private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");  
32.           
33. public void setup(Context context) throws IOException{  
34. "FLIST");  
35.              FileSystem fs=FileSystem.get(URI.create(input),context.getConfiguration());  
36. new Path(input);  
37.                 FSDataInputStream in1=fs.open(path);  
38. new InputStreamReader(in1);  
39. new BufferedReader(isr1);  
40.                 String line;  
41. while((line=br.readLine())!=null){  
42. "\t");  
43. if(str.length>0){  
44. 0]);  
45.                     }  
46.                 }  
47.         }  
48. // map  
49. public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{  
50.             String [] items=splitter.split(value.toString());  
51. new HashSet<String>();  
52.             set.clear();  
53. for(String s:items){  
54.                 set.add(s);  
55.             }  
56.             Iterator<String> iter=list.iterator();  
57. new StringBuffer();  
58. 0);  
59. int num=0;  
60. while(iter.hasNext()){  
61.                 String item=iter.next();  
62. if(set.contains(item)){  
63. ",");  
64.                     num++;  
65.                 }  
66.             }  
67. if(num>0){  
68. new Text(sb.toString()));  
69.             }  
70.         }  
71.     }  
72. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
73. // TODO Auto-generated method stub  
74. if(args.length!=3){  
75. "Usage: <input><output><fListPath>");  
76. 1);  
77.         }  
78. 0];  
79. 1];  
80. 2];  
81. new Configuration();  
82. "FLIST", fListPath);  
83. new Job(conf,"the sort and cut  the items  job");  
84. class);  
85. class);  
86. 0);  
87. class);  
88. class);      
89. new Path(input));  
90. new Path(output));    
91. boolean succeed=job.waitForCompletion(true);  
92. if(succeed){  
93. " succeed ... ");  
94.         }  
95.     }  
96. }


在本阶段的Mapper的setup中读取frequence file到一个LinkedHashSet(可以保持原始的插入顺序)中,然后在map中针对一个事务输出这个LinkedHashSet,不过限制输出是在这个事务中出现的项目而已。

第三步:步骤4和步骤5,代码如下:

OutRules.java



1. package org.fansy.date1108.fpgrowth.twodimension;  
2.   
3. import java.io.IOException;  
4. import java.util.HashMap;  
5. import java.util.Iterator;  
6. import java.util.Map.Entry;  
7. import java.util.Stack;  
8. import java.util.TreeSet;  
9.   
10. import org.apache.hadoop.conf.Configuration;  
11. import org.apache.hadoop.fs.Path;  
12. import org.apache.hadoop.io.LongWritable;  
13. import org.apache.hadoop.io.Text;  
14. import org.apache.hadoop.mapreduce.Job;  
15. import org.apache.hadoop.mapreduce.Mapper;  
16. import org.apache.hadoop.mapreduce.Reducer;  
17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
19. public class OutRules {  
20.       
21. public static class M extends Mapper<LongWritable,Text,Text,Text>{  
22. public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{  
23.             String str=value.toString();  
24. ",");  
25. if(s.length<=1){  
26. return;  
27.             }  
28. new Stack<String>();  
29. for(int i=0;i<s.length;i++){  
30.                 stack.push(s[i]);  
31.             }  
32. int num=str.length();  
33. while(stack.size()>1){  
34. 2;  
35. new Text(stack.pop()),new Text(str.substring(0,num)));  
36.             }  
37.         }  
38.     }  
39. // Reducer  
40. public static class R extends Reducer<Text ,Text,Text,Text>{  
41. private int minConfidence=0;  
42. public void setup(Context context){  
43. "MIN");  
44. try {  
45.                 minConfidence=Integer.parseInt(str);  
46. catch (NumberFormatException e) {  
47. // TODO Auto-generated catch block  
48. 3;  
49.             }  
50.         }  
51. public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{  
52. new HashMap<String ,Integer>();  
53. for(Text v:values){  
54. ",");  
55. for(int i=0;i<str.length;i++){  
56. if(hm.containsKey(str[i])){  
57. int temp=hm.get(str[i]);  
58. 1);  
59. else{  
60. 1);  
61.                     }  
62.                 }  
63.             }  
64. //  end of for  
65. new TreeSet<String>(new MyComparator(" "));  
66.             Iterator<Entry<String,Integer>> iter=hm.entrySet().iterator();  
67. while(iter.hasNext()){  
68.                 Entry<String,Integer> k=iter.next();  
69. if(k.getValue()>minConfidence&&!key.toString().equals(k.getKey())){  
70. " "+k.getValue());  
71.                 }  
72.             }  
73.             Iterator<String> iters=sss.iterator();  
74. new StringBuffer();  
75. while(iters.hasNext()){  
76. "|");  
77.             }  
78. new Text(":\t"+sb.toString()));  
79.         }  
80.     }  
81. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
82. // TODO Auto-generated method stub  
83. if(args.length!=3){  
84. "Usage: <input><output><min_confidence>");  
85. 1);  
86.         }  
87. 0];  
88. 1];  
89. 2];     
90. new Configuration();  
91. "MIN", minConfidence);  
92. new Job(conf,"the out rules   job");  
93. class);  
94. class);  
95. 1);  
96. class);  
97. class);  
98. class);  
99. new Path(input));  
100. new Path(output));    
101. boolean succeed=job.waitForCompletion(true);  
102. if(succeed){  
103. " succeed ... ");  
104.         }  
105.     }  
106. }


在map阶段使用了Stack 和字符串操作实现类似下面的功能:


 

1. input:p,x,z,y,a,b  
2. output:  
3. b:p,x,z,y,a  
4. a:p,x,z,y  
5. y:p,x,z  
6. z:p,x  
7. x:p


在reduce阶段只是统计下项目出现的次数而已,用到了一个HashMap,又如果输出是根据项目出现的次数从大到小的一个排序那就更好了,所以又用到了TreeSet.

其中上面所有的输出文件中的格式都只是拼串而已,所以其中的格式可以按照自己的要求进行更改。

比如,我的输出如下:



 


1. 0   :   39 125|48 99|32 44|41 37|38 26|310 17|5 16|65 14|1 13|89 13|1144 12|225 12|60 11|604 11|  
2. 1327 10|237 10|101 9|147 9|270 9|533 9|9 9|107 8|11 8|117 8|170 8|271 8|334 8|549 8|62 8|812 8|10 7|  
3. 1067 7|12925 7|23 7|255 7|279 7|548 7|783 7|14098 6|2 6|208 6|22 6|36 6|413 6|789 6|824 6|961 6|110 5|  
4. 120 5|12933 5|201 5|2238 5|2440 5|2476 5|251 5|286 5|2879 5|3 5|4105 5|415 5|438 5|467 5|475 5|479 5|49 5|  
5. 592 5|675 5|715 5|740 5|791 5|830 5|921 5|9555 5|976 5|979 5|1001 4|1012 4|1027 4|1055 4|1146 4|12 4|13334 4|  
6. 136 4|1393 4|16 4|1600 4|165 4|167 4|1819 4|1976 4|2051 4|2168 4|2215 4|2284 4|2353 4|2524 4|261 4|267 4|269 4|  
7. 27 4|2958 4|297 4|3307 4|338 4|420 4|4336 4|4340 4|488 4|4945 4|5405 4|58 4|589 4|75 4|766 4|795 4|809 4|880 4|8978 4|916 4|94 4|956 4|


冒号前面是项目,后面的39是项目再后面是<0,39>出现的次数,即125次,<0,48>出现的次数是99次;

总结,mahout的源代码确实比较难啃,所以要先对算法非常熟悉,然后去看源码的话 应该会比较容易点;


标签:java,hadoop,new,关联,org,apache,import,二项
From: https://blog.51cto.com/u_2650279/6424141

相关文章

  • 使用mahout做海量数据关联规则挖掘
     mahout是一个基于hadoop的分布式数据挖掘开源项目(mahout本来是指一个骑在大象上的人)。掌握了关联规则的基本算法和使用,加上分布式关联规则挖掘后,就可以处理基本的关联规则挖掘工作了,实践中只需要把握业务,理解数据便可游刃有余。安装mahout骑在大象上的侠士必然需要一头雄纠纠......
  • 6.11 类关联结构
    classCar{privateStringname;privatedoubleprice;privatePersonperson;//车应该属于一个人publicCar(Stringname,doubleprice){this.name=name;this.price=price;}publicvoidsetPerson(Personperson){......
  • Hadoop伪分布式安装
    Hadoop伪分布式安装安装环境:Centos7.5,只少2核4G提前准备:Linux中要安装JDK8,Zookeeper-3.5.71.关闭防火墙systemctlstopfirewalldsystemctldisablefirewalld2.修改主机名Hadoop集群中,主机名中尽量不要出现-或者_vim/etc/hostname将原来的主机名删除,添加自己指定的主机名3.需......
  • vue前端model和data强关联
    如果不关联会报错,且错误不好找!......
  • 比较简洁的Hadoop介绍
    Hadoop集群的物理分布这里是一个由两个机架组成的机群,图中有两种颜色绿色和黄色,不难看出黄色为主节点(Master),NameNode和JobTracker都独占一个服务器,只有一个是唯一,绿色为从节点(Slave)有多个。而上面所说的JobTracker、NameNode,DataNode,TaskTracker本质都是Java进程,这些进程进行相......
  • Hadoop YARN - Introduction to the web services REST API’s
    HadoopYARN-IntroductiontothewebservicesRESTAPI’sOverviewURI’sHTTPRequestsSummaryofHTTPoperationsSecurityHeadersSupportedHTTPResponsesCompressionResponseFormatsResponseErrorsResponseExamplesSampleUsageOverviewTheHadoopYARNwebservice......
  • mybatis 多表关联查询
    MyBatis多表关联查询一对多查询一对多关联查询是指,在查询一方对象的时候,同时将其所关联的多方对象也都查询出来。.Ⅰ、多表连接(联合)查询这种方式只用到1条SQL语句。.注意:即使字段名与属性名相同,在<resultMap></resultMap>中也要写出它们的映射关系。因为框架......
  • R语言关联规则Apriori对抗肿瘤中药数据库知识发现研究
    肿瘤是近年来严重威胁人类的健康的疾病,据统计,目前大部分种类的肿瘤都呈现不同程度的上升趋势,中国因患肿瘤而死亡的人数约占全球肿瘤死亡总人数的1/4左右,人类正面临着肿瘤防治的新挑战。现代医学治疗肿瘤的手段和方式已经日臻完善,主要为手术配合放、化疗联合治疗。但传统西医治......
  • R语言APRIORI模型关联规则挖掘分析脑出血急性期用药规律最常配伍可视化|附代码数据
    最近我们被客户要求撰写关于关联规则的研究报告,包括一些图形和统计输出。本文帮助客户运用关联规则方法分析中医治疗脑出血方剂,用Apriori模型挖掘所选用的主要药物及其用药规律,为临床治疗脑出血提供参考脑出血急性期用药数据读取数据a_df3=read.xlsx("脑出血急性期用药最常配伍......
  • postman接口关联
    在使用postman做接口测试时,有时候后面的接口需要获取前面接口的某一个返回值做为请求参数,这时就可以使用关联。如从A接口提取出a字段的值,供B接口的b字段使用。一个接口的返回报文如下:{"retCode":"0","retMsg":"请求成功","rspData":{"status":"1",......