Flink DataStream Sink(四)
文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/
环境信息
import org.apache.flink.api.scala.ExecutionEnvironment
//批处理运行上下文环境
val env = ExecutionEnvironment.getExecutionEnvironment;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//流处理运行上下文环境
val streamenv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala.createTypeInformation
val data = streamenv.fromCollection(Array("1,2"))
文件
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.core.fs.Path
data.addSink(StreamingFileSink.forRowFormat(new Path("/data/flink/"),new SimpleStringEncoder[String]).build())
kafka连接器
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import java.util.Properties
val properties = new Properties();
properties.put("bootstrap.servers", "114.116.44.117:9092");
val flink2kafka = new FlinkKafkaProducer[String]( "events", new SimpleStringSchema(),properties)
data.addSink(flink2kafka)
redis操作
import org.apache.flink.streaming.connectors.redis.RedisSink//
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
val config = new FlinkJedisPoolConfig.Builder().setHost("114.116.44.117").build()
class MyRedisMapper extends RedisMapper[String]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SET, "key")}
override def getKeyFromData(string: String): String = {string}
override def getValueFromData(string: String): String = {string}
}
data.addSink(new RedisSink[String](config,new MyRedisMapper()))
Jdbc方式一
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
class Myjdbc extends RichSinkFunction[String]{
var conn:Connection = _
var insert:PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://114.116.44.117:3306/test","root","123456")
insert = conn.prepareStatement("insert into student(name,score) values(?,?)");
}
override def invoke(value: String, context: SinkFunction.Context): Unit = {
insert.setString(1,value.split(",").apply(0));
insert.setString(2,value.split(",").apply(1));
insert.execute()
}
override def close(): Unit = {
insert.close()
conn.close()
}
}
data.addSink(new Myjdbc())
Jdbc方式二
import java.sql.PreparedStatement
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
data.addSink(
JdbcSink.sink(
"insert into student(name,score) values(?,?)",
new JdbcStatementBuilder[String] {
override def accept(t: PreparedStatement, insert:String): Unit = {
t.setString(1,insert.split(",").apply(1));
t.setString(2,insert.split(",").apply(0));
}
},
JdbcExecutionOptions.builder().withBatchSize(1000)
.withBatchIntervalMs(200).withMaxRetries(5).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://114.116.44.117:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root").withPassword("123456").build()
));
streamenv.execute("source test")
POM
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.8</scala.version>
<java.version>8</java.version>
<flink.version>1.13.0</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.13.6</version>
</dependency>
标签:DataStream,flink,Flink,streaming,Sink,import,apache,org,2.11
From: https://www.cnblogs.com/wuxiaolong4/p/16779357.html