首页 > 数据库 >Flink自定义Oracle的Source的Demo

Flink自定义Oracle的Source的Demo

时间:2022-10-10 16:59:08浏览次数:62  
标签:ps 自定义 private Source connection Oracle null public

1.实体类

@Data
@Builder   //创建对象
@NoArgsConstructor   // 无参构造函数
@AllArgsConstructor //  有参构造函数
public class OrderSink {

    private int id;
    private int itemid;
    private int orderid;
    private float price;
    private String area;
    private String name;
    private String time;
}

2.自定义的Oracle的Source流

 

public class OracleSource extends RichSourceFunction<OrderSink> {
    private Connection connection = null;
    private PreparedStatement ps = null;
    @Override
    public void open(Configuration parameters){

        System.out.println("begin open method");
        try{
            super.open(parameters);
            Class.forName("oracle.jdbc.driver.OracleDriver");
            connection = DriverManager.getConnection("jdbc:oracle:thin:@ip:port:schemal", "username", "password");//获取连接
            ps = connection.prepareStatement("select * from ORDER_SINK");
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("finish open method");
    }

    @Override
    public void run(SourceContext<OrderSink> sourceContext){
        System.out.println("enter run method");
        ResultSet res = null;
        try {
            res = ps.executeQuery();
            while(res.next()){
                OrderSink user = new OrderSink();
                OrderSink.builder()//
                        .name(res.getNString("name"))
                        .build();
                sourceContext.collect(user);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        System.out.println("finish run method");
    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

3.主程序调用

public class CustomOracleSource {

    public static void main(String[] args) throws Exception {
        // 1.流执行环境
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.添加Oracle自定义的Source流
        DataStreamSource<OrderSink> source = fsEnv.addSource(new OracleSource());
        source.print();
        // 3.执行打印
        fsEnv.execute();
    }
}

 

标签:ps,自定义,private,Source,connection,Oracle,null,public
From: https://www.cnblogs.com/glblog/p/16776262.html

相关文章