package com.example; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcInputFormat; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.flink.streaming.api.datastream.DataStream; import java.sql.PreparedStatement; import java.sql.SQLException; /** * first flink */ public class FlinkMySQLExampleMain { public static void main( String[] args ) { // 创建执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 MySQL 读取数据 RowTypeInfo rowTypeInfo = new RowTypeInfo( TypeInformation.of(String.class), // user_name TypeInformation.of(Long.class), // id TypeInformation.of(Long.class), // age TypeInformation.of(Long.class) // level ); JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://192.168.0.211:3306/flink-test") .setQuery("SELECT `user_name`,`id`,`age`,`level` FROM flink_user") .setRowTypeInfo(rowTypeInfo) .setUsername("root") .setPassword("test123456") .finish(); DataStream<Row> sourceStream = env.createInput(jdbcInputFormat); // 对数据进行简单处理(例如,转换名称为大写) DataStream<FlinkExchangeDto> processedStream = sourceStream.map(value -> { FlinkExchangeDto dto = new FlinkExchangeDto(); // 确保类型正确,检查是否有那个字段 if (value.getField(0) != null) { dto.setUserName(value.getField(0).toString().toUpperCase()); } else { // 或者设置默认值 dto.setUserName("no user name"); } if (value.getField(1) != null) { Long id = (Long)value.getField(1); dto.setUserId(id); } else { // 或者设置默认值 dto.setUserId(0L); } if (value.getField(2) != null&&value.getField(3) != null) { Long age = (Long) value.getField(2); Long level = (Long) value.getField(3); dto.setActivityType("FromUser-age:"+age.toString()+",level:"+level.toString()); } else { // 或者设置默认值 dto.setActivityType("FromUser"); } return dto; }); // 将数据写回 MySQL processedStream.addSink(JdbcSink.sink( "INSERT INTO activity_user (user_id,user_name,activity_type) VALUES ( ?,?,?) ", new JdbcStatementBuilder<FlinkExchangeDto>() { @Override public void accept(PreparedStatement ps, FlinkExchangeDto t) throws SQLException { ps.setLong(1, t.getUserId()); ps.setString(2, t.getUserName()); ps.setString(3, t.getActivityType()); } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://192.168.0.211:3306/flink-test") .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("root") .withPassword("test123456") .build() )); // 执行 Flink 作业 try { env.execute("Flink MySQL Example123"); } catch (Exception e) { e.printStackTrace(); } } public static class FlinkExchangeDto { /** * 执行的用户 ID */ private Long userId; private String userName; /** * 执行的活动类型 */ private String activityType; public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getActivityType() { return activityType; } public void setActivityType(String activityType) { this.activityType = activityType; } } }
数据表结构如下
CREATE TABLE `flink_user` ( `id` int UNSIGNED NOT NULL AUTO_INCREMENT, `user_name` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `age` int NOT NULL, `level` int NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
当 MySQL 表中的字段被定义为 int 时,使用Integer
类型
当 MySQL 表中的字段被定义为 int UNSIGNED 时,使用 Long
类型
修改代码
package com.example; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcInputFormat; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.flink.streaming.api.datastream.DataStream; import java.sql.PreparedStatement; import java.sql.SQLException; /** * first flink */ public class FlinkMySQLExampleMain { public static void main( String[] args ) { // 创建执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 MySQL 读取数据 RowTypeInfo rowTypeInfo = new RowTypeInfo( TypeInformation.of(String.class), // user_name TypeInformation.of(Long.class), // id TypeInformation.of(Integer.class), // age TypeInformation.of(Integer.class) // level ); JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://192.168.0.211:3306/flink-test") .setQuery("SELECT `user_name`,`id`,`age`,`level` FROM flink_user") .setRowTypeInfo(rowTypeInfo) .setUsername("root") .setPassword("test123456") .finish(); DataStream<Row> sourceStream = env.createInput(jdbcInputFormat); // 对数据进行简单处理(例如,转换名称为大写) DataStream<FlinkExchangeDto> processedStream = sourceStream.map(value -> { FlinkExchangeDto dto = new FlinkExchangeDto(); // 确保类型正确,检查是否有那个字段 if (value.getField(0) != null) { dto.setUserName(value.getField(0).toString().toUpperCase()); } else { // 或者设置默认值 dto.setUserName("no user name"); } if (value.getField(1) != null) { Long id = (Long)value.getField(1); dto.setUserId(id); } else { // 或者设置默认值 dto.setUserId(0L); } if (value.getField(2) != null&&value.getField(3) != null) { Integer age = (Integer) value.getField(2); Integer level = (Integer) value.getField(3); dto.setActivityType("FromUser-age:"+age.toString()+",level:"+level.toString()); } else { // 或者设置默认值 dto.setActivityType("FromUser"); } return dto; }); // 将数据写回 MySQL processedStream.addSink(JdbcSink.sink( "INSERT INTO activity_user (user_id,user_name,activity_type) VALUES ( ?,?,?) ", new JdbcStatementBuilder<FlinkExchangeDto>() { @Override public void accept(PreparedStatement ps, FlinkExchangeDto t) throws SQLException { ps.setLong(1, t.getUserId()); ps.setString(2, t.getUserName()); ps.setString(3, t.getActivityType()); } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://192.168.0.211:3306/flink-test") .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("root") .withPassword("test123456") .build() )); // 执行 Flink 作业 try { env.execute("Flink MySQL Example123"); } catch (Exception e) { e.printStackTrace(); } } public static class FlinkExchangeDto { /** * 执行的用户 ID */ private Long userId; private String userName; /** * 执行的活动类型 */ private String activityType; public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getActivityType() { return activityType; } public void setActivityType(String activityType) { this.activityType = activityType; } } }
标签:lang,java,flink,Long,public,org,apache,import,class From: https://www.cnblogs.com/baby123/p/18594269