首页 > 数据库 >springboot+Flink(读取kafka并存入Mysql)20220921

springboot+Flink(读取kafka并存入Mysql)20220921

时间:2022-09-21 19:44:23浏览次数:81  
标签:20220921 Flink springboot flink kafka import apache org public

1、mysql数据库test

 

2、kafka创建主题student

 

 

3、pom.xml

<properties>
  <java.version>1.8</java.version>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.10.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.10.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.10.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.10.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.10.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web_2.11</artifactId>
    <version>1.9.2</version>
  </dependency>


  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.78</version>
  </dependency>
  <dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.9.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-dbcp2</artifactId>
    <version>2.1.1</version>
  </dependency>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
  </dependency>
  <dependency>
    <groupId>commons-logging</groupId>
    <artifactId>commons-logging</artifactId>
    <version>1.2</version>
  </dependency>
</dependencies>

 

4、实体Student

public class Student {
  private Integer id;
  private String name;
  private String password;
  private Integer age;

  public Student(Integer id, String name, String password, Integer age) {
    this.id = id;
    this.name = name;
    this.password = password;
    this.age = age;
  }

  public Integer getId() {
    return id;
  }

  public void setId(Integer id) {
    this.id = id;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public String getPassword() {
    return password;
  }

  public void setPassword(String password) {
    this.password = password;
  }

  public Integer getAge() {
    return age;
  }

  public void setAge(Integer age) {
    this.age = age;
  }
}

 

5、GsonUtil工具类
  import com.google.gson.Gson;
  import com.google.gson.GsonBuilder;
  import java.lang.reflect.Type;
  import java.nio.charset.Charset;


  public class GsonUtil {
    private final static Gson gson = new Gson();

    private final static Gson disableHtmlEscapingGson = new GsonBuilder().disableHtmlEscaping().create();

    public static <T> T fromJson(String value, Class<T> type) {
      return gson.fromJson(value, type);
    }

    public static <T> T fromJson(String value, Type type) {
      return gson.fromJson(value, type);
    }

    public static String toJson(Object value) {
      return gson.toJson(value);
    }

    public static String toJsonDisableHtmlEscaping(Object value) {
      return disableHtmlEscapingGson.toJson(value);
    }

  }

 

6、kafka生产者producer

     注:kafka先批量生成student的json数据

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaUtil {
  public static final String broker_list = "localhost:9094";
  public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topic

  /**
  * 往Kafka写入数据
  * Student的json串
  * 主题topic:student
  * @throws InterruptedException
  */
  public static void writeToKafka() throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", broker_list);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer producer = new KafkaProducer<String, String>(props);

    for (int i = 51; i <= 100; i++) {
      Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
      ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(student));
      producer.send(record);
      System.out.println("发送数据: " + GsonUtil.toJson(student));
      Thread.sleep(10 * 1000); //发送一条数据 sleep 10s,相当于 1 分钟 6 条
    }
    producer.flush();
  }

  public static void main(String[] args) throws InterruptedException {
    writeToKafka();
  }
}

 

 

 

 7、SinkToMySQL 工具类  

        注:student数据写入Mysql的student表

 

  import org.apache.commons.dbcp2.BasicDataSource;
  import org.apache.flink.configuration.Configuration;
  import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  import java.sql.Connection;
  import java.sql.PreparedStatement;
  import java.util.List;

 

  public class SinkToMySQL extends RichSinkFunction<List<Student>> {
    PreparedStatement ps;
    BasicDataSource dataSource;
    private Connection connection;

 

    /**
    * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
    *
    * @param parameters
    * @throws Exception
    */
    @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    dataSource = new BasicDataSource();
    connection = getConnection(dataSource);
    String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
    ps = this.connection.prepareStatement(sql);
  }

 

  @Override
  public void close() throws Exception {
    super.close();
    //关闭连接和释放资源
    if (connection != null) {
      connection.close();
    }
    if (ps != null) {
      ps.close();
    }
  }

 

  /**
    * 每条数据的插入都要调用一次 invoke() 方法
  *
  * @param value
  * @param context
  * @throws Exception
  */
  @Override
  public void invoke(List<Student> value, Context context) throws Exception {
    //遍历数据集合
    for (Student student : value) {
      ps.setInt(1, student.getId());
      ps.setString(2, student.getName());
      ps.setString(3, student.getPassword());
      ps.setInt(4, student.getAge());
      ps.addBatch();
    }
    int[] count = ps.executeBatch();//批量后执行
    System.out.println("成功了插入了" + count.length + "行数据");
  }

 


  private static Connection getConnection(BasicDataSource dataSource) {
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
    dataSource.setUrl("jdbc:mysql://localhost:3306/test");
    dataSource.setUsername("root");
    dataSource.setPassword("root");
    //设置连接池的一些参数
    dataSource.setInitialSize(10);
    dataSource.setMaxTotal(50);
    dataSource.setMinIdle(2);

 

    Connection con = null;
    try {
      con = dataSource.getConnection();
      System.out.println("创建连接池:" + con);
    } catch (Exception e) {
      System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
    }
      return con;
  }
}

 

      

 

8、Main kafka消费者

     注:读取kafka并存入Mysql

 

  import org.apache.flink.api.common.serialization.SimpleStringSchema;
  import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
  import org.apache.flink.streaming.api.windowing.time.Time;
  import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  import org.apache.flink.util.Collector;
  import java.util.ArrayList;
  import java.util.List;
  import java.util.Properties;


  public class Main {
    public static void main(String[] args) throws Exception{
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9094");//kafka
      props.put("zookeeper.connect", "localhost:2181");//zk
      props.put("group.id", "test");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("auto.offset.reset", "latest");

      //读取Kafka数据,主题topic:student
      FlinkKafkaConsumer<String> consumer=new FlinkKafkaConsumer<String>("student" ,new SimpleStringSchema(),props);
      consumer.setStartFromEarliest();//从初始值开始
      SingleOutputStreamOperator<Student> studentoso= env.addSource(consumer).setParallelism(1).map(string -> GsonUtil.fromJson(string, Student.class)); //
      studentoso.print();
      studentoso.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
        @Override
        public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
          ArrayList<Student> students = Lists.newArrayList(values);
          if (students.size() > 0) {
            System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
            out.collect(students);
          }
        }
      }).addSink(new SinkToMySQL());//读取Kafka数据{students} 存入Mysql

      env.execute("flink learning connectors kafka");
    }
  }

 

 

 

 

 

 9、其他问题

  Main kafka消费者 一直提示

 一直提示The group coordinator is not available-Kafka

解决把zk  里brokers/toptics 清除一下并重启kafka

 

标签:20220921,Flink,springboot,flink,kafka,import,apache,org,public
From: https://www.cnblogs.com/smallfa/p/16716917.html

相关文章

  • flinksql实时数仓开发
    pom文件<groupId>com.ssi</groupId><artifactId>datalake</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><name>DataLake</nam......
  • StarRocks flink 同步工具 smt 使用
    注:本文主要内容来自于官网MySQL实时同步至StarRocks功能简介StarRocks提供FlinkCDCconnector、flink-connector-starrocks和StarRocks-migrate-tools(简称smt),实......
  • 计算机毕业设计 SpringBoot+Vue校园疫情防控系统 校园疫情管理系统 社区疫情防控系统J
    ......
  • Flink系列--Flink catalog
    Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临......
  • HO引擎近况20220921
    这俩月一直在忙公司的项目,好多事都要我来处理我打算在公司再买一台显示器组三显,但是公司的显卡太破了接口不够,我只好又买了一张显卡自从之前XGP被回收之后我也没有再......
  • springcloud或者springboot项目生产如何发版
    我们要先从注册中心将服务下线为了通用性,不管任何注册中心都能使用统一的逻辑,我们在项目提供下面的Controller:@RestControllerpublicclassServerDeRegisterControl......
  • Maven-20220921第七组薛雯匀
    Maven:项目构建工具,主流整个项目架构,source,resource,test,testresource依赖:导入的jar包。对项目进行打包。apache基金会作为一个java程序员,有必要连接一下apache的官网命......
  • springboot逆向工程
    通过逆向工程少写很多代码generatorConfig.xml<?xmlversion="1.0"encoding="UTF-8"?><!DOCTYPEgeneratorConfigurationPUBLIC"-//mybatis.org//DTDMyBati......
  • Flink-状态一致性(如何保证exactly-once、flink+kafka端到端保证exactly-once)
    当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结......
  • Flink-checkpoint配置及重启策略
    Flink-checkpoint配置及重启策略valenv=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//---------------checkpoint配置-......