首页 > 编程语言 >Flink Caused by: java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.

Flink Caused by: java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.

时间:2024-12-09 14:32:37浏览次数:8  
标签:lang java flink Long public org apache import class

 

package com.example;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * first flink
 */
public class FlinkMySQLExampleMain
{
    public static void main( String[] args )
    {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从 MySQL 读取数据
        RowTypeInfo rowTypeInfo = new RowTypeInfo(
                TypeInformation.of(String.class), // user_name
                TypeInformation.of(Long.class),  // id
                TypeInformation.of(Long.class),  // age
                TypeInformation.of(Long.class)  // level
        );
        JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.0.211:3306/flink-test")
                .setQuery("SELECT `user_name`,`id`,`age`,`level` FROM flink_user")
                .setRowTypeInfo(rowTypeInfo)
                .setUsername("root")
                .setPassword("test123456")
                .finish();
        DataStream<Row> sourceStream = env.createInput(jdbcInputFormat);
        // 对数据进行简单处理(例如,转换名称为大写)
        DataStream<FlinkExchangeDto> processedStream = sourceStream.map(value -> {
            FlinkExchangeDto dto = new FlinkExchangeDto();
            // 确保类型正确,检查是否有那个字段
            if (value.getField(0) != null) {
                dto.setUserName(value.getField(0).toString().toUpperCase());
            } else {
                // 或者设置默认值
                dto.setUserName("no user name");
            }
            if (value.getField(1) != null) {
                Long id = (Long)value.getField(1);
                dto.setUserId(id);
            } else {
                // 或者设置默认值
                dto.setUserId(0L);
            }

            if (value.getField(2) != null&&value.getField(3) != null) {
                Long age = (Long) value.getField(2);
                Long level = (Long) value.getField(3);
                dto.setActivityType("FromUser-age:"+age.toString()+",level:"+level.toString());
            } else {
                // 或者设置默认值
                dto.setActivityType("FromUser");
            }
            return dto;
        });
        // 将数据写回 MySQL
        processedStream.addSink(JdbcSink.sink(
                "INSERT INTO activity_user (user_id,user_name,activity_type) VALUES ( ?,?,?) ",
                new JdbcStatementBuilder<FlinkExchangeDto>() {
                    @Override
                    public void accept(PreparedStatement ps, FlinkExchangeDto t) throws SQLException {
                        ps.setLong(1, t.getUserId());
                        ps.setString(2, t.getUserName());
                        ps.setString(3, t.getActivityType());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.0.211:3306/flink-test")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("test123456")
                        .build()
        ));

        // 执行 Flink 作业
        try {
            env.execute("Flink MySQL Example123");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class FlinkExchangeDto {

        /**
         * 执行的用户 ID
         */
        private Long userId;

        private String userName;

        /**
         * 执行的活动类型
         */
        private String activityType;

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }
        public String getUserName() {
            return userName;
        }
        public void setUserName(String userName) {
            this.userName = userName;
        }
        public String getActivityType() {
            return activityType;
        }

        public void setActivityType(String activityType) {
            this.activityType = activityType;
        }
    }
}

数据表结构如下

CREATE TABLE `flink_user`  (
  `id` int UNSIGNED NOT NULL AUTO_INCREMENT,
  `user_name` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
  `age` int NOT NULL,
  `level` int NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

 当 MySQL 表中的字段被定义为  int 时,使用Integer 类型

 当 MySQL 表中的字段被定义为 int UNSIGNED 时,使用 Long 类型

 修改代码

package com.example;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * first flink
 */
public class FlinkMySQLExampleMain
{
    public static void main( String[] args )
    {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从 MySQL 读取数据
        RowTypeInfo rowTypeInfo = new RowTypeInfo(
                TypeInformation.of(String.class), // user_name
                TypeInformation.of(Long.class),  // id
                TypeInformation.of(Integer.class),  // age
                TypeInformation.of(Integer.class)  // level
        );
        JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.0.211:3306/flink-test")
                .setQuery("SELECT `user_name`,`id`,`age`,`level` FROM flink_user")
                .setRowTypeInfo(rowTypeInfo)
                .setUsername("root")
                .setPassword("test123456")
                .finish();
        DataStream<Row> sourceStream = env.createInput(jdbcInputFormat);
        // 对数据进行简单处理(例如,转换名称为大写)
        DataStream<FlinkExchangeDto> processedStream = sourceStream.map(value -> {
            FlinkExchangeDto dto = new FlinkExchangeDto();
            // 确保类型正确,检查是否有那个字段
            if (value.getField(0) != null) {
                dto.setUserName(value.getField(0).toString().toUpperCase());
            } else {
                // 或者设置默认值
                dto.setUserName("no user name");
            }
            if (value.getField(1) != null) {
                Long id = (Long)value.getField(1);
                dto.setUserId(id);
            } else {
                // 或者设置默认值
                dto.setUserId(0L);
            }

            if (value.getField(2) != null&&value.getField(3) != null) {
                Integer age = (Integer) value.getField(2);
                Integer level = (Integer) value.getField(3);
                dto.setActivityType("FromUser-age:"+age.toString()+",level:"+level.toString());
            } else {
                // 或者设置默认值
                dto.setActivityType("FromUser");
            }
            return dto;
        });
        // 将数据写回 MySQL
        processedStream.addSink(JdbcSink.sink(
                "INSERT INTO activity_user (user_id,user_name,activity_type) VALUES ( ?,?,?) ",
                new JdbcStatementBuilder<FlinkExchangeDto>() {
                    @Override
                    public void accept(PreparedStatement ps, FlinkExchangeDto t) throws SQLException {
                        ps.setLong(1, t.getUserId());
                        ps.setString(2, t.getUserName());
                        ps.setString(3, t.getActivityType());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.0.211:3306/flink-test")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("test123456")
                        .build()
        ));

        // 执行 Flink 作业
        try {
            env.execute("Flink MySQL Example123");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class FlinkExchangeDto {

        /**
         * 执行的用户 ID
         */
        private Long userId;

        private String userName;

        /**
         * 执行的活动类型
         */
        private String activityType;

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }
        public String getUserName() {
            return userName;
        }
        public void setUserName(String userName) {
            this.userName = userName;
        }
        public String getActivityType() {
            return activityType;
        }

        public void setActivityType(String activityType) {
            this.activityType = activityType;
        }
    }
}

 

标签:lang,java,flink,Long,public,org,apache,import,class
From: https://www.cnblogs.com/baby123/p/18594269

相关文章

  • javascript-Array
    1.序1.javascript数组索引是32位,自0开始。2.数组动态扩张和收缩。3.数组都有length属性,非离散数组Length表示数组中元素个数。离散数组,length大于最大元素的下标。4.数组属性继承于Array.prototype。大多数方法是泛型的。可以在数组和类数组对象中工作。7.1.1Arrayliterals......
  • web前端大作业:旅游网页主题网站设计——武汉旅游网页设计(11页)HTML+CSS+JavaScript
    ......
  • canal基于自定义注解使用【java】
    1、引入pom文件<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>2、自定义注解【BinLogs】importjav......
  • java中的注解使用
    说明java中的注解(Annotation)是用于为代码添加元数据的信息,编译器可以通过注解进行不同的处理。注解本身并不直接影响程序的运行。常见内置注解@Override标记重写父类方法@Deprecated标记类、方法、字段等不推荐使用,可能会在未来的版本中删除。@SuppressWarnings抑制编译器......
  • Java Playwright 浏览器最大化
    Playwright是一个用于自动化Web应用测试的现代工具,支持多种语言(包括Java)及多个浏览器(如Chromium、Firefox和WebKit)。它提供了一致的API来控制浏览器行为,其中包括窗口操作,如最大化。本文将详细介绍如何在JavaPlaywright中实现浏览器窗口的最大化,并提供详细的代码示例。......
  • java review
    一、多态向上转型FUfu=newZi();可以调用子类方法,不能调用子类特有方法(成员方法)成员变量,看等号左边的是谁,调用谁里面的成员变量二、内部类1.什么时候使用内部类:​ 当一个事务的内部,还有一个部分需要定义完整的结构去描述,而这个内部的完整结构又只为外部事物提供......
  • java基础--多线程
    进程与线程进程(Process):每个独立运行的程序都对应一个进程。进程是资源分配的最小单位,占用独立的内存空间和系统资源线程(Thread):CPU调度和分派的基本单位,程序执行过程中的最小单元例如:迅雷是个进程,里面的多个下载任务属于线程二者区别进程是资源分配的基本单位,线程......
  • 如何处理 JavaScript 中的事件委托?
    目录事件委托简介为什么要使用事件委托事件委托的原理事件委托的实际应用4.1示例1:动态生成的列表项点击事件4.2示例2:表单验证事件委托的优缺点常见问题及优化1.事件委托简介事件委托是指将一个事件处理程序绑定到父元素上,而不是直接绑定到每个子元素上。通过事件......
  • 基于java ssm篮球网上商城系统(源码+文档+运行视频+讲解视频)
     文章目录系列文章目录目的前言一、详细视频演示二、项目部分实现截图三、技术栈后端框架SSM前端框架vueSSM框架详细介绍系统测试四、代码参考源码获取目的摘要: 本文论述基于JavaSSM框架构建的篮球网上商城系统。该系统在满足篮球爱好者购物需求和推动篮球运动发......
  • 基于java ssm家用电器上门回收系统回收分配订单(源码+文档+运行视频+讲解视频)
     文章目录系列文章目录目的前言一、详细视频演示二、项目部分实现截图三、技术栈后端框架SSM前端框架vueSSM框架详细介绍系统测试四、代码参考源码获取目的摘要: 本文论述基于JavaSSM框架构建的家用电器上门回收系统。该系统在推动资源循环利用和环保事业中发挥着......