代码demo
public static void main(String[] args){
FlinkManager manager = new FlinkManager(FLINK_IBU_BI_UBT_CUSTOM_HOTEL_FAVORITE, IS_LOCAL);
manager.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
manager.setParallelism(30); //建议kafka partition数量的整除数
FlinkKafkaConsumer011 consumer = manager.buildHermesKafkaConsumer011(CUSTOM_OUTPUT_TOPIC, Object.class);
consumer.setStartFromLatest();
manager.addSource(consumer).rebalance().addSink(new CustomSink());
try {
manager.execute();
} catch (Exception e) {
LOG.error("error ,"+e);
}
}
任务执行
mvn打包
// 生产环境,跳过unit测试打包语句
mvn clean package -Pprod -DskipTests=True
标签:框架,error,实时,CUSTOM,manager,计算,FlinkManager,new,consumer
From: https://www.cnblogs.com/alidata/p/18509835