数仓项目总结
一、数据采集
数据从哪里来的?一般在实际开发中,是业务开发端在业务系统程序中,植入一些收集事件数据的SDK(工具代码),进行各种事件数据的收集,埋点数据可以植入到业务系统的前端程序或者后端程序中。我们作为大数据开发,只需要提出数据埋点需求,对具体实现技术仅作基本了解即可。
收集与分析的数据包括:页面数据、事件数据、曝光数据、启动数据和错误数据。
页面数据:页面数据主要记录一个页面的用户访问情况,包括访问时间、停留时间、页面路径等信息
事件数据:事件数据主要记录应用内一个具体操作行为,包括操作类型、操作对象、操作对象描述等信息
曝光数据:主要记录页面所曝光的内容,包括曝光对象,曝光类型等信息
启动数据:启动数据记录应用的启动信息。
错误数据:错误数据记录应用使用过程中的错误信息,包括错误编号及错误信息。
- 埋点方式
前端埋点,本质上就是为页面各种元素(按钮,链接,输入框……)绑定js事件;
后端埋点,则主要是交易类行为(提交订单,支付,取消订单等)的采集;
- 行为数据Mock
考虑到学习使用,本次Mock数据是编写java程序随机生成的,在虚拟机中执行java代码生成批量的日志文件数据,然后通过flume工具,把数据拉到hdfs中存储,在拉取的时候,设置自定义拦截器,把数据生成的时间戳设置到头部信息中,然后在flume中config配置文件配置使用头部信息中的时间戳生成hdfs目录名称。
- flume拦截器代码
导入依赖文件
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.41</version>
</dependency>
编写代码
package com.zwf.flume;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author MrZeng
* @version 1.0
* @date 2023-11-22 21:53
*/
public class MockDataInterceptor implements Interceptor {
//过滤后Event集合
private List<Event> eventList=null;
@Override
public void initialize() {
//初始化
eventList=new ArrayList<>();
}
/**
* 从source中获取源数据参数
* @param list 源数据转化成List<Event>
* @return
*/
@Override
public List<Event> intercept(List<Event> list) {
eventList.clear();
for (Event event:list){
if(event!=null) {
//调用下面的方法
eventList.add(intercept(event));
}
}
return eventList;
}
@Override
public Event intercept(Event event) {
//获取event请求头信息
Map<String, String> headers = event.getHeaders();
//获取event请求体信息
String body = new String(event.getBody());
//获取body中的timestamp属性值
String timeStamp = JSONObject.parseObject(body).getString("timeStamp");
headers.put("timestamp",timeStamp);
return event;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
//通过该静态内部类来创建自定义对象供flume使用,实现Interceptor.Builder接口,并实
// 现其抽象方法
@Override
public Interceptor build() {
return new MockDataInterceptor();
}
/**
*
* @param context 通过该对象可以获取flume配置自定义拦截器的参数并打印出来
*/
@Override
public void configure(Context context) {
}
}
}
- flume config文件(使用
TAILDIR
断点续传方式)
##定义a1的三个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
##定义Source的类垿
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/flumePosition/datax_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/datax/data.log
a1.sources.r1.headers.f1.headerKey1 = f1
a1.sources.r1.fileHeader = true
## 定义拦截器
a1.sources.r1.interceptors=i6
a1.sources.r1.interceptors.i6.type=com.yjxxt.interceptor.DataxFilterInterceptor$Builder
##定义Channel的信息 采用内存传输 速度快 但是容易丢失数据!
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
##定义Sink的信息
a1.sinks.k1.type = hdfs
#使用头部信息中的timestamp自定格式化年月日信息 设置hdfs存在文化的目录
a1.sinks.k1.hdfs.path=hdfs://hdfs-zwf/flume/datax/%Y%m%d/%H%M%S
##hdfs有多少条消息时新建文件,0不基于消息个数
a1.sinks.k1.hdfs.rollCount=0
##hdfs创建多长时间新建文件0不基于时间 60s钟滚动一次
a1.sinks.k1.hdfs.rollInterval=60
##hdfs多大时新建文件,0不基于文件大尿 这里基于100KB文件滚动一次
a1.sinks.k1.hdfs.rollSize=102400
##当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
##使用事件的header中的timestamp属性对应的值作为分区的判定标准
a1.sinks.k1.hdfs.useLocalTimeStamp=false
##每五分钟生成一个目录
##是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响t的其他所有时间表达式
a1.sinks.k1.hdfs.round=true
##时间上进行“舍弃”的值; 1小时生成一个
a1.sinks.k1.hdfs.roundValue=1
##时间上进行”舍弃”的单位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=hour
##组装source channel sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
使用命令
#把conf文件放进flume根目录下的options目录下,把java程序打成jar包,上传到flume的lib目录下!
#执行命令进行拉取
flume-ng agent -n a1 -c conf -f ${FLUME_HOME}/options/*.conf -Dflume.root.logger=INFO,CONSOLE
- 业务数据
业务数据一般存在mysql、oracle等数据库中,由于数据量巨大,如果只是在普通的数据库中处理数据会非常慢,需要使用我们的大数据处理数据库(hadoop、hive、spark),一般业务数据都是处理好格式的数据,相比行为数据,经过一定的处理,在从业务数据拉取到大数据处理组件中(hdfs)中,我们需要使用DataX/sqoop/ Kettle工具进行拉取,我们本次使用Datax。
- 操作主要步骤
在拉取数据前,要先在hive上创建好数据库及数据表,可以直接在拉取时进行映射,或者先拉取到hdfs上,再加载到数据表中!
编写配置文件(注意数据类型的配置,具体看datax文档 以下是模板)
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"empno",
"ename",
"job",
"mgr",
"hiredate",
"sal",
"comm",
"deptno"
],
"splitPK":"empno",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.147.120:3306/scott?
characterEncoding=utf8"],
"table": ["emp"]
}
],
"password": "Root@123456.",
"username": "root",
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name":"empno",
"type":"INT"
},
{
"name":"ename",
"type":"STRING"
},
{
"name":"job",
"type":"STRING"
},
{
"name":"mgr",
"type":"INT"
},
{
"name":"hiredate",
"type":"DATE"
},
{
"name":"sal",
"type":"DOUBLE"
},
{
"name":"comm",
"type":"DOUBLE"
},
{
"name":"deptno",
"type":"INT"
},
],
"defaultFS": "hdfs://node1:8020",
"fieldDelimiter": ",",
"fileName": "tb_emp",
"fileType": "text",
"path": "/datax/warehouse/emp",
"writeMode": "append",
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
- 执行命令
#把配置文件放进datax的job中,执行以下命令
python ./datax.python ./job/*.json
- hive建表
drop table ods.ods_app_event_log;
create external table if not exists ods.ods_app_event_log
(
account string,
appId string,
appVersion string,
carrier string,
deviceId string,
deviceType string,
eventId string,
ip string,
latitude double,
longitude double,
netType string,
osName string,
osVersion string,
properties map<string,string>, -- 事件属性
releaseChannel string, -- 下载渠道
resolution string, -- 分辨率
sessionId string, -- 会话id
`timeStamp` bigint -- 事件时间
)
partitioned by (dt string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
stored as textfile
location '/zwf/app/ods/ods_app_event_log';
-- 导入flume拉取数据
load data inpath 'hdfs://hdfs-zwf/flume/datax/20230301' into table ods.ods_app_event_log partition(dt='20230301');
- 实际开发使用的架构
二、需求说明
ETL数据处理就是对数据进行抽取、清洗转换、加载到数据仓库层的整个过程。
- 清洗过滤
去除json数据体中的废弃字段(前端开发人员在埋点设计方案变更后遗留的无用字段)
过滤掉json格式不正确的
过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录!
过滤掉日志中不符合时间段的记录
对于web端日志,过滤爬虫请求数据
- 数据口径统一
Boolean字段,在数据中有使用1/0/-1标识的,也有使用true/false表示的,统一为Y/N/U
字符串类型字段,在数据中有空串 ,有null值 ,统一为null值
日期格式统一, 2020/9/2 2020-9-2 2020-09-02 20200902 都统一成 yyyy-MM-dd
小数类型,统一成decimal
字符串,统一成string
时间戳,统一成bigint
- Session分割
对于web端日志,按天session分割,不需处理。
对于app/小程序,使用了会话保持策略,导致App/小程序长时间在后台运行,恢复到前台依然是同一个APP,不符合session分析定义,需要按事件间隔切割(业内通用:30分钟),对于切割的Session赋予随机的UUID。
- 地理位置转换
将日志中的GPS纬度坐标解析成省市县信息
将日志中的IP地址解析成省市县信息
app和微信小程序可以收集到用户行为数据中的GPS定位,可以通过GPS定位获取坐标信息,而web日志无法获取GPS信息,但是可以收集用户的IP地址,Gps可以准确表达地理位置,而Ip地址表达精确度较低。
- 全局用户标识
每一个用户标识一个全局唯一ID,给匿名访问记录,绑定到一个id上。
选取合适的用户标识对于提高用户行为分析的准确性有非常大的影响。
例如:设备标识:一部设备上出现不同的账户,会认为是同一个人。同一个账号在不同设备会认为是多个人。
账户标识:一个设备出现不同账户,会认为是多个人操作。不同设备一个账户操作,会认为是一个人操作。
- 标识新老访客
新用户标识为1,老用户标识为0
- 保存结果
将数据输出为parquet/ORC格式,压缩编码为snappy。
parquet格式的框架兼容性更好,而ORC的读写压缩性能要比parquet强。
三、需求分析
- 清洗过滤
- 废弃字段直接过滤。
- 不符合json格式字符串直接过滤。
- 缺少关键字段判断首先确认关键字的数据类型,本次判断四个字段deviceid、eventid、sessionid均为String类型,如果缺少则为null,properties为map类型,可以判断集合元素个数判断空值。
- 转换时间存储格式。
- 过滤爬虫请求数据,通过User-Agent标识来分析。
- GPS坐标系转换
- GEOHASH编码介绍
Geohash
编码是一种地理位置编码技术,它可将一个gps
坐标(含经、纬度)点,转化为一个 字符串;通过编码后得到的字符串,表达的是:包含被编码
gps
坐标点的一个矩形范围;原理:把[-90,90]使用二分法划分成[-90,0)和[0,90]左右两个区间,经/纬度落到左区间为0,落在右区间为1,然后把落在区间上的经纬度再进行二分划分成[-90,-45)和[-45,0]或者[0,45)和[45,90],落到左区间为0,落到右区间为1,依次类推越来越逼近经纬度标记点,会产生一串二进制数值。
- 经纬组码 [121.713614,31.203117]
偶数位
放经度,奇数位
放纬度,把2串编码组合生成新串:11100 11001 11100 01001 01001 01000最终转换为Base64编码:wtw998 Base32编码长度为6,精度在610米左右
- Base64编码表
- 当geohash base32编码长度为8时,精度在19米左右,而当编码长度为9时,精度在2米左右, 编码长度需要根据数据情况进行选择。
- GEOHASH编码工具包
<dependency>
<groupId>ch.hsr</groupId>
<artifactId>geohash</artifactId>
<version>1.3.0</version>
</dependency>
- 使用方法
String geohashcode = GeoHash.withCharacterPrecision(45.667, 160.876547,6).toBase32();
目的-->经纬度使用hash值加密。
- 使用代码
//创建SparkSql对象
val spark = SparkSessionUtil.getSparkSession("DIM层_区划字典表"/*, YjxxtConstants.SPARK_MASTER_LOCAL*/)
//配置参数 数据库账户和密码
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "Root@123456.")
//创建DataFrame表
val dataFrame = spark.read.jdbc("jdbc:mysql://192.168.147.120:3306/yjx-shop?useUnicode=true&characterEncoding=utf-8",
"t_md_areas",properties)
//创建临时视图
dataFrame.createTempView("v_md_areas")
//自定义gps函数 UDF函数
val gps2geo = (lat: Double, lng: Double) => {
/**
* 第一个参数是经度 第二个参数是维度 第三个参数是base32编码的长度
* 不同的长度 地点精度不一样 长度越长 地址精度越范围越小 越精确!
*/
GeoHash.withCharacterPrecision(lat, lng, 5).toBase32
}
spark.udf.register("gps2geo", gps2geo)
//执行sql语句 执行sql 第一种
spark.sql(
"""
|insert overwrite table dim.dim_area_dict
|SELECT
|gps2geo(l4.BD09_LAT,l4.BD09_LNG),
|l1.AREANAME province,
|l2.AREANAME city,
|l3.AREANAME region
|FROM v_md_areas l4
|JOIN v_md_areas l3 ON l4.LEVEL = 4 AND l4.PARENTID = l3.ID
|JOIN v_md_areas l2 ON l3.PARENTID = l2.ID
|JOIN v_md_areas l1 ON l2.PARENTID = l1.ID
|""".stripMargin
)
spark.close()
- 全局用户管理
全局用户管理分为登录用户、匿名用户
登录用户:一个账号多设备登录、多个账号设备登录
匿名用户:随机生成一个包含UUID的Cookie存储10年的设备ID
解决方案:使用设备ID标识、设备ID绑定登录用户、N设备ID关联一个登录ID 、N设备ID关联N登录ID (设备ID与用户ID绑定并算出得分,昨天登录今天登录得分不变、昨天登录今天不登录得分比例缩减,昨天未登录今天登录赋值分数,若出现设备id未绑定记录追加到最高得分上)
- IP2region编程工具包
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
- APi调用方式
// 初始化配置参数
val config = new DbConfig
// 构造搜索器,dbFile是ip地址库字典文件所在路径 ip2region.db文件要上传到hdfs上的相应路径上。
val searcher = new DbSearcher(config,
"src/main/resources/data/ip2region.db")
// 使用搜索器,调用查找算法获取地理位置信息
val block = searcher.memorySearch("39.99.177.94")
println(block)
- 代码使用
ip2region.db案例数据
1.29.24.0|1.29.39.255|中国|0|内蒙古|赤峰市|联通
1.29.40.0|1.29.59.255|中国|0|内蒙古|通辽市|联通
1.29.60.0|1.29.67.255|中国|0|内蒙古|呼伦贝尔市|联通
1.29.68.0|1.29.107.255|中国|0|内蒙古|赤峰市|联通
//读取ip2region库文件--需要将文件提前上传到HDFS
val configuration = new Configuration()
val fs = FileSystem.get(configuration)
val path = new Path("/zwf/dict/ip2region.db")
val inputStream = fs.open(path)
val len = fs.getFileStatus(path).getLen
val array = new Array[Byte](len.toInt)
IOUtils.readFully(inputStream, array)
IOUtils.closeQuietly(inputStream)
val ipBroadcast = spark.sparkContext.broadcast(array)
//splitedSession切分好的session的数据,mapPartitions每个分区内单独进行map(4个并行度,就4个单独执行
splitedSession.mapPartitions(iter => {
//读取广播数据中的(Geo)
val areaDict = geoBroadcast.value
//读取广播中的数据(ip2region)并获取ip查询对象
val ip2regionBytes = ipBroadcast.value
val config = new DbConfig()
val searcher = new DbSearcher(config, ip2regionBytes)
//通过IP地址查询
val block = searcher.memorySearch(bean.ip)
// IP|省|市|网络|
val strings = block.getRegion().split("\\|")
if (strings.size >= 5 && StringUtils.isNotBlank(strings(2)) && StringUtils.isNotBlank(strings(3))) {
bean.province = strings(2)
bean.city = strings(4)
bean.region = ""
- 数据的增量和全量
为了适应数据缓慢变化维,每天需要更新数据,但是更新数据的同时,可能会出现一些历史数据,与全量数据有状态上的数据关联,导致数据在拉取的同时会出现大量重复数据,导致数据冗余严重。
解决方案:使用拉链表
所谓的拉链表就是:表中增加三个字段:有效时间、终止时间、状态
把左表(全量表)有关联的数据的终止时间改为今天时间,然后再把右表(增量数据)进行数据合并变成拉链表,存放到数仓的ODS层中 。
- SQL案例
-- 拉链表练习
select
l.oid,
l.amount,
l.status,
l.start_dt,
coalesce(date_sub(a.dt,1),l.end_dt) as end_dt
from zip_db.lalian l
left join zip_db.add a on l.oid=a.oid
where l.dt='2023-12-06'
union all
select
oid,
amount,
status,
dt start_dt,
'9999-12-31' end_dt
from zip_db.add
where dt='2023-12-06';
- ETL全过程代码编写
依赖环境
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.12.17</scala.version>
<!-- Spark 版本控制 -->
<spark.version>3.3.2</spark.version>
<!-- Hive 版本控制 -->
<hive.version>3.1.2</hive.version>
</properties>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>ch.hsr</groupId>
<artifactId>geohash</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
ETL数据处理代码
package com.yjxxt.etl.com.zwf.etl
import ch.hsr.geohash.GeoHash
import com.yjxxt.etl.pojo.AppEventLog
import com.yjxxt.etl.util.{DateUtil, SparkSessionUtil, YjxxtConstants}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}
import redis.clients.jedis.Jedis
import java.text.SimpleDateFormat
import java.util.UUID
/**
* @author MrZeng
* @date 2023-12-04 10:21
* @version 1.0
* 代码注意的点:1、使用同一个sparkSession
* 2、查询数据插入新表时,注意字段名的排序顺序
* 3、注意数据类型匹配
*/
object job03AppEventLogOds2Dwd {
//数据清洗过滤
//过滤空值
def dataClear(spark:SparkSession,start_date:String,end_date:String): DataFrame = {
val format = new SimpleDateFormat("yyyyMMdd")
// 时间范围 [startTime,endTime)
val start_ts = format.parse(s"$start_date").getTime
val end_ts = format.parse(s"$end_date").getTime
/* val start_ts=LocalDate.parse(start_date,DateTimeFormatter.ofPattern("yyyyMMdd")).atStartOfDay(ZoneOffset.ofHours(8)).toInstant.toEpochMilli
val end_ts=LocalDate.parse(end_date,DateTimeFormatter.ofPattern("yyyyMMdd")).atStartOfDay(ZoneOffset.ofHours(8)).toInstant.toEpochMilli*/
//读取特定分区的数据
val curLog = spark.read.table("ods.ods_app_event_log").where(s"dt=$start_date")
//导入隐式转换
import org.apache.spark.sql.functions._
//自定义UDF函数判断值是否为空 一个参数一个返回值
val isNotBlank=udf((s:String)=>{
StringUtils.isNotBlank(s)
})
//过滤掉日志中缺少关键字段的记录(deviceid/properties/eventid/sessionid任何一个都不行)的记录
//导入隐式转换 properties字段是map集合
import spark.implicits._
curLog.where(isNotBlank($"deviceid") and isNotBlank(col("eventid")) and isNotBlank(col("sessionid")))
.where("size(properties) <> 0").where(s"timestamp>=$start_ts and timestamp<$end_ts")
}
//数据格式标准化
def dataStandard(spark:SparkSession,dataClear:DataFrame):Dataset[AppEventLog] = {
//导入隐式转换
import spark.implicits._
//创建临时视图
dataClear.createOrReplaceTempView("dataClear")
//填充 filledAccount:账户为null进行填充 把同一个设备出现 last
// splitSessionId:每隔30分钟切割一次session
// isNew:是否是新用户
// guid:递增用户id值
// province:省 city:市 region:地区
val sqlStr=
"""
|select
| if(account='',null,account) as account,
| appId,
| appVersion,
| carrier,
| deviceId,
| eventId,
| ip,
| latitude,
| longitude,
| netType,
| osName,
| osVersion,
| properties,
| resolution,
| sessionId,
| timestamp,
| null as splitSessionId,
| null as filledAccount,
| null as province,
| null as city,
| null as region,
| -1 as guid,
| 0 as isnew
| from dataClear
|""".stripMargin
val standard = spark.sql(sqlStr)
standard.as[AppEventLog]
}
//sessionId间隔切割 每30分钟切割一次sessionId
def splitSessionId(spark:SparkSession,dataStandard:Dataset[AppEventLog])={
//导入隐式转换
import spark.implicits._
//按照sessionId进行分组 (sessionId,Array[AppEventLog])
dataStandard.rdd.groupBy(bean=>bean.splitSessionId).flatMap(tp=>{
//转换为迭代器对象集合
val item=tp._2
//按照时间从小到大排序转换为列表集合
val list=item.toList.sortBy(bean=>bean.timestamp)
//生成临时UUID
var uuid: String = UUID.randomUUID().toString
//遍历列表对象 List[AppEventLog] 如果同一个设备同一个会话每个登录的用户生成sessionId之间相差时间不足30分钟
//共用第一个splitSessionId
for(i<- 0 until list.size){
//为第一个AppEventLog对象添加一个切割sessionId值
list(i).splitSessionId=uuid
//不是list列表中最后一个appEventLog对象 且 两次登录时间相差30分钟 就赋值一个新的SplitSessionId
if(i<list.size-1 && (list(i+1).timestamp-list(i).timestamp)>30*60*1000){
//同一个设备同一个会话 登录时间相差30分钟,为当前对象设置一个新的splitSessionId
uuid=UUID.randomUUID().toString
}
}
list
}).toDS()
}
//为每个gesHash查找是否存在符合在国内 在国内就使用原来的省市县(地区)
//如果在国外就用IP查找地区 填写省市
//对地区维度表进行操作
def areaWithGes(spark: SparkSession, splitSession: Dataset[AppEventLog]): Dataset[AppEventLog] = {
//加载地区维度表 过滤掉GesHash为空的对象
val gesDim = spark.read.table("dim.dim_area_dict").where("geohash is not null and geohash !=''")
//把查询好的每一个对象映射成 (gesHash,(省、市、地区)) 使用偏函数进行映射成Map (gesHash,(省、市、地区))集合
import spark.implicits._
val gesMapTuple = gesDim.rdd.map({
case Row(geohash: String, province: String, city: String, region: String) => (geohash, (province, city, region))
}).collectAsMap()
//使用广播变量
val gesBoardCast: Broadcast[collection.Map[String, (String, String, String)]] = spark.sparkContext.broadcast(gesMapTuple)
//读取Ip2Region.db文件 先上传到Hdfs上特定位置
//配置hdfs配置信息
val conf = new Configuration()
//获取文件系统
val fs: FileSystem = FileSystem.get(conf)
//设置hdfs上的路径 先把ip2region.db上传到hdfs上
val path = new Path("/zwf/dict/ip2region.db")
val input: FSDataInputStream = fs.open(path)
//获取文件字节数组长度
val len: Long = fs.getFileStatus(path).getLen
//设置字节数组
val array = new Array[Byte](len.toInt)
//读取所有的流转换到字节缓存区中
IOUtils.readFully(input, array)
//关闭输入流
IOUtils.closeQuietly(input)
//把字节数组流放入广播变量中
val iP2RegionCast: Broadcast[Array[Byte]] = spark.sparkContext.broadcast(array)
//把session切割点对象分区处理 每个分区各自处理
splitSession.mapPartitions(item => {
//建立Ip2regin查询
//获取广播变量中的地区维度表信息
val regions: collection.Map[String, (String, String, String)] = gesBoardCast.value
//获取广播变量中的ip2region对象
val ip2regionByte: Array[Byte] = iP2RegionCast.value
//构建ip2region查询引擎
val config = new DbConfig()
val searcher = new DbSearcher(config, ip2regionByte)
//配置环境
item.map(x => {
//根据表中每条记录的经度和纬度获取gesHash
val lat: Double = x.latitude
val lon: Double = x.longitude
//使用geoHash包获取geoHash值
val geohash: String = GeoHash.geoHashStringWithCharacterPrecision(lat, lon, 5)
//如果是国内的经纬度 表中的经纬度转换成的hash值 广播变量中map集合是否存在
//存在就在国内
var flag = false
if (regions.contains(geohash)) {
flag = true
//获取广播中的省市县(地区)如果为空返回空字符串
val regionsArea: (String, String, String) = regions.getOrElse(geohash, ("", "", ""))
x.province = regionsArea._1
x.city = regionsArea._2
x.region = regionsArea._3
}
//如果在国外 通过Ip地址搜索位置
if (!flag) {
val block: DataBlock = searcher.memorySearch(x.ip)
//[id|Ip|province|city|网络类型|town]
val strings: Array[String] = block.getRegion.split("\\|")
if (strings.length >= 5 && StringUtils.isNotBlank(strings(2)) && StringUtils.isNotBlank(strings(3))) {
//省市赋值
x.province = strings(2)
x.city = strings(3)
x.region = ""
}
}
x
})
})
}
//标识新老用户 老用户0 新用户1
/**
*
* @param session spark连接信息
* @param areaed 过滤后的数据
* @param last_login 最后登录的时间戳
* @param lastMaxGuid 最大的用户id 存入广播变量中
* @return
*/
def markRedisIsNew(spark: SparkSession, areaed: Dataset[AppEventLog], last_login: Long, lastMaxGuid: Long): Dataset[AppEventLog] = {
//创建临时视图
areaed.createOrReplaceTempView("v_isNewLog")
//导入隐式转换
import spark.implicits._
//编写sql合并填充accountid
val sqlstr =
s"""
|select
| t2.account ,
| t2.appId ,
| t2.appVersion ,
| t2.carrier ,
| t2.deviceId ,
| t2.eventId ,
| t2.ip ,
| t2.latitude ,
| t2.longitude ,
| t2.netType ,
| t2.osName ,
| t2.osVersion ,
| t2.properties ,
| t2.resolution ,
| t2.sessionId ,
| t2.`timestamp` ,
| t2.splitSessionId ,
| coalesce(t2.account,t3.account) filledAccount,
| t2.province ,
| t2.city ,
| t2.region ,
| t2.guid ,
| t2.isNew
|from v_isNewLog t2
|left join
| (select t1.deviceid,
| t1.account
|from (select
| deviceId,
| account,
| row_number() over (partition by deviceId order by score desc,last_login desc ) rk
|from dws.dws_device_account2_score
|where last_login=$last_login and account is not null ) as t1
|where rk=1 ) t3
|on t2.deviceid=t3.deviceid
|""".stripMargin
//把查询的记录转化为DataSet对象
val ael: Dataset[AppEventLog] = spark.sql(sqlstr).as[AppEventLog]
//分区处理对象
ael.mapPartitions(item => {
//获取redis连接
val jedis = new Jedis(YjxxtConstants.REDIS_SERVER_HOSTNAME, YjxxtConstants.REDIS_SERVER_PORT)
jedis.auth("root@123456")
//声明Guid
var guidStr: String = null
item.map(rs => {
try {
//先从redis中获取以填充的account的账户为key获取guid值
guidStr = jedis.get(rs.filledAccount)
} catch {
case exception: Exception => exception.printStackTrace()
}
//如果获取的填充账户不为空 且 guid也不为空 就回填guid值
//老用户
if (rs.filledAccount != null && guidStr != null) {
rs.guid = guidStr.toLong
} else {
//如果是新设备 可以从redis中以设备ID获取guid值
val guids: String = jedis.get(rs.deviceId)
//回填guid
if (guids != null) {
rs.guid = guids.toLong
//如果回填账号不为空 说明昨天是匿名登录
if (rs.filledAccount != null) {
jedis.del(rs.deviceId)
jedis.set(rs.filledAccount, guids)
} else {
}
} else {
//如果accountId和guids都查不到 说明是新用户 设置自增变量
val newGuid = jedis.incr("guid_cnt")
//将结果插入到redis
val key = if (rs.filledAccount == null) rs.deviceId else rs.filledAccount
jedis.set(key, newGuid + "")
//表示是新用户
if (rs.guid > lastMaxGuid) rs.isnew = 1
}
}
rs
})
})
}
//把经过ETL处理好的数据 插入dwd表中
def saveDataTotable(spark: SparkSession, datas: Dataset[AppEventLog], start_date: String): DataFrame = {
datas.createOrReplaceTempView("V_data_Log")
import spark.implicits._
spark.sql(
s"""
| insert overwrite table dwd.dwd_app_event_detail partition(dt=$start_date)
| select
| account ,
| appId ,
| appVersion ,
| carrier ,
| deviceId ,
| eventId ,
| ip ,
| latitude ,
| longitude ,
| netType ,
| osName ,
| osVersion ,
| properties ,
| resolution ,
| sessionId ,
| `timestamp` ,
| splitSessionId ,
| filledAccount ,
| province ,
| city ,
| region ,
| guid ,
| isNew
| from
| V_data_Log
|""".stripMargin
)
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSessionUtil.getSparkSession("test_dwd")
spark.sql("add jar hdfs://hdfs-zwf/spark/hive-hcatalog-core-3.1.2.jar")
spark.sql("add jar hdfs://hdfs-zwf/spark/json-serde-1.3.7-jar-with-dependencies.jar")
val start_date: String = DateUtil.getCertainDay(2023, 3, 8)
val end_date:String = DateUtil.getCertainDay(2023, 3, 9)
val ts: Long = DateUtil.getCertainDayTimestamp(2023, 3, 8)
//数据清洗
val clear: DataFrame = dataClear(spark, start_date, end_date)
//数据标准化
val aet: Dataset[AppEventLog] = dataStandard(spark, clear)
//sessionId之间的切割
val spsession: Dataset[AppEventLog] = splitSessionId(spark, aet)
//地理位置转化 geohash转省市县
val area: Dataset[AppEventLog] = areaWithGes(spark, spsession)
//设置广播变量 初始值
var lastMaxGuid=spark.sparkContext.broadcast(0L).value
//连接redis设置
val jedis= new Jedis(YjxxtConstants.REDIS_SERVER_HOSTNAME,YjxxtConstants.REDIS_SERVER_PORT)
jedis.auth("root@123456")
//获取计数自增变量
val str: String = jedis.get("guid_cnt")
if(str!=null){
lastMaxGuid=str.toLong
}
//获取guid值 标记新老用户
val total: Dataset[AppEventLog] = markRedisIsNew(spark, area, ts, lastMaxGuid)
//保存数据
saveDataTotable(spark,total,start_date)
}
四、主题计算
- 流量主题
核心度量:UV(unique visitor)数、PV(page visitor)数、IP数(不精确)、用户访问次数、跳出次数、用户访问时长(visitor_date)。
设计维度:时间维度;访问新老属性;访客手机型号;访客操作系统类型、版本;访客的app下载渠道;访客的app版本;访客的vip级别、访客的活跃等级(1级2级3级)、访客的消费等级(1级2级3级)、访客的消费偏好、入口页面(同一个会话下最早出现pageview事件的pageId)、退出页面(同一个会话下最晚出现pageview事件的pageId)、被访问的页面、被访问的频道、被访问的商品(商品名称、品类、价格区间、品牌、档次、风格 )。
根据需求设计维度与核心度量随机搭配变成不同的事实表!
create database ads;
drop table if exists ads.ads_province_tfc_1d;
CREATE external TABLE ads.ads_province_tfc_1d(
province string comment '省(直辖市、自治区)'
,pv bigint comment '访问页面次数'
,uv bigint comment '访问页面用户数'
,ses_cnt bigint comment '会话次数'
,ip_cnt bigint comment '访问ip数'
,jpt_ses_cnt bigint comment '退出页面的会话次数'
,acc_tml bigint comment '页面访问总时长'
)
PARTITIONED BY(dt string)
STORED AS parquet
location '/zwf/app/ads/ads_province_tfc_1d';
insert overwrite table ads.ads_province_tfc_1d
select
province,
count(1) pv,
count(distinct guid) uv,
count(distinct splitSessionId) ses_cnt,
count(distinct ip) ip_cnt,
count(distinct if(is_jumpout='Y',splitSessionId,null)) jpt_ses_cnt,
sum(page_acc_tml) acc_html,
'20230301' dt
from dws.dws_app_tfc_topic
where dt='20230301'
group by province;
-----------------------------------------------------------------------------------------
drop table if exists dws.dws_app_session_agr;
CREATE external TABLE dws.dws_app_session_agr(
guid bigint comment '用户唯一标识',
splitSessionId string comment 'sessionId切割Id',
enter_page_id string comment '进入页面id',
exit_page_id string comment '退出页面id',
start_time bigint comment '开始时间',
end_time bigint comment '结束时间',
is_jumpout string comment '是否跳出页面',
pv_cnt bigint comment '页面访问数')
partitioned by (dt string)
stored as parquet
location '/zwf/app/dws/dws_app_session_agr';
insert overwrite table dws.dws_app_session_agr partition (dt='20230301')
select
guid,
splitSessionId,
split(min(if(eventId='pageView',concat_ws('_',`timestamp`,eventId,properties['pageId']),9)),'_')[2] as enter_page_id,
split(max(if(eventId='pageView',concat_ws('_',`timestamp`,eventId,properties['pageId']),0)),'_')[2] as exit_page_id,
min(`timestamp`) as start_time,
max(`timestamp`) as end_time,
if(count(if(eventId='pageView',1,null))<2,'Y','N') as is_jumpout,
count(if(eventId='pageView',1,null)) as pv_cnt
from dwd.dwd_app_event_detail
where dt='20230301'
group by guid,splitSessionId;
-----------------------------------------------------------------------------------------
-- dws_app_session_agr退化维度到dwd_app_event_detail表中,变成一个大宽表!
drop table if exists dws.dws_app_tfc_topic;
CREATE external TABLE dws.dws_app_tfc_topic(
account string
,appid string
,appversion string
,carrier string
,deviceid string
,eventid string
,ip string
,latitude double
,longitude double
,nettype string
,osname string
,osversion string
,properties map<string,string>
,resolution string
,`timestamp` bigint
,filledAccount string
,province string
,city string
,region string
,guid bigint
,isnew int
,page_acc_tml bigint
,splitSessionId string
,enter_page_id string
,exit_page_id string
,start_time bigint
,end_time bigint
,is_jumpout string
,pv_cnt bigint
)
PARTITIONED BY(dt string)
STORED AS parquet
location '/zwf/app/dws/dws_app_tfc_topic';
with t1 as(
select
account,
appId,
appVersion,
carrier,
deviceId,
eventId,
ip,
latitude,
longitude,
netType,
osName,
osVersion,
properties,
properties,
resolution,
`timestamp`,
filledAccount,
province,
city,
region,
guid,
isNew,
splitSessionId
from dwd.dwd_app_event_detail
where dt='20230301' and eventId='pageView'
),t2 as (
select
splitSessionId,
enter_page_id,
exit_page_id,
start_time,
end_time,
is_jumpout,
pv_cnt
from dws.dws_app_session_agr
where dt='20230301'
)
insert overwrite table dws.dws_app_tfc_topic partition(dt='20230301')
select
account,
appId,
appVersion,
carrier,
deviceId,
eventId,
ip,
latitude,
longitude,
netType,
osName,
osVersion,
properties,
resolution,
`timestamp`,
filledAccount,
province,
city,
region,
guid,
isNew,
lead(t1.`timestamp`,1,t2.end_time) over (partition by t1.splitSessionId order by t1.`timestamp`)-t1.`timestamp` as page_acc_tml,
t1.splitSessionId,
enter_page_id,
exit_page_id,
start_time,
end_time,
is_jumpout,
pv_cnt
from t1
left join t2 on t1.splitSessionId=t2.splitSessionId;
- 多维度cube表
hive中有一些高阶函数,可以简化对多维表的简化开发。
with cube:字段排列组合分组聚合。例如:group by a,b,c with cube=> (a) (b) (c) (ab) (ac) (bc) (abc) ( ) 可以计数2^3种维度分析
grouping sets:自定义设置字段维度分组聚合,例如:group by a,b,c grouping sets( (a), (a,c), (c), () )
with rollup:设置向上维度分组聚合。例如:group by a,b,c with rollup=>(a,b,c) (a,b) (a) ( )
- cube相关概念
一个维度所有可能的取值的个数,叫做这个维度基数(cardinality)。
像省市区,几百个信息,低基维。
像用户id,手机号等等,千万甚至上亿,高基维
再高,超高基维,会造成Cube膨胀,再进行维度组和产生数据量比原始数据量大出数倍。Cube构建的优化显得至关重要。
维度上卷:细粒度维度属性分析转化为相对粗粒度的维度属性分析 eg:市-省
维度下卷:粗粒度维度属性分析转化为相对细粒度的维度属性分析 eg:省-市
- 自定义设置字段维度分组聚合(案例)
--多维流量分析cube建表
drop table if exists ads.ads_app_tfc_cube;
create external table ads.ads_app_tfc_cube(
appid string
,appversion string
,carrier string
,nettype string
,osname string
,province string
,city string
,region string
,isnew int
,enter_page_id string
,exit_page_id string
,is_jumpout string
,pv_cnt bigint
,uv_cnt bigint
,ses_cnt bigint
,acc_tml bigint
,avg_ses_tml bigint
,ip_cnt bigint
,jpt_ses_cnt bigint
)
partitioned by (dt string)
stored as parquet
location '/zwf/app/ads/ads_app_tfc_cube'
tblproperties(
'parquet.compress'='snappy'
);
with tmp as(
select
nvl(account ,'UNKOWN')as account
,nvl(appid ,'UNKOWN')as appid
,nvl(appversion ,'UNKOWN')as appversion
,nvl(carrier ,'UNKOWN')as carrier
,nvl(deviceid ,'UNKOWN')as deviceid
,nvl(eventid ,'UNKOWN')as eventid
,nvl(ip ,'UNKOWN')as ip
,nvl(latitude ,'UNKOWN')as latitude
,nvl(longitude ,'UNKOWN')as longitude
,nvl(nettype ,'UNKOWN')as nettype
,nvl(osname ,'UNKOWN')as osname
,nvl(osversion ,'UNKOWN')as osversion
,nvl(resolution ,'UNKOWN')as resolution
,nvl(`timestamp` ,'UNKOWN')as `timestamp`
,nvl(filledaccount ,'UNKOWN')as filledaccount
,nvl(province ,'UNKOWN')as province
,nvl(city ,'UNKOWN')as city
,nvl(region ,'UNKOWN')as region
,nvl(guid ,'UNKOWN')as guid
,cast(nvl(isnew ,'UNKOWN') as int)as isnew
,nvl(page_acc_tml ,'UNKOWN')as page_acc_tml
,nvl(splitsessionid,'UNKOWN')as splitsessionid
,nvl(enter_page_id ,'UNKOWN')as enter_page_id
,nvl(exit_page_id ,'UNKOWN')as exit_page_id
,nvl(start_time ,'UNKOWN')as start_time
,nvl(end_time ,'UNKOWN')as end_time
,nvl(is_jumpout ,'UNKOWN')as is_jumpout
,nvl(pv_cnt ,'UNKOWN')as pv_cnt
from
dws.dws_app_tfc_topic
where dt='20230301'
)
insert into table ads.ads_app_tfc_cube partition(dt='20230301')
select
nvl(appid,'UNKNOWN') appid
,nvl(appversion,'UNKNOWN') appversion
,nvl(carrier,'UNKNOWN') carrier
,nvl(nettype,'UNKNOWN') nettype
,nvl(osname,'UNKNOWN') osname
,nvl(province,'UNKNOWN') province
,nvl(city,'UNKNOWN') city
,nvl(region,'UNKNOWN') region
,nvl(isnew,-1) isnew
,nvl(enter_page_id,'UNKNOWN') enter_page_id
,nvl(exit_page_id,'UNKNOWN') exit_page_id
,nvl(is_jumpout,'UNKNOWN') is_jumpout
,nvl(count(1),0) pv_cnt
,nvl(count(distinct guid),0) as uv_cnt
,nvl(count(distinct splitsessionid),0) as ses_cnt
,nvl(sum(page_acc_tml),0) as acc_tml
,nvl(sum(page_acc_tml)/count(distinct splitsessionid),0) as avg_ses_tml
,nvl(count(distinct ip),0) as ip_cnt
,nvl(count(distinct if(is_jumpout='Y',splitsessionid,null)),0) as jpt_ses_cnt
from
tmp
group by
appid
,appversion
,carrier
,nettype
,osname
,province
,city
,region
,isnew
,enter_page_id
,exit_page_id
,is_jumpout
grouping sets(
(appid)
,(appid,appversion)
,(osname)
,(carrier,nettype)
,(province)
,(province,city)
,(province,city,region)
,(isnew)
,(is_jumpout)
,(province,isnew)
,(enter_page_id)
,(exit_page_id)
,()
);
- BitMap工具(RoaringBitmap)
使用bitmap,一个Byte是8个bit,每个bit的下标从0开始递增,存位置时我们使用下标表示存储的数值,下标存在bit中存放1,下标不存在bit中存放0。目的是节省内存空间!
- API调用(使用咆哮bitmap)
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.25</version>
</dependency>
- java代码
public class RoaringBitMapTest {
/**
* 将值添加到容器中(将值设置为“true”),无论它是否已出现。
* Java缺少本机无符号整数,但x参数被认为是无符号的。
* 在位图中,数字按照Integer.compareUnsigned进行排序。我们订购的数字是0,1,…,2147483647,-2147483648,-2147473647,…,-1。
* add(final int x)
* 参数: x–整数值
*/
@Test
public void add() {
RoaringBitmap rbm = new RoaringBitmap();
rbm.add(1);
rbm.add(4);
rbm.add(100000000);
System.out.println(rbm);
}
/**
* 将值添加到容器中(将值设置为“true”),无论它是否已出现。
* checkedAdd(final int x)
* 参数: x–整数值
* return: 如果添加的int尚未包含在位图中,则为true。否则为False
*/
@Test
public void checkAndadd() {
RoaringBitmap rbm = RoaringBitmap.bitmapOf(1, 2, 3, 6, 1000, 100000000);
boolean checkedAdd1 = rbm.checkedAdd(3);
boolean checkedAdd2 = rbm.checkedAdd(4);
// 查询该位图中存储的第几个值,从小到大排序
System.out.println(checkedAdd1);
System.out.println(checkedAdd2);
System.out.println(rbm);
// false
// true
// {1,2,3,4,6,1000,100000000}
}
/**
* 检查是否包含该值,这相当于检查是否设置了相应的位(在BitSet类中获取)
* 参数: x–整数值
* return: 是否包括整数值。
*/
@Test
public void contains() {
RoaringBitmap rbm = RoaringBitmap.bitmapOf(1, 2, 3, 6, 1000, 100000000);
boolean contains1 = rbm.contains(3);
boolean contains2 = rbm.contains(4);
// 查询该位图中存储的第几个值,从小到大排序
System.out.println(contains1);
System.out.println(contains2);
}
/**
* 查询该位图中存储的第几个值,从小到大排序
* select(int j)方法中,
* 参数:j代表的是位图值的索引
* return:根据索引查到的值
*/
@Test
public void select() {
RoaringBitmap rbm = RoaringBitmap.bitmapOf(1, 3, 2, 1000, 100000000);
// 查询该位图中存储的第几个值,从小到大排序
int valueOfIndex0 = rbm.select(0);
int valueOfIndex1 = rbm.select(1);
int valueOfIndex2 = rbm.select(2);
int valueOfIndex3 = rbm.select(3);
System.out.println(valueOfIndex0);
System.out.println(valueOfIndex1);
System.out.println(valueOfIndex2);
System.out.println(valueOfIndex3);
// 1
// 2
// 3
// 1000
}
/**
* 统计排名
* rank(int x) ——> rankLong(int x)
* Rank返回小于或等于x的整数数(Rank(无穷大)将是GetCardinality())。如果提供最小值作为参数,此函数将返回1。如果提供小于最小值的值,则返回0。
*/
@Test
public void rank() {
RoaringBitmap rbm = RoaringBitmap.bitmapOf(1, 2, 3, 6, 1000, 100000000);
// 查询该位图中存储的第几个值,从小到大排序
System.out.println(rbm.rank(0));
System.out.println(rbm.rank(1));
System.out.println(rbm.rank(2));
System.out.println(rbm.rank(3));
System.out.println(rbm.rank(4));
System.out.println(rbm.rank(5));
System.out.println(rbm.rank(6));
System.out.println(rbm.rank(7));
System.out.println(rbm.rank(2000));
System.out.println(rbm.rank(3000));
System.out.println(rbm.rank(6000));
System.out.println(rbm.rank(111111111));
System.out.println(rbm.rank(1000000));
}
/**
* 检查范围是否与位图相交。
* intersects(long minimum, long supremum)
* 参数: minimum–范围的包含无符号下界 supermum–范围的唯一无符号上界
* 注意:[上闭,下开)
*/
@Test
public void intersects() {
RoaringBitmap rbm = RoaringBitmap.bitmapOf(1, 2, 3, 1000, 100000000);
// 查询该位图中存储的第几个值,从小到大排序
Boolean boolean1 = rbm.intersects(4L, 999L);
Boolean boolean2 = rbm.intersects(4L, 1000L);
Boolean boolean3 = rbm.intersects(4L, 1001L);
Boolean boolean4 = rbm.intersects(3L, 999L);
Boolean boolean5 = rbm.intersects(2000L, 100000000L);
Boolean boolean6 = rbm.intersects(100000000L, 100000001L);
System.out.println(boolean1);
System.out.println(boolean2);
System.out.println(boolean3);
System.out.println(boolean4);
System.out.println(boolean5);
System.out.println(boolean6);
// false
// false
// true
// true
// false
// true
}
/**
* 按位OR(联合)操作。参数中的位图不会被修改,只要提供的位图保持不变,此操作是线程安全的。
* 如果您有2个以上的位图,请考虑使用FastAggregation类。
* 参数: x1–第一个位图 x2–其他位图
* return: 操作结果
*
* @see FastAggregation#or(RoaringBitmap...)
* @see FastAggregation#horizontal_or(RoaringBitmap...)
*/
@Test
public void or() {
RoaringBitmap rbm1 = RoaringBitmap.bitmapOf(1, 2, 3, 1000, 100000000);
RoaringBitmap rbm2 = RoaringBitmap.bitmapOf(1, 2, 3, 4, 2000, 5000);
RoaringBitmap or = RoaringBitmap.or(rbm1, rbm2);
System.out.println(or);
}
/**
* 按位AND(交叉)运算。参数中的位图不会被修改,只要提供的位图保持不变,此操作是线程安全的。
* 如果您有2个以上的位图,请考虑使用FastAggregation类。
* 参数: x1–第一个位图 x2–其他位图
* return:操作结果
*
* @see FastAggregation#and(RoaringBitmap...)
*/
@Test
public void and() {
RoaringBitmap rbm1 = RoaringBitmap.bitmapOf(1, 2, 3, 1000, 100000000);
RoaringBitmap rbm2 = RoaringBitmap.bitmapOf(1, 2, 3, 4, 1000, 5000);
RoaringBitmap and = RoaringBitmap.and(rbm1, rbm2);
System.out.println(and);
}
}
- bitmap方式计算用户活跃情况
基本思想,使用32位(4Byte)长度空间大小存放每天活跃状态,活跃状态为1,不活跃状态为0.分别用0和1标记登录状态!
- 案例sql
-- 需求:归档增量表和全量表
-- 求当前日期31天以前的时间(包括当前日期) 二进制从右往左 下标是0到30位
-- 当前日期放在下标为30位 30天前的日期放在0位 加上当前日期是31天 所以二进制从右往左31位 下标是0到30位
create table test.user_active_bitmap
as
with tmp as (
-- 求2022-12-22到2021-01-21(包括当天: 2021-01-21)共31天内的活跃日期 最终由二进制转为十进制 比如2021-01-21=2^30
-- 2021-01-21在bitmap中第30位 2021-01-20在bitmap中第29位...以此类推到2022-12-22在bitmap中第0位
select guid,
sum(pow(2,datediff(dt,date_sub('2021-01-21',30)))) as bitmap
from test.active_user_day
where dt between date_sub('2021-01-21',30) and '2021-01-21'
group by guid
)
select
nvl(tmp.guid,dau.guid) guid,
case
when tmp.guid is not null and dau.guid is not null then tmp.bitmap/2+pow(2,30)
when tmp.guid is not null and dau.guid is null then tmp.bitmap/2
when tmp.guid is null and dau.guid is not null then pow(2,30)
end as bitmap_int
from(
-- 由于2021-01-22是往后的日期 因为需要在最高位进行操作!
-- 如果2021-01-22活跃 结果是bitmap/2+2^30 2021-01-22在第30位 2021-01-21在第29位.
-- 因为bitmap是所有活跃日期二进制数相加,而2021-01-21从第30位向右移动到29位 应该除以2再加上2021-01-22二进制数.
-- 如果2021-01-22不活跃 最高位第30位为0 因此bitmap/2 只需要向右移动一位取二进制数.
-- 如果只有2021-01-22活跃 bitmap只有第30位为1,其余全部为0 因为bitmap值为2^30.
select * from test.active_user_day where dt='2021-01-22'
)dau
full join tmp on dau.guid=tmp.guid;
select * from test.user_active_bitmap;
-- 每个用户的bitmap值匹配特定时间范围,连续登录活跃用户!
-- 需求计算在2021-01-18到2021-01-20(包括当天总共3天)范围内连续登录活跃的用户信息 21号日期-18号日期
-- 二进制运算下标是从0开始到30 总共31位且是从左往右
select
guid,
bitmap_int
from test.user_active_bitmap
where cast(pow(2,datediff('2021-01-21',date_sub('2021-01-22',30)))-pow(2,datediff('2021-01-18',date_sub('2021-01-22',30))) as int)|cast(bitmap_int as int)=bitmap_int;
-- 指定日期范围内每个人的活跃天数 例如:2021-01-19到2021-01-22活跃天数 (4天活跃用户数)
-- substring(用户所有活跃日期转二进制字符串,起始日期下标,截取起始日期往后的天数)
-- 比如2021-01-19到2021-01-22活跃天数 (4天内用户活跃情况) subString(用户所有活跃日期转二进制字符串,2021-01-19在二进制中的下标,4)
-- 字符串0000000000000000000000000010110共31位 下标从1开始到31 从左往右 共31位
select
guid,
length(replace(substring(reverse(lpad(bin(bitmap_int),31,'0')),datediff('2021-01-19',date_sub('2021-01-22',30))+1,1),0,'')) as days
from test.user_active_bitmap;
-- 每个用户最大连续登录天数 比如:g003 最大3天连续登录 g002最大5天连续登录...
select
guid,
max(length(s)) as max_act_cnt
from test.user_active_bitmap
lateral view explode(split(bin(bitmap_int),'[0]+')) o as s
group by guid;
- 用户主题
- 活跃用户统计(日、月、周) 活跃定义:当日登录在线即为活跃 日活:当日活跃的用户数 周活:当周活跃的用户数 月活:当月活跃的用户数 计算频率:每日更新.
- 当月连续活跃用户统计 需求数据:计算日期、月份、连续5天用户数、连续7天、连续14天、连续20天、连续30天 计算频率:每日更新.
- 新增用户数(日、月、周) 新用户:当日被标注为 isNew 的用户,新用户标签有效时间为当天,第二天则视为老用户 计算频率:每日更新.
- 新用户留存统计 新用户留存:需要灵活统计半年内各个时间范围内的留存用户 .
- 流失用户计算 流失用户:最近7天未活跃的用户 流失定义翻看文档第一章 《需求分析与设计》3.1.7 计算频率:每日更新.
- 用户连续活跃区间记录(类似拉链表 方案一)
核心思想: 记录着每个人的每天活跃状态,但是又不用每天都存储一条记录!
- sql案例(连续登录用户数量)
-- user_login_tbl表字段备注
-- guid用户id dt当天日期
-- 需求:在12月1日和12月7日中,连续登录5天用户,显示用户id,起始时间、结束时间、连续登录天数
-- 连续登录5天的用户
select
guid,
min(dt) as start_time,
max(dt) as end_time,
count(1) as days
from
(select
guid,
dt,
date_sub(t1.dt,rs) temp_date
from(
select
guid, dt,
row_number() over (partition by guid order by dt) rs
from
dws.user_login_tbl) t1
) t2
group by guid,temp_date
having count(temp_date)>=5;
- 日活表与全量表进行数据归档(案例二)
核心思想:把日活用户记录添加到全量表中,分三种情况,1、全量表有记录 日活表有记录,全量表失效时间不是9999-12-31,直接把日活表记录插入全量表中,也就是生效时间使用日活记录生效时间,失效时间是9999-12-31。2、全量表记录存在,并且失效时间是9999-12-31,日活表没有记录,要把全量表记录截断,也就是全量表生效时间为历史表生效日期,失效时间变成当天日期的前一天。3、全量表没有记录,日活表有记录,生效时间直接使用日活记录表时间,失效时间使用9999-12-31。其他情况都是使用全量表的记录时间。
-- dws_app_user_ctnu_rng表字段备注
--guid bigint, 用户全局id
--first_acc_dt string, 第一次访问时间
--rng_start_dt string, 开始上线时间
--rng_end_dt string 下线时间
-- dwd_app_user_dau表字段备注
-- guid bigint, 用户全局id
-- isnew int, 是否新用户
-- first_acc_dt string 第一次访问时间
with history as(
select
guid,
-- 用户最早的第一次访问时间
min(first_acc_dt) as first_acc_dt,
-- 用户最早的生效时间
min(rng_start_dt) as rng_start_dt,
-- 用户最晚的失效时间
max(rng_end_dt) as rng_end_dt
from
dws.dws_app_user_ctnu_rng
group by guid
),day_active as (
select
guid,
dt
from dwd.dwd_app_user_dau
where dt='2023-03-02'
)
select
coalesce(h.guid,a.guid) as guid,
coalesce(h.first_acc_dt,a.dt) as first_acc_dt,
case
-- 历史表记录有数据并过期 增量表有数据 直接插入增量表记录 生效时间是当日时间
when h.guid is not null and a.guid is not null and h.rng_end_dt<>'9999-12-31' then a.dt
-- 历史表记录有数据并且还有效 增量表为空 生效时间还是历史表生效时间
when h.guid is not null and a.guid is null and h.rng_end_dt='9999-12-31' then h.rng_start_dt
-- 历史表记录为空 增量表记录不为空 生效时间就是当天时间
when h.guid is null and a.guid is not null then a.dt
-- 其他情况就是历史表生效时间
else h.rng_start_dt
end as rng_start_dt,
case
-- 历史表记录有数据并过期 增量表有数据 直接插入增量表记录 失效时间是9999-12-31
when h.guid is not null and a.guid is not null and h.rng_end_dt<>'9999-12-31' then '9999-12-31'
-- 历史表记录有数据并且还有效 增量表为空 失效日期是当前时间前一天!
when h.guid is not null and a.guid is null and h.rng_end_dt='9999-12-31' then date_format(date_sub(`current_date`(),1),'yyyy-MM-dd')
-- 历史表记录为空 增量表记录不为空 失效时间就是9999-12-31
when h.guid is null and a.guid is not null then '9999-12-31'
-- 其他情况就是历史表失效时间
else h.rng_end_dt
end as rng_end_dt
from history h
-- 通过用户全局id字段关联
full join day_active a on h.guid=a.guid;
-- 方案二
-- 日活表与历史表进行拉链合并
-- 在历史表中把截止昨天为止 失效用户提取出来
with temp1 as (
-- 用户失效记录
select
guid,first_acc_dt, rng_start_dt, rng_end_dt, dt
from dws.dws_app_user_ctnu_rng
where dt=date_sub(`current_date`(),1) and rng_end_dt!='9999-12-31'
), temp2 as (
select
nvl(t1.guid,t2.guid) guid,
nvl(t2.first_acc_dt,t1.first_acc_dt) first_acc_dt,
nvl(t1.rng_start_dt,'2023-12-09') rng_start_dt,
-- 昨天失效
if(t1.guid is not null and t2.guid is null,'2023-12-08','9999-12-31') rng_end_dt,
'2023-12-09' dt
from
(
-- 截止昨天为止 用户在线记录
select
guid,first_acc_dt,rng_start_dt,rng_end_dt
from dws.dws_app_user_ctnu_rng
where dt=date_sub(`current_date`(),1) and rng_end_dt='9999-12-31'
)t1
full join (
-- 用户日增记录
select
guid,first_acc_dt,dt
from dwd.dwd_app_user_dau
where dt='2023-12-09'
) t2
)
insert overwrite table dws.dws_app_user_ctnu_rng partition (dt='2023-12-09')
select
guid,first_acc_dt, rng_start_dt, rng_end_dt
from temp2
union all
select
guid,first_acc_dt, rng_start_dt, rng_end_dt
from temp1;
- 用户留存率
一般是起始时间的几天留存天数或者留存率。
计算日期向前几天就是几天留存率!
- 用户留存人数sql
-- 当天为止前一个月内,一个月的用户留存率
select
t1.dt dt,
t1.first_acc_dt first_acc_dt,
-- 用户存活数量
t1.ret_days ret_days,
t2.new_usr_count new_usr_count,
-- 用户在线数/一个月内注册的用户
t1.ret_days/t2.new_usr_count retention_ratio
from (select dt,
first_acc_dt,
ret_days,
ret_usr_cnt
from dws.dws_user_ret_cnt
where dt = '2023-12-09'
) t1
left join (
select
-- 在一个月内注册的用户中
first_acc_dt,
count(guid) new_usr_count
from dwd.dwd_app_user_register
where first_acc_dt>=date_sub('2023-12-09',30) and first_acc_dt<='2023-12-09'
group by first_acc_dt
) t2
on t1.first_acc_dt=t2.first_acc_dt;
-- 超过7天用户未上线属于流失用户
select
'2023-12-08' dt,
count(1) wastage_count
from(
select
max(rng_end_dt) last_login_user
from dws.dws_app_user_ctnu_rng
where dt='2023-12-08'
group by guid) t
where datediff('2023-12-08',last_login_user)>7;
- 漏斗模型
用户进行一系列操作,在这个过程中,每到一个步骤人数相应减少,到最后一个步骤人数最少!
比如,分析师定义了一个业务路径:秒购 步骤1: 点击了秒杀运营位广告
步骤2: 点击了广告落地页中的促销商品
步骤3: 订阅了这个商品的秒杀通知、
漏斗转化率就是每步转化率是这步操作的人数/上一步人数+下面所有步骤人数。
- 分析浏览商品===> 加入购物车 ===> 提交订单每步的转化率
drop table if exists dws.dws_app_ld_compstep;
create external table dws.dws_app_ld_compstep
(
guid BIGINT COMMENT '用户唯一ID',
funnel_model STRING COMMENT '漏斗模型名称',
eventid STRING COMMENT '最终事件名称',
max_step INT COMMENT '最大步骤'
)PARTITIONED BY (DT STRING)
STORED AS parquet
LOCATION '/zwf/app/dws/dws_app_ld_compstep'
TBLPROPERTIES ('parquet.compress' = 'snappy');
-- regexp_extract 默认返回正则匹配第一个() 还可以在第二个参数进行指定匹配值 不能超过()个数 如果第三个参数是0 则返回全部字符串
select regexp_extract('1702125375_pageView_addCart_submitOrder','.*?(pageView).*?(addCart).*?(submitOrder).*?',2) as dt;
--处理数据
select * from dwd.dwd_app_event_detail;
with temp as (
-- 把时间和eventId进行拼接 并按时间戳进行从小到大排序
select
guid,
concat_ws(',',sort_array(collect_list(concat_ws('_',`timestamp`,eventId)))) as event_seq
from dwd.dwd_app_event_detail
where dt='20230301'
group by guid
)
insert overwrite table dws.dws_app_ld_compstep partition (dt='2023-03-01')
select
guid,
'下单漏斗模型' funnel_model,
eventId,
max_step
from (select guid,
case
when regexp_extract(event_seq, '.*?(pageView).*?(addCart).*?(submitOrder).*?', 3) = 'submitOrder'
then 'submitOrder'
when regexp_extract(event_seq, '.*?(pageView).*?(addCart).*?', 2) = 'addCart' then 'addCart'
when regexp_extract(event_seq, '.*?(pageView).*?', 1) = 'pageView' then 'pageView'
else ''
end as eventId,
case
when regexp_extract(event_seq, '.*?(pageView).*?(addCart).*?(submitOrder).*?', 3) = 'submitOrder'
then 3
when regexp_extract(event_seq, '.*?(pageView).*?(addCart).*?', 2) = 'addCart' then 2
when regexp_extract(event_seq, '.*?(pageView).*?', 1) = 'pageView' then 1
else 0
end as max_step
from temp) t
where max_step>0;
-----------------------------------------------------------------------------------------
drop table ads.ads_app_ld_conversion_rate;
create external table ads.ads_app_ld_conversion_rate
(
funnel_model STRING COMMENT '漏斗模型名称',
step INT COMMENT '步骤',
rate DOUBLE COMMENT '转化率',
step_cnt BIGINT COMMENT '步骤人数'
)PARTITIONED BY (DT STRING)
STORED AS parquet
LOCATION '/zwf/app/ads/ads_app_ld_conversion_rate';
-- 先求出每步进行的用户数量
with temp as(
select
funnel_model,
max_step step,
count(max_step) step_cnt
from dws.dws_app_ld_compstep
where dt='2023-03-01'
group by funnel_model,max_step
)
-- insert overwrite table ads.ads_app_ld_conversion_rate partition (dt='2023-03-01')
-- 第一步到第二步转化率 第二步用户人数/第一步用户人数+第二步+第三步
-- 第二步到第三步转化率 第三步用户人数/第二步+第三步用户人数
select
t.funnel_model,
step,
total_cnt/lag(total_cnt,1,t.total_cnt) over (partition by funnel_model order by step) as rate,
total_cnt
from (select step,
sum(step_cnt) over (partition by funnel_model order by step rows between current row and unbounded following) total_cnt,
funnel_model
from temp
) t;
- 事件归因分析
所谓事件归因就是,事件发生前几个步骤对事件发生影响的比例多大!
首次触点归因:待归因事件中,最早发生的事,被认为是导致业务结果的唯一因素
末次触点归因:待归因事件中,最近发生的事,被认为是导致业务结果的唯一因素
线性归因:待归因事件中,每一个事件都被认为对业务结果产生了影响,影响力平均分摊
位置归因:定义一个规则,比如最早、最晚事件占40%影响力,中间事件平摊影响力 时间衰减归因:越晚发生的待归因事件,对业务结果的影响力越大
- 案例:分析获取优惠券线性归因分析.
drop table if exists dws.dws_event_attribute_day;
create external table dws.dws_event_attribute_day
(
model STRING COMMENT '分析模型'
,strategy STRING COMMENT '归因分析类型'
,guid BIGINT COMMENT '用户唯一ID'
,dest_event STRING COMMENT '目标事件'
,attr_event STRING COMMENT '待归因事件'
,weight DOUBLE COMMENT '权重'
)PARTITIONED BY (dt STRING)
STORED AS parquet
location '/zwf/app/dws/dws_event_attribute_day'
TBLPROPERTIES ('parquet.compress' = 'snappy');
package com.zwf.calc
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.collection.mutable
/**
* @author MrZeng
* @date 2023-12-10 13:02
* @version 1.0
*/
//归因分析 代码
object AttributeAnlysis {
def main(args: Array[String]): Unit = {
//线性归因
//加载spark环境
val spark: SparkSession = SparkSession.builder().master("local[*]").config("spark.sql.shuffle.partitions", "1")
.appName("归因分析").enableHiveSupport().getOrCreate()
//导入隐式转换
import spark.implicits._
//执行sql
val SQLStr=
"""
|select
| guid,
| sort_array(collect_list(concat_ws('_',`timestamp`,eventId))) event
|from dwd.dwd_app_event_detail
|where eventId in ('fetchCoupon','adShow','goodsView','addCart') and dt='20230301'
|group by guid;
|""".stripMargin
val total: DataFrame = spark.sql(SQLStr)
// total.show(10,false)
val value: RDD[(Long, String, String)] = total.rdd.flatMap(row => {
//获取第一个字段
val guid: Long = row.getLong(0)
//获取第二个字段 WrappedArray数组类型
val array: mutable.WrappedArray[String] = row.getAs[mutable.WrappedArray[String]]("event")
//处理每个元素 并变成字符串
val s=array.map(s => s.split("_")(1)).mkString(",")
//["adShow,goodsView,addCart","adShow goodsView addCart","adShow goodsView addCart"]
val eventIds: Array[String] =s.split("fetchCoupon").filter(x => StringUtils.isNotBlank(x.replaceAll(",", "")))
eventIds.map(str => (guid, "fetchCoupon", str))
})
value.toDF().show(10,false)
//线性归因 先转为DataFrame数据类型
val df: DataFrame = linearAnalysis(value).toDF("model", "strategy", "guid", "dest_event", "attr_event", "weight")
df.createOrReplaceTempView("temp")
df.show(10,false)
spark.sql(
"""
|insert overwrite table dws.dws_event_attribute_day partition(dt='2023-12-10')
|select
| model,strategy,guid,dest_event,attr_event,weight
|from
| temp
|""".stripMargin
)
spark.close()
}
//线性分析
def linearAnalysis(total:RDD[(Long, String, String)]): RDD[(String,String,Long,String,String,Double)] = {
total.flatMap(x=> {
val guid = x._1
val dest_event= x._2
val attr_event =x._3
val event_attr: Array[String] = attr_event.split(",").filter(x => StringUtils.isNotBlank(x))
val weight=100.0/event_attr.length
event_attr.map(t=>("优惠券获取归因分析", "线性归因",guid,dest_event,t,weight))
}
)
}
}
- 用户画像分析
标签:总结,数仓,val,项目,--,app,dt,guid,spark From: https://www.cnblogs.com/smallzengstudy/p/17897759.html用户画像就是给用户行为产生的数据,经过处理后,直观展示ads层数据,进行标签,比如常用标签:IT男、吸金男等。
用户画像有什么用
- 用于人群属性统计分析
- 用于人群划分,精准营销(针对不同的人群实施不同的营销策略、行为)
- 用于个性化推荐
- 用于精准广告推送
用户画像的技术实现
直接在hive数仓中,通过计算sql任务即可实现,算完后的结果依然存在hive中
用户画像的数据如何提供给“别人”使用
看统计分析报表,直接在“可视化平台上”对接结果数据进行展现即可
在线分析,直接在“可视化平台上”对接presto引擎进行即时计算即可
个性化推荐、精准营销、精准广告推送,画像标签结果数据导出到 hbase或elastic search,提供给 相应的功能系统对接读取即可!
一般要所有需要计算的指标全部算出来存入hbase,供用户根据展示的数据进行打标签!!!