首页 > 其他分享 >生产实习Day4 ---- 电商日志数据分析(问题1--统计页面浏览量(每行记录就是一次浏览))

生产实习Day4 ---- 电商日志数据分析(问题1--统计页面浏览量(每行记录就是一次浏览))

时间:2024-06-10 12:04:58浏览次数:13  
标签:浏览量 -- hadoop job org apache import 电商 class

文章目录

项目需求

根据电商日志文件,分析:

  1. 统计页面浏览量(每行记录就是一次浏览)
  2. 统计各个省份的浏览量 (需要解析IP)
  3. 日志的ETL操作(ETL:数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程)

整体架构流程

在这里插入图片描述
WebLogPVMapper.java ------- 为每个处理的行输出一个固定的键值对。
WebLogPVReducer.java ------- 对Map阶段输出的每个键(在这个例子中是省份)对应的一系列值(访问量)进行累加,然后输出每个键对应的总访问量。
WebLogPVMapReduce.java ------- 处理Web日志数据,计算页面浏览量(Page Views, PV)。它通过MapReduce的方式,将日志数据映射(Map)到不同的页面上,并在Reduce阶段进行汇总。

数据集

数据集
链接:https://pan.baidu.com/s/1AtFZqf7pfQk_Tlh-HiFX4w?pwd=r5r8
提取码:r5r8
在这里插入图片描述

  • 日志字段说明:
      第二个字段:url
      第十四个字段:IP
      第十八个字段:time
  • 字段解析
      IP—>国家、省份、城市
      url—>页面ID

实验步骤

  1. 先在虚拟机上创建一个文件夹存放输入数据集文件

创建一个输入数据的文件夹

mkdir input
  1. 用xtfp将数据集传到虚拟机上
    在这里插入图片描述
    在这里插入图片描述
  2. 启动Hadoop集群
    启动HDFS
    在这里插入图片描述

启动yarn
在这里插入图片描述
在这里插入图片描述

  1. 将数据传到HDFS上

在HDFS上创建存放的文件夹

hdfs  dfs -mkdir  -p /trackinfo/input

在这里插入图片描述

hdfs dfs -put input.txt /trackinfo/input

在这里插入图片描述
在这里插入图片描述

  1. 运行代码
    在这里插入图片描述

在这里插入图片描述

  1. 查看结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

代码

WebLogPVMapper.java
package com.weblogpv;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class WebLogPVMapper extends Mapper <LongWritable, Text, Text, IntWritable>>{
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       //分割每一行内容,
       context.write(new Text("line"), new IntWritable(1));
   }
}
WebLogPvReducer.java
package com.weblogpv;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
public class WebLogPvReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
   private IntWritable outputValue = new IntWritable(  );
   @Override
   protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException, IOException {
       //key :省份;  value:<1,1,1,1>
       int sum = 0;
       for (IntWritable value:values) {
           sum+= value.get();
       }
       outputValue.set( sum );
       context.write( key,outputValue );
   }
}
WebLogPVMapReduce.java
package com.weblogpv;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;


public class WebLogPVMapReduce extends Configured implements Tool {
   @Override
   public int run(String[] args) throws Exception {

       //2、创建job
       Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" );
       //设置job运行的主类
       job.setJarByClass( WebLogPVMapReduce.class);

       //设置Job
       //a、input
       job.setInputFormatClass(TextInputFormat.class);
       Path inputPath = new Path("hdfs://hadoop102:8020/trackinfo/input/trackinfo.txt");
       TextInputFormat.setInputPaths( job, inputPath);

       //b、map
       job.setMapperClass( WebLogPVMapper.class );
       job.setMapOutputKeyClass( Text.class );
       job.setMapOutputValueClass( IntWritable.class );

       //c.partitioner
       job.setNumReduceTasks(1);

       //d、reduce
       job.setReducerClass( WebLogPvReducer.class);
       job.setOutputKeyClass( Text.class  );
       job.setOutputValueClass( IntWritable.class );

       //e、output
       job.setOutputFormatClass(TextOutputFormat.class);
       Path outputPath = new Path("hdfs://hadoop102:8020/trackinfo/output");

       //如果输出目录存在,先删除
       FileSystem hdfs = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration());
       if(hdfs.exists(outputPath)){
           hdfs.delete( outputPath,true );
       }
       TextOutputFormat.setOutputPath( job,outputPath );

       //第四步,提交job
       boolean isSuccess = job.waitForCompletion( true );

       return isSuccess?0:1 ;
   }


   public static void main(String[] args) {
       Configuration configuration = new Configuration();
       ///public static int run(Configuration conf, Tool tool, String[] args)
       try {
           int  status =  ToolRunner.run( configuration,new WebLogPVMapReduce(),args );
           System.exit( status );
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

代码细节

WebLogPVMapper.java详细解释
package com.weblogpv;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class WebLogPVMapper extends Mapper <LongWritable, Text, Text, IntWritable>>{
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       //分割每一行内容,
       context.write(new Text("line"), new IntWritable(1));
   }
}

这个Mapper的目的是处理日志数据,特别是网页日志数据。

  1. 类定义
public class WebLogPVMapper extends Mapper<LongWritable, Text, Text, IntWritable>

这行代码定义了一个名为WebLogPVMapper的公共类,它扩展了Mapper类。这个类的泛型参数<LongWritable, Text, Text, IntWritable>指定了Mapper的输入和输出类型。这里,Mapper接收LongWritableText类型的输入,并输出TextIntWritable类型的键值对。

  1. map方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   //分割每一行内容,
   context.write(new Text("line"), new IntWritable(1));
}

这个方法是Mapper的核心。它被Hadoop框架调用来处理每个输入项。在这个例子中,map方法接收两个参数:keyvaluekey是一个LongWritable对象,通常表示数据的偏移量;value是一个Text对象,表示实际的数据行。

在这个特定的实现中,map方法并没有实际处理value中的数据。它只是简单地将字符串"line"作为键,数字1作为值写入上下文(context)。这意味着对于每个输入行,这个Mapper都会输出一个键值对,键是"line",值是1。

WebLogPvReducer.java详细解释
package com.weblogpv;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
public class WebLogPvReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
   private IntWritable outputValue = new IntWritable(  );
   @Override
   protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException, IOException {
       //key :省份;  value:<1,1,1,1>
       int sum = 0;
       for (IntWritable value:values) {
           sum+= value.get();
       }
       outputValue.set( sum );
       context.write( key,outputValue );
   }
}

用于处理Map阶段输出的数据,并生成最终的输出结果。

  1. 类定义:
public class WebLogPvReducer extends Reducer<Text,IntWritable,Text,IntWritable> 

这行代码定义了一个名为WebLogPvReducer的类,它继承自Hadoop的Reducer类。这个Reducer处理的输入键值对类型分别是TextIntWritable,输出键值对类型也是TextIntWritable

  1. 成员变量
private IntWritable outputValue = new IntWritable(); 

这行代码声明了一个IntWritable类型的成员变量outputValue,用于存储每个键对应的累加结果。

  1. 重写reduce方法:
protected void reduce(Text key, Iterable<IntWritable> values, Context context) 

这是Reducer的核心方法,用于处理Map阶段的输出。
int sum = 0; 初始化一个整数sum,用于累加值。
for (IntWritable value : values) { sum += value.get(); } 这个循环遍历所有与当前键(key)关联的值,并将它们累加起来。
outputValue.set(sum); 将累加后的总和设置到outputValue中。
context.write(key, outputValue); 将处理后的键值对写入到输出中。

WebLogPVMapReduce.java详细解释
package com.weblogpv;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;


public class WebLogPVMapReduce extends Configured implements Tool {
   @Override
   public int run(String[] args) throws Exception {

       //2、创建job
       Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" );
       //设置job运行的主类
       job.setJarByClass( WebLogPVMapReduce.class);

       //设置Job
       //a、input
       job.setInputFormatClass(TextInputFormat.class);
       Path inputPath = new Path("hdfs://hadoop102:8020/trackinfo/input/trackinfo.txt");
       TextInputFormat.setInputPaths( job, inputPath);

       //b、map
       job.setMapperClass( WebLogPVMapper.class );
       job.setMapOutputKeyClass( Text.class );
       job.setMapOutputValueClass( IntWritable.class );

       //c.partitioner
       job.setNumReduceTasks(1);

       //d、reduce
       job.setReducerClass( WebLogPvReducer.class);
       job.setOutputKeyClass( Text.class  );
       job.setOutputValueClass( IntWritable.class );

       //e、output
       job.setOutputFormatClass(TextOutputFormat.class);
       Path outputPath = new Path("hdfs://hadoop102:8020/trackinfo/output");

       //如果输出目录存在,先删除
       FileSystem hdfs = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration());
       if(hdfs.exists(outputPath)){
           hdfs.delete( outputPath,true );
       }
       TextOutputFormat.setOutputPath( job,outputPath );

       //第四步,提交job
       boolean isSuccess = job.waitForCompletion( true );

       return isSuccess?0:1 ;
   }


   public static void main(String[] args) {
       Configuration configuration = new Configuration();
       ///public static int run(Configuration conf, Tool tool, String[] args)
       try {
           int  status =  ToolRunner.run( configuration,new WebLogPVMapReduce(),args );
           System.exit( status );
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

处理Web日志数据,计算页面浏览量(Page Views, PV)。它通过MapReduce的方式,将日志数据映射(Map)到不同的页面上,并在Reduce阶段进行汇总。

  1. 定义WebLogPVMapReduce类:
public class WebLogPVMapReduce extends Configured implements Tool

这个类继承自Configured并实现了Tool接口,这是编写Hadoop MapReduce程序的标准做法。

  1. 重写run方法:
public int run(String[] args) throws Exception {

       //2、创建job
       Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" );
       //设置job运行的主类
       job.setJarByClass( WebLogPVMapReduce.class);

       //设置Job
       //a、input
       job.setInputFormatClass(TextInputFormat.class);
       Path inputPath = new Path("hdfs://hadoop102:8020/trackinfo/input/trackinfo.txt");
       TextInputFormat.setInputPaths( job, inputPath);

       //b、map
       job.setMapperClass( WebLogPVMapper.class );
       job.setMapOutputKeyClass( Text.class );
       job.setMapOutputValueClass( IntWritable.class );

       //c.partitioner
       job.setNumReduceTasks(1);

       //d、reduce
       job.setReducerClass( WebLogPvReducer.class);
       job.setOutputKeyClass( Text.class  );
       job.setOutputValueClass( IntWritable.class );

       //e、output
       job.setOutputFormatClass(TextOutputFormat.class);
       Path outputPath = new Path("hdfs://hadoop102:8020/trackinfo/output");

       //如果输出目录存在,先删除
       FileSystem hdfs = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration());
       if(hdfs.exists(outputPath)){
           hdfs.delete( outputPath,true );
       }
       TextOutputFormat.setOutputPath( job,outputPath );

       //第四步,提交job
       boolean isSuccess = job.waitForCompletion( true );

       return isSuccess?0:1 ;
   }

run方法是MapReduce程序的入口点。它配置并启动MapReduce作业。
创建和配置作业:
创建一个Job实例,并设置作业的名称为"WebLogUVMapReduce"。
设置作业的jar文件和主类。
配置输入格式为TextInputFormat,并指定输入路径。
设置Mapper类为WebLogPVMapper,并定义Map阶段的输出键值类型。
设置Reducer类为WebLogPvReducer,并定义Reduce阶段的输出键值类型。
配置输出格式为TextOutputFormat,并指定输出路径。

  1. 处理输出目录:

在开始作业之前,检查输出目录是否存在,如果存在,则删除该目录。

  1. 提交作业:

调用job.waitForCompletion(true)来提交作业,并等待其完成。

  1. 主方法:
public static void main(String[] args) {
       Configuration configuration = new Configuration();
       ///public static int run(Configuration conf, Tool tool, String[] args)
       try {
           int  status =  ToolRunner.run( configuration,new WebLogPVMapReduce(),args );
           System.exit( status );
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

在main方法中,创建一个Configuration对象,并使用ToolRunner.run来启动MapReduce作业。

代码已上传至Gitee
https://gitee.com/lijiarui-1/test/tree/master/Test_project2

标签:浏览量,--,hadoop,job,org,apache,import,电商,class
From: https://blog.csdn.net/Instinct__121/article/details/139567689

相关文章

  • 【Redis】Redis实现高性能的原因
    Redis作为一个单线程的数据库,能够达到高性能的关键在于其设计上的几个方面。以下是Redis快速的几个主要原因:1.内存存储Redis是一个内存数据库,所有数据都存储在内存中。内存的访问速度远远快于磁盘,所以这使得读写操作非常快速。2.简单的数据结构Redis提供了一些基......
  • 【Redis】Redis的数据过期策略有哪些
    Redis提供了多种数据过期策略,用于管理存储在其中的数据的生命周期。数据过期策略决定了何时以及如何删除过期的数据。主要的策略有以下几种:1.定时删除(TimedDeletion)在设置键的过期时间时,Redis会创建一个定时器,当过期时间到达时自动删除该键。这种方法的优点是删除操作......
  • C++多态详解:静态多态与动态多态的实现
    C++中的多态是面向对象编程的重要特性,允许相同的接口调用不同的实现。多态性可以分为两类:静态多态和动态多态。1.静态多态(编译时多态)(1)函数重载(FunctionOverloading):函数重载是一种静态多态,允许同一个函数名在同一作用域内具有不同的参数列表。这些不同的版本在编译时......
  • 深入理解 C++ 动态内存管理:new vs malloc
    概述new/delete 是C++的关键字,需要编译器支持。malloc/free 是库函数,需要头文件支持。使用 new 申请内存分配时无需指定内存块大小,编译器会自动计算。而 malloc 需要明确指定所需内存的大小。new 会返回对象类型的指针,类型安全。而 malloc 返回 void*,需要进行强制......
  • 【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 机场航班调度程序(100分) - 三语言A
    ......
  • CorelDRAW2024注册码激活码分享,设计师的首选神器!
    【CorelDRAWGraphicsSuite2024】是一款集图形设计、照片编辑和矢量动画于一体的全面图形套件。这款软件因其用户友好的界面、强大的功能集以及支持多种文件格式而广受专业人士和业余爱好者的欢迎。它提供了创新的设计工具,如高级向量插图、页面布局、照片编辑等,旨在提升设计效......
  • Leetcode-007
    题目7.整数反转难度:中等给你一个32位的有符号整数x,返回将x中的数字部分反转后的结果。如果反转后整数超过32位的有符号整数的范围[−231,231−1],就返回0。假设环境不允许存储64位整数(有符号或无符号)。示例1:输入:x=123输出:321示例2:输入:x=-123输......
  • 文件IO——用read与write实现图片拷贝
    1#include<stdio.h>2#include<sys/types.h>3#include<sys/stat.h>4#include<fcntl.h>5#include<unistd.h>6#include<string.h>7intmain(intargc,constchar*argv[])8{910intpath=......
  • 常微分方程
    虽然这部分在笔记本上只有短短三页,但总是记不清公式,所以写下来,随时参考规定\(\int{p(x)\mathrm{d}x}\)不含\(C\)一阶微分方程一、变量分离方程\[\frac{\mathrmdy}{\mathrmdx}=\frac{X(x)}{Y(y)}\]解:移项积分\(\int{Y(y)}\mathrm{d}y=\int{X(x)}\mathrm{d}x+C\)二、......
  • python-数字黑洞
    [题目描述]给定一个三位数,要求各位不能相同。例如,352是符合要求的,112是不符合要求的。将这个三位数的三个数字重新排列,得到的最大的数,减去得到的最小的数,形成一个新的三位数。对这个新的三位数可以重复上述过程。神奇的是,最终一定会得到495!试试看,重新排列352,得到的最大数为......