首页 > 编程语言 >【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

时间:2024-01-13 14:33:19浏览次数:39  
标签:join 示例 flink new api org apache import



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、时态表的join
  • 1、统计需求对应的SQL
  • 2、Without connnector 实现代码
  • 3、With connnector 实现代码
  • 1)、bean定义
  • 2)、序列化定义
  • 3)、实现



本文通过两个示例介绍了时态表TemporalTableFunction的join操作。


本文除了maven依赖外,没有其他依赖。

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、时态表的join

假设有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。

【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)_flink table

1、统计需求对应的SQL

SELECT o.currency, o.amount, r.rate
  o.amount * r.rate AS yen_amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

2、Without connnector 实现代码

就是使用静态数据实现,其验证结果在代码中的注释部分。

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */

import static org.apache.flink.table.api.Expressions.$;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

public class TestTemporalTableFunctionDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Rate {
        private String currency;
        private Integer rate;
        private Long rate_time;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private Long total;
        private String currency;
        private Long order_time;
    }

    final static List<Rate> rateList = Arrays.asList(
            new Rate("US Dollar", 102, 1L),
            new Rate("Euro", 114, 1L),
            new Rate("Yen", 1, 1L),
            new Rate("Euro", 116, 5L),
            new Rate("Euro", 119, 7L)

    );

    final static List<Order> orderList = Arrays.asList(
            new Order(2L, "Euro", 2L),
            new Order(1L, "US Dollar", 3L),
            new Order(50L, "Yen", 4L),
            new Order(3L, "Euro", 5L)

    );

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // order 实时流 事实表
        DataStream<Order> orderDs = env.fromCollection(orderList)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((order, rTimeStamp) -> order.getOrder_time()));

        // rate 实时流 维度表
        DataStream<Rate> rateDs = env.fromCollection(rateList)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Rate>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((rate, rTimeStamp) -> rate.getRate_time()));

        // 转变为Table
        Table orderTable = tenv.fromDataStream(orderDs, $("total"), $("currency"), $("order_time").rowtime());
        Table rateTable = tenv.fromDataStream(rateDs, $("currency"), $("rate"), $("rate_time").rowtime());

        tenv.createTemporaryView("alan_orderTable", orderTable);
        tenv.createTemporaryView("alan_rateTable", rateTable);

        // 定义一个TemporalTableFunction
        TemporalTableFunction rateDim = rateTable.createTemporalTableFunction($("rate_time"), $("currency"));
        // 注册表函数
        // tenv.registerFunction("alan_rateDim", rateDim);
        tenv.createTemporarySystemFunction("alan_rateDim", rateDim);

        String sql = "select o.*,r.rate from alan_orderTable as o,Lateral table (alan_rateDim(o.order_time)) r where r.currency = o.currency ";
        
        // 关联查询
        Table result = tenv.sqlQuery(sql);

        // 打印输出
        DataStream resultDs = tenv.toAppendStream(result, Row.class);

        resultDs.print();
        // rate 流数据(维度表)
        // rateList

        // order 流数据
        // orderList

        // 控制台输出
        // 2> +I[2, Euro, 1970-01-01T00:00:00.002, 114]
        // 5> +I[50, Yen, 1970-01-01T00:00:00.004, 1]
        // 16> +I[1, US Dollar, 1970-01-01T00:00:00.003, 102]
        // 2> +I[3, Euro, 1970-01-01T00:00:00.005, 116]

        env.execute();
    }

}

3、With connnector 实现代码

本处使用的是kafka作为数据源来实现。其验证结果在代码中的注释部分。

1)、bean定义

package org.tablesql.join.bean;

import java.io.Serializable;

import lombok.Data;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
@Data
public  class CityInfo implements Serializable {
    private Integer cityId;
    private String cityName;
    private Long ts;
}
package org.tablesql.join.bean;

import java.io.Serializable;

import lombok.Data;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
@Data
public  class UserInfo implements Serializable {
    private String userName;
    private Integer cityId;
    private Long ts;
}

2)、序列化定义

package org.tablesql.join.bean;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class CityInfoSchema implements DeserializationSchema<CityInfo> {
 
    @Override
    public CityInfo deserialize(byte[] message) throws IOException {
        String jsonStr = new String(message, StandardCharsets.UTF_8);
        CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
        return data;
    }
 
    @Override
    public boolean isEndOfStream(CityInfo nextElement) {
        return false;
    }
 
    @Override
    public TypeInformation<CityInfo> getProducedType() {
        return TypeInformation.of(new TypeHint<CityInfo>() {
        });
    }
    
}
package org.tablesql.join.bean;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;

 /*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class UserInfoSchema implements DeserializationSchema<UserInfo> {
 
    @Override
    public UserInfo deserialize(byte[] message) throws IOException {
        String jsonStr = new String(message, StandardCharsets.UTF_8);
        UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
        return data;
    }
 
    @Override
    public boolean isEndOfStream(UserInfo nextElement) {
        return false;
    }
 
    @Override
    public TypeInformation<UserInfo> getProducedType() {
        return TypeInformation.of(new TypeHint<UserInfo>() {
        });
    }

    
}

3)、实现

package org.tablesql.join;

import static org.apache.flink.table.api.Expressions.$;

import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.tablesql.join.bean.CityInfo;
import org.tablesql.join.bean.CityInfoSchema;
import org.tablesql.join.bean.UserInfo;
import org.tablesql.join.bean.UserInfoSchema;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestJoinDimByKafkaEventTimeDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Kafka的ip和要消费的topic,//Kafka设置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
        props.setProperty("group.id", "kafkatest");

        // 读取用户信息Kafka
        FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(),props);
        userConsumer.setStartFromEarliest();

        userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
                        .<UserInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间
        );
                
        // 读取城市维度信息Kafka
        FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
        cityConsumer.setStartFromEarliest();

        cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
                        .<CityInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间
        );

        
        Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());
        Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());

        tableEnv.createTemporaryView("userTable", userTable);
        tableEnv.createTemporaryView("cityTable", cityTable);

        // 定义一个TemporalTableFunction
        TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));
        // 注册表函数
        // tableEnv.registerFunction("dimCity", dimCity);
        tableEnv.createTemporarySystemFunction("dimCity", dimCity);

        Table u = tableEnv.sqlQuery("select * from userTable");
        // u.printSchema();
        tableEnv.toAppendStream(u, Row.class).print("user流接收到:");

        Table c = tableEnv.sqlQuery("select * from cityTable");
        // c.printSchema();
        tableEnv.toAppendStream(c, Row.class).print("city流接收到:");

        // 关联查询
        Table result = tableEnv
                .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
                        "from userTable as u " +
                        ", Lateral table  (dimCity(u.ts)) d " +
                        "where u.cityId=d.cityId");

        // 打印输出
        DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
        resultDs.print("\t关联输出:");
        // 用户信息格式:
        // {"userName":"user1","cityId":1,"ts":0}
        // {"userName":"user1","cityId":1,"ts":1}
        // {"userName":"user1","cityId":1,"ts":4}
        // {"userName":"user1","cityId":1,"ts":5}
        // {"userName":"user1","cityId":1,"ts":7}
        // {"userName":"user1","cityId":1,"ts":9}
        // {"userName":"user1","cityId":1,"ts":11}
        // kafka-console-producer.sh --broker-list server1:9092 --topic user
        // 城市维度格式:
        // {"cityId":1,"cityName":"nanjing","ts":15}
        // {"cityId":1,"cityName":"beijing","ts":1}
        // {"cityId":1,"cityName":"shanghai","ts":5}
        // {"cityId":1,"cityName":"shanghai","ts":7}
        // {"cityId":1,"cityName":"wuhan","ts":10}
        // kafka-console-producer.sh --broker-list server1:9092 --topic city

        // 输出
        // city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]
        // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]
        // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]
        // city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]
        //         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]
        //         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]
        //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]
        //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]
        //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
        
        env.execute("joinDemo");
    }

}

以上,本文通过两个示例介绍了时态表TemporalTableFunction的join操作。


标签:join,示例,flink,new,api,org,apache,import
From: https://blog.51cto.com/alanchan2win/9232410

相关文章