首页 > 其他分享 >大数据-基础操作代码(笔记)

大数据-基础操作代码(笔记)

时间:2022-10-09 19:56:39浏览次数:62  
标签:hdfs log -- 代码 ods 笔记 a1 command 操作

大数据-相关基础代码

Kafka

  • 创建Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka_test --replication-factor 3 --partition 3
  • 查看Topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 -list
  • 生产者向Topic发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-test
  • 消费者消费Topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-test --from-beginning

Flume

  • Flume采集
启动:
"nohup /flume/bin/flume-ng agent --conf-file /flume/conf/.conf --name agent1 -Dflume.root.logger=INFO,LOGFILE > /flume/log1.txt 2>&1 &"
停止:
"ps -ef | grep file-flume-kafka | grep -v grep | awk '{print \$2}' |xargs -n1 kill -9"
  • file-flume-kafka.conf
#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.maple.flume.interceptor.ETLInterceptor$Builder(自定义拦截器)

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

ETLInterceptor自定义拦截器

public class JSONUtils {
    public static boolean isJSONValidate(String log){
        try {
            JSON.parse(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}



public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        if (JSONUtils.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                 iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

kafka-flume-hdfs.conf

a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.maple.flume.interceptor.TimeStampInterceptor$Builder(自定义拦截器)

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

#控制生成的小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

TimeStampInterceptor自定义拦截器

public class TimeStampInterceptor implements Interceptor {

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

Sqoop

导入

sqoop/bin/sqoop import \
--connect jdbc:mysql://master:3306/db \
--username root \
--password root \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 where \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-code lzop \
--null-string '\\N' \
--null-non-string '\\N'

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date

导出

/sqoop/bin/sqoop export \
--connect "jdbc:mysql://hadoop102:3306/${mysql_db_name}?useUnicode=true&characterEncoding=utf-8"  \
--username root \
--password 000000 \
--table $1 \
--num-mappers 1 \
--export-dir /warehouse/$hive_db_name/ads/$1 \
--input-fields-terminated-by "\t" \
--update-mode allowinsert \
--update-key $2 \
--input-null-string '\\N'    \
--input-null-non-string '\\N'

Hive

  • ods层
drop table if exists ods_log(表名);
create external table ods_log('字段' 数据类型 comment '注释')
partitioned by ('dt' string)
sorted as 
	inputformat 'com.hadoop.mapred.DeprecatedLzoTextInputformat'
	outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
Localtion '/warehouse/gmall/ods/ods_log'
  • dwd层
DROP TABLE IF EXISTS dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log('字段' 数据类型 comment '注释')
PARTITIONED BY (`dt` STRING) -- 按照时间创建分区
STORED AS PARQUET -- 采用parquet列式存储
LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
TBLPROPERTIES('parquet.compression'='lzo') -- 采用LZO压缩
;
  • dws层
DROP TABLE IF EXISTS dws_visitor_action_daycount;
CREATE EXTERNAL TABLE dws_visitor_action_daycount('字段' 数据类型 comment '注释')
PARTITIONED BY(`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dws/dws_visitor_action_daycount'
TBLPROPERTIES ("parquet.compression"="lzo");
  • dwt层
DROP TABLE IF EXISTS dwt_visitor_topic;
CREATE EXTERNAL TABLE dwt_visitor_topic('字段' 数据类型 comment '注释')
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwt/dwt_visitor_topic'
TBLPROPERTIES ("parquet.compression"="lzo");
  • dim层
drop table if exists dim_sku_info(表名);
create external table dim_sku_info('字段' 数据类型 comment '注释')
partitioned by ('dt' string)
sorted as parquet
Localtion '/warehouse/gmall/dim/dim_sku_info'
TBLPROPERTIES("parquet.compression"="lzo")
  • ads层
DROP TABLE IF EXISTS ads_visit_stats;
CREATE EXTERNAL TABLE ads_visit_stats('字段' 数据类型 comment '注释')
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads-visit_stats';

插入数据

DWD层插入数据需要设置

set hive.exec.dynamic.partition.mode=nonstrict;

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat

插入基本命令可用

insert overwrite table 

Azakaban

nodes:
  - name: mysql_to_hdfs
    type: command
    config:
     command: /home/hadoop/tools/bin/mysql_to_hdfs.sh all ${dt}
    
  - name: hdfs_to_ods_log
    type: command
    config:
     command:  /home/hadoop/tools/bin/hdfs_to_ods_log.sh ${dt}
     
  - name: hdfs_to_ods_db
    type: command
    dependsOn: 
     - mysql_to_hdfs
    config: 
     command: /home/hadoop/tools/bin/hdfs_to_ods_db.sh all ${dt}
  
  - name: ods_to_dim_db
    type: command
    dependsOn: 
     - hdfs_to_ods_db
    config: 
     command: /home/hadoop/tools/bin/ods_to_dim_db.sh all ${dt}

  - name: ods_to_dwd_log
    type: command
    dependsOn: 
     - hdfs_to_ods_log
    config: 
         command: /home/hadoop/tools/bin/ods_to_dwd_log.sh all ${dt}
    
  - name: ods_to_dwd_db
    type: command
    dependsOn: 
     - hdfs_to_ods_db
    config: 
     command: /home/hadoop/tools/bin/ods_to_dwd_db.sh all ${dt}
    
  - name: dwd_to_dws
    type: command
    dependsOn:
     - ods_to_dim_db
     - ods_to_dwd_log
     - ods_to_dwd_db
    config:
     command: /home/hadoop/tools/bin/dwd_to_dws.sh all ${dt}
    
  - name: dws_to_dwt
    type: command
    dependsOn:
     - dwd_to_dws
    config:
     command: /home/hadoop/tools/bin/dws_to_dwt.sh all ${dt}
    
  - name: dwt_to_ads
    type: command
    dependsOn: 
     - dws_to_dwt
    config:
     command: /home/hadoop/tools/bin/dwt_to_ads.sh all ${dt}
     
  - name: hdfs_to_mysql
    type: command
    dependsOn:
     - dwt_to_ads
    config:
      command: /home/hadoop/tools/bin/hdfs_to_mysql.sh all

应用方面

访客主题
    访客统计
    路径分析
用户主题
	用户统计
    用户变动统计
    用户行为漏斗分析
    用户留存率
商品主题
	商品统计
    品牌复购率 
订单主题
	订单统计
	各地区订单统计
优惠券主题
	优惠券统计
活动主题
	活动统计

标签:hdfs,log,--,代码,ods,笔记,a1,command,操作
From: https://www.cnblogs.com/maple9360/p/16773438.html

相关文章

  • MySQL学习笔记
    个人理解可能存在偏差,仅为参考文档;一切以官方文档为准~。数据库什么是数据库按照一定数据结构来组织和存储数据的仓库数据库应用场景:数据量庞大繁多。什么是数据表一......
  • EasyPoi操作Excel
    目录​​一、前言​​​​二、使用​​​​1、导入以下依赖​​​​2、导出示例​​​​2.1.使用定义好的模板​​​​2.2.直接将List>数据导出为excel示例(无需模板)​​​......
  • Mybatis——plus 代码生成器
    MybatisPlus 给我们提供了更加强大的代码生成器  ## 代码生成器的简单的对比  MybatisPlus 给我们提供的代码生成器,不仅仅可以生成dao层,还可以生成 Service层,Cont......
  • python文件操作
    python文件操作文件的概念文件就像是系统呈现在我们面前的能够使用硬盘的快捷方式,文件可以是文本文档、图片、程序等等。文件通常具有点+三个字母的文件扩展名,用于指示文......
  • 文件操作
    文件操作1.文件的概念 就是操作系统暴入给用户操作硬盘的快捷方式比如双击一个文件就是冲硬盘将数据加载到内存ctrl+s保存文件就是将内存的数据刷到硬盘保存......
  • gorm中的关联操作详解
    一对一belongto属于:可以理解为舔狗认为自己属于女神,而女神都不知道舔狗的存在typeGirlstruct{ Idint Namestring } typeDogstruct{ Idint Na......
  • Jmeter使用beanshell加密,调用AES代码,生成jar包
    工作中需要对接口进行AES加密,找开发要来了加密的代码(如下),记录下具体的使用方法:新建一个AESUtil包,在里面新建一个类(建议类的名字也为AESUtil)。把下面的代码复制进去,注意,......
  • kotlin协程代码测试
    importkotlinx.coroutines.*importorg.junit.jupiter.api.Testimportjava.util.concurrent.ExecutorServiceimportjava.util.concurrent.Executorsimportkotlin.......
  • python中文件操作相关基础知识
    python中文件操作相关基础知识文件的概念1,文件的概念?文件就是操作系统暴露给用户操作硬盘的快捷方式,当我们双击一个文件进行打开时,其实就是把硬盘中的数据加载......
  • 代码随想录day14 | 144.二叉树的前序遍历 94.二叉树的中序遍历 145.二叉树的后续遍历
    144.二叉树的前序遍历题目|文章1.递归思路1.确定参数和返回值2.确定终止条件3.确定单层递归的逻辑实现点击查看代码/***Definitionforabinarytreenode.......