直播平台源码,FlinkSQL实现行转列
1、使用 UNNEST 解析
select
name,course,score
from ods_kafka_student_scores
CROSS JOIN UNNEST(`list`) AS t (course,score);
select
name,course,score
from ods_kafka_student_scores, UNNEST(`list`) AS t (course,score);
select
name,course,score
from ods_kafka_student_scores
LEFT JOIN UNNEST(`list`) AS t (course,score) on true;
2、使用自定义 UDTF 解析
UDTF(自定义表值函数),自定义表值函数。
将 0 个、1 个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是 1 个值。返回的行可以由 1 个或多个列组成。调用一次函数输出多行或多列数据。
必须继承 TableFunction 基类,并实现一个或者多个名为 eval 的方法。
在使用 UDTF 时,需要带上 LATERAL TABLE两个关键字.
@FunctionHint(output = @DataTypeHint("ROW<course STRING,score INT>"))
public class ParserJsonArrayTest extends TableFunction<Row> {
private static final Logger LOG = Logger.getLogger(ParserJsonArrayTest.class);
public void eval(String value) {
try {
JSONArray arrays = JSONArray.parseArray(value);
Iterator<Object> iterator = arrays.iterator();
while (iterator.hasNext()) {
JSONObject jsonObject = (JSONObject) iterator.next();
String course = jsonObject.getString("course");
Integer score = jsonObject.getInteger("score");
collect(Row.of(course,score));
}
} catch (Exception e) {
LOG.error("Parser json failed :" + e.getMessage());
}
}
}
自定义 UDTF 解析的时候,就不需要把 list 字段定义成 ARRAY 类型了,直接定义成 STRING 类型就可以了,并且这种方式会更加的灵活,比如还需要过滤数据或者更复杂的一些操作时都可以在 UDTF 里面完成.
以上就是 直播平台源码,FlinkSQL实现行转列,更多内容欢迎关注之后的文章
标签:score,自定义,course,FlinkSQL,UDTF,转列,源码,iterator From: https://www.cnblogs.com/yunbaomengnan/p/17748933.html