首页 > 数据库 >FlinkSQL的DataStream和Table互转的Demo

FlinkSQL的DataStream和Table互转的Demo

时间:2022-10-09 23:44:44浏览次数:57  
标签:DataStream String fields FlinkSQL 09 00 互转 Table

1.构建UserLog对象

@Data
@Builder   //创建对象
@NoArgsConstructor   // 无参构造函数
@AllArgsConstructor //  有参构造函数
public class UserLog {
    private String username;
    private String url;
    private String ctime;
}

2.主程序

public class DataStreamATable {
	public static void main(String[] args) throws Exception {
		// 1.获取流的执行环境
		StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
		// 2.创建表的执行环境
		StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
		// 3.读取数据
		DataStream<UserLog> userLog = senv
				.fromElements("zhangsan,/new,2022-10-09 23:00:00", "lisi,/car,2022-10-09 23:00:00",
						"wangwu,/front,2022-10-09 23:00:00", "zhangsan,/front,2022-10-09 23:00:00").map(event -> {
					String[] fields = event.split(",");
					return UserLog.builder()//
							.username(fields[0])//
							.url(fields[1])//
							.ctime(fields[2]).build();

				});
		// 4.流转换为动态表
		Table table = tenv.fromDataStream(userLog);
		// 5.执行Table API查询
		Table resultTable = table.where($("username").isEqual("zhangsan"))
				.select($("username"), $("url"),$("ctime"));
		// 5.将Table转换为DataStream
		DataStream<UserLog> clickLogDS = tenv.toDataStream(resultTable, UserLog.class);
		// 6.将结果打印
		clickLogDS.print();
		// 7.执行Flink的程序
		senv.execute("DataStreamATable");
	}
}

标签:DataStream,String,fields,FlinkSQL,09,00,互转,Table
From: https://www.cnblogs.com/glblog/p/16774130.html

相关文章

  • FlinkSQL基础概念
    1.spark和flink的区别Flink中,批处理是流处理的一个特例spark刚好相反,是微小的批次,准实时不能说实时处理。 2.Fink的版本Flink1.12之前的版本,并没有实现流批统一Flin......
  • (编程语言界的丐帮 C#).NET Framework 读取Excel到DataTable
    (编程语言界的丐帮C#).NETFramework读取Excel到DataTable生成DataTable到Excel,支持2007.xlsx,2003 .xls。 nuget引用 NPOI。ExcelHelper:usingNPOI.HPSF;usi......
  • @Zabbix报表系统ZbxTable
    文章目录​​1.ZbxTable概述​​​​2.ZbxTable功能介绍​​​​3.ZbxTable系统架构​​​​4.ZbxTable组件介绍​​​​5.ZbxTable线上体验​​​​6.ZbxTable版本兼容性​......
  • C# Datatable换为模型
    1///<summary>2///Datatable转为模型3///</summary>4///<typeparamname="T"></typeparam>5///<paramname="dt"></param>6///<returns></returns......
  • QT——QTableWidget样式设计
    链接1链接2(1)创建一个新的表格控件 QTableWidget*table=newQTableWidget();(2)设置列数  table->setColumnCount(3);//设置3列注意:如果内容超过3列,则无法显示......
  • 时间类型和字符串类型相互转换——SimpleDateFormat中parse和format的用法
    SimpleDateFormatsimpleDateFormat=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");simpleDateFormat.parse("xxxxxxx");//字符串->时间simpleDateFormat.format(new......
  • ThreadLocal、InheritThreadLocal、TransmittableThreadLocal
    一、ThreadLocal多线程是Java实现多任务的基础,​​Thread​​​对象代表一个线程,我们可以在代码中调用​​Thread.currentThread()​​获取当前线程。例如,打印日志时,可以同......
  • C# EF 模型转DataTable
    1///<summary>2///EF模型转换为Datatable3///</summary>4///<typeparamname="T"></typeparam>5///<paramname="list"></param>6///<returns></r......
  • 网络字节序与主机字节序的相互转换
    1.前言知识  字节顺序是指(在计算机中)多于一个字节的数据类型在内存中的存放顺序。例如一个32位整数由4个字节组成,内存中存储这4个字节可以采取两种方法:小端字节序(littl......
  • 网络字节序与主机字节序的相互转换
    在Linux网络编程中,经常碰到网络字节序与主机字节序的相互转换。说到网络字节序与主机字节序需要清晰了解以下几个概念。字节序,顾名思义,指字节在内存中存储的顺序。比如一个......