首页 > 其他分享 >【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

时间:2024-01-13 14:34:00浏览次数:33  
标签:acc map 示例 alan tenv Table aggregate import public




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、Row-based操作
  • 1、本示例的公共代码
  • 1、Map
  • 2、FlatMap
  • 3、Aggregate
  • 4、Group Window Aggregate
  • 5、FlatAggregate



本文介绍了通过Table API 基于行的map、flatmap、aggregate、group window aggregate 和flataggregate操作,并以示例进行展示操作。


本文除了maven依赖外,没有其他依赖。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)17、Flink 之Table API: Table API 支持的操作(2)

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、Row-based操作

1、本示例的公共代码

本部分代码是本示例的公共代码,下面的具体操作示例均以一个方法进行展示,所需要进入的import均在公共代码部分中。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.api.Expressions.row;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAPIOperationWithRowbasedDemo {
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private int balance;
		private Long rowtime;
	}

	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L), 
			new User(2L, "alan", 19, 1698742359396L), 
			new User(3L, "alan", 25, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L), 
			new User(5L, "alanchan", 29, 1698742362424L)
			);

	public static class MyMapFunction extends ScalarFunction {

		public Row eval(String a) {
			return Row.of(a, "pre-" + a);
		}

		@Override
		public TypeInformation<?> getResultType(Class<?>[] signature) {
			return Types.ROW(Types.STRING, Types.STRING);
		}
	}

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
//		testMap();
//		testFlatMap();
//		testAggregate();
//		testGroupWindowAggregate();
		testFlatAggregate();
	}

}

1、Map

public static class MyMapFunction extends ScalarFunction {

		public Row eval(String a) {
			return Row.of(a, "pre-" + a);
		}

		@Override
		public TypeInformation<?> getResultType(Class<?>[] signature) {
			return Types.ROW(Types.STRING, Types.STRING);
		}
	}

	/**
	 * 使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。
	 * @throws Exception
	 */
	static void testMap() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		ScalarFunction func = new MyMapFunction();
		tenv.registerFunction("func", func);

//		DataStream<String> users = env.fromCollection(Arrays.asList("alan", "alanchan", "alanchanchn"));
//		Table usersTable = tenv.fromDataStream(users, $("name"));

		DataStream<User> users = env.fromCollection(userList);
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"), $("rowtime"));

		Table result = usersTable.map(call("func", $("name")));

		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		2> (true,+I[alan, pre-alan])
//		4> (true,+I[alan, pre-alan])
//		6> (true,+I[alanchan, pre-alanchan])
//		5> (true,+I[alanchan, pre-alanchan])
//		3> (true,+I[alan, pre-alan])

		env.execute();
	}

2、FlatMap

public static class MyFlatMapFunction extends TableFunction<Row> {

		public void eval(String str) {
			if (str.contains("#")) {
				String[] array = str.split("#");
				for (int i = 0; i < array.length; ++i) {
					collect(Row.of(array[i], array[i].length()));
				}
			}
		}

		@Override
		public TypeInformation<Row> getResultType() {
			return Types.ROW(Types.STRING, Types.INT);
		}
	}

	/**
	 * 使用表函数执行 flatMap 操作。
	 * 
	 * @author alanchan
	 *
	 */
	static void testFlatMap() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		TableFunction func = new MyFlatMapFunction();
		tenv.registerFunction("func", func);

//		DataStream<User> users = env.fromCollection(userList);
//		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"), $("rowtime"));
		DataStream<String> users = env.fromCollection(Arrays.asList("alan#alanchan#alanchanchn", "alan_chan_chn#", "alan-chan-chn"));
		Table usersTable = tenv.fromDataStream(users, $("name"));

		Table result = usersTable.flatMap(call("func", $("name")));

		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		13> (true,+I[alan_chan_chn, 13])
//		10> (true,+I[alan, 4])
//		12> (true,+I[alanchanchn, 11])
//		11> (true,+I[alanchan, 8])

		env.execute();
	}

3、Aggregate

public static class MyMinMaxAcc {
		public int min = 0;
		public int max = 0;
	}

	public static class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {
		public void accumulate(MyMinMaxAcc acc, int value) {
			if (value < acc.min) {
				acc.min = value;
			}
			if (value > acc.max) {
				acc.max = value;
			}
		}

		@Override
		public MyMinMaxAcc createAccumulator() {
			return new MyMinMaxAcc();
		}

		public void resetAccumulator(MyMinMaxAcc acc) {
			acc.min = 0;
			acc.max = 0;
		}

		@Override
		public Row getValue(MyMinMaxAcc acc) {
			return Row.of(acc.min, acc.max);
		}

		@Override
		public TypeInformation<Row> getResultType() {
			return new RowTypeInfo(Types.INT, Types.INT);
		}
	}

	/**
	 * 使用聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。
	 * 
	 * @throws Exception
	 */
	static void testAggregate() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		AggregateFunction myAggFunc = new MyMinMax();
		tenv.registerFunction("myAggFunc", myAggFunc);

		Table ordersTable = tenv.fromValues(
				DataTypes.ROW(
//						DataTypes.FIELD("key", DataTypes.BIGINT()),
						DataTypes.FIELD("name", DataTypes.STRING()),
						DataTypes.FIELD("balance", DataTypes.INT())
						),
				Arrays.asList(
						row("alan", 16987423), 
						row("alan", 16396), 
						row("alanchan", 1690407),
						row("alanchanchn", 16409), 
						row("alanchan", 162424),
						row("alan", 164)
						));
				
		Table usersTable = ordersTable.select($("name"),$("balance"));
		
//		Table usersTable = tenv.fromDataStream(users, $("key"),$("name"),$("age"));
		
		Table result = usersTable
				.groupBy($("name")).
				aggregate(call("myAggFunc", $("balance")))
				.select($("name"), $("f0"),$("f1"));
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		2> (true,+I[alan, 0, 16987423])
//		16> (true,+I[alanchan, 0, 1690407])
//		16> (true,+I[alanchanchn, 0, 16409])
		
		env.execute();
	}

4、Group Window Aggregate

/**
	 * 在 group window 和可能的一个或多个分组键上对表进行分组和聚合。你必须使用 select 子句关闭 aggregate。并且 select 子句不支持“*“或聚合函数。
	 * 
	 * @throws Exception
	 */
	static void testGroupWindowAggregate() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		AggregateFunction myAggFunc = new MyMinMax();
		tenv.registerFunction("myAggFunc", myAggFunc);
		
		List<User> userList = Arrays.asList(
				new User(1L, "alan", 18, 1698742358391L), 
				new User(2L, "alan", 19, 1698742359396L), 
				new User(3L, "alan", 25, 1698742360407L),
				new User(4L, "alanchan", 28, 1698742361409L), 
				new User(5L, "alanchan", 29, 1698742362424L),
				new User(5L, "alanchan", 29, 1698742362424L)
				);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						);
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
		
		Table result = usersTable
			    .window(Tumble.over(lit(5).minutes())
			                  .on($("rowtime"))
			                  .as("w")) // 定义窗口
			    .groupBy($("name"), $("w")) // 以键和窗口分组
			    .aggregate(call("myAggFunc", $("balance")))
			    .select($("name"), $("f0"), $("f1"), $("w").start(), $("w").end()); // 访问窗口属性与聚合结果
		
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		2> (true,+I[alan, 0, 25, 2023-10-31T08:50, 2023-10-31T08:55])
//		16> (true,+I[alanchan, 0, 29, 2023-10-31T08:50, 2023-10-31T08:55])
		
		env.execute();
	}

5、FlatAggregate

/**
	 * Top2 Accumulator。
	 */
	public static class Top2Accum {
	    public Integer first;
	    public Integer second;
	}

	/**
	 * 用户定义的聚合函数 top2。
	 */
	public static  class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

	    @Override
	    public Top2Accum createAccumulator() {
	        Top2Accum acc = new Top2Accum();
	        acc.first = Integer.MIN_VALUE;
	        acc.second = Integer.MIN_VALUE;
	        return acc;
	    }


	    public void accumulate(Top2Accum acc, Integer v) {
	        if (v > acc.first) {
	            acc.second = acc.first;
	            acc.first = v;
	        } else if (v > acc.second) {
	            acc.second = v;
	        }
	    }

	    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
	        for (Top2Accum otherAcc : iterable) {
	            accumulate(acc, otherAcc.first);
	            accumulate(acc, otherAcc.second);
	        }
	    }

	    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
	        // 下发 value 与 rank
	        if (acc.first != Integer.MIN_VALUE) {
	            out.collect(Tuple2.of(acc.first, 1));
	        }
	        if (acc.second != Integer.MIN_VALUE) {
	            out.collect(Tuple2.of(acc.second, 2));
	        }
	    }
	}
	
	/**
	 * 和 GroupBy Aggregation 类似。使用运行中的表之后的聚合算子对分组键上的行进行分组,以按组聚合行。
	 * 和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。
	 * 必须使用 select 子句关闭 flatAggregate。并且 select 子句不支持聚合函数。
	 * 
	 * @throws Exception
	 */
	static void testFlatAggregate() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		env.setParallelism(1);
		tenv.registerFunction("top2", new Top2());
		
		Table ordersTable = tenv.fromValues(
				DataTypes.ROW(
//						DataTypes.FIELD("key", DataTypes.BIGINT()),
						DataTypes.FIELD("name", DataTypes.STRING()),
						DataTypes.FIELD("balance", DataTypes.INT())
						),
				Arrays.asList(
						row("alan", 16987423), 
						row("alan", 16396), 
						row("alanchan", 1690407),
						row("alanchanchn", 16409), 
						row("alanchan", 162424),
						row("alan", 164)
						));
		
//		Table orders = tenv.from("Orders");
		
		Table result = ordersTable
			    .groupBy($("name"))
			    .flatAggregate(call("top2", $("balance")))
			    .select($("name"), $("f0").as("balance"), $("f1").as("rank"));
			    
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
		resultDS.print();
//		(true,+I[alan, 16987423, 1])
//		(false,-D[alan, 16987423, 1])
//		(true,+I[alan, 16987423, 1])
//		(true,+I[alan, 16396, 2])
//		(true,+I[alanchan, 1690407, 1])
//		(true,+I[alanchanchn, 16409, 1])
//		(false,-D[alanchan, 1690407, 1])
//		(true,+I[alanchan, 1690407, 1])
//		(true,+I[alanchan, 162424, 2])
//		(false,-D[alan, 16987423, 1])
//		(false,-D[alan, 16396, 2])
//		(true,+I[alan, 16987423, 1])
//		(true,+I[alan, 16396, 2])
		
		env.execute();	    
	}

以上,本文介绍了通过Table API 基于行的map、flatmap、aggregate、group window aggregate 和flataggregate操作,并以示例进行展示操作。


标签:acc,map,示例,alan,tenv,Table,aggregate,import,public
From: https://blog.51cto.com/alanchan2win/9232404

相关文章