首页 > 其他分享 >MapReduce实战之日志清洗案例

MapReduce实战之日志清洗案例

时间:2022-11-11 11:05:21浏览次数:37  
标签:String hadoop MapReduce public org apache import 清洗 日志


简单解析版

1)需求:

去除日志中字段长度小于等于11的日志。

2)输入数据

      数据有点大

3)实现代码:

(1)编写LogMapper

package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

publicclass LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

Text k = new Text();

@Override
protectedvoid map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// 1 获取1行数据
String line = value.toString();

// 2 解析日志
boolean result = parseLog(line,context);

// 3 日志不合法退出
if (!result) {
return;
}

// 4 设置key
k.set(line);

// 5 写出数据
context.write(k, NullWritable.get());
}

// 2 解析日志
privateboolean parseLog(String line, Context context) {
// 1 截取
String[] fields = line.split(" ");

// 2 日志长度大于11的为合法
if (fields.length > 11) {
// 系统计数器
context.getCounter("map", "true").increment(1);
returntrue;
}else {
context.getCounter("map", "false").increment(1);
returnfalse;
}
}
}

(2)编写LogDriver

package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass LogDriver {

publicstaticvoid main(String[] args) throws Exception {

args = new String[] { "e:/input/inputlog", "e:/output1" };

// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 加载jar包
job.setJarByClass(LogDriver.class);

// 3 关联map
job.setMapperClass(LogMapper.class);

// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 设置reducetask个数为0
job.setNumReduceTasks(0);

// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交
job.waitForCompletion(true);
}
}

复杂解析版

1)需求:

对web访问日志中的各字段识别切分

去除日志中不合法的记录

根据统计需求,生成各类访问请求过滤数据

2)输入数据

    数据有点大

3)实现代码:

(1)定义一个bean,用来记录日志数据中的各数据字段

package com.atguigu.mapreduce.log;

public class LogBean {
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息

private boolean valid = true;// 判断数据是否合法

public String getRemote_addr() {
return remote_addr;
}

public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}

public String getRemote_user() {
return remote_user;
}

public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}

public String getTime_local() {
return time_local;
}

public void setTime_local(String time_local) {
this.time_local = time_local;
}

public String getRequest() {
return request;
}

public void setRequest(String request) {
this.request = request;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public String getBody_bytes_sent() {
return body_bytes_sent;
}

public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}

public String getHttp_referer() {
return http_referer;
}

public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}

public String getHttp_user_agent() {
return http_user_agent;
}

public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}

public boolean isValid() {
return valid;
}

public void setValid(boolean valid) {
this.valid = valid;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.remote_addr);
sb.append("\001").append(this.remote_user);
sb.append("\001").append(this.time_local);
sb.append("\001").append(this.request);
sb.append("\001").append(this.status);
sb.append("\001").append(this.body_bytes_sent);
sb.append("\001").append(this.http_referer);
sb.append("\001").append(this.http_user_agent);

return sb.toString();
}
}

(2)编写LogMapper程序

package com.atguigu.mapreduce.log;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取1行
String line = value.toString();

// 2 解析日志是否合法
LogBean bean = pressLog(line);

if (!bean.isValid()) {
return;
}

k.set(bean.toString());

// 3 输出
context.write(k, NullWritable.get());
}

// 解析日志
private LogBean pressLog(String line) {
LogBean logBean = new LogBean();

// 1 截取
String[] fields = line.split(" ");

if (fields.length > 11) {
// 2封装数据
logBean.setRemote_addr(fields[0]);
logBean.setRemote_user(fields[1]);
logBean.setTime_local(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBody_bytes_sent(fields[9]);
logBean.setHttp_referer(fields[10]);

if (fields.length > 12) {
logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
}else {
logBean.setHttp_user_agent(fields[11]);
}

// 大于400,HTTP错误
if (Integer.parseInt(logBean.getStatus()) >= 400) {
logBean.setValid(false);
}
}else {
logBean.setValid(false);
}

return logBean;
}
}

(3)编写LogDriver程序

package com.atguigu.mapreduce.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass LogDriver {
publicstaticvoid main(String[] args) throws Exception {
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 加载jar包
job.setJarByClass(LogDriver.class);

// 3 关联map
job.setMapperClass(LogMapper.class);

// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交
job.waitForCompletion(true);
}
}

标签:String,hadoop,MapReduce,public,org,apache,import,清洗,日志
From: https://blog.51cto.com/u_12654321/5843250

相关文章

  • MapReduce实战之 MapReduce中多表合并案例
     MapReduce中多表合并案例1)需求:订单数据表t_order:idpidamount1001011100202210030331001   01   11002   02   21003   03   31004   01 ......
  • MapReduce实战之倒排索引案例(多job串联)
    0)需求:有大量的文本(文档、网页),需要建立搜索索引输出数据:a:atguigupingpingatguigussatguigussb:atguigupingpingatguigupingpingpingpingssc:atguigussatguigup......
  • MapReduce实战之压缩/解压缩案例
    1数据流的压缩和解压缩CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStr......
  • .net 温故知新:【9】.NET日志记录 ILogger使用和原理
    日志日志作为我们程序记录的“黑匣子”不论什么系统都应该使用到的,比如我们经常使用的log4net就是第三方日志记录提供程序。.NET支持使用各种内置和第三方日志记录提供程......
  • serilog 动态更新日志级别
    使用这个库,更新配置文件,就可以动态更新日志输出级别。newLoggerConfiguration().ReadFrom.Configuration(hostingContext.Configuration)这个Configuration定义在这儿......
  • 如何清洗DDOS呢?
      流量清洗服务是提供给租用IDC服务的政企客户,针对其发起的DOS/DDOS的监控、告警和防护的一种网络安全服务。  流量清洗针对互联网络上有着高密度依赖性的商业客户和......
  • SpringBoot自定义日志Starter(二十五)
    即使有一天,我放弃了自己的身体,也请你,不要放弃我,我亲爱的灵魂.上一章简单介绍了SpringBoot自定义Starter(二十四),如果没有看过,​​请观看上一章​​一.AOP实现日志功能......
  • logback-spring.xml文件详解,日志优化
    依赖<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.3</version></dependency><dependency><groupId......
  • Java输出SSL握手日志和查看cacerts路径
    在JAVA启动时添加下面的VM参数就可以启动握手日志了!!!-Djavax.net.debug=all另外,在debug日志中,有一个trustStoreis关键字,根据这个可以找到使用的是哪个truststor......
  • SQL数据分析,数据清洗
    获取数据后,对数据的清洗工作必不可少,常用的数据清洗方法主要有缺失值填充、数值替换、数据类型转换、数据分列、重复值处理等,清洗的数据结果直接影响最后数据分析的结果,一个......