首页 > 系统相关 >Java 使用flink读写kafka中的数据(windows下)

Java 使用flink读写kafka中的数据(windows下)

时间:2022-09-03 15:11:05浏览次数:81  
标签:Java windows flink kafka -- env new server

一、启动服务(网上查)

1、启动zookeeper

2、启动kafka

3、启动flink

二、写producer

public void kafkaProducer(List<ResultBean> opcValue) throws Exception {

        // 1、准备flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(3);

        // 2、封装实时数据
        ArrayList<String> opcValueList = new ArrayList<>();
        for(ResultBean resultBean : opcValue){
            if(resultBean.isSuccess()){
                opcValueList.add(resultBean.getResult());
                System.out.println("==============实时数据的值=========="+resultBean.getResult());
            }
        }
        DataStreamSource<String> stream = env.fromCollection(opcValueList);

        // 3、创建生产者,连接kafka并发送数据
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("opc6", new SimpleStringSchema(), properties);
        stream.addSink(kafkaProducer);
        System.out.println("----------producer-----------");
        env.execute();
    }

三、写consumer

public static void kafkaConsumer() throws Exception {

        // 1、准备flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //并行度为1,表示不分区
        //env.setParallelism(1);

        // 2、创建消费者,读取kafka中的数据
        Properties props = new Properties();
        //kafka的地址,消费组名
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");

        //Flink设置kafka的offset,从最新的开始
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "opc6", //主题
                new org.apache.flink.api.common.serialization.SimpleStringSchema(),
                props
        );
        kafkaConsumer.setStartFromLatest();
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStreamSource<String> source = env.addSource(kafkaConsumer);
        //source.m
        SingleOutputStreamOperator<String> resDS = source.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) {
                        System.out.println("解析kafka数据");
                logger.info("=================== kafka value {}",value);
                StringBuilder fieldsBuilder = new StringBuilder();
                // 解析JSON数据
                //JSONObject obj = JSON.parseObject(value);
                //JSONObject jsonOb = JSONArray.parseObject(obj.getString("payload"));
                //JSONObject jsonObj = JSONArray.parseObject(jsonOb.getString("after"));

                //logger.info("=================== sink invoke value {}",jsonObj.toString());
                //return jsonObj.toString();
                return null;
            }
        });
        // 打印读取出的数据
        source.print();
        env.execute("=============test==============");

    }

四、分别启动consumer和producer即可

其他配置信息:

https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-src.tgz


kafka修改config下的server.properties中的路径改为自己的
log.dirs=F:\\tool-data\\kafka_2.12\\kafka-logs

启动kafka 进入windows需要进入bin\winndows下的.bat
kafka-server-start.bat F:\tool-data\kafka_2.12\config\server.properties

 

/**
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
https://blog.csdn.net/m0_38052384/article/details/104041658
*/


命令创建topic
https://blog.csdn.net/weixin_41076658/article/details/126330406

kafka-topics.bat --create --topic kafka_topic1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

# 可以通过配置使kafka自动创建topic,但生产环境不会这样使用,这里就不作讲解
# kafka-topics.bat 操作topic的脚本
# --create 代表本次操作动作是创建 kafka_topic1 定义的topic名称
# --bootstrap-server 代表kafka的broker地址,多个地址用逗号隔开,这里是单机且本机,使用localhost:9092
# --partitions 1 代表为名称是kafka_topic1的topic创建一个分区
# --replication-factor 1 代表为已创建的分区创建一个备份分区

查看所有主题
kafka-topics.bat --list --bootstrap-server localhost:9092

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myDemo --from-beginning

flink 启动
https://www.cnblogs.com/M-MAKI/p/16145552.html
ui
http://localhost:8081/


查看端口
kafka端口
netstat -aon|findstr "9092"

zk端口
netstat -aon|findstr "2181"

flink端口
netstat -aon|findstr "8081"

标签:Java,windows,flink,kafka,--,env,new,server
From: https://www.cnblogs.com/caesar-the-great/p/16652636.html

相关文章

  • [javascript] 自调用函数
    自调用函数(IIFE)作用1.隐藏实现2.利用局部作用域,避免污染全局命名空间3.用它编写js模块(function(){vara=3;console.log(a+3);})();......
  • java环境变量配置
    安装好java和jdk之后,对环境进行配置。  ================  上面的administrator是用户变量,下面为系统变量,上面或者下面都可以配置,这里选择配置系统变量。  先......
  • 并发的核心:CAS 是什么?Java8是如何优化 CAS 的?
    大家可能都听说说Java中的并发包,如果想要读懂Java中的并发包,其核心就是要先读懂CAS机制,因为CAS可以说是并发包的底层实现原理。今天就带大家读懂CAS是......
  • 比较:java多线程 sleep()和wait()
    相信看这篇文章的朋友都已经知道进程和线程的区别,也都知道了为什么要使用多线程了。这两个方法主要来源是,sleep用于线程控制,而wait用于线程间的通信,与wait配套的......
  • 并发的核心:CAS 是什么?Java8是如何优化 CAS 的?_2
    大家可能都听说说Java中的并发包,如果想要读懂Java中的并发包,其核心就是要先读懂CAS机制,因为CAS可以说是并发包的底层实现原理。今天就带大家读懂CAS是......
  • 干货 - 写好Java代码的30个技巧,看完终生受用
    成为一个优秀的Java程序员,有着良好的代码编写习惯是必不可少的。下面就让我们来看看代码编写的30条建议吧。(1)类名首字母应该大写。字段、方法以及对象(句柄)的首......
  • 记录 javascript canvas ImageData 解析
    数组的内容:data.length:w*h*4r:0-255g:0-255b:0-255a:0-255Math.round(255*a) 数组遍历:constdata=context.getI......
  • JavaScript 原始值的比较和对象的比较
    原始值有null、undefined、布尔值、数字、字符串。所有的原始值,只要编码值相同,则被认为相等:letpri1=123;letpri2=123;pri1===pri2;//=>true相反,对象的比......
  • java实现单链表源码
    packageMyLink.MySingleLink;importjava.util.Objects;/***单链表结点类**/publicclassNode{/***数据域**/privateObjectdate;/**......
  • java实现双向链表
    package数据结构.链表.双链表;publicclassDoubleNode{privateDoubleNodeprecursor;privateintdate;privateDoubleNodenext;publicDoubleNode(i......