首页 > 其他分享 >离线数据处理1

离线数据处理1

时间:2023-12-21 11:14:55浏览次数:34  
标签:info 离线 user time 数据处理 spark operate 字段名

离线数据处理-数据抽取&数据清洗&指标计算.1 2023/12/20学习笔记

1.基础SQL-1

1.1 基础命令

1.1.1 SQL基本操作-数据库数据表基本操作

#创建数据库
create database DatebaseName;
#查询所有的数据库
show database DatabaseName;
#删除数据库
drop database DatabaseName;
#切换数据库
use databaseName;
#查看当前使用的数据库
select database();
#创建数据表
create table tablename(字段名 value,字段名 value......);
#查看数据表
show tables;
#查看表结构
desc tablename;
#删除数据表
drop table tablename;

1.1.2 SQL基本操作-修改数据表

#修改表名称
alter table tablename rename to tablename1;
#修改字段名
alter table tablename change name sname 字段名;
#修改字段数据类型
alter table tablename modify sname 字段名;
#增加字段
alter table tablename add 字段名 字段类型;
#删除字段
drop table tablename drop 字段名;

1.1.3 SQL基本操作-主键约束

主键约束即primary key用于唯一的标识表中的每一行。被标识为主键的数据在表中是唯一的且其值不能为空。这点类似于我们每个人都有一个身份证号,并且这个身份证号是唯一的。
基本语法:
字段名 数据类型 primary key;

#Part.1
create table tablename(字段名 数据类型 primary key...)
#Part.2
create table tablename(字段名 数据类型,字段名 数据类型...primary key)

1.1.4 SQL基本操作-非空约束

非空约束即 NOT NULL指的是字段的值不能为空
基本语法:
字段名 数据类型 NOT NULL;

create table tablename(字段名 数据类型 NOT NULL)

1.1.5 SQL基本操作-默认值约束

默认值约束即DEFAULT用于给数据表中的字段指定默认值,即当在表中插入一条新记录时若未给该字段赋值,那么,数据库系统会自动为这个字段插入默认值
基本语法:
字段名 数据类型 DEFAULT 默认值;

create table tablename(字段名 数据类型,字段名 数据类型 default '...')

1.1.6 SQL基本操作-唯一性约束

唯一性约束即UNIQUE用于保证数据表中字段的唯一性,即表中字段的值不能重复出现
基本语法:
字段名 数据类型 UNIQUE;

create table tablename(字段名 数据类型,字段名 数据类型 unique)

1.1.7 SQL基本操作-外键约束

基本语法:
CONSTRAINT 外键名 FOREIGN KEY (从表外键字段) REFERENCES 主表 (主键字段)
ALTER TABLE 从表名 ADD CONSTRAINT 外键名 FOREIGN KEY (从表外键字段) REFERENCES 主表 (主键字段);

create table tablename1(字段名1 数据类型,字段名2 数据类型)
create table tablename2(字段名3 数据类型,字段名4 数据类型)
alter table tablename2 add constraint fk_字段名3_字段名4 foreign key(字段名3) references tablename3(字段名1);

1.1.8 SQL基础操作-删除外键

alter table 从表名 drop foreign key 外键名

1.1.9 SQL基础操作-插入数据

#为表中字段插入数据
insert into tablename (字段1,字段2...) values (值1,值2...);
#同时插入多条记录仪
insert into tablename (字段1,字段2...) values (值1,值2...),(值1,值2...),...;

2.离线数据处理-数据抽取

2.1 数据抽取-GZ033

2.1.1 数据抽取-题目

编写Scala代码,使用Spark将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中

1.抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.user_info命令

2.1.2 数据抽取-代码

package ods
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import java.util.Properties

object task6 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .appName("hive example")
      .master("local")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("hive.metastore.uris", "thrift://address1:9083")
      .config("spark.sql.warehouse.dir", "hdfs://address1:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .enableHiveSupport()
      .getOrCreate()

    val URL = "jdbc:mysql://address2:3306/ds_pub?useSSL=false"
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "yourpassword")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")

        spark.read.jdbc(url,"user_info",properties).createOrReplaceTempView("user_info")

    val max = spark.sql(
      """
        |select greatest(create_time,operate_time) as max_time
        |from user_info
        |""".stripMargin).collect()(0)(0).toString

    println(max)

    val df1 = spark.sql(
      """
        |select * , '20231220' as etl_date
        |from user_info
        |where greatest(create_time,operate_time) > "2020-04-26 18:57:55"
        |""".stripMargin)

    println("df1--------------"+df1.count())

    df1.show()

    spark.stop()
  }
}

3.离线数据处理-数据清洗

3.1 数据清洗-GZ033

3.1.1 数据清洗-题目

编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。(若dwd库中部分表没有数据,正常抽取即可)

1.抽取ods库中user_info表中昨天的分区(子任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令

3.1.2 数据清洗-代码

package DataCleaning

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Date, Properties}

object task1_1 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_HOME","root")
      Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .config("hive.metastore.uris", "thrift://ADDRESS1:9083")
      .config("spark.sql.warehouse.dir", "hdfs://ADDRESS1:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .config("hive.metastore.uris", "thrift://ADDRESS1:9083")
      .config("spark.sql.warehouse.dir", "hdfs://ADDRESS1:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .enableHiveSupport()
      .getOrCreate()

//    val URL = "jdbc:mysql://ADDRESS2:3306/ds_pub?useSSL=false"
    val URL = "jdbc:mysql://localhost/dwd?useSSL=false"
    val properties = new Properties()
      properties.setProperty("user","root")
      properties.setProperty("password","yourpassword")
      properties.setProperty("driver","com.mysql.jdbc.Driver")

    spark.read.jdbc(URL,"user_info",properties).createOrReplaceTempView("user_info")
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val df1 = spark.sql(
      """
        |select *,
        |   case
        |   When operate_time is null then
        |       create_time
        |   else
        |       operate_time
        |   end as op
        |from user_info
        |""".stripMargin
    )
      .drop("operate_time")
      .withColumnRenamed("op","operate_time")
      .withColumn("etl_date",lit("20201111"))
      .withColumn("insert_user",lit("user1"))
      .withColumn("insert_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
      .withColumn("modify_user",lit("user1"))
      .withColumn("modify_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
    println("df1---------dim-customer_inf最新分区现有的数据的数量"+df1.count())
    df1.show()

    spark.read.jdbc(URL,"user_info_new",properties).createOrReplaceTempView("user_info_new")
    val df2 = spark.sql(
      """
        |select *,
        |   case
        |   when operate_time is null
        |       then
        |       create_time
        |   else
        |     operate_time
        |   end as op
        |from user_info_new
        |""".stripMargin)
//      .show()
      .drop("operate_time")
      .withColumnRenamed("op","operate_time")
      .withColumn("etl_date",lit("20201111"))
      .withColumn("insert_user",lit("user1"))
      .withColumn("insert_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
      .withColumn("modify_user",lit("user1"))
      .withColumn("modify_time",lit(sdf.format(new Date(new Date().getTime-60*1000))))
    println("df2---------dim-customer-inf最新分区现有的数据的数量"+df2.count())
    df2.show()

    df1.createOrReplaceTempView("dim_customer_info")

    df2.createOrReplaceTempView("customer_info")

    //合并df1和df2
    val df3 = spark.sql(
      """
        |select * from customer_info
        |union all
        |select * from dim_customer_info
        |""".stripMargin)
    println("df3--------union_user_inf0合并后的分区现有的数据的数量"+df3.count())
    df3.show()

    df3.createOrReplaceTempView("union_user_info")

    //从union的视图中以id来查询operate_time的最大值
    val df4 = spark.sql(
      """
        |select id,
        |   Max(operate_time) operate_time
        |from
        |   union_user_info
        |group by id
        |""".stripMargin)
//      .show()
    println("df4---------dict_user_info根据df3分组id得到的操作时间表"+df4.count())
    df4.show()

    df4.createOrReplaceTempView("dict_user_info")

    //根据operate_time取最新的一条
    //从union_user_info和dict_user_info中进行筛选
    //and union_user_info.operate_time=dict_user_info.operate_time指的是operate_time也必须匹配
    //df5将包含union_user_info中那些在dict_user_info中也匹配id和operate_time的记录的所有类
    val df5 = spark.sql(
      """
        |select union_user_info.*
        |from union_user_info,dict_user_info
        |where union_user_info.id==dict_user_info.id
        |and union_user_info.operate_time==dict_user_info.operate_time
        |""".stripMargin)
//      .show()
    println("df5----------"+df5.count())
    df5.show()

    df5.createOrReplaceTempView("distinct_user_info")

//    val df6 = spark.sql(
//      """
//        |select id,
//        | min(insert_time) dit
//        |from
//        | union_user_info
//        |group by id
//        |""".stripMargin)
////      .show()
//    println("df6---------"+df6.count())
//    df6.show()
//
//    df6.createOrReplaceTempView("dict2_user_info")
  }

}

3.1.3 数据抽取-个人总结

在做数据清洗的时候根据题意得知题意需要我们做的是抽取(子任务1)中做出的user_info表与我们新抽取出的表根据id字段去合并数据到一张新的表(再根据operate_time排序取出最新的那一条),题目中给出条件当operate_time为null值就用create_time填充,同时添加4个新的字段...

4.离线数据抽取-指标计算

4.1 指标计算-GZ033

4.1.1 指标计算-题目

编写Scala代码,使用Spark计算相关指标。
注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。

1.根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,

字段 类型 中文含义
provinceid int 省份表主键
provincename text 省份名称
regionid int 地区表主键
regionname text 地区名称
totalconsumption double 订单总金额
totalorder int 订单总数
year int
month int

4.1.2 指标计算-代码

package DataCleaning
import org.apache.hadoop.hdfs.server.namenode.SafeMode
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties

object task {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder()
      .master("local")
      .config("hive.metastore.uris", "thrift://youraddress:9083")
      .config("spark.sql.warehouse.dir", "hdfs://192.168.45.13:8020/user/hive/warehouse")
      .config("dfs.client.use.datanode.hostname", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    val url = "jdbc:mysql://mysqladeddress:3306/ds_pub?useSSL=false"
    val url_save = "jdbc:mysql://youraddress:3306/shtd_result?useSSL=false"
    val url1_save = "jdbc:mysql://localhost:3306/dwd?useSSL=false"

    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","yourpassword")
    properties.setProperty("driver","com.mysql.jdbc.Driver")

    val properties1 = new Properties()
    properties1.setProperty("user", "root")
    properties1.setProperty("password", "123456")
    properties1.setProperty("driver", "com.mysql.jdbc.Driver")

    val properties2 = new Properties()
    properties2.setProperty("user", "root")
    properties2.setProperty("password", "yourpassword")
    properties2.setProperty("driver", "com.mysql.jdbc.Driver")

    spark.read.jdbc(url,"user_info",properties).createOrReplaceTempView("user_info")
    spark.read.jdbc(url,"base_province",properties).createOrReplaceTempView("base_province")
    spark.read.jdbc(url,"base_region",properties).createOrReplaceTempView("base_region")
    spark.read.jdbc(url,"order_info",properties).createOrReplaceTempView("order_info")
    spark.read.jdbc(url,"order_detail",properties).createOrReplaceTempView("order_detail")
    spark.read.jdbc(url,"sku_info",properties).createOrReplaceTempView("sku_info")

    val df1 = spark.sql(
      """
        |SELECT
        |	base_province.id AS provinceid,
        |	base_province.name AS provincename,
        |	base_region.id AS regionid,
        |	base_region.region_name AS regionname,
        |	sum( final_total_amount ) AS totalconsumption,
        |	count(*) AS totalorder,
        |	YEAR ( create_time ) year,
        |	MONTH ( create_time ) month
        |FROM
        |	order_info,base_province,base_region
        |where base_province.id = order_info.province_id and base_region.id = base_province.region_id
        |GROUP BY
        |	provinceid,
        |	provincename,
        |	regionid,
        |	regionname,
        |	YEAR,
        |	MONTH
        |""".stripMargin)

    df1.show()
    df1.write.jdbc(url_save,"provinceeverymonth",properties1)
    spark.stop()
  }
}

4.1.3 指标计算做题思路-个人总结

根据题意,题目要求统计每个省份每个地区以及每个月下单的数量以及下单的总金额,我们可以根据它题目中给出的字段名去每个表中查找中间哪个是互相匹配的,由此做出连接,base_province中的id字段与order_info里的id字段匹配的,base_region中的id与base_province中的region_id字段匹配的,然后再根据题目需求,group by每个省份,每个地区,每个月以及下单的总金额
在做sql语句的时候一定要经常调试!

标签:info,离线,user,time,数据处理,spark,operate,字段名
From: https://www.cnblogs.com/Reirablog21/p/17917936.html

相关文章

  • python进行二进制数据处理的方法
    方法一:使用struct模块,特点轻量化,简单易用。缺点就是可读性不是太好,使用小数据临时使用一下,对于大量的数据解析,写起来比较繁琐,显得有点力不从心。importstructdata=b'\x92\xaa\xbb\xcc\x11\x22\x33\x44'a,b,c,d,e=struct.unpack(">BBBBI",data)print("a=0x%xb=0x%xc=......
  • MJ数据处理:读取txt版
    读取文件夹内的txt名称,并根据该名称将其批量修改importosimportreUNWANTED_UNITS=["undefined","皮皮","zly324"]IMAGE_EXTENSIONS=[".jpg",".jpeg",".png",".gif",".bmp",".tiff"]......
  • vscode server的离线安装
    由于开发环境只有Windows桌面端与Linux服务器,且Linux服务器是内网环境下运行,无法直接访问外网,想在Windows上用vscode直接调示Linux服务上的代码,所以选择了vscode+remotessh插件的方案,但问题在于,如何在Linux是离线的情况下部置vscodeserver1安装SSH工具Windows端的vscode与L......
  • sealos 离线部署 k8s 高可用集群
    sealos简介sealos特性与优势:通过内核ipvs对apiserver进行负载均衡,并且带apiserver健康检测,并不依赖haproxy和keepalived。支持离线安装,工具与资源包(二进制程序配置文件镜像yaml文件等)分离,这样不同版本替换不同离线包即可证书延期使用简单支持自定义配置内核负......
  • PG数据库的离线rpm包下载
    PG数据库的离线rpm包下载背景周末时间研究数据库的版本.发现PostgreSQL数据库的版本号已经变成了一年一个大版本.兼容起来其实成本很高.想着能够在能够上网的机器上面弄好多套数据库.便于备份和下载下载方式参照官方文档进行学习,改完下载和使用.注意的是,我这边......
  • 列队中对询问离线排序后如何建立树状数组
    假设\(m=5\)(注意值存储前\(m-1\)个人)注意我们并没有在方框里面填上具体编号,因为从下文就可以知道这是无关紧要的假设我们删除了第二个人绿色方框是新进来的一个人,红色斜杠表示被删除掉的(但是在代码中我们不会真正的删除这一个位置)那么如果要删除这行中的第二个人,等价于删除......
  • 服务器raid5两块硬盘离线数据恢复
    IBMV5000的故障如下:一块硬盘红灯闪亮,机器还在正常运行,但没有多久,系统就不能正常运行,这时才发现另一块硬盘的红灯也在闪亮。 磁盘阵列数据恢复过程:1.启动服务器,自检至阵列时按Ctrl+M进入NetRaid管理程序。查看阵列信息,发现硬盘下线,运用修改配置将一硬盘强行设置成上线。重新启......
  • vscode全离线环境下远程连接慢、扩展未启用的一种原因
    简单写写网络环境堡垒机-VMware远程->开发虚拟机(Windows,离线)-SSH->编译服务器(Ubuntu,离线)问题现象按照网络教程在编译服务器上离线部署了vscodeserver,配置好ssh公钥,在Windows开发虚拟机上使用vscode的RemoteSSH扩展连接到编译服务器,出现以下问题长时间处于“正在打开远程”......
  • maven推送离线jar包
    一、修改maven的settings.xml文件<servers><server><id>maven-releases</id><username>admin</username><password>admin</password></server></servers>二、生成脚本packagecom.......
  • 对象的数据处理方法,要对对象属性进行数组操作(list数组中每一项与column数组中的valu
       //需要对对象属性进行数组操作时,使用Object.entries()方法    varlist=['V11046_052','V11046_051','V11046_50','V11046_0511'];    varcolumn=[{'观测时间':'D_DATETIME'},{'小时内极大风速出现时间':'V......