首页 > 数据库 >实时即未来,车联网项目之将数据落地到文件系统和数据库【三】

实时即未来,车联网项目之将数据落地到文件系统和数据库【三】

时间:2022-10-03 11:31:29浏览次数:58  
标签:src tableName 数据库 写入 文件系统 实时 maynor HBase 数据


文章目录

实时读取流数据的步骤

实时即未来,车联网项目之将数据落地到文件系统和数据库【三】_hive

原始数据实时ETL任务分析 Hive

将HDFS数据映射到Hive表

  • 需要指定的HDFS的目录
  • 实时即未来,车联网项目之将数据落地到文件系统和数据库【三】_数据库_02

  • 回忆如何映射HDFS数据到Hive表中

① 创建表 create external table maynor_src (…) row formate delimited field terminate by ‘\t’ partitioned by(dt string) location ‘hdfs://node01…/maynor_src’;

② 使用数据库

③ 添加文件夹到指定分区

alter table maynor_src add partition(dt=‘20210922’) location ‘hdfs://node01:8020/apps/warehouse/ods.db/maynor_src/20210922’

#!/bin/bash

dt=`date -d '1 days ago' +'%Y%m%d'`
tableName=$1

ssh node03 `/export/server/hive/bin/hive -e "use maynor_ods;alter table ${tableName} add partition(dt=${dt}) location 'hdfs://node01:8020/apps/warehouse/ods.db/${tableName}/${dt}"`
  • 如何实现从HDFS中正确或错误的数据映射到Hive表中


  • 如何自动化HDFS数据到Hive表中
# 使用shell 脚本
alter table maynor_src add partition (dt="20210922") location "/apps/hive/warehouse/ods.db/maynor_src/20210922";
  • 如何执行 t+1 离线任务,设置调度的两种方式


  • ① crontab
    ​ linux 自带调度
    ② 调度平台
    azkaban airflow dolphinscheduler oozie 自研

自定义Sink数据写入Hive表(了解)

  • 实现步骤
package cn.maynor.streaming.sink;

import cn.maynor.streaming.entity.maynorDataObj;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
* Author maynor
* Date 2021/9/22 10:02
* Desc 将每条车辆的数据直接写入到 Hive 中
*/
public class SaveErrorDataHiveSink extends RichSinkFunction<maynorDataObj> {
//定义 logger
private static final Logger logger = LoggerFactory.getLogger(SaveErrorDataHiveSink.class);
//2.创建有参构造方法,参数包括数据库名和表名
//定义变量
private String dbName;
private String tableName;
//定义连接对象和statement对象
private Connection conn = null;
private Statement statement = null;
//构造方法
public SaveErrorDataHiveSink(String _dbName,String _tableName){
this.dbName = _dbName;
this.tableName = _tableName;
}

//3.重写open方法进行Hive连接的初始化
@Override
public void open(Configuration parameters) throws Exception {
//3.1 将JDBC驱动 org.apache.hive.jdbc.HiveDriver 加载进来
//获取全局参数
ParameterTool parameterTool = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters();
//获取当前上下文中 hive 的驱动
Class.forName(parameterTool.getRequired("hive.driver"));
//3.2 设置JDBC连接Hive的连接器,端口为10000
conn = DriverManager.getConnection(
parameterTool.getRequired("hive.url"),
parameterTool.getRequired("hive.user"),
parameterTool.get("hive.password")
);
//3.3 创建Statement
statement = conn.createStatement();
//3.4 定义 schemaAndTableExists 实现库不存在创建库,表不存在创建表
Boolean flag = schemaAndTableExists(dbName,tableName,statement);
if(flag){
logger.info("当前数据库和表初始化成功!");
}else{
logger.warn("请检查数据库和表!");
}
}

//5.重写cloese方法 关闭连接
@Override
public void close() throws Exception {
if(!statement.isClosed())statement.close();
if(!conn.isClosed())conn.close();
}

//4.重写invoke将每条数据
@Override
public void invoke(maynorDataObj value, Context context) throws Exception {
//4.1 编写SQL将数据插入到表中
// insert into maynor_error values('11111');
StringBuffer buffer = new StringBuffer();
buffer.append("INSERT INTO "+tableName);
buffer.append(" VALUES('");
buffer.append(value.getErrorData()+"'");
//4.2 执行statement.executeUpdate 将数据直接落地到Hive表
statement.executeUpdate(buffer.toString());
}

//6.定义 schemaAndTableExists 方法 create database if not exists库或表, execute,选择数据库

/**
* 初始化数据库和数据表,如果初始化成功返回 true,否则 false
* @param dbName
* @param tableName
* @param statement
* @return
*/
private Boolean schemaAndTableExists(String dbName, String tableName, Statement statement) {
//数据库是否存在
Boolean flag = true;
try{
//初始化数据库
String createDBSQL="create database if not exists "+dbName;
boolean executeDB = statement.execute(createDBSQL);
if(executeDB){
logger.info("当前数据库创建成功");
flag = true;
}else{
logger.info("当前数据库已经存在");
flag = true;
}
//初始化数据表
String createTableSQL = "use "+tableName+";create table if not exists "+tableName+" (json string) partition by dt" +
" row formatted delimited field terminate by '\t' location '/apps/hive/warehouse/ods.db/maynor_error'";
boolean executeTable = statement.execute(createTableSQL);
if(executeTable){
logger.info("当前数据库表创建成功");
flag = true;
}else{
logger.info("当前数据表已经存在");
flag = true;
}
}catch (Exception ex){
logger.warn("初始化失败!");
flag = false;
}
return flag;
}
}

原始数据实时ETL落地到HBase

  • 写入hbase的步骤和准备
  1. 写入的表名
  2. hbase的rowkey
  3. 写入的列簇 columnFamily
  4. 列名和列值

HBase的rowkey设计原则

① rowkey 的长度原则 , 16个字节

② rowkey 的散列原则 ,尽量保证离散

③ rowkey 的唯一原则 , rowkey不要一样

HBase的rowkey设计方法

① 加盐 —— 随机数

② Hash散列

③ 翻转字符串

正常数据落地到HBase

  • 开启 HBase 集群
# 首先开启 hdfs ,zookeeper
  • 进入到 HBase命令行
hbase shell
  • 创建HBase表 - maynor_src ,列簇为 cf
# 查看hbase所有表
list
# 查看namespace(数据库)
list_namespace
# 创建数据表
hbase(main):005:0> create 'maynor_src','cf'
# 查看表中的数据
scan 'maynor_src'
  • 开发步骤
//1.创建 SrcDataToHBaseSink类继承 RichSinkFunction<maynorDataObj>
//2.创建一个有参数-表名的构造方法
//3.重写open方法
//3.1 从上下文获取到全局的参数
//3.2 设置hbase的配置,Zookeeper Quorum集群和端口和TableInputFormat的输入表
//3.3 通过连接工厂创建连接
//3.4 通过连接获取表对象
//4.重写close方法
//4.1 关闭hbase 表和连接资源
//5. 重写 invoke 方法,将读取的数据写入到 hbase
//5.1 setDataSourcePut输入参数value,返回put对象
//6. 实现 setDataSourcePut 方法
//6.1 如何设计rowkey VIN+时间戳翻转
//6.2 定义列簇的名称
//6.3 通过 rowkey 实例化 put
//6.4 将所有的字段添加到put的字段中

原始数据实时 ETL 任务 HBase 调优

数据写入HBase优化 - 客户端优化

  • 为什么需要优化呢?

防止出现每条数据都读写 HBase 数据库,造成集群宕机和数据丢失。

  • 批量写入需要使用的缓存对象 - BufferedMutator 写数据的原理
    将数据按批次写入到 BufferedMutator 对象中,按时间或者按大小写入。
  • 代码逻辑优化
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
params.writeBufferSize(10 * 1024 * 1024L);
params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L);

//5.1 setDataSourcePut输入参数value,返回put对象
try {
Put put = setDataSourcePut(value);
mutator.mutate(put);
//5.2 指定时间内的数据强制刷写到hbase
mutator.flush();
}catch (Exception ex){
logger.error("写入到hbase失败:"+ex.getMessage());
}
  • 在主流程中将数据写入到 maynor_src

数据写入HBase预分区

  • 预分区的概念
  • 创建预分区的语法

数据写入HBase预写日志

  • 预写日志的作用
  • memstore在HBase读写作用

数据写入HBase使用压缩和编码

  • 编码压缩其实是对列数据的压缩
  • 编码压缩的优势


  • 编码类型


  • 创建一个 fast_diff 编码的 maynor_src 表
alter 'maynor_src', { NAME => 'cf', DATA_BLOCKs_ENCODING => 'FAST_DIFF' }
  • 压缩算法


  • 创建一个 gz 或 snappy 压缩的 maynor_src_gz 表
create 'maynor_src',{NAME => 'cf',COMPRESSION => 'gz'}create 'maynor_src_snappy', { NAME => 'cf', COMPRESSION => 'SNAPPY' }
  • 查看数据量大小


标签:src,tableName,数据库,写入,文件系统,实时,maynor,HBase,数据
From: https://blog.51cto.com/u_15277063/5729992

相关文章

  • PG--数据库统计信息
    analyze自动收集统计信息自动收集统计信息是依赖AUTOVACUUM定时触发analyze触发vacuumanalyze的条件表上(insert,update,delte记录)>=autovacuum_analyze_scale_facto......
  • SpringBoot--解决@Transactional与@CacheEvict联合使用导致的缓存与数据库的一致性问
    ​简介说明    本文介绍@Transactional与@CacheEvict联合使用导致的缓存与数据库的一致性问题的原因及解决方案。注解的作用        @Transactional:给当前......
  • 【Swoole系列6.4】Hyperf操作数据库
    Hyperf操作数据库对于一个MVC框架来说,连接操作各种数据库是最基本的功能。而对于Swoole,其实我们前面并没有讲很多关于数据库方面的东西,毕竟这不是我们这个系列的主要内......
  • SQLMap入门——查询当前用户下的所有数据库
    确定网站存在注入后,用于查询当前用户下的所有数据库pythonsqlmap.py-uhttp://localhost/sqli-labs-master/Less-1/?id=1--dbs  ......
  • SQLMap入门——获取数据库的所有用户
    列出数据库中的所有用户在当前用户有权读取包含所有用户的表的权限时,使用该命令列出所有管理用户pythonsqlmap.py-uhttp://localhost/sqli-labs-master/Less-1/?id=1......
  • SQLMap入门——获取数据库用户的密码
    列出数据库用户的密码如果当前用户有读取包含用户密码的权限,SQLMap会先列举出用户,然后列出Hash,并尝试破解pythonsqlmap.py-uhttp://localhost/sqli-labs-master/Less-......
  • SQLMap入门——获取数据库中的表名
    查询完数据库后,查询指定数据库中所有的表名pythonsqlmap.py-uhttp://localhost/sqli-labs-master/Less-1/?id=1-Dxssplatform--tables注意:-D大小写  --table......
  • oracle数据库运行内存PGA+SGA分配
    调整内存大小用dba身份进入oracle,(sqlplussys/密码assysdba):--显示内存分配情况showparametersga;--修改占用内存的大小altersystemsetsga_max_size=200mscop......
  • EXT2文件系统
    目录数据结构和遍历支持Linux所有文件操作的EXT2文件系统通过虚拟磁盘mount_root构建基本文件系统三个级别时间戳转换为任意可读的日期格式数据结构和遍历Block#0是......
  • Spring整合Mybatis连接MySQL数据库配置 异常 情景再现
    Spring整合Mybatis连接MySQL数据库配置情景再现注意点一:数据库Table表字段和Entity实体类属性名不匹配解决:方法一:开启下划线转大驼峰命名mybatis:configurati......