文章目录
- Flink 系列文章
- 一、maven依赖
- 二、Row-based操作
- 1、本示例的公共代码
- 1、Map
- 2、FlatMap
- 3、Aggregate
- 4、Group Window Aggregate
- 5、FlatAggregate
本文介绍了通过Table API 基于行的map、flatmap、aggregate、group window aggregate 和flataggregate操作,并以示例进行展示操作。
本文除了maven依赖外,没有其他依赖。
本文更详细的内容可参考文章:
17、Flink 之Table API: Table API 支持的操作(1)17、Flink 之Table API: Table API 支持的操作(2)
一、maven依赖
本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。
二、Row-based操作
1、本示例的公共代码
本部分代码是本示例的公共代码,下面的具体操作示例均以一个方法进行展示,所需要进入的import均在公共代码部分中。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.api.Expressions.row;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestTableAPIOperationWithRowbasedDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {
private long id;
private String name;
private int balance;
private Long rowtime;
}
final static List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L)
);
public static class MyMapFunction extends ScalarFunction {
public Row eval(String a) {
return Row.of(a, "pre-" + a);
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.ROW(Types.STRING, Types.STRING);
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testMap();
// testFlatMap();
// testAggregate();
// testGroupWindowAggregate();
testFlatAggregate();
}
}
1、Map
public static class MyMapFunction extends ScalarFunction {
public Row eval(String a) {
return Row.of(a, "pre-" + a);
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.ROW(Types.STRING, Types.STRING);
}
}
/**
* 使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。
* @throws Exception
*/
static void testMap() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
ScalarFunction func = new MyMapFunction();
tenv.registerFunction("func", func);
// DataStream<String> users = env.fromCollection(Arrays.asList("alan", "alanchan", "alanchanchn"));
// Table usersTable = tenv.fromDataStream(users, $("name"));
DataStream<User> users = env.fromCollection(userList);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"), $("rowtime"));
Table result = usersTable.map(call("func", $("name")));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 2> (true,+I[alan, pre-alan])
// 4> (true,+I[alan, pre-alan])
// 6> (true,+I[alanchan, pre-alanchan])
// 5> (true,+I[alanchan, pre-alanchan])
// 3> (true,+I[alan, pre-alan])
env.execute();
}
2、FlatMap
public static class MyFlatMapFunction extends TableFunction<Row> {
public void eval(String str) {
if (str.contains("#")) {
String[] array = str.split("#");
for (int i = 0; i < array.length; ++i) {
collect(Row.of(array[i], array[i].length()));
}
}
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}
}
/**
* 使用表函数执行 flatMap 操作。
*
* @author alanchan
*
*/
static void testFlatMap() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
TableFunction func = new MyFlatMapFunction();
tenv.registerFunction("func", func);
// DataStream<User> users = env.fromCollection(userList);
// Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"), $("rowtime"));
DataStream<String> users = env.fromCollection(Arrays.asList("alan#alanchan#alanchanchn", "alan_chan_chn#", "alan-chan-chn"));
Table usersTable = tenv.fromDataStream(users, $("name"));
Table result = usersTable.flatMap(call("func", $("name")));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 13> (true,+I[alan_chan_chn, 13])
// 10> (true,+I[alan, 4])
// 12> (true,+I[alanchanchn, 11])
// 11> (true,+I[alanchan, 8])
env.execute();
}
3、Aggregate
public static class MyMinMaxAcc {
public int min = 0;
public int max = 0;
}
public static class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> {
public void accumulate(MyMinMaxAcc acc, int value) {
if (value < acc.min) {
acc.min = value;
}
if (value > acc.max) {
acc.max = value;
}
}
@Override
public MyMinMaxAcc createAccumulator() {
return new MyMinMaxAcc();
}
public void resetAccumulator(MyMinMaxAcc acc) {
acc.min = 0;
acc.max = 0;
}
@Override
public Row getValue(MyMinMaxAcc acc) {
return Row.of(acc.min, acc.max);
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.INT, Types.INT);
}
}
/**
* 使用聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。
*
* @throws Exception
*/
static void testAggregate() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
AggregateFunction myAggFunc = new MyMinMax();
tenv.registerFunction("myAggFunc", myAggFunc);
Table ordersTable = tenv.fromValues(
DataTypes.ROW(
// DataTypes.FIELD("key", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("balance", DataTypes.INT())
),
Arrays.asList(
row("alan", 16987423),
row("alan", 16396),
row("alanchan", 1690407),
row("alanchanchn", 16409),
row("alanchan", 162424),
row("alan", 164)
));
Table usersTable = ordersTable.select($("name"),$("balance"));
// Table usersTable = tenv.fromDataStream(users, $("key"),$("name"),$("age"));
Table result = usersTable
.groupBy($("name")).
aggregate(call("myAggFunc", $("balance")))
.select($("name"), $("f0"),$("f1"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 2> (true,+I[alan, 0, 16987423])
// 16> (true,+I[alanchan, 0, 1690407])
// 16> (true,+I[alanchanchn, 0, 16409])
env.execute();
}
4、Group Window Aggregate
/**
* 在 group window 和可能的一个或多个分组键上对表进行分组和聚合。你必须使用 select 子句关闭 aggregate。并且 select 子句不支持“*“或聚合函数。
*
* @throws Exception
*/
static void testGroupWindowAggregate() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
AggregateFunction myAggFunc = new MyMinMax();
tenv.registerFunction("myAggFunc", myAggFunc);
List<User> userList = Arrays.asList(
new User(1L, "alan", 18, 1698742358391L),
new User(2L, "alan", 19, 1698742359396L),
new User(3L, "alan", 25, 1698742360407L),
new User(4L, "alanchan", 28, 1698742361409L),
new User(5L, "alanchan", 29, 1698742362424L),
new User(5L, "alanchan", 29, 1698742362424L)
);
DataStream<User> users = env.fromCollection(userList)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
);
Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
Table result = usersTable
.window(Tumble.over(lit(5).minutes())
.on($("rowtime"))
.as("w")) // 定义窗口
.groupBy($("name"), $("w")) // 以键和窗口分组
.aggregate(call("myAggFunc", $("balance")))
.select($("name"), $("f0"), $("f1"), $("w").start(), $("w").end()); // 访问窗口属性与聚合结果
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// 2> (true,+I[alan, 0, 25, 2023-10-31T08:50, 2023-10-31T08:55])
// 16> (true,+I[alanchan, 0, 29, 2023-10-31T08:50, 2023-10-31T08:55])
env.execute();
}
5、FlatAggregate
/**
* Top2 Accumulator。
*/
public static class Top2Accum {
public Integer first;
public Integer second;
}
/**
* 用户定义的聚合函数 top2。
*/
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
// 下发 value 与 rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
/**
* 和 GroupBy Aggregation 类似。使用运行中的表之后的聚合算子对分组键上的行进行分组,以按组聚合行。
* 和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。
* 必须使用 select 子句关闭 flatAggregate。并且 select 子句不支持聚合函数。
*
* @throws Exception
*/
static void testFlatAggregate() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.setParallelism(1);
tenv.registerFunction("top2", new Top2());
Table ordersTable = tenv.fromValues(
DataTypes.ROW(
// DataTypes.FIELD("key", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("balance", DataTypes.INT())
),
Arrays.asList(
row("alan", 16987423),
row("alan", 16396),
row("alanchan", 1690407),
row("alanchanchn", 16409),
row("alanchan", 162424),
row("alan", 164)
));
// Table orders = tenv.from("Orders");
Table result = ordersTable
.groupBy($("name"))
.flatAggregate(call("top2", $("balance")))
.select($("name"), $("f0").as("balance"), $("f1").as("rank"));
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
resultDS.print();
// (true,+I[alan, 16987423, 1])
// (false,-D[alan, 16987423, 1])
// (true,+I[alan, 16987423, 1])
// (true,+I[alan, 16396, 2])
// (true,+I[alanchan, 1690407, 1])
// (true,+I[alanchanchn, 16409, 1])
// (false,-D[alanchan, 1690407, 1])
// (true,+I[alanchan, 1690407, 1])
// (true,+I[alanchan, 162424, 2])
// (false,-D[alan, 16987423, 1])
// (false,-D[alan, 16396, 2])
// (true,+I[alan, 16987423, 1])
// (true,+I[alan, 16396, 2])
env.execute();
}
以上,本文介绍了通过Table API 基于行的map、flatmap、aggregate、group window aggregate 和flataggregate操作,并以示例进行展示操作。
标签:acc,map,示例,alan,tenv,Table,aggregate,import,public From: https://blog.51cto.com/alanchan2win/9232404