车辆碰撞程序
总体概述
简单介绍
车辆在发生对应的异常情况时,产生事件数据,实时程序消费通过OGG 推送至Kafka的数据进行事件判断,并将消息通过Rabbit推送至业务端。
- 碰撞告警:车辆发生碰撞事件并有气囊、车胎、翻滚等状态。
- 低等级碰撞告警:车辆发生轻微碰撞,产生低等级碰撞计数
- 故障告警:车辆发生故障事件
涉及组件
组件名称 | 说明 |
---|---|
Kafka | 消息中间件 |
Hbase | 数据库 |
RabbitMq | 消息中间件 |
flink 1.12.1 | 大数据计算框架 |
程序设计
程序需要根据监测碰撞事件发生,并获取到事件发生的位置,根据车辆号查询到对应的车架号,将结果实时发送给下游。
* SQL 逻辑
-- 碰撞事件位置表 VEHICLE_LOCATION
-- 碰撞事件主表 INTERACTION
SQL 逻辑:
SELECT
cast(t1.interaction_id as varchar(47)) CONTCT_ID,
cast(t2.vehicle_vin as varchar(17)) VIN_ID,
t3.location,
cast(t1.aacn_data as varchar(1500)) RAW_DATA,
t1.created_on as CREATE_TIMSTM
from interaction t1
inner join vehicle t2 > Part II
on t1.vehicle_id = t2.vehicle_id > Vehicle中查询
left join vehicle_location t3
on t1.interaction_id = t3.owner_id
where t1.interaction_type = 'crash'
and t1.created_on >= to_date('20191226', 'yyyymmdd')
and t1.created_on < to_date('20191227', 'yyyymmdd')
1.位置与事件信息流Join
/**
* Join elements of this [[KeyedStream]] with elements of another [[KeyedStream]] over
* a time interval that can be specified with [[IntervalJoin.between]].
*
* @param otherStream The other keyed stream to join this keyed stream with
* @tparam OTHER Type parameter of elements in the other stream
* @return An instance of [[IntervalJoin]] with this keyed stream and the other keyed stream
*/
acrInterStream
.intervalJoin(acrLocStream)
.between(Time.minutes(-10), Time.minutes(10))
.process(new AcrJoinProcessFunction())// Join, (vehicleId, ObjectNode)
2. 根据车辆号查询车架号
对应关系为Hbase表,通过异步算子使用Hbase Api Get出对应值。
AsyncDataStream
.unorderedWait(
acrJoinStream,
new AsyncAcrGetVinFromHbaseFunction(HbaseConfig),
3000,
TimeUnit.SECONDS,
20) //异步查询Hbase 获取vin号
标签:stream,碰撞,程序,t1,车辆,vehicle,id
From: https://www.cnblogs.com/lvdoiu/p/17478984.html