一、启动服务(网上查)
1、启动zookeeper
2、启动kafka
3、启动flink
二、写producer
public void kafkaProducer(List<ResultBean> opcValue) throws Exception { // 1、准备flink环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(3); // 2、封装实时数据 ArrayList<String> opcValueList = new ArrayList<>(); for(ResultBean resultBean : opcValue){ if(resultBean.isSuccess()){ opcValueList.add(resultBean.getResult()); System.out.println("==============实时数据的值=========="+resultBean.getResult()); } } DataStreamSource<String> stream = env.fromCollection(opcValueList); // 3、创建生产者,连接kafka并发送数据 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("opc6", new SimpleStringSchema(), properties); stream.addSink(kafkaProducer); System.out.println("----------producer-----------"); env.execute(); }
三、写consumer
public static void kafkaConsumer() throws Exception { // 1、准备flink环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为1,表示不分区 //env.setParallelism(1); // 2、创建消费者,读取kafka中的数据 Properties props = new Properties(); //kafka的地址,消费组名 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category"); //Flink设置kafka的offset,从最新的开始 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( "opc6", //主题 new org.apache.flink.api.common.serialization.SimpleStringSchema(), props ); kafkaConsumer.setStartFromLatest(); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); DataStreamSource<String> source = env.addSource(kafkaConsumer); //source.m SingleOutputStreamOperator<String> resDS = source.map(new MapFunction<String, String>() { @Override public String map(String value) { System.out.println("解析kafka数据"); logger.info("=================== kafka value {}",value); StringBuilder fieldsBuilder = new StringBuilder(); // 解析JSON数据 //JSONObject obj = JSON.parseObject(value); //JSONObject jsonOb = JSONArray.parseObject(obj.getString("payload")); //JSONObject jsonObj = JSONArray.parseObject(jsonOb.getString("after")); //logger.info("=================== sink invoke value {}",jsonObj.toString()); //return jsonObj.toString(); return null; } }); // 打印读取出的数据 source.print(); env.execute("=============test=============="); }
四、分别启动consumer和producer即可
其他配置信息:
https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-src.tgz
kafka修改config下的server.properties中的路径改为自己的
log.dirs=F:\\tool-data\\kafka_2.12\\kafka-logs
启动kafka 进入windows需要进入bin\winndows下的.bat
kafka-server-start.bat F:\tool-data\kafka_2.12\config\server.properties
/**
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
https://blog.csdn.net/m0_38052384/article/details/104041658
*/
命令创建topic
https://blog.csdn.net/weixin_41076658/article/details/126330406
kafka-topics.bat --create --topic kafka_topic1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
# 可以通过配置使kafka自动创建topic,但生产环境不会这样使用,这里就不作讲解
# kafka-topics.bat 操作topic的脚本
# --create 代表本次操作动作是创建 kafka_topic1 定义的topic名称
# --bootstrap-server 代表kafka的broker地址,多个地址用逗号隔开,这里是单机且本机,使用localhost:9092
# --partitions 1 代表为名称是kafka_topic1的topic创建一个分区
# --replication-factor 1 代表为已创建的分区创建一个备份分区
查看所有主题
kafka-topics.bat --list --bootstrap-server localhost:9092
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myDemo --from-beginning
flink 启动
https://www.cnblogs.com/M-MAKI/p/16145552.html
ui
http://localhost:8081/
查看端口
kafka端口
netstat -aon|findstr "9092"
zk端口
netstat -aon|findstr "2181"
flink端口
netstat -aon|findstr "8081"