首页 > 其他分享 >flink-综合练习

flink-综合练习

时间:2023-03-03 15:36:40浏览次数:51  
标签:练习 flink Long api org apache import 综合

案例需求:

假设用户需要每个1秒钟需要统计4秒钟 窗口中数据的量,然后对统计的结果值进行checkpoint处理
数据规划
使用自定义算子每秒钟产生大约10000条数据
产生的数据为一个四元组(Long,String,String,Interger)-- (id,name,info,count)
数据经统计后,统计结果打印到终端输出
打印输出的结果为Long类型的数据

开发自定义数据源:

代码实现:

// ** 开发自定义数据源
// 1、自定义样例类
case class Msg(id:Long, name:String,info:String,cout:Int)

// 2、自定义数据源,继承RichSourceFunction
class MySourceFunction extends RichSourceFunction[Msg]{
  var isRunning = true

  // 3、实现run方法,每秒向流中注入10000个样例类
  override def run(ctx: SourceFunction.SourceContext[Msg]): Unit = {
    while (isRunning){
      for(i<-0 until 10000){
        //收集数据
        ctx.collect(Msg(1L, "name_"+i, "test_info", 1))
      }
      // 休眠 1s
      TimeUnit.SECONDS.sleep(1)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

开发自定义的状态

代码实现:

// ** 开发自定义状态 **

//1、继承Serializable ListCheckpointed
class UDFState extends Serializable{
  private var count = 0L
  //2、为总数count提供set和get方法
  def setState(s:Long) = count = s

  def getState:Long = count
}

开发自定义Window和检查点

代码实现:

//1、继承WindowFunction
//3、继承ListCheckpointed
class MyWindowAndCheckpoint extends WindowFunction[Msg,Long,Tuple,TimeWindow] with ListCheckpointed[UDFState]{
  // 求和总数
  var total = 0L

  //2、重写apply方法,对窗口数据进行总数累加
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[Msg], out: Collector[Long]): Unit = {
    var count = 0L
    for(msg<-input){
      count = count + 1
    }
    total = total + count
    out.collect(count)
  }

  // 自定义快照
  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
    val udfList = new util.ArrayList[UDFState]()

    // 创建UDFState对象
    var udfState = new UDFState
    udfState.setState(total)
    udfList.add(udfState)

    // 返回数据
    udfList
  }

  // 恢复快照
  override def restoreState(state: util.List[UDFState]): Unit = {
    val udfState:UDFState = state.get(0)

    // 取出监测点的值 赋值给total即可
    total = udfState.getState
  }
}

开发主业务

代码实现

def main(args: Array[String]): Unit = {
  // 1、流处理环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 2、开启checkpoint,间隔时间为6s
  env.enableCheckpointing(6000)
  // 3、设置checkpoint位置
  env.setStateBackend(new FsStateBackend("file:///E:/itcast_zz_test/maven_flink/flink-base/src/dev_checkpoint"))
  // 4、添加数据源
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  // 5、添加数据源
  import org.apache.flink.api.scala._
  val sourceDataStream:DataStream[Msg] = env.addSource(new MySourceFunction)

  //6、添加水印支持
  val watermarkDataStream = sourceDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Msg]() {
    override def getCurrentWatermark: Watermark = {
      new Watermark(System.currentTimeMillis())
    }

    // 抽取当前时间
    override def extractTimestamp(element: Msg, previousElementTimestamp: Long): Long = {
      System.currentTimeMillis()
    }
  })
  //7、keyby分组
  val keyedStream: KeyedStream[Msg, Tuple] = watermarkDataStream.keyBy(0)
  //8、设置滑动窗口,窗口时间为4s,滑动事件为1s
  val windowedSteam:WindowedStream[Msg, Tuple, TimeWindow] = keyedStream.timeWindow(Time.seconds(4), Time.seconds(1))
  //9、指定自定义窗口
  val result:DataStream[Long] = windowedSteam.apply(new MyWindowAndCheckpoint)
  //10、打印结果
  result.print()

  //11、执行任务
  env.execute()
}

引用的包

package com.wanghao

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.util
import java.util.concurrent.TimeUnit

验证效果

标签:练习,flink,Long,api,org,apache,import,综合
From: https://www.cnblogs.com/qingtianyu2015/p/17175771.html

相关文章

  • 编译flink源码卡在Running 'npm ci --cache-max=0 --no-save'
    编译flink源码时卡在了Running'npmci--cache-max=0--no-save'in......解决方案:将flink-runtime-web/pom.xml文件中的  修改为:<arguments>install-g-regis......
  • SYUCT2022综合训练1
    2022级综合训练本次训练三道题目都为codeforces上原题。1.BinaryDecimal题解:多尝试几组数据不难看出题目所求为,所有位数上最大的一个值.#include<bits/stdc++.......
  • 登录页面练习
    主页面1<?xmlversion="1.0"encoding="utf-8"?>2<LinearLayoutxmlns:android="http://schemas.android.com/apk/res/android"3android:layout_width="m......
  • 课堂练习——计算最长单词链
      packageletteron;importjava.io.*;importjava.util.*;publicclasstext1{publicstaticvoidmain(String[]args)throwsFileNotFoundException......
  • 课堂练习01题目:计算最长英语单词链
    大家经常玩成语接龙游戏,我们试一试英语的接龙吧:一个文本文件中有N个不同的英语单词,我们能否写一个程序,快速找出最长的能首尾相连的英语单词链,每个单词最多只能用一次。最......
  • 浅述综合管廊消防安全及预防对策
    陈盼安科瑞电气股份有限公司上海嘉定 201801摘要:本文简要介绍了城市综合管廊特点及火灾原因,分别从防火分隔、通风排烟、自动报警以及灭火设施等方面,对电缆隧道消防安全......
  • 靶机练习6: BSS(Cute 1.0.2)
    靶机地址https://www.vulnhub.com/entry/bbs-cute-102,567/信息收集进行全端口扫描,确认目标开放端口和服务nmap-n-v-sS--max-retries=0-p-172.16.33.9对开放......
  • 一个独立报表显示工单,进出站,扫码上料,工单下达,完工下线所有情况便于综合异常排查
                              工单头                                             计划产线计......
  • 又更新了 20 道 TS 练习题,你能答对几道?
    中秋假期已经结束了,由于疫情原因阿宝哥在家里待了三天,期间精选了 20 道新的TS练习题,目前已经有 30 道题了。很多小伙伴已提交了他们心目中的答案,撸过TS的小伙伴赶......
  • OSCP练习靶机
    搞不到OSCP官方泄露的lab,只好搞一些有质量的、相似的靶机来练习首先感谢大哥的分享(34条消息)和oscp相似的靶机-hackthebox&vulnhub(OSCP-LIKEHACKTHEBOX&VULNHUB)......