首页 > 其他分享 >Flink/Spark中ETL的简单模版

Flink/Spark中ETL的简单模版

时间:2022-11-18 16:00:37浏览次数:58  
标签:String val Flink id ls Spark data line ETL


我们往往可以忽略外界的干扰因素,避免焦虑,专心做自己想做的事情,反正焦虑又解决不了问题

引言

使用flink或者spark的时候,写好固定的模版很重要,对于一下etl的实时任务,只需要执行一个map和fliter就解决了,简单抽象了一下,用的时候穿穿参数。

父类

abstract class BashLogLineParse extends Serializable{
var line: String= ""
def logParse():String
def logFilter(mes:String):Boolean
}

etl

import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import com.alibaba.fastjson.JSON
import com.oneniceapp.base.BashLogLineParse
import org.apache.commons.lang3.time.FastDateFormat

class StageDataOperationPushLog extends BashLogLineParse with Serializable{

private val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)

override def logParse(): String = {
val line = this.line
val log = JSON.parseObject(line)
val datastr = log.getString("raw")

val data = JSON.parseObject(datastr)
val payloadstr = data.getString("payload")
val payload = JSON.parseObject(payloadstr)
val notice_id = payload.getString("notice_id")
val user_id = data.getString("receiver")
val raw_time = data.getLong("raw_time")
val partition_date = new SimpleDateFormat("yyyy-MM-dd").format(new Date(raw_time*1000))

data.remove("receiver")
data.put("user_id", user_id)
data.put("notice_id", notice_id)
data.put("partition_date", partition_date)

if(notice_id==null)
""
else
data.toJSONString
}

override def logFilter(mes:String):Boolean ={
if(!mes.equals(""))
true
else
false
}

}

反射方法

public class CreateObject {
public static Object createInstance(String className){

try {
Class clz = Class.forName(className);
Object obj = clz.newInstance();
return obj;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;
}
}

使用

StageDataOperationPushLog的全路径

mapDate = transction.map(line=>{
val ls = CreateObject
.createInstance(conf.classFile)
.asInstanceOf[BashLogLineParse]

ls.line = line.message
(line.topic, line.partition, line.offset, ls.logParse())
})
.filter(line=>{
val ls = CreateObject
.createInstance(conf.classFile)
.asInstanceOf[BashLogLineParse]
ls.logFilter(line._4)
})


标签:String,val,Flink,id,ls,Spark,data,line,ETL
From: https://blog.51cto.com/u_15879559/5868561

相关文章

  • Apache Flink架构及其工作原理
    ApacheFlink架构及其工作原理1、定义:Apacheflink是一个实时计算框架和分布式处理引擎,用于再无边界和有边界数据流上进行有状态的计算,Flink能在所有的集群环境中运行,......
  • 在macbook m1上调试flink1.14.3
    前置条件1:首先先用homebrew安装一下flink1.14.3版本,安装完成后,/usr/local/Celler/apache-flink/1.14.3是主路径。可以看看有没有类似的文件夹来确定有没有安装上。前置条......
  • 初识Flink简单介绍
    Flink是实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。有界流和无界流都是基于Datastream这个Flink的编程模型。Flink自己管理内存机制,批......
  • SparkSQL 核心编程
    DataFrame创建DataFrame从Spark数据源进行创建➢查看Spark支持创建文件的数据源格式scala>spark.read.csvformatjdbcjsonloadoptionoptionsorcparqu......
  • Flink 按键分区状态基本介绍
    在实际应用中,我们一般都需要将数据按照某个key进行分区,然后再进行计算处理;所以最为常见的状态类型就是KeyedState。之前介绍到keyBy之后的聚合、窗口计算,算子所持有的状态......
  • spark (四) RDD概念
    目录1.RDD基本概念1.1弹性1.2分布式1.3数据集1.4数据抽象1.5不可变1.6可分区、并行计算2.WordCount为例,看RDD特性3.RDD的五大属性3.1分区列表3.2计算逻辑compu......
  • spark (一) 入门 & 安装
    目录基本概念spark核心模块sparkcore(核心)sparksql(结构化数据操作)sparkstreaming(流式数据操作)部署模式local(本地模式)standalone(集群模式)onyarn(集群模式)......
  • 快速体验 Flink Table Store进阶篇
    在本地安装单机版本,能够实现快速体验FlinkTableStore的目的,本文以Flink1.15.2、flink-table-store-dist-0.2.1、flink-shaded-hadoop-2-uber-2.8.3-10.0和Kafka3.......
  • 【Spark】java.lang.NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hiv
    2/11/1419:02:23ERROR[main]SparkUncaughtExceptionHandler:UncaughtexceptioninthreadThread[main,5,main]java.lang.NoSuchMethodException:org.apache.hado......
  • Spark与Iceberg整合写操作-INSERT INTO,MERGE INTO,INSERT OVERWRITE,DELETE FROM,UPDATE,s
    1.8.7Spark与Iceberg整合写操作1.8.7.1INSERTINTO"insertinto"是向Iceberg表中插入数据,有两种语法形式:"INSERTINTOtblVALUES(1,"zs",18),(2,"ls",19)"、"INSERT......