我们往往可以忽略外界的干扰因素,避免焦虑,专心做自己想做的事情,反正焦虑又解决不了问题
引言
使用flink或者spark的时候,写好固定的模版很重要,对于一下etl的实时任务,只需要执行一个map和fliter就解决了,简单抽象了一下,用的时候穿穿参数。
父类
abstract class BashLogLineParse extends Serializable{
var line: String= ""
def logParse():String
def logFilter(mes:String):Boolean
}
etl
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import com.alibaba.fastjson.JSON
import com.oneniceapp.base.BashLogLineParse
import org.apache.commons.lang3.time.FastDateFormat
class StageDataOperationPushLog extends BashLogLineParse with Serializable{
private val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
override def logParse(): String = {
val line = this.line
val log = JSON.parseObject(line)
val datastr = log.getString("raw")
val data = JSON.parseObject(datastr)
val payloadstr = data.getString("payload")
val payload = JSON.parseObject(payloadstr)
val notice_id = payload.getString("notice_id")
val user_id = data.getString("receiver")
val raw_time = data.getLong("raw_time")
val partition_date = new SimpleDateFormat("yyyy-MM-dd").format(new Date(raw_time*1000))
data.remove("receiver")
data.put("user_id", user_id)
data.put("notice_id", notice_id)
data.put("partition_date", partition_date)
if(notice_id==null)
""
else
data.toJSONString
}
override def logFilter(mes:String):Boolean ={
if(!mes.equals(""))
true
else
false
}
}
反射方法
public class CreateObject {
public static Object createInstance(String className){
try {
Class clz = Class.forName(className);
Object obj = clz.newInstance();
return obj;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
}
使用
StageDataOperationPushLog的全路径
mapDate = transction.map(line=>{
val ls = CreateObject
.createInstance(conf.classFile)
.asInstanceOf[BashLogLineParse]
ls.line = line.message
(line.topic, line.partition, line.offset, ls.logParse())
})
.filter(line=>{
val ls = CreateObject
.createInstance(conf.classFile)
.asInstanceOf[BashLogLineParse]
ls.logFilter(line._4)
})