首页 > 其他分享 >大数据学习之Flink(三)

大数据学习之Flink(三)

时间:2022-11-30 20:34:23浏览次数:52  
标签:String val Flink 学习 result env 数据 def

Flink可以从各种来源获取数据,构建DataStream进行转换处理,source就是我们整个处理程序的输入端

从kafka中读取数据

bject KafKaSourceClass {
  def main(args: Array[String]): Unit = {

    /**
     * 构建Flink环境
     */
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val pro = new Properties()
    pro.setProperty("enable.auto.commit","true")
    //构建kafka的环境
    val source = KafkaSource
      .builder[String]
      //broker的地址
      .setBootstrapServers("master:9092,node1:9092,node2:9092")//设置kafka的集群地址
      .setProperties(pro)
      //设置topic的名称
      .setTopics("topic1")
      .setGroupId("my-group")//设置消费者组ID
      /**
       * earliest 从最早的位置消费
       * committedOffsets() 从消费组提交的位点开始消费,不指定位点的重置策略
       * committedOffsets(OffsetResetStrategy.EARLIEST) 从消费组提交的位点开始消费 如果提交的微店不存在,使用最早的位点
       * timestamp() 从时间戳大于等于指定是时间的数据开始消费
       * latest(从末尾开始消费)
       */
      .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
//      .setStartingOffsets(OffsetsInitializer.earliest)
//      .setStartingOffsets(OffsetsInitializer.timestamp())
//      .setStartingOffsets(OffsetsInitializer.latest())
      .setValueOnlyDeserializer(new SimpleStringSchema())//加载数据的格式
      .build

    val kafkaDS: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks[String](), "Kafka Source")

    kafkaDS.print()

    env.execute()


  }
}

从mysql中读取数据

object SourceMySqlClass {

  def main(args: Array[String]): Unit = {
    //自定义数据源 读取mysql中的数据

    /**
     * 构建Flink环境
     */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(2)//并行度

    //自定义数据源
    env.addSource(new MySource())
      .print()


    env.execute()

  }

}

class MySource extends SourceFunction[String]{
  //连接mysql println("连接一次")

  /**
   * 程序运行时只会被执行一次,用于加载数据
   * @param ctx 用于上游向下游发送数据
   */


  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {

    val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false&characterEncoding=utf-8", "root", "123456")

    val state: PreparedStatement = conn.prepareStatement("select * from student")

    val result: ResultSet = state.executeQuery()

    while (result.next()){
      val id: String = result.getString("id")
      val name: String = result.getString("name")
      val age: Int = result.getInt("age")
      val gender: String = result.getString("gender")
      val clazz: String = result.getString("clazz")

      ctx.collect(s"${id}\t${name}\t${age}\t${gender}\t${clazz}")
    }
  }

  override def cancel(): Unit = {

  }


}

从目录中获取数据

object SourceTestClass {
  def main(args: Array[String]): Unit = {

    /**
     * 构建Flink环境
     */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //指定并行度
    env.setParallelism(2)

    /**
     * 控制Flink的处理模式
     * BATCH:只能用于有界流 类似于spark的批处理模式 只会输出最终的一个结果
     * STREAMING:既能作用在无界流上也能作用域有界流上 都是进行逐项处理
     * AUTO:自动判断
     */
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)

    //从文件夹中获取文件 有界流
    val wordDS: DataStream[String] = env.readTextFile("FlinkProject/data/words.txt")

    //统计单词的数量
    val wordCountDS: DataStream[(String, Int)] = wordDS.flatMap(_.split(","))
      .map(word => (word, 1))
      .keyBy(kv => kv._1)
      .sum(1)

    wordCountDS.print()


//    //从集合中获取数据
//    env.fromCollection(List(1,2,3,4,5,6))
//      .print()
    //启动Flink
    env.execute()



  }
}

 

标签:String,val,Flink,学习,result,env,数据,def
From: https://www.cnblogs.com/lkd0910/p/16939321.html

相关文章