首页 > 数据库 >【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

时间:2023-12-28 13:36:43浏览次数:26  
标签:String 示例 flink sink org apache import 完整版




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、Flink sink介绍
  • 三、sink 到文件、console示例
  • 1、console输出
  • 2、sink到文件
  • 1)、sink txt文件到hdfs上
  • 2)、sink csv文件到本地
  • 3)、sink text文件到hdfs上(writeUsingOutputFormat)
  • 四、sink到socket示例(writeToSocket)
  • 五、Jdbc/mysql示例
  • 1、maven依赖
  • 2、实现
  • 1)、user bean
  • 2)、内部匿名类实现
  • 3)、lambda实现
  • 4)、普通继承RichSinkFunction实现
  • 5)、完整代码
  • 3、验证
  • 六、sink到redis示例
  • 1、API介绍
  • 2、maven依赖
  • 3、实现
  • 1)、定义redisMapper
  • 2)、使用redisMapper将数据写入redis
  • 4、验证
  • 1)、启动nc
  • 2)、启动redis
  • 3)、启动应用程序
  • 4)、在nc中输入字符串
  • 5)、查看应用程序的控制台输出
  • 6)、通过redis客户端查看alanchanTestinghash表中单词统计数量
  • 七、sink到ClickHouse示例
  • 1、介绍
  • 2、maven依赖
  • 3、创建clickhouse表
  • 4、验证clickhouse web页面是否正常
  • 5、实现
  • 1)、user bean
  • 2)、sink实现
  • 6、验证
  • 1)、nc 输入
  • 2)、启动应用程序
  • 3)、观察应用程序控制台输出
  • 4)、查看clickhouse表中的数据
  • 八、sink到kafka示例
  • 1、环境或版本说明
  • 2、flink sink到kafka示例
  • 1)、介绍
  • 2)、1.13.6版本示例
  • 1、maven依赖
  • 2、实现
  • 3、验证步骤
  • 3)、1.17.0版本示例
  • 1、maven依赖
  • 2、实现
  • 3、验证步骤
  • 九、分布式缓存(Distributed Cache)示例
  • 1、介绍
  • 2、maven依赖
  • 3、实现
  • 4、验证
  • 1)、验证步骤
  • 2)、验证
  • 十、广播变量Broadcast Variables示例
  • 1、介绍
  • 2、广播变量示例
  • 3、验证
  • 4、Broadcast State 与 Broadcast Variable 区别



将常用的Flink sink到几种实现方式以例子的形式进行展示说明,sink的示例包含内置(flie、console和socket)、jdbc、kafka、redis、clickhouse、分布式缓存和广播变量。

本文除了maven依赖外,没有其他依赖。

本文依赖的运行环境有redis、mysql、clickhouse、kafka以及netcat。


一、maven依赖

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>3.1.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>3.1.4</version>
	</dependency>
</dependencies>

二、Flink sink介绍

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。

Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(…) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

注意,DataStream 的 write*() 方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 FileSink。此外,通过 .addSink(…) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。

三、sink 到文件、console示例

1、console输出

该种方法比较简单,直接调用print方法即可。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");
		System.setProperty("HADOOP_USER_NAME", "alanchan");

		// transformation

		// sink
		ds.print();
		ds.print("输出标识");
		//		i like flink
		//		i am alanchan
		//		输出标识> i like flink
		//		输出标识> i am alanchan
		
		// execute
		env.execute();

	}

}

2、sink到文件

1)、sink txt文件到hdfs上

sink到本地是一样的,不再赘述。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");
		System.setProperty("HADOOP_USER_NAME", "alanchan");

		// transformation

		// sink
		ds.print();
		ds.print("输出标识");
//		i like flink
//		i am alanchan
//		输出标识> i like flink
//		输出标识> i am alanchan
		
		// 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件
		//如果并行度是1的话,该路径是一个文件名称;
		//如果并行度大于1的话,该路径是一个文件夹,文件夹下的文件名是并行度的数字,并行度为2,该文件夹下会创建名称为1和2的两个文件
		//运行下面的示例时,hdfs上不能存在该文件,也即下面两行代码运行时需要注释一行
		ds.writeAsText("hdfs://server2:8020///flinktest/sinktest/words").setParallelism(2);		
		ds.writeAsText("hdfs://server2:8020///flinktest/sinktest/words").setParallelism(1);
		
		// execute
		env.execute();

	}

}
  • 并行度为1的运行结果如下
  • 并行度为2的运行结果如下

2)、sink csv文件到本地

该方法中只能使用tuple格式组织数据

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {
	public static final String DEFAULT_LINE_DELIMITER = "\n";
	public static final String DEFAULT_FIELD_DELIMITER = ",";

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<Tuple2<Integer, String>> ds = env.fromElements(
						Tuple2.of(1, "i am alanchan"), 
						Tuple2.of(2, "i like flink"), 
						Tuple2.of(3, "i like hadoop too"),
						Tuple2.of(4, "你呢?")
					);

		// transformation

		// sink
		String file = "D:\\workspace\\flink1.17-java\\testdatadir\\csvfile.csv";

		// 使用默认的其他三个参数
//		ds.writeAsCsv(file);
//		ds.writeAsCsv(file, WriteMode.OVERWRITE);
		ds.writeAsCsv(file, WriteMode.OVERWRITE, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER);

		// execute
		env.execute();
	}

}
  • 输出文件

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink

3)、sink text文件到hdfs上(writeUsingOutputFormat)

输出格式Flink 1.17版本实现的如下

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_kafka_02

FileOutputFormat系统已经实现了几种格式,如下

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_kafka_03

完整代码如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestSinkFileDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");
		System.setProperty("HADOOP_USER_NAME", "alanchan");

		// transformation

		// sink
		TextOutputFormat format = new TextOutputFormat(new Path("hdfs://server1:8020///flinktest/sinktest/foramts"), "UTF-8");
//		CsvOutputFormat format_csv = new CsvOutputFormat(new Path());
		
		format.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
		format.setWriteMode(WriteMode.OVERWRITE);
		ds.writeUsingOutputFormat(format);

		// execute
		env.execute();
	}

}
  • 程序运行后,hdfs文件输出结果

四、sink到socket示例(writeToSocket)

本示例比较简单,没有太多需要说明的地方。

下图是系统内置实现的序列化类,根据自己的需要选择。

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink hive_04


完整代码实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class TestSinkSocketDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds = env.fromElements("i am alanchan", "i like flink");

		// transformation

		// sink
		// ip、端口、序列化
		ds.writeToSocket("192.168.10.42", 9999, new SimpleStringSchema());

		// execute
		env.execute();

	}

}
  • 验证

1、先开通9999端口,本示例是通过nc -lk 9999 来开启的

2、启动应用程序

3、观察9999端口的输出

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink sql_05

五、Jdbc/mysql示例

  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

1、maven依赖

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.38</version>
	<!--<version>8.0.20</version> -->
</dependency>

2、实现

1)、user bean

package org.datastreamapi.sink.custom.jdbc.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private String pwd;
	private String email;
	private int age;
	private double balance;
}

下面三种实现方式均可,具体取决于自己的应用场景。

2)、内部匿名类实现

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;

....

	static void sinkToMysql() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "[email protected]", 19, 800));

		// transformation

		// sink
//		public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions) {
//				return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
//		}
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 1、采用匿名类的方式写
		userDS.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<User>() {

			@Override
			public void accept(PreparedStatement ps, User value) throws SQLException {
				ps.setString(1, value.getName());
				ps.setString(2, value.getPwd());
				ps.setString(3, value.getEmail());
				ps.setInt(4, value.getAge());
				ps.setDouble(5, value.getBalance());
			}
			// (String url, String driverName, String username, String password
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

3)、lambda实现

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;

....

static void sinkToMysqlByLambda() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "[email protected]", 29, 1800));

		// transformation

		// sink
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 2、采用lambda方式
		userDS.addSink(JdbcSink.sink(sql, (ps, value) -> {
			ps.setString(1, value.getName());
			ps.setString(2, value.getPwd());
			ps.setString(3, value.getEmail());
			ps.setInt(4, value.getAge());
			ps.setDouble(5, value.getBalance());
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

4)、普通继承RichSinkFunction实现

package org.datastreamapi.sink.custom.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.datastreamapi.sink.custom.jdbc.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomSinkToMysql extends RichSinkFunction<User> {
	private Connection conn = null;
	private PreparedStatement ps = null;

	@Override
	public void open(Configuration parameters) throws Exception {
		conn = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "123456");
//		private int id;
//		private String name;
//		private String pwd;
//		private String email;
//		private int age;
//		private double balance;
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		ps = conn.prepareStatement(sql);
	}

	@Override
	public void invoke(User value, Context context) throws Exception {
		// 设置?占位符参数值
		ps.setString(1, value.getName());
		ps.setString(2, value.getPwd());
		ps.setString(3, value.getEmail());
		ps.setInt(4, value.getAge());
		ps.setDouble(5, value.getBalance());
		// 执行sql
		ps.executeUpdate();
	}

	@Override
	public void close() throws Exception {
		if (conn != null)
			conn.close();
		if (ps != null)
			ps.close();
	}

}

5)、完整代码

package org.datastreamapi.sink.custom.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomSinkToMysqlDemo {
	static void sinkToMysql() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "[email protected]", 19, 800));

		// transformation

		// sink
//		public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions) {
//				return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
//		}
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 1、采用匿名类的方式写
		userDS.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<User>() {

			@Override
			public void accept(PreparedStatement ps, User value) throws SQLException {
				ps.setString(1, value.getName());
				ps.setString(2, value.getPwd());
				ps.setString(3, value.getEmail());
				ps.setInt(4, value.getAge());
				ps.setDouble(5, value.getBalance());
			}
			// (String url, String driverName, String username, String password
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

	static void sinkToMysqlByLambda() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "[email protected]", 29, 1800));

		// transformation

		// sink
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 2、采用lambda方式
		userDS.addSink(JdbcSink.sink(sql, (ps, value) -> {
			ps.setString(1, value.getName());
			ps.setString(2, value.getPwd());
			ps.setString(3, value.getEmail());
			ps.setInt(4, value.getAge());
			ps.setDouble(5, value.getBalance());
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

	static void sinkToMysql2() throws Exception {
		// 0.env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// 1.source
		DataStream<User> studentDS = env.fromElements(new User(1, "alanchan", "sink mysql", "[email protected]", 39, 3800));
		// 2.transformation

		// 3.sink
		studentDS.addSink(new CustomSinkToMysql());

		// 4.execute
		env.execute();

	}

	public static void main(String[] args) throws Exception {
//		sinkToMysqlByLambda(); //验证结果中id=5001是插入的数据
//		sinkToMysql();//验证结果中id=5002是插入的数据
		sinkToMysql2();//验证结果中id=5003是插入的数据
	}

}

3、验证

创建好相应的库、表,然后运行程序,观察表内数据变化情况。

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_clickhouse_06

六、sink到redis示例

1、API介绍

flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。
更多信息可以参考地址:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

RedisSink 核心类是RedisMapper 是一个接口,使用时要编写自己的redis 操作类实现这个接口中的三个方法,如下所示:

  • 1.getCommandDescription() ,设置使用的redis 数据结构类型和key的名称,通过RedisCommand 设置数据结构类型
  • 2.String getKeyFromData(T data),设置value 中的键值对key的值
  • 3.String getValueFromData(T data),设置value 中的键值对value的值

2、maven依赖

<dependency>
	<groupId>org.apache.bahir</groupId>
	<artifactId>flink-connector-redis_2.12</artifactId>
	<version>1.1.0</version>
	<exclusions>
		<exclusion>
			<artifactId>flink-streaming-java_2.12</artifactId>
			<groupId>org.apache.flink</groupId>
		</exclusion>
		<exclusion>
			<artifactId>flink-runtime_2.12</artifactId>
			<groupId>org.apache.flink</groupId>
		</exclusion>
		<exclusion>
			<artifactId>flink-core</artifactId>
			<groupId>org.apache.flink</groupId>
		</exclusion>
		<exclusion>
			<artifactId>flink-java</artifactId>
			<groupId>org.apache.flink</groupId>
		</exclusion>
	</exclusions>
</dependency>

3、实现

本示例实现的功能如下:
1、nc输入字符串,并进行逗号分隔
2、应用程序针对分隔的字符串按照单词统计数量
3、写入redis中

1)、定义redisMapper

package org.datastreamapi.sink.custom.redis;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * @author alanchan
 *
 */
public class CustomSinkToRedis implements RedisMapper<Tuple2<String, Integer>> {

	@Override
	public RedisCommandDescription getCommandDescription() {
		// Hash(单词,数量),其数据结构key:String("alanchanTesting")
		return new RedisCommandDescription(RedisCommand.HSET, "alanchanTesting");
	}

	@Override
	public String getKeyFromData(Tuple2<String, Integer> data) {
		return data.f0;
	}

	@Override
	public String getValueFromData(Tuple2<String, Integer> data) {
		return data.f1.toString();
	}

}

2)、使用redisMapper将数据写入redis

package org.datastreamapi.sink.custom.redis;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestCustomSinkToRedisDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// Source
		// nc -lk 9999
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		// 以逗号进行输入字符分隔,统计数量
		SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] arr = value.split(",");
				for (String word : arr) {
					out.collect(Tuple2.of(word, 1));
				}
			}
		}).keyBy(t -> t.f0).sum(1);

		// sink
		result.print();
		// redis地址
		FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.10.41").build();
		RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<Tuple2<String, Integer>>(conf, new CustomSinkToRedis());
		result.addSink(redisSink);

		// execute
		env.execute();
	}

}

4、验证

验证步骤如下:

1)、启动nc

nc -lk 9999

2)、启动redis

[root@server1 src]# redis-server
24095:C 10 Jul 05:42:18.432 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 3.0.5 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 24095
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               

24095:M 10 Jul 05:42:18.434 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
24095:M 10 Jul 05:42:18.434 # Server started, Redis version 3.0.5
24095:M 10 Jul 05:42:18.434 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
24095:M 10 Jul 05:42:18.434 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
24095:M 10 Jul 05:42:18.445 * DB loaded from disk: 0.011 seconds
24095:M 10 Jul 05:42:18.445 * The server is now ready to accept connections on port 6379

3)、启动应用程序

4)、在nc中输入字符串

[alanchan@server2 src]$ nc -lk 9999
i am alanchan
i,am,alanchan
^[[A^H
i ,like,flink
i,like,redis

5)、查看应用程序的控制台输出

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink sql_07

6)、通过redis客户端查看alanchanTestinghash表中单词统计数量

[root@server1 src]# redis-cli
。。。。。。
127.0.0.1:6379> HGETALL alanchanTesting
1) "i am alanchan"
2) "1"
127.0.0.1:6379> HGET alanchanTesting 1
(nil)
127.0.0.1:6379> HGET alanchanTesting 'i am alanchan'
"1"
127.0.0.1:6379> hget alanchanTesting 'i'
"1"
127.0.0.1:6379> hget alanchanTesting 'i'
"2"
127.0.0.1:6379> hget alanchanTesting 'i '
"1"
127.0.0.1:6379> hget alanchanTesting 'am'
"1"
127.0.0.1:6379> hget alanchanTesting 'like'
"2"
127.0.0.1:6379>

七、sink到ClickHouse示例

1、介绍

2、maven依赖

<dependency>
  <groupId>ru.ivi.opensource</groupId>
  <artifactId>flink-clickhouse-sink</artifactId>
  <version>1.3.1</version>
</dependency>

3、创建clickhouse表

-- 1、创建数据库 tutorial
--略
-- 2、创建表
CREATE TABLE t_flink_sink_clickhouse (    
id UInt16 COMMENT '员工id',    
name String COMMENT '员工姓名',     
age UInt8 COMMENT '员工年龄' ) 
ENGINE = MergeTree 
ORDER BY id;

4、验证clickhouse web页面是否正常

http://192.168.10.42:8123/【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_kafka_08

5、实现

1)、user bean

import lombok.Data;

@Data
public class User {
	private int id;
	private String name;
	private int age;

	public User(int id, String name, int age) {
		this.id = id;
		this.name = name;
		this.age = age;
	}

	// Java Bean 必须实现的方法,信息通过字符串进行拼接
	public static String convertToCsv(User user) {
		StringBuilder builder = new StringBuilder();
		builder.append("(");

		// add user.id
		builder.append(user.id);
		builder.append(", ");

		// add user.name
		builder.append("'");
		builder.append(String.valueOf(user.name));
		builder.append("', ");

		// add user.age
		builder.append(user.age);

		builder.append(" )");
		return builder.toString();
	}
}

2)、sink实现

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.clickhouse.ClickHouseSink;
import org.clickhouse.model.ClickHouseClusterSettings;
import org.clickhouse.model.ClickHouseSinkConst;

/**
 * @author alanchan
 *
 */
public class TestFinkSinkClickhouse {
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

		// source
		// nc
		DataStream<String> inputStream = env.socketTextStream("192.168.10.42", 9999);

		// Transform
		SingleOutputStreamOperator<String> dataStream = inputStream.map(new MapFunction<String, String>() {
			@Override
			public String map(String data) throws Exception {
				String[] split = data.split(",");
				User user = new User(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2]));
				return User.convertToCsv(user);
			}
		});

		// create props for sink
		Map<String, String> globalParameters = new HashMap<>();
		// clickhouse 的服务地址,该链接访问返回ok
		globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://192.168.10.42:8123/");
		// common
		globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
		globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "/usr/local/bigdata/testdata/clickhouse_failpath");
		globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
		globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
		globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "10");
		globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");

		// set global paramaters
		ParameterTool parameters = ParameterTool.fromMap(globalParameters);
		env.getConfig().setGlobalJobParameters(parameters);

//		env.setParallelism(1);
		Properties props = new Properties();
		// 数据库tutorial和表名称t_flink_sink_clickhouse
		// 需要先创建数据库和表
		// CREATE TABLE t_flink_sink_clickhouse (id UInt16 COMMENT '员工id',name String
		// COMMENT '员工姓名',age UInt8 COMMENT '员工年龄' ) ENGINE = MergeTree ORDER BY id;
		props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "tutorial.t_flink_sink_clickhouse");
		props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
		ClickHouseSink sink = new ClickHouseSink(props);
		dataStream.addSink(sink);
		dataStream.print();

		env.execute();

	}
}

6、验证

1)、nc 输入

[root@server2 etc]# nc -lk 9999
1,alanchan,19
2,alan,20
3,chan,21

2)、启动应用程序

3)、观察应用程序控制台输出

4)、查看clickhouse表中的数据

server2 :) select * from t_flink_sink_clickhouse;

SELECT *
FROM t_flink_sink_clickhouse

Query id: aea358e8-8d9d-4caa-98b1-54903356a7d0

┌─id─┬─name─┬─age─┐
│  2 │ alan │  20 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│  3 │ chan │  21 │
└────┴──────┴─────┘
┌─id─┬─name─────┬─age─┐
│  1 │ alanchan │  19 │
└────┴──────────┴─────┘

3 rows in set. Elapsed: 0.003 sec.

八、sink到kafka示例

1、环境或版本说明

1、该示例需要有kafka的运行环境,kafka的部署与使用参考文章:
1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

2、Flink关于kafka的使用在不同的版本中有不同的实现,最直观的的变化是由FlinkKafkaConsumer换成了KafkaSource,同理sink也有相应的由FlinkKafkaProducer换成了KafkaSink。

3、由于使用kafka涉及的内容较多,请参考文章:
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版

4、本文会提供关于kafka 作为sink的2个版本,即1.13.6和1.17的版本。

5、以下属性在构建 KafkaSink 时是必须指定的:

  • Bootstrap servers, setBootstrapServers(String)
  • 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)

2、flink sink到kafka示例

1)、介绍

Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。

FlinkKafkaProducer 已被弃用并将在 Flink 1.17 中移除,请改用 KafkaSink。

KafkaSink 可将数据流写入一个或多个 Kafka topic。

Kafka sink 提供了构建类来创建 KafkaSink 的实例。

以下两个示例展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka topic。

2)、1.13.6版本示例

1、maven依赖
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.12</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
2、实现
import java.util.Properties;
import java.util.Random;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestKafkaSinkDemo {

	public static void test1() throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2、source-主题:alan_source
		// 准备kafka连接参数
		Properties propSource = new Properties();
		propSource.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 集群地址
		propSource.setProperty("group.id", "flink_kafka");
		propSource.setProperty("auto.offset.reset", "latest");
		propSource.setProperty("flink.partition-discovery.interval-millis", "5000");
		propSource.setProperty("enable.auto.commit", "true");
		// 自动提交的时间间隔
		propSource.setProperty("auto.commit.interval.ms", "2000");

		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_source", new SimpleStringSchema(), propSource);

		// 使用kafkaSource
		DataStream<String> kafkaDS = env.addSource(kafkaSource);

		// 3、transformation-统计单词个数
		SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			private Random ran = new Random();

			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] arr = value.split(",");
				for (String word : arr) {
					out.collect(Tuple2.of(word, 1));
				}
			}
		}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
			@Override
			public String map(Tuple2<String, Integer> value) throws Exception {
				System.out.println("输出:" + value.f0 + "->" + value.f1);
				return value.f0 + "->" + value.f1;
			}
		});

		// 4、sink-主题alan_sink
		Properties propSink = new Properties();
		propSink.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
		propSink.setProperty("transaction.timeout.ms", "5000");

		FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("alan_sink", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

		result.addSink(kafkaSink);

		// 5、execute
		env.execute();
	}

	public static void main(String[] args) throws Exception {
		test1();
	}

}
3、验证步骤
  • 1、创建kafka 主题 alan_source 和 alan_sink
  • 2、驱动程序,观察运行控制台
  • 3、通过命令往alan_source 写入数据,同时消费 alan_sink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>

## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
hello->2
alan->3
alach->2
hello->3
123->1
  • 4、应用程序控制台输出

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_clickhouse_09

3)、1.17.0版本示例

1、maven依赖
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka</artifactId>
   <version>1.17.1</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-base</artifactId>
   <version>1.17.1</version>
</dependency>
2、实现
import java.util.Properties;
import java.util.Random;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestKafkaSinkDemo {

	public static void test2() throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// 2、 source
		KafkaSource<String> source = KafkaSource.<String>builder()
																				.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
																				.setTopics("alan_nsource")
																				.setGroupId("flink_kafka")
																				.setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema())
																				.build();

		DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
		
		
		// 3、 transformation
		DataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] arr = value.split(",");
				for (String word : arr) {
					out.collect(Tuple2.of(word, 1));
				}
			}
		}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
			@Override
			public String map(Tuple2<String, Integer> value) throws Exception {
				System.out.println("输出:" + value.f0 + "->" + value.f1);
				return value.f0 + "->" + value.f1;
			}
		});
		
		// 4、 sink
		KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
		        .setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
		        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
		            .setTopic("alan_nsink")
		            .setValueSerializationSchema(new SimpleStringSchema())
		            .build()
		        )
		        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
		        .build();
		
		result.sinkTo(kafkaSink);
		
		// 5、execute
		env.execute();
		
	}

	public static void main(String[] args) throws Exception {
//		test1();
		test2();
	}

}
3、验证步骤
  • 1、创建kafka 主题 alan_nsource 和 alan_nsink
  • 2、驱动程序,观察运行控制台
  • 3、通过命令往alan_nsource 写入数据,同时消费 alan_nsink 主题的数据
## kafka生产数据
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>


## kafka消费数据
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
  • 4、应用程序控制台输出

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink sql_10

九、分布式缓存(Distributed Cache)示例

1、介绍

Flink提供了一个类似于Hadoop的分布式缓存,以使用户函数的并行实例可以在本地访问文件。此功能可用于共享包含静态外部数据(如字典或机器学习回归模型)的文件。

关于hadoop分布式缓存参考:19、Join操作map side join 和 reduce side join

缓存的工作方式如下:

  • 程序在其ExecutionEnvironment中以特定名称将本地或远程文件系统(如HDFS或S3)的文件或目录注册为缓存文件。
  • 当程序执行时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。
  • 用户函数可以查找指定名称下的文件或目录,并从工作者的本地文件系统访问它。

官方示例代码

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true);

// define your program and execute
...
DataSet<String> input = ...;
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

访问用户函数(此处为MapFunction)中的缓存文件。函数必须扩展RichFunction类,因为它需要访问RuntimeContext。

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {

      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}

2、maven依赖

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>3.1.4</version>
</dependency>

3、实现

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;

import akka.japi.tuple.Tuple4;

/**
 * @author alanchan
 *
 */
public class DistributedCacheSink {

	public static void main(String[] args) throws Exception {
		// env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		// Source
		// 注册分布式缓存文件
		env.registerCachedFile("hdfs://server2:8020//flinktest/words/goodsDistributedCacheFile", "goodsDistributedCacheFile");

		// order数据集(id,name,goodsid)
		DataSource<Tuple3<Integer, String, Integer>> ordersDS = env
				.fromCollection(Arrays.asList(Tuple3.of(1, "alanchanchn", 1), Tuple3.of(2, "alanchan", 4), Tuple3.of(3, "alan", 123)));

		// Transformation
		// 将ordersDS(id,name,goodsid)中的数据和分布式缓存中goodsDistributedCacheFile的数据(goodsid,goodsname)关联,得到这样格式的数据: (id,name,goodsid,goodsname)
		MapOperator<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, Integer, String>> result = ordersDS

				// public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction
				// implements MapFunction<IN, OUT> {
				// @Override
				// public abstract OUT map(IN value) throws Exception;
				// }

				.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, Integer, String>>() {

					// 获取缓存数据,并存储,具体以实际应用为准
					Map<Integer, String> goodsMap = new HashMap<>();

					//读取缓存数据,并放入本地数据结构中
					@Override
					public void open(Configuration parameters) throws Exception {
						// 加载分布式缓存文件
						File file = getRuntimeContext().getDistributedCache().getFile("goodsDistributedCacheFile");
						List<String> goodsList = FileUtils.readLines(file);
						for (String str : goodsList) {
							String[] arr = str.split(",");
							goodsMap.put(Integer.parseInt(arr[0]), arr[1]);
						}
					}

					//关联数据,并输出需要的数据结构
					@Override
					public Tuple4<Integer, String, Integer, String> map(Tuple3<Integer, String, Integer> value) throws Exception {
						// 使用分布式缓存文件中的数据
						// 返回(id,name,goodsid,goodsname)
						return new Tuple4(value.f0, value.f1, value.f2, goodsMap.get(value.f2));
					}
				});

		// Sink
		result.print();

	}

}

4、验证

1)、验证步骤

1、准备分布式文件及其内容,并上传至hdfs中
2、运行程序,查看输出

2)、验证

1、缓存文件内容

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink hive_11


2、上传至hdfs

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_flink_12


3、运行程序,查看结果

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_kafka_13

十、广播变量Broadcast Variables示例

1、介绍

可以将数据广播到TaskManager上就可以供TaskManager中的SubTask/task去使用,数据存储到内存中。这样可以减少大量的shuffle操作,而不需要多次传递给集群节点。比如在数据join阶段,可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降。

广播变量允许您使数据集可用于操作的所有并行实例,以及操作的常规输入。这对于辅助数据集或依赖数据的参数化非常有用。然后,操作员可以将数据集作为集合进行访问。

  • 广播:通过withBroadcastSet(DataSet,String)按名称注册广播集,以及
  • Access:可通过目标运算符处的getRuntimeContext().getBroadcastVariable(String)进行访问。

图示广播的工作方式

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_clickhouse_14


官方示例

// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

在注册和访问广播数据集时,请确保名称(上一示例中的broadcastSetName)匹配。

由于广播变量的内容保存在每个节点的内存中,因此不应变得太大。对于标量值等更简单的事情,您可以简单地将参数作为函数闭包的一部分,或者使用withParameters(…)方法传入配置。

2、广播变量示例

本示例实现上一个缓存示例一样的内容,不过是使用广播实现的。
该示例比较简单,实现逻辑与分布式缓存基本上一样。

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;

/**
 * @author alanchan
 *
 */
public class TestBroadcastVariablesDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// Source
		// student数据集(学号,姓名)
		DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(Arrays.asList(Tuple2.of(1, "alan"), Tuple2.of(2, "alanchan"), Tuple2.of(3, "alanchanchn")));

		// score数据集(学号,学科,成绩)
		DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
				Arrays.asList(Tuple3.of(1, "chinese", 50), Tuple3.of(1, "math", 90), Tuple3.of(1, "english", 90), Tuple3.of(2, "math", 70), Tuple3.of(3, "art", 86)));

		// Transformation
		// 将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)
		// 然后使用scoreDS(学号,学科,成绩)和广播数据studentDS(学号,姓名)进行关联,得到这样格式的数据:(学号,姓名,学科,成绩)
		MapOperator<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>> result = scoreDS
				.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>>() {

					Map<Integer, String> studentsMap = new HashMap<>();

					@Override
					public void open(Configuration parameters) throws Exception {
						// 获取广播数据
						List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentsInfo");
						for (Tuple2<Integer, String> tuple : studentList) {
							studentsMap.put(tuple.f0, tuple.f1);
						}

					}

					@Override
					public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
						// 使用广播数据
						Integer stuId = value.f0;
						String stuName = studentsMap.getOrDefault(stuId, "");

						return new Tuple4(stuId, stuName, value.f1, value.f2);
					}
				}).withBroadcastSet(studentDS, "studentsInfo");
		
		// 4.Sink
		result.print();

	}

}

3、验证

启动程序,运行程序,控制台输出如下:

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版_clickhouse_15

4、Broadcast State 与 Broadcast Variable 区别

关于Broadcast State的介绍,请参考文章:53、Flink 的Broadcast State 模式介绍及示例

Broadcast State 和 Broadcast Variable 都是 Flink 中用于广播数据的机制,但它们之间有一些区别:

  • Broadcast State 是 KeyedStateBackend 的一个实现,它允许将状态数据广播到所有并行任务中。每个并行任务都可以访问相同的状态数据,从而实现状态的共享。Broadcast State 主要用于处理键控的状态,即状态与某个键相关联。
  • Broadcast Variable 是一种简单的广播机制,它可以将任意类型的数据广播到所有并行任务中。每个并行任务都可以访问相同的广播变量值。Broadcast Variable 主要用于处理非键控的数据,即不需要与特定键关联的数据。

总结一下,Broadcast State 和 Broadcast Variable 的主要区别在于:

  • Broadcast State 用于广播键控的状态数据,而 Broadcast Variable 用于广播非键控的数据。
  • Broadcast State 需要与 KeyedStream 一起使用,而 Broadcast Variable 可以与任何类型的 DataStream 一起使用。

以上,将常用的Flink sink到几种实现方式以例子的形式进行展示说明,sink的示例包含内置(flie、console和socket)、jdbc、kafka、redis、clickhouse、分布式缓存和广播变量。

标签:String,示例,flink,sink,org,apache,import,完整版
From: https://blog.51cto.com/alanchan2win/9013164

相关文章

  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、分布式缓存(DistributedCache)示例1、介绍2、maven依赖3、实现4、验证1)、验证步骤2)、验证本文介绍了flink关于分布式缓存的使用示例,比较简单。本文除了maven依赖外,没有其他依赖。本示例需要hadoop环境可用。一、maven依赖为避免篇幅过长,所......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、sink到ClickHouse示例1、介绍2、maven依赖3、创建clickhouse表4、验证clickhouseweb页面是否正常5、实现1)、userbean2)、sink实现6、验证1)、nc输入2)、启动应用程序3)、观察应用程序控制台输出4)、查看clickhouse表中的数据本文介绍了nc作......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、环境或版本说明三、flinksink到kafka示例1、介绍2、1.13.6版本示例1)、maven依赖2)、实现3)、验证步骤3、1.17.0版本示例1)、maven依赖2)、实现3)、验证步骤本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到......
  • Flink实验
     题目:实验八姓名 日期12.8实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。 实验内容与完成情况:(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装IntelliJIDEA,然后使用IntelliJIDEA工具开发WordCount程序,并打包......
  • 【SpringBoot快速入门】(4)SpringBoot项目案例代码示例
    目录1创建工程3配置文件4静态资源之前我们已经学习的Spring、SpringMVC、Mabatis、Maven,详细讲解了Spring、SpringMVC、Mabatis整合SSM的方案和案例,上一节我们学习了SpringBoot的开发步骤、工程构建方法以及工程的快速启动,从这一节开始,我们开始学习SpringBoot配置文件。接下来......
  • 【SpringBoot快速入门】(3)SpringBoot整合junit和MyBatis 详细代码示例与讲解
    目录1.SpringBoot整合junit1.1环境准备1.2编写测试类2.SpringBoot整合mybatis2.1回顾Spring整合Mybatis2.2SpringBoot整合mybatis2.2.1创建模块2.2.2定义实体类2.2.3定义dao接口2.2.4定义测试类2.2.5编写配置2.2.6测试2.2.7使用Druid数据源之前我们已经学习的Spring、......
  • 项目应用多级缓存示例
    前不久做的一个项目,需要在前端实时展示硬件设备的数据。设备很多,并且每个设备的数据也很多,总之就是数据很多。同时,设备的刷新频率很快,需要每2秒读取一遍数据。问题来了,我们如何读取数据,并且在前端展示?我的想法是利用多级缓存:1)首先是有个数据采集程序,不停地采集设备的数据。采集到......
  • Linux中date命令使用示例
    一、.Linux中的date命令date"+%Y-%m-%d"输出当前日期,格式为“年-月-日”,例如:2023-06-01date"+%Y年%m月%d日%H:%M:%S"输出当前日期喝时间,格式为“年月日时:分:秒”,例如:2023年12月28日04:28:11date"+%b"输出当前月份的英文缩写,例如:Jundate"+%B"输出当前月份的英文全称,例......
  • spring MVC 后端 接收 前端 批量添加的数据(简单示例)
    <%@pagecontentType="text/html;charset=UTF-8"language="java"%><html><head>  <title>Title</title></head><body><scriptsrc="${pageScope.request.ContextPath}/js/jquery-3.3.1.min.js&qu......
  • Composite 组合模式简介与 C# 示例【结构型3】【设计模式来了_8】
    Composite组合模式简介与C#示例【结构型3】【设计模式来了_8】 阅读目录〇、简介1、什么是组合设计模式?2、优缺点和适用场景一、简单的代码示例二、根据示例代码看结构三、相关模式回到顶部〇、简介1、什么是组合设计模式?一句话解释:  针对树形结构......