1.实体类
@Data @Builder //创建对象 @NoArgsConstructor // 无参构造函数 @AllArgsConstructor // 有参构造函数 public class OrderSink { private int id; private int itemid; private int orderid; private float price; private String area; private String name; private String time; }
2.自定义的Oracle的Source流
public class OracleSource extends RichSourceFunction<OrderSink> { private Connection connection = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters){ System.out.println("begin open method"); try{ super.open(parameters); Class.forName("oracle.jdbc.driver.OracleDriver"); connection = DriverManager.getConnection("jdbc:oracle:thin:@ip:port:schemal", "username", "password");//获取连接 ps = connection.prepareStatement("select * from ORDER_SINK"); }catch (Exception e){ e.printStackTrace(); } System.out.println("finish open method"); } @Override public void run(SourceContext<OrderSink> sourceContext){ System.out.println("enter run method"); ResultSet res = null; try { res = ps.executeQuery(); while(res.next()){ OrderSink user = new OrderSink(); OrderSink.builder()// .name(res.getNString("name")) .build(); sourceContext.collect(user); } } catch (SQLException e) { e.printStackTrace(); } System.out.println("finish run method"); } @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { e.printStackTrace(); } } }
3.主程序调用
public class CustomOracleSource { public static void main(String[] args) throws Exception { // 1.流执行环境 StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.添加Oracle自定义的Source流 DataStreamSource<OrderSink> source = fsEnv.addSource(new OracleSource()); source.print(); // 3.执行打印 fsEnv.execute(); } }
标签:ps,自定义,private,Source,connection,Oracle,null,public From: https://www.cnblogs.com/glblog/p/16776262.html