文章目录
项目需求
根据电商日志文件,分析:
- 统计页面浏览量(每行记录就是一次浏览)
- 统计各个省份的浏览量 (需要解析IP)
- 日志的ETL操作(ETL:数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程)
整体架构流程
WebLogPVMapper.java ------- 为每个处理的行输出一个固定的键值对。
WebLogPVReducer.java ------- 对Map阶段输出的每个键(在这个例子中是省份)对应的一系列值(访问量)进行累加,然后输出每个键对应的总访问量。
WebLogPVMapReduce.java ------- 处理Web日志数据,计算页面浏览量(Page Views, PV)。它通过MapReduce的方式,将日志数据映射(Map)到不同的页面上,并在Reduce阶段进行汇总。
数据集
数据集
链接:https://pan.baidu.com/s/1AtFZqf7pfQk_Tlh-HiFX4w?pwd=r5r8
提取码:r5r8
- 日志字段说明:
第二个字段:url
第十四个字段:IP
第十八个字段:time - 字段解析
IP—>国家、省份、城市
url—>页面ID
实验步骤
- 先在虚拟机上创建一个文件夹存放输入数据集文件
创建一个输入数据的文件夹
mkdir input
- 用xtfp将数据集传到虚拟机上
- 启动Hadoop集群
启动HDFS
启动yarn
- 将数据传到HDFS上
在HDFS上创建存放的文件夹
hdfs dfs -mkdir -p /trackinfo/input
hdfs dfs -put input.txt /trackinfo/input
- 运行代码
- 查看结果
代码
WebLogPVMapper.java
package com.weblogpv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WebLogPVMapper extends Mapper <LongWritable, Text, Text, IntWritable>>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割每一行内容, context.write(new Text("line"), new IntWritable(1)); } }
WebLogPvReducer.java
package com.weblogpv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WebLogPvReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable outputValue = new IntWritable( ); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException, IOException { //key :省份; value:<1,1,1,1> int sum = 0; for (IntWritable value:values) { sum+= value.get(); } outputValue.set( sum ); context.write( key,outputValue ); } }
WebLogPVMapReduce.java
package com.weblogpv; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.net.URI; public class WebLogPVMapReduce extends Configured implements Tool { @Override public int run(String[] args) throws Exception { //2、创建job Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" ); //设置job运行的主类 job.setJarByClass( WebLogPVMapReduce.class); //设置Job //a、input job.setInputFormatClass(TextInputFormat.class); Path inputPath = new Path("hdfs://hadoop102:8020/trackinfo/input/trackinfo.txt"); TextInputFormat.setInputPaths( job, inputPath); //b、map job.setMapperClass( WebLogPVMapper.class ); job.setMapOutputKeyClass( Text.class ); job.setMapOutputValueClass( IntWritable.class ); //c.partitioner job.setNumReduceTasks(1); //d、reduce job.setReducerClass( WebLogPvReducer.class); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( IntWritable.class ); //e、output job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path("hdfs://hadoop102:8020/trackinfo/output"); //如果输出目录存在,先删除 FileSystem hdfs = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration()); if(hdfs.exists(outputPath)){ hdfs.delete( outputPath,true ); } TextOutputFormat.setOutputPath( job,outputPath ); //第四步,提交job boolean isSuccess = job.waitForCompletion( true ); return isSuccess?0:1 ; } public static void main(String[] args) { Configuration configuration = new Configuration(); ///public static int run(Configuration conf, Tool tool, String[] args) try { int status = ToolRunner.run( configuration,new WebLogPVMapReduce(),args ); System.exit( status ); } catch (Exception e) { e.printStackTrace(); } } }
代码细节
WebLogPVMapper.java详细解释
package com.weblogpv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WebLogPVMapper extends Mapper <LongWritable, Text, Text, IntWritable>>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割每一行内容, context.write(new Text("line"), new IntWritable(1)); } }
这个
Mapper
的目的是处理日志数据,特别是网页日志数据。
- 类定义
public class WebLogPVMapper extends Mapper<LongWritable, Text, Text, IntWritable>
这行代码定义了一个名为
WebLogPVMapper
的公共类,它扩展了Mapper类。这个类的泛型参数<LongWritable, Text, Text, IntWritable>
指定了Mapper的输入和输出类型。这里,Mapper接收LongWritable
和Text
类型的输入,并输出Text
和IntWritable
类型的键值对。
- map方法
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割每一行内容, context.write(new Text("line"), new IntWritable(1)); }
这个方法是Mapper的核心。它被Hadoop框架调用来处理每个输入项。在这个例子中,map方法接收两个参数:
key
和value
。key
是一个LongWritable
对象,通常表示数据的偏移量;value是一个Text
对象,表示实际的数据行。在这个特定的实现中,map方法并没有实际处理value中的数据。它只是简单地将
字符串"line"
作为键,数字1作为值写入上下文(context)。这意味着对于每个输入行,这个Mapper都会输出一个键值对,键是"line",值是1。
WebLogPvReducer.java详细解释
package com.weblogpv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WebLogPvReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable outputValue = new IntWritable( ); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException, IOException { //key :省份; value:<1,1,1,1> int sum = 0; for (IntWritable value:values) { sum+= value.get(); } outputValue.set( sum ); context.write( key,outputValue ); } }
用于处理Map阶段输出的数据,并生成最终的输出结果。
- 类定义:
public class WebLogPvReducer extends Reducer<Text,IntWritable,Text,IntWritable>
这行代码定义了一个名为WebLogPvReducer的类,它继承自Hadoop的Reducer类。这个Reducer处理的输入键值对类型分别是
Text
和IntWritable
,输出键值对类型也是Text
和IntWritable
。
- 成员变量
private IntWritable outputValue = new IntWritable();
这行代码声明了一个IntWritable类型的成员变量
outputValue
,用于存储每个键对应的累加结果。
- 重写reduce方法:
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
这是Reducer的核心方法,用于处理Map阶段的输出。
int sum = 0; 初始化一个整数sum,用于累加值。
for (IntWritable value : values) { sum += value.get(); } 这个循环遍历所有与当前键(key)关联的值,并将它们累加起来。
outputValue.set(sum); 将累加后的总和设置到outputValue中。
context.write(key, outputValue); 将处理后的键值对写入到输出中。
WebLogPVMapReduce.java详细解释
package com.weblogpv; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.net.URI; public class WebLogPVMapReduce extends Configured implements Tool { @Override public int run(String[] args) throws Exception { //2、创建job Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" ); //设置job运行的主类 job.setJarByClass( WebLogPVMapReduce.class); //设置Job //a、input job.setInputFormatClass(TextInputFormat.class); Path inputPath = new Path("hdfs://hadoop102:8020/trackinfo/input/trackinfo.txt"); TextInputFormat.setInputPaths( job, inputPath); //b、map job.setMapperClass( WebLogPVMapper.class ); job.setMapOutputKeyClass( Text.class ); job.setMapOutputValueClass( IntWritable.class ); //c.partitioner job.setNumReduceTasks(1); //d、reduce job.setReducerClass( WebLogPvReducer.class); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( IntWritable.class ); //e、output job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path("hdfs://hadoop102:8020/trackinfo/output"); //如果输出目录存在,先删除 FileSystem hdfs = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration()); if(hdfs.exists(outputPath)){ hdfs.delete( outputPath,true ); } TextOutputFormat.setOutputPath( job,outputPath ); //第四步,提交job boolean isSuccess = job.waitForCompletion( true ); return isSuccess?0:1 ; } public static void main(String[] args) { Configuration configuration = new Configuration(); ///public static int run(Configuration conf, Tool tool, String[] args) try { int status = ToolRunner.run( configuration,new WebLogPVMapReduce(),args ); System.exit( status ); } catch (Exception e) { e.printStackTrace(); } } }
处理Web日志数据,计算页面浏览量(Page Views, PV)。它通过MapReduce的方式,将日志数据映射(Map)到不同的页面上,并在Reduce阶段进行汇总。
- 定义WebLogPVMapReduce类:
public class WebLogPVMapReduce extends Configured implements Tool
这个类继承自Configured并实现了Tool接口,这是编写Hadoop MapReduce程序的标准做法。
- 重写run方法:
public int run(String[] args) throws Exception { //2、创建job Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" ); //设置job运行的主类 job.setJarByClass( WebLogPVMapReduce.class); //设置Job //a、input job.setInputFormatClass(TextInputFormat.class); Path inputPath = new Path("hdfs://hadoop102:8020/trackinfo/input/trackinfo.txt"); TextInputFormat.setInputPaths( job, inputPath); //b、map job.setMapperClass( WebLogPVMapper.class ); job.setMapOutputKeyClass( Text.class ); job.setMapOutputValueClass( IntWritable.class ); //c.partitioner job.setNumReduceTasks(1); //d、reduce job.setReducerClass( WebLogPvReducer.class); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( IntWritable.class ); //e、output job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path("hdfs://hadoop102:8020/trackinfo/output"); //如果输出目录存在,先删除 FileSystem hdfs = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration()); if(hdfs.exists(outputPath)){ hdfs.delete( outputPath,true ); } TextOutputFormat.setOutputPath( job,outputPath ); //第四步,提交job boolean isSuccess = job.waitForCompletion( true ); return isSuccess?0:1 ; }
run
方法是MapReduce程序的入口点。它配置并启动MapReduce作业。
创建和配置作业:
创建一个Job实例,并设置作业的名称为"WebLogUVMapReduce"。
设置作业的jar文件和主类。
配置输入格式为TextInputFormat
,并指定输入路径。
设置Mapper类为WebLogPVMapper
,并定义Map阶段的输出键值类型。
设置Reducer类为WebLogPvReducer
,并定义Reduce阶段的输出键值类型。
配置输出格式为TextOutputFormat
,并指定输出路径。
- 处理输出目录:
在开始作业之前,检查输出目录是否存在,如果存在,则删除该目录。
- 提交作业:
调用job.waitForCompletion(true)来提交作业,并等待其完成。
- 主方法:
public static void main(String[] args) { Configuration configuration = new Configuration(); ///public static int run(Configuration conf, Tool tool, String[] args) try { int status = ToolRunner.run( configuration,new WebLogPVMapReduce(),args ); System.exit( status ); } catch (Exception e) { e.printStackTrace(); } }
在main方法中,创建一个
Configuration
对象,并使用ToolRunner.run
来启动MapReduce作业。
标签:浏览量,--,hadoop,job,org,apache,import,电商,class From: https://blog.csdn.net/Instinct__121/article/details/139567689代码已上传至Gitee
https://gitee.com/lijiarui-1/test/tree/master/Test_project2