首页 > 其他分享 >今日总结

今日总结

时间:2024-03-04 23:33:25浏览次数:23  
标签:总结 map getOrElse val StringType StructField time 今日

今天

配置pom.xml

<repositories>
<repository>
<id>aliyunmaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.2.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.8</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.2.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.2.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>

下载Scala插件:

file->setting->plugins

 

2.数据清洗
可以通过SparkSql中DataFrame的数据抽象,将数据存放在Mysql中,整个日志的RDD格式走向变化过程可理解为:

RDD[String]->RDD[Array[String]]->RDD[Row]->DataFrame->存入Mysql

在数据清洗前,需要了解Web日志的规格设置,本日志数据与数据之间是通过"\t"也就是Tab键位分隔开的,下面是一条常规的Web日志,其规格如下

event_time = 2018-09-04T20:27:31+08:00
url = http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157
method = GET
status = 200
sip = 192.168.168.64
user_uip = -
action_prepend = -
action_client = Apache-HttpClient/4.1.2 (java 1.5)
1)将RDD[String]转换为RDD[Row]的形式,并且过滤字段数少于8的日志

val linesRDD = sc.textFile("C:/Users/Lenovo/Desktop/Working/Python/data/test.log")
import spark.implicits._

val line1 = linesRDD.map(x => x.split("\t"))
//line1.foreach(println)
val rdd = line1
.filter(x => x.length == 8)
.map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim))
//rdd.foreach(println)
2)将RDD[Row]转换为DataFrame,建立初步映射关系

// 建立RDD和表格的映射关系
val schema = StructType(Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
))
val orgDF = spark.createDataFrame(rdd, schema)
// orgDF.show(5)
3)将url按照"&"和"="切割字段

//去重,过滤掉状态码非200,过滤时间为空
//distinct是根据每一条数据进行完整内容的比对和去重,dropDuplicates可以根据指定的字段进行去重。
val ds1 = orgDF.dropDuplicates("event_time", "url")
.filter(x => x(3) == "200")
.filter(x => StringUtils.isNotEmpty(x(0).toString))

//将url按照"&"和"="切割
//userSID
//userUIP
//actionClient
//actionBegin
//actionEnd
//actionType
//actionPrepend
//actionTest
//ifEquipment
//actionName
//id
//progress进行切割

//以map的形式建立内部映射关系
val dfDetail = ds1.map(row => {
val urlArray = row.getAs[String]("url").split("\\?")
var map = Map("params" -> "null")
if (urlArray.length == 2) {
map = urlArray(1).split("&")
.map(x => x.split("="))
.filter(_.length == 2)
.map(x => (x(0), x(1)))
.toMap
}
(
//map为url中字段,row为原DataFrame字段
row.getAs[String]("event_time"),
row.getAs[String]("user_uip"),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
map.getOrElse("actionBegin", ""),
map.getOrElse("actionEnd", ""),
map.getOrElse("userUID", ""),
map.getOrElse("userSID", ""),
map.getOrElse("userUIP", ""),
map.getOrElse("actionClient", ""),
map.getOrElse("actionType", ""),
map.getOrElse("actionPrepend", ""),
map.getOrElse("actionTest", ""),
map.getOrElse("ifEquipment", ""),
map.getOrElse("actionName", ""),
map.getOrElse("progress", ""),
map.getOrElse("id", "")
)
}).toDF()
// dfDetail.show(5)

4)重新组建表头,将原DataFrame数据全部平摊,并存入数据库

val detailRDD = dfDetail.rdd
val detailSchema = StructType(Array(
StructField("event_time", StringType),
StructField("user_uip", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("actionBegin", StringType),
StructField("actionEnd", StringType),
StructField("userUID", StringType),
StructField("userSID", StringType),
StructField("userUIP", StringType),
StructField("actionClient", StringType),
StructField("actionType", StringType),
StructField("actionPrepend", StringType),
StructField("actionTest", StringType),
StructField("ifEquipment", StringType),
StructField("actionName", StringType),
StructField("progress", StringType),
StructField("id", StringType)
))

val detailDF = spark.createDataFrame(detailRDD, detailSchema)

// overwrite重写,append追加
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "******")
prop.put("driver","com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/python_db"
println("开始写入数据库")
detailDF.write.mode("overwrite").jdbc(url,"logDetail",prop)
println("完成写入数据库")

 

3.用户日留存分析
求出第n天的新增用户总数m
求出第n+1天登录与n天新增用户的交集的总数n
留存率=n/m*100%
1)求出注册和登录行为的数据表

val prop = new Properties()
prop.put("user", "root")
prop.put("password", "******")
prop.put("driver", "com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/python_db"
val dataFrame = spark.read.jdbc(url, "logdetail", prop)

//所有的注册用户信息(userID,register_time,注册行为)
val registerDF = dataFrame
.filter(dataFrame("actionName") === ("Registered"))
.select("userUID","event_time", "actionName")
.withColumnRenamed("event_time","register_time")
.withColumnRenamed("userUID","regUID")
// registerDF.show(5)
//原获取的日期格式为2018-09-04T20:27:31+08:00,只需要获取前10个字段(yyyy-mm-dd)
val registDF2 = registerDF
.select(registerDF("regUID"),registerDF("register_time")
.substr(1,10).as("register_date"),registerDF("actionName"))
.distinct()
// registDF2.show(5)


//所有的用户登录信息DF(userUID,signin_time,登录行为)
val signinDF = dataFrame.filter(dataFrame("actionName") === ("Signin"))
.select("userUID","event_time", "actionName")
.withColumnRenamed("event_time","signing_time")
.withColumnRenamed("userUID","signUID")
// signinDF.show(5)
val signiDF2 = signinDF
.select(signinDF("signUID"),signinDF("signing_time")
.substr(1,10).as("signing_date"),signinDF("actionName"))
.distinct()
// signiDF2.show(5)

2)求出第n和n+1天的交集总数n,第n天新增用户数m

//以inner方式将相同userUID加在一起
val joinDF = registDF2
.join(signiDF2,signiDF2("signUID") === registDF2("regUID"),joinType = "inner")
// joinDF.show(5)

//Spark内置的datediff函数求出第n和n+1天交集总数n
val frame = joinDF
.filter(datediff(joinDF("signing_date"),joinDF("register_date")) === 1)
.groupBy(joinDF("register_date")).count()
.withColumnRenamed("count","signcount")
// frame.show(5)

//过滤,只拿第n天和当天新增用户总数m
val frame1 = registDF2
.groupBy(registDF2("register_date")).count()
.withColumnRenamed("count","regcount")
// frame1.show(5)

3)留存率=n/m*100%

//将m和n放在一张表格中
val frame2 = frame
.join(frame1,"register_date")
frame2.show()

//新增列名留存率,数值为n/m,求出第n天的用户留存率
frame2.withColumn("留存率",frame2("signcount")/frame2("regcount"))
.show()

4.源代码:
DataClear.scala

package spark

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import java.util.Properties

object DataClear {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[1]").appName("DataClear").getOrCreate()
val sc = spark.sparkContext
val linesRDD = sc.textFile("C:/Users/Lenovo/Desktop/Working/Python/data/test.log")
import spark.implicits._
val line1 = linesRDD.map(x => x.split("\t"))
//line1.foreach(println)
val rdd = line1
.filter(x => x.length == 8)
.map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim))
//rdd.foreach(println)

// 建立RDD和表格的映射关系
val schema = StructType(Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
))
val orgDF = spark.createDataFrame(rdd, schema)
// orgDF.show(5)

//去重,过滤掉状态码非200,过滤时间为空
//distinct是根据每一条数据进行完整内容的比对和去重,dropDuplicates可以根据指定的字段进行去重。
val ds1 = orgDF.dropDuplicates("event_time", "url")
.filter(x => x(3) == "200")
.filter(x => StringUtils.isNotEmpty(x(0).toString))

//将url按照"&"以及"="切割,即按照userUID
//userSID
//userUIP
//actionClient
//actionBegin
//actionEnd
//actionType
//actionPrepend
//actionTest
//ifEquipment
//actionName
//id
//progress进行切割

val dfDetail = ds1.map(row => {
val urlArray = row.getAs[String]("url").split("\\?")
var map = Map("params" -> "null")
if (urlArray.length == 2) {
map = urlArray(1).split("&")
.map(x => x.split("="))
.filter(_.length == 2)
.map(x => (x(0), x(1)))
.toMap
}
(
row.getAs[String]("event_time"),
row.getAs[String]("user_uip"),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
map.getOrElse("actionBegin", ""),
map.getOrElse("actionEnd", ""),
map.getOrElse("userUID", ""),
map.getOrElse("userSID", ""),
map.getOrElse("userUIP", ""),
map.getOrElse("actionClient", ""),
map.getOrElse("actionType", ""),
map.getOrElse("actionPrepend", ""),
map.getOrElse("actionTest", ""),
map.getOrElse("ifEquipment", ""),
map.getOrElse("actionName", ""),
map.getOrElse("progress", ""),
map.getOrElse("id", "")

)
}).toDF()
// dfDetail.show(5)

val detailRDD = dfDetail.rdd
val detailSchema = StructType(Array(
StructField("event_time", StringType),
StructField("user_uip", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("actionBegin", StringType),
StructField("actionEnd", StringType),
StructField("userUID", StringType),
StructField("userSID", StringType),
StructField("userUIP", StringType),
StructField("actionClient", StringType),
StructField("actionType", StringType),
StructField("actionPrepend", StringType),
StructField("actionTest", StringType),
StructField("ifEquipment", StringType),
StructField("actionName", StringType),
StructField("progress", StringType),
StructField("id", StringType)
))

val detailDF = spark.createDataFrame(detailRDD, detailSchema)
detailDF.show(10)


// overwrite重写,append追加
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "******")
prop.put("driver","com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/python_db"
println("开始写入数据库")
detailDF.write.mode("overwrite").jdbc(url,"logDetail",prop)
println("完成写入数据库")

}
}

UserAnaylsis.scala

package spark

import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{datediff, unix_timestamp}

object UserAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("userAnalysis").master("local").getOrCreate()
val sc = spark.sparkContext
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "******")
prop.put("driver", "com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/python_db"
val dataFrame = spark.read.jdbc(url, "logdetail", prop)
dataFrame.show(10)

//所有的注册用户信息(userID,register_time,注册行为)
val registerDF = dataFrame.filter(dataFrame("actionName") === ("Registered"))
.select("userUID","event_time", "actionName")
.withColumnRenamed("event_time","register_time")
.withColumnRenamed("userUID","regUID")
// registerDF.show(5)
//原获取的日期格式为2018-09-04T20:27:31+08:00,只需要获取前10个字段(yyyy-mm-dd)
val registDF2 = registerDF
.select(registerDF("regUID"),registerDF("register_time")
.substr(1,10).as("register_date"),registerDF("actionName"))
.distinct()
// registDF2.show(5)


//所有的用户登录信息DF(userUID,signin_time,登录行为)
val signinDF = dataFrame.filter(dataFrame("actionName") === ("Signin"))
.select("userUID","event_time", "actionName")
.withColumnRenamed("event_time","signing_time")
.withColumnRenamed("userUID","signUID")
// signinDF.show(5)
val signiDF2 = signinDF
.select(signinDF("signUID"),signinDF("signing_time")
.substr(1,10).as("signing_date"),signinDF("actionName"))
.distinct()
// signiDF2.show(5)

//以inner方式将相同userUID加在一起
val joinDF = registDF2
.join(signiDF2,signiDF2("signUID") === registDF2("regUID"),joinType = "inner")
// joinDF.show(5)

//Spark内置的datediff函数求出第n和n+1天交集总数n
val frame = joinDF
.filter(datediff(joinDF("signing_date"),joinDF("register_date")) === 1)
.groupBy(joinDF("register_date")).count()
.withColumnRenamed("count","signcount")
// frame.show(5)

//过滤,只拿第n天和当天新增用户总数m
val frame1 = registDF2
.groupBy(registDF2("register_date")).count()
.withColumnRenamed("count","regcount")
// frame1.show(5)

//将m和n放在一张表格中
val frame2 = frame
.join(frame1,"register_date")
// frame2.show()

//新增列名留存率,数值为n/m,求出第n天的用户留存率
frame2.withColumn("留存率",frame2("signcount")/frame2("regcount"))
.show()

sc.stop()
}
}

————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/Legosnow/article/details/124169161

标签:总结,map,getOrElse,val,StringType,StructField,time,今日
From: https://www.cnblogs.com/zhaoyueheng/p/18053026

相关文章

  • html的总结
    1.2.2渲染引擎(了解)渲染引擎(浏览器内核):浏览器中专门对代码进行解析渲染的部分浏览器出品的公司不同,内在的揎染引也是不同的:浏览器内核不一样,渲染方式就会不同怎么做到统一的打开页面解决就是要有一个相同的web标准1.3.2Web标准的构成Web标准中分成三个构成:构成语言说明......
  • abc343比赛总结
    写在前面A简单,随便取两个值判一下,不过这道题的名字不吉利,叫什么WA啊?B简单,读入的时候判断一下是不是\(1\)就行了。C有点点难,题目不是那么好理解(尤其是英文不好的话)。虽然说\(N\le10^{18}\)但是仔细算一下其实只需要1e6的遍历一遍就够了,毕竟有个三次方。D......
  • 独立开发周记 #55:2 月总结&新 App 上架
    2024,第九周,0226-0303连续两个月没有点外卖,继续保持!2月数据总结下载量(只统计极简时钟)AppStore,下降17.73%GooglePlay,下降17.70%国内安卓市场,下降0.51%收入AppStore,下降3.3%GooglePlay,下降10.24%Admob,增长14.18%国内安卓市场,增长35.19%2月份都在忙安卓端......
  • TortoiseGit使用问题总结
    1、安装和汉化https://blog.csdn.net/qq_42889406/article/details/1087633882、基本使用和克隆https://zhuanlan.zhihu.com/p/3818512073、TortoiseGitPlink提示输入密码https://blog.csdn.net/lala1583165/article/details/108277387......
  • Java List常用方法Stream()简要总结&生成的List去重
    Java中,ArrayList应该是比较常用的一个对象了;那么它的stream()方法大家了解过吗?如果可以熟练使用stream()方法,就可以方便地获取list中元素的某个特定字段、对list中的元素进行筛选、或者把list转成map。参考:https://blog.csdn.net/BHSZZY/article/details/122860048如何获取不重......
  • MCU CY2BL总结
    1.CYT2BLaddressmap•4160KB(4032KB+128KB)ofcode-flash,usedinthesingle-ordual-bankmodebasedontheassociatedbitintheflashcontrolregisterSingle-bankmode-4160KBDual-bankmode-2080KBperbank•128KB(96KB+32KB)ofwork......
  • 代码随想录 第13天 | ● 239. 滑动窗口最大值 ● 347.前 K 个高频元素 ● 总结
    leetcode:239.滑动窗口最大值-力扣(LeetCode)思路:看了挺长时间才反应过来与暴力算法的区别。当遇到比上一个元素大的值时,将上一个元素剔除,小于时加入队列中,每次等于窗口长度时将顶端也就是最大值存起来classSolution{publicint[]maxSlidingWindow(int[]nums,intk)......
  • 总结的Java知识点集合
    这是我读大学时的Java知识点总结,还不全面,后续会逐渐增加完善。知识点集合实例变量实例变量是指在类中声明的变量,其值是针对类的每个实例而独立存储的。每个类的实例都有自己的一组实例变量,它们的值可以在对象创建时初始化,并在整个对象的生命周期中保持不变或者随着对象的状态而......
  • 【2023-2024第一学期】助教工作学期总结
    一、助教工作的具体职责和任务 (包括:你和老师是如何配合的、你和课程其他助教是如何配合的(如果有的话))1、与老师的配合:老师每周会布置课后作业发布在云班课协助老师批改课后作业收集学生对教学的反馈和建议,与教师合作改进教学方法。2、与课程其他助教的配合:共同解决同学......
  • 前端技术开发助教工作总结 —— 2023~2024第一学期
    一、助教工作的具体职责和任务(1)与老师配合的方面理论课批改云班课作业并提醒同学们修改解答同学们在实操中遇到的问题定期收集课程评价并协助老师进行调整和改进整理和分享与前端开发技术相关的学习资料实践课协助老师在实践课中解决同学们的困难统计分组、辅助老师评......