Flink可以从各种来源获取数据,构建DataStream进行转换处理,source就是我们整个处理程序的输入端
从kafka中读取数据
bject KafKaSourceClass {
def main(args: Array[String]): Unit = {
/**
* 构建Flink环境
*/
val env = StreamExecutionEnvironment.getExecutionEnvironment
val pro = new Properties()
pro.setProperty("enable.auto.commit","true")
//构建kafka的环境
val source = KafkaSource
.builder[String]
//broker的地址
.setBootstrapServers("master:9092,node1:9092,node2:9092")//设置kafka的集群地址
.setProperties(pro)
//设置topic的名称
.setTopics("topic1")
.setGroupId("my-group")//设置消费者组ID
/**
* earliest 从最早的位置消费
* committedOffsets() 从消费组提交的位点开始消费,不指定位点的重置策略
* committedOffsets(OffsetResetStrategy.EARLIEST) 从消费组提交的位点开始消费 如果提交的微店不存在,使用最早的位点
* timestamp() 从时间戳大于等于指定是时间的数据开始消费
* latest(从末尾开始消费)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
// .setStartingOffsets(OffsetsInitializer.earliest)
// .setStartingOffsets(OffsetsInitializer.timestamp())
// .setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())//加载数据的格式
.build
val kafkaDS: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks[String](), "Kafka Source")
kafkaDS.print()
env.execute()
}
}
从mysql中读取数据
object SourceMySqlClass {
def main(args: Array[String]): Unit = {
//自定义数据源 读取mysql中的数据
/**
* 构建Flink环境
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)//并行度
//自定义数据源
env.addSource(new MySource())
.print()
env.execute()
}
}
class MySource extends SourceFunction[String]{
//连接mysql println("连接一次")
/**
* 程序运行时只会被执行一次,用于加载数据
* @param ctx 用于上游向下游发送数据
*/
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false&characterEncoding=utf-8", "root", "123456")
val state: PreparedStatement = conn.prepareStatement("select * from student")
val result: ResultSet = state.executeQuery()
while (result.next()){
val id: String = result.getString("id")
val name: String = result.getString("name")
val age: Int = result.getInt("age")
val gender: String = result.getString("gender")
val clazz: String = result.getString("clazz")
ctx.collect(s"${id}\t${name}\t${age}\t${gender}\t${clazz}")
}
}
override def cancel(): Unit = {
}
}
从目录中获取数据
object SourceTestClass {
def main(args: Array[String]): Unit = {
/**
* 构建Flink环境
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//指定并行度
env.setParallelism(2)
/**
* 控制Flink的处理模式
* BATCH:只能用于有界流 类似于spark的批处理模式 只会输出最终的一个结果
* STREAMING:既能作用在无界流上也能作用域有界流上 都是进行逐项处理
* AUTO:自动判断
*/
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
//从文件夹中获取文件 有界流
val wordDS: DataStream[String] = env.readTextFile("FlinkProject/data/words.txt")
//统计单词的数量
val wordCountDS: DataStream[(String, Int)] = wordDS.flatMap(_.split(","))
.map(word => (word, 1))
.keyBy(kv => kv._1)
.sum(1)
wordCountDS.print()
// //从集合中获取数据
// env.fromCollection(List(1,2,3,4,5,6))
// .print()
//启动Flink
env.execute()
}
}
标签:String,val,Flink,学习,result,env,数据,def From: https://www.cnblogs.com/lkd0910/p/16939321.html