flinkcdc mysql到kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class FlinkCDCDemo {
// 定义 Kafka Topic 名字和 Flink Checkpoint 目录
private static final String KAFKA_TOPIC = "test";
private static final String CHECKPOINT_DIR = "file:///tmp/flink-cdc";
public static void main(String[] args) throws Exception {
// 创建 Flink 执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironmentImpl.create(env, settings);
// 设置 FlinkCDC 的配置信息
Properties props = new Properties();
props.setProperty("scan.startup.mode", "earliest-offset");
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "root");
props.setProperty("database.password", "password");
props.setProperty("database.server.id", "1");
props.setProperty("database.server.name", "mysql-binlog");
props.setProperty("database.whitelist", "test_db");
props.setProperty("table.whitelist", "test_table");
// 创建 FlinkCDC 的 Source,读取 MySQL 数据库的变更数据
tEnv.executeSql(String.format(
"CREATE TABLE mysql_source (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = 'test',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'database.hostname' = '%s',\n" +
" 'database.port' = '%s',\n" +
" 'database.user' = '%s',\n" +
" 'database.password' = '%s',\n" +
" 'database.server.id' = '%s',\n" +
" 'database.server.name' = '%s',\n" +
" 'database.whitelist' = '%s',\n" +
" 'table.whitelist' = '%s',\n" +
" 'debezium.transforms' = 'unwrap,flatten'\n" +
")",
"localhost:9092",
props.getProperty("database.hostname"),
props.getProperty("database.port"),
props.getProperty("database.user"),
props.getProperty("database.password"),
props.getProperty("database.server.id"),
props.getProperty("database.server.name"),
props.getProperty("database.whitelist"),
props.getProperty("table.whitelist")
));
// 创建 Kafka Producer,用于将变更数据发送到 Kafka Topic 中
Properties producerProps = new Properties();
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
KAFKA_TOPIC,
new KeyedSerializationSchema<Row>() {
@Override
public ProducerRecord<byte[], byte[]> serializeKeyAndValue(Row row) {
return null;
}
@Override
public ProducerRecord<byte[], byte[]> serializeValue(Row row) {
// 把 Row 转成 JSON 字符串,并将其发送到 Kafka Topic
return new ProducerRecord<>(KAFKA_TOPIC, row.toString().getBytes());
}
},
producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
// 将 Source 读取的数据写入 Kafka Topic 中
tEnv.executeSql(String.format(
"INSERT INTO kafka_sink\n" +
"SELECT id, name\n" +
"FROM mysql_source"
)).print();
// 启动 Flink Job
env.execute("FlinkCDC Demo");
}
}
从kafka到mysql
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.mysql.MySQLSinkFunction;
import org.apache.flink.streaming.connectors.mysql.MySQLUpsertTableSink;
import org.apache.flink.streaming.connectors.mysql.StatementSetSinkFunction;
import org.apache.flink.streaming.connectors.mysql.descriptor.MySQLDescriptor;
import org.apache.flink.streaming.connectors.mysql.descriptor.MySQLValidator;
import org.apache.flink.streaming.connectors.mysql.ExperimentalUpsertMySQLSinkFunction;
import org.apache.flink.streaming.connectors.mysql.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.mysql.MySQLConnectionProvider;
import org.apache.flink.types.Row;
public class FlinkKafkaToMySQLDemo {
private static final String KAFKA_TOPIC = "test";
private static final String MYSQL_JDBC_URL = "jdbc:mysql://localhost:3306/test";
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PASSWORD = "password";
public static void main(String[] args) throws Exception {
// 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置 Checkpointing 配置
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 设置 MySQLSink 配置
MySQLConnectionProvider connectionProvider = new MySQLConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(MYSQL_JDBC_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
}
};
MySQLUpsertTableSink mysqlSink = MySQLUpsertTableSink.builder()
.withConnectionProvider(connectionProvider)
.withDbName("test")
.withTableName("test_table")
.withFlushIntervalMillis(1000)
.build();
// 创建 Kafka Consumer,读取指定 Topic 中的数据
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test-consumer-group");
props.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
KAFKA_TOPIC,
new SimpleStringSchema(),
props
);
// 将 Kafka 数据转成 Row 类型(假设 Kafka 中的数据为 id,value 形式)
DataStream<Row> kafkaStream = env.addSource(kafkaConsumer)
.map((MapFunction<String, Row>) value -> {
String[] fields = value.split(",");
return Row.of(Integer.parseInt(fields[0]), fields[1], new Timestamp(System.currentTimeMillis()));
});
// 写入 MySQL
kafkaStream.addSink(mysqlSink).name("MySQL Upsert Sink");
// 启动 Flink Job
env.execute("Flink Kafka to MySQL Demo");
}
}
标签:flink,database,mysql,chatpgt,kafka,org,apache,import,props
From: https://www.cnblogs.com/hbym/p/17304157.html