首页 > 其他分享 >Flink DataStream Sink(四)

Flink DataStream Sink(四)

时间:2022-10-11 15:37:28浏览次数:54  
标签:DataStream flink Flink streaming Sink import apache org 2.11

Flink DataStream Sink(四)

文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/

环境信息
    import org.apache.flink.api.scala.ExecutionEnvironment
    //批处理运行上下文环境
    val env = ExecutionEnvironment.getExecutionEnvironment;

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    //流处理运行上下文环境
    val streamenv = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala.createTypeInformation
    val data = streamenv.fromCollection(Array("1,2"))
文件
    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.core.fs.Path
    data.addSink(StreamingFileSink.forRowFormat(new Path("/data/flink/"),new SimpleStringEncoder[String]).build())
kafka连接器
    import org.apache.flink.streaming.api.scala.createTypeInformation
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
    import java.util.Properties
    val properties = new Properties();
    properties.put("bootstrap.servers", "114.116.44.117:9092");
    val flink2kafka = new FlinkKafkaProducer[String]( "events", new SimpleStringSchema(),properties)
       data.addSink(flink2kafka)
redis操作
    import org.apache.flink.streaming.connectors.redis.RedisSink//
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
    val config = new FlinkJedisPoolConfig.Builder().setHost("114.116.44.117").build()
    class MyRedisMapper  extends RedisMapper[String]{
      override def getCommandDescription: RedisCommandDescription = { 
        new RedisCommandDescription(RedisCommand.SET, "key")}
      override def getKeyFromData(string: String): String = {string}
      override def getValueFromData(string: String): String = {string}
    }
    data.addSink(new RedisSink[String](config,new MyRedisMapper()))
Jdbc方式一
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import java.sql.{Connection, DriverManager, PreparedStatement}
    import org.apache.flink.configuration.Configuration
    class Myjdbc  extends RichSinkFunction[String]{
      var conn:Connection = _
      var insert:PreparedStatement = _
      override def open(parameters: Configuration): Unit = {
        conn = DriverManager.getConnection("jdbc:mysql://114.116.44.117:3306/test","root","123456")
        insert = conn.prepareStatement("insert into student(name,score) values(?,?)");
      }
      override def invoke(value: String, context: SinkFunction.Context): Unit = {
        insert.setString(1,value.split(",").apply(0));
        insert.setString(2,value.split(",").apply(1));
        insert.execute()
      }
      override def close(): Unit = {
        insert.close()
        conn.close()
      }
    }
    data.addSink(new Myjdbc())
Jdbc方式二
    import java.sql.PreparedStatement
    import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
    data.addSink(
        JdbcSink.sink(
            "insert into student(name,score) values(?,?)",
            new JdbcStatementBuilder[String] {
                override def accept(t: PreparedStatement, insert:String): Unit = {
                  t.setString(1,insert.split(",").apply(1));
                  t.setString(2,insert.split(",").apply(0));
                }
            },
            JdbcExecutionOptions.builder().withBatchSize(1000)
              .withBatchIntervalMs(200).withMaxRetries(5).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withUrl("jdbc:mysql://114.116.44.117:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8")
              .withDriverName("com.mysql.jdbc.Driver")
              .withUsername("root").withPassword("123456").build()
        ));

    streamenv.execute("source test")
POM
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.8</scala.version>
        <java.version>8</java.version>
        <flink.version>1.13.0</flink.version>
    </properties>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.13.6</version>
        </dependency>

标签:DataStream,flink,Flink,streaming,Sink,import,apache,org,2.11
From: https://www.cnblogs.com/wuxiaolong4/p/16779357.html

相关文章

  • 148-《大数据架构师》Flink-1.14 集群启动源码分析:JobManager 和 T_ev
                ......
  • flinksql读写redis
    0、前言最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个。......
  • Flink DataStream Transform(三)
    FlinkDataStreamTransform(三)环境变量      import org.apache.flink.api.scala.ExecutionEnvironment      val env = ExecutionEnvironment.getEx......
  • Flink自定义Oracle的Source的Demo
    1.实体类@Data@Builder//创建对象@NoArgsConstructor//无参构造函数@AllArgsConstructor//有参构造函数publicclassOrderSink{privateintid;......
  • flink集群(docker版)
    flink集群(docker版)​  时间2021-06-09   目录​​1环境说明 ​​​​2部署nfs ​​​​2.1安装nfs软件包 ​​​​2.2配置参数,映射checkpoints存储目录 ​​​​2......
  • FlinkSQL的DataStream和Table互转的Demo
    1.构建UserLog对象@Data@Builder//创建对象@NoArgsConstructor//无参构造函数@AllArgsConstructor//有参构造函数publicclassUserLog{privateStr......
  • FlinkSQL基础概念
    1.spark和flink的区别Flink中,批处理是流处理的一个特例spark刚好相反,是微小的批次,准实时不能说实时处理。 2.Fink的版本Flink1.12之前的版本,并没有实现流批统一Flin......
  • Flink架构优势及应用场景
       相对于传统的离线计算会存在数据反馈不及时的问题,很难满足急需实时数据做决策的场景Flink是对有界数据和无界数据进行有状态计算的分布式引擎,它是纯流式处理模式。......
  • Flink架构优势及应用场景
    相对于传统的离线计算会存在数据反馈不及时的问题,很难满足急需实时数据做决策的场景Flink是对有界数据和无界数据进行有状态计算的分布式引擎,它是纯流式处理模式。纯流式模......
  • ​大数据面试题——Flink面试进阶篇
    1FlinkJob的提交流程用户提交的FlinkJob会被转化成一个DAG任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交......