1.构建UserLog对象
@Data @Builder //创建对象 @NoArgsConstructor // 无参构造函数 @AllArgsConstructor // 有参构造函数 public class UserLog { private String username; private String url; private String ctime; }
2.主程序
public class DataStreamATable { public static void main(String[] args) throws Exception { // 1.获取流的执行环境 StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建表的执行环境 StreamTableEnvironment tenv = StreamTableEnvironment.create(senv); // 3.读取数据 DataStream<UserLog> userLog = senv .fromElements("zhangsan,/new,2022-10-09 23:00:00", "lisi,/car,2022-10-09 23:00:00", "wangwu,/front,2022-10-09 23:00:00", "zhangsan,/front,2022-10-09 23:00:00").map(event -> { String[] fields = event.split(","); return UserLog.builder()// .username(fields[0])// .url(fields[1])// .ctime(fields[2]).build(); }); // 4.流转换为动态表 Table table = tenv.fromDataStream(userLog); // 5.执行Table API查询 Table resultTable = table.where($("username").isEqual("zhangsan")) .select($("username"), $("url"),$("ctime")); // 5.将Table转换为DataStream DataStream<UserLog> clickLogDS = tenv.toDataStream(resultTable, UserLog.class); // 6.将结果打印 clickLogDS.print(); // 7.执行Flink的程序 senv.execute("DataStreamATable"); } }标签:DataStream,String,fields,FlinkSQL,09,00,互转,Table From: https://www.cnblogs.com/glblog/p/16774130.html