首页 > 其他分享 >SpringBoot整合ShardingJdbc分表

SpringBoot整合ShardingJdbc分表

时间:2024-09-19 18:12:41浏览次数:8  
标签:tables null return SpringBoot tableRule 分表 ShardingJdbc public String

项目中处理接收设备上报日志需求,上报数据量大,因此对数据进行按日期分表处理。

使用技术:ShardingJdbc + rabbitMq + jpa + 多线程处理

引入所需jar :

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.23</version>
        </dependency>

        <!--shardingJDBC-->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
            <version>4.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

在application.yml 中配置数据库分表:

spring:
  application:
    name: data-system
  profiles:
    active: local
  # 关闭驼峰命名
  jpa:
    hibernate:
      naming:
        physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl

  # sharding jdbc配置
  shardingsphere:
    datasource:
      names: ds0
      ds0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql:
        username: 
        password: 
    # 配置表的分布,表的策略
    sharding:
      tables:
        ali_data:
          actual-data-nodes: ds0.ali_data
          key-generator:
            # 指定表 主键id 生成策略为 SNOWFLAKE
            column: id
            type: SNOWFLAKE
          table-strategy:
            standard:
              # 分片字段
              sharding-column: create_time
              # 精确算法实现类路径
              precise-algorithm-class-name: com.chunmi.data.group.shardingjdbc.PreciseAlgorithmCustomer
        data_source:
          actual-data-nodes: ds0.data_source
          key-generator:
            column: id
            type: SNOWFLAKE
          table-strategy:
            standard:
              sharding-column: create_time
              precise-algorithm-class-name: com.chunmi.data.group.shardingjdbc.PreciseAlgorithmCustomer


    # 打开ShardingSphere-sql输出日志---调试时方便查看具体哪张表
    props:
      sql:
        show: true

分片算法:

@Component
public class PreciseAlgorithmCustomer implements PreciseShardingAlgorithm<Date> {
    private static ShardingAlgorithmReload shardingAlgorithmReload;

    @Autowired
    public void setShardingAlgorithmReload(ShardingAlgorithmReload shardingAlgorithmReload) {
        PreciseAlgorithmCustomer.shardingAlgorithmReload = shardingAlgorithmReload;
    }


    @Override
    public String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue) {
        String suffix = ShardingDateUtil.getYearMonthDay(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(preciseShardingValue.getValue()));
        String preciseTable = preciseShardingValue.getLogicTableName() + "_" + suffix;
        if (collection.contains(preciseTable)) {
            return preciseTable;
        } else {
            String table = shardingAlgorithmReload.tryCreateShardingTable(preciseShardingValue.getLogicTableName(), suffix);
            if (StringUtils.isNotBlank(table)) {
                return table;
            } else {
                throw new IllegalArgumentException("未找到匹配的数据表");
            }
        }
    }
}

新建表以及重载:

@Slf4j
@Component
public class ShardingAlgorithmReload {

    @Resource
    private ShardingDataSource shardingDataSource;

    private ShardingRuntimeContext runtimeContext;

    /**
     * 重载表缓存
     */
    public void tableNameCacheReloadAll() {
        ShardingRuntimeContext runtimeContext = getRuntimeContext();

        List<TableRule> tableRuleList = (List<TableRule>) runtimeContext.getRule().getTableRules();
        for (TableRule tableRule : tableRuleList) {
            String nodeName = tableRule.getActualDatasourceNames().stream().findFirst().get();
            Set<String> tablesInDBSet = queryTables(tableRule.getLogicTable());
            refreshTableRule(tableRule, nodeName, tablesInDBSet);
        }
    }

    protected void refreshTableRule(TableRule tableRule, String nodeName, Set<String> tablesInDBSet) {
        // sharding缓存的表名
        Set<String> tableSets = getActualTables(tableRule);
        // 刷新
        if (!tableContrast(tableSets, tablesInDBSet)) {
            List<String> tableList = new ArrayList<>(tablesInDBSet);
            setDatasourceToTablesMap(tableRule, nodeName, tableList);
        }

    }

    private boolean tableContrast(Set<String> actualTableSets, Set<String> tablesInDBSet) {
        if (actualTableSets == null || tablesInDBSet == null) {
            return false;
        }
        if (actualTableSets.size() != tablesInDBSet.size()) {
            return false;
        }
        return actualTableSets.containsAll(tablesInDBSet);
    }

    protected void refreshShardingAlgorithm(TableRule tableRule, String nodeName) {
        // 获取分库分表时真正使用的表名
        Map<String, Set<String>> datasourceToTablesMap = getDatasourceToTablesMap(tableRule);
        Set<String> tables = datasourceToTablesMap.get(nodeName);
        ShardingStrategy shardingStrategy = tableRule.getTableShardingStrategy();
        if (shardingStrategy instanceof ComplexShardingStrategy) {
            ShardingAlgorithm algorithm = getObjectField(shardingStrategy, "shardingAlgorithm");
            setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);
        } else if (shardingStrategy instanceof HintShardingStrategy) {
            ShardingAlgorithm algorithm = getObjectField(shardingStrategy, "shardingAlgorithm");
            setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);
        } else if (shardingStrategy instanceof StandardShardingStrategy) {
            ShardingAlgorithm preciseAlgorithm = getObjectField(shardingStrategy, "preciseShardingAlgorithm");
            setValueToBaseAlgorithm(tableRule, preciseAlgorithm, nodeName, tables);
            ShardingAlgorithm rangeAlgorithm = getObjectField(shardingStrategy, "rangeShardingAlgorithm");
            setValueToBaseAlgorithm(tableRule, rangeAlgorithm, nodeName, tables);
        }
    }

    private void setValueToBaseAlgorithm(TableRule tableRule, ShardingAlgorithm algorithm, String nodeName, Set<String> tables) {

        if (algorithm != null && algorithm instanceof BaseShardingAlgorithm) {
            BaseShardingAlgorithm baseShardingAlgorithm = (BaseShardingAlgorithm) algorithm;
            baseShardingAlgorithm.setLogicTable(tableRule.getLogicTable());
            baseShardingAlgorithm.setTables(tables);
            baseShardingAlgorithm.setTableRule(tableRule);
            baseShardingAlgorithm.setNodeName(nodeName);
        }
    }

    protected ShardingRuntimeContext getRuntimeContext() {
        try {
            if (runtimeContext == null) {
                Method getRuntimeContextMethod = shardingDataSource.getClass().getDeclaredMethod("getRuntimeContext");
                getRuntimeContextMethod.setAccessible(true);
                runtimeContext = (ShardingRuntimeContext) getRuntimeContextMethod.invoke(shardingDataSource, null);
            }
        } catch (Exception e) {
            log.error("发生异常:" + e);
        }
        return runtimeContext;
    }

    protected Set<String> getActualTables(TableRule tableRule) {
        Set<String> tables = getObjectField(tableRule, "actualTables");
        return tables == null ? new LinkedHashSet<>() : tables;
    }

    protected void setDatasourceToTablesMap(TableRule tableRule, String nodeName, List<String> newTableList) {
        synchronized (tableRule) {
            Map<String, Set<String>> datasourceToTablesMap = getDatasourceToTablesMap(tableRule);
            Set<String> tables = datasourceToTablesMap.get(nodeName);
            Collections.sort(newTableList);
            tables.clear();
            tables.addAll(newTableList);
        }
    }

    protected Map<String, Set<String>> getDatasourceToTablesMap(TableRule tableRule) {
        Map<String, Set<String>> tablesMap = getObjectField(tableRule, "datasourceToTablesMap");
        return tablesMap == null ? new HashMap<>(0) : tablesMap;
    }

    protected static <T> T getObjectField(Object object, String fieldName) {
        try {
            Field field = object.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            return (T) field.get(object);
        } catch (Exception e) {
            log.error("发生异常:{}", e);
        }
        return null;
    }

    protected Set<String> queryTables(String tableName) {
        Connection conn = null;
        Statement statement = null;
        ResultSet rs = null;
        Set<String> tables = null;
        try {
            conn = shardingDataSource.getConnection();
            statement = conn.createStatement();
            rs = statement.executeQuery("select table_name from information_schema.tables where table_schema ='ali_sourcedata' and table_name like '" + tableName + "%'");
            tables = new LinkedHashSet<>();
            while (rs.next()) {
                tables.add(rs.getString(1));
            }
        } catch (SQLException e) {
            log.error("获取数据库连接失败!", e);
        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (statement != null) {
                    statement.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                log.error("关闭数据连接失败", e);
            }
        }
        return tables;
    }

    protected void createTable(String tableName, String suffix) {
        String tableAllName = tableName + "_" + suffix;
        String sql = null;
        if (Constant.FIELD_TABLE_DATA.equals(tableName)) {
            sql = "CREATE TABLE `" + tableAllName +
                    "` (`id` bigint NOT NULL AUTO_INCREMENT,`deviceType` varchar(500) NOT NULL,`identifier` varchar(255) DEFAULT NULL,`method` varchar(255) DEFAULT NULL,`productKey` varchar(50) DEFAULT NULL,`deviceName` varchar(50) DEFAULT NULL," +
                    "`time` bigint DEFAULT NULL,`value` varchar(500) DEFAULT NULL,`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`),KEY `idx_time_did_model` (`time`,`deviceName`,`productKey`)," +
                    "KEY `idx_did` (`deviceName`),KEY `idx_model` (`productKey`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";
        } else if (Constant.FIELD_TABLE_DATA_SOURCE.equals(tableName)) {
            sql = "CREATE TABLE `" + tableAllName +
                    "` (`id` bigint NOT NULL AUTO_INCREMENT,`productKey` varchar(50) DEFAULT NULL COMMENT '产品model',`deviceName` varchar(50) DEFAULT NULL COMMENT '产品did',`source_data_json` json DEFAULT NULL COMMENT '源数据'," +
                    "`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`), KEY `idx_deviceName` (`deviceName`),KEY `idx_productKey` (`productKey`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;";
        }
        Connection conn = null;
        Statement statement = null;
        try {
            conn = shardingDataSource.getConnection();
            statement = conn.createStatement();
            statement.executeUpdate(sql);
        } catch (SQLException e) {
            log.error("获取数据库连接失败!", e);
        } finally {
            try {
                if (statement != null) {
                    statement.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                log.error("关闭数据连接失败", e);
            }
        }
    }

    public String tryCreateShardingTable(String tableName, String suffix) {
        String resTable = tableName + "_" + suffix;
        //建表
        createTable(tableName, suffix);
        //重载
        tableNameCacheReloadAll();
        return resTable;
    }

}

工具类:

public class ShardingDateUtil {

    public static final String DATE_FORMAT_DEFAULT = "yyyy-MM-dd HH:mm:ss";
    public static final String DATE_FORMAT_NUMBER = "yyyyMMddHHmmss";
    public static final String YEAR_MONTH_DAY_NUMBER = "yyyyMMdd";
    public static final String YEAR_MONTH_NUMBER = "yyyyMM";
    public static final String DATE_FORMAT_DAY_PATTERN = "yyyy-MM-dd";
    public static final String YEAR_MONTH_DAY_EN_SECOND = "yyyy/MM/dd HH:mm:ss";
    public static final String YEAR_MONTH_DAY_CN_SECOND = "yyyy年MM月dd日 HH时mm分ss秒";
    public static final String YEAR_MONTH_DAY_CN = "yyyy年MM月dd日";
    public static final String MONTH_DAY = "MM-dd";

    public static String getYearMonth(Long date) {
        if (date == null) {
            return null;
        }
        return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(date);
    }

    public static String getYearMonthDay(String date) {
        if (date == null) {
            return null;
        }
        String format = DATE_FORMAT_DEFAULT;
        Date parse = new Date();
        try {
            parse = new SimpleDateFormat(format).parse(date);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return new SimpleDateFormat(YEAR_MONTH_DAY_NUMBER).format(parse);
    }

    public static String getYearMonth(String date, String format) {
        if (date == null) {
            return null;
        }
        if (StringUtils.isBlank(format)) {
            format = DATE_FORMAT_DEFAULT;
        }
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
        return simpleDateFormat.format(date);
    }
}

初始化表:

import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Order(value = 1)
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {
    @Resource
    private ShardingAlgorithmReload shardingAlgorithmReload;
    @Override
    public void run(String... args) throws Exception {
        shardingAlgorithmReload.tableNameCacheReloadAll();
    }
}

添加多线程处理:

@EnableAsync
@Configuration
public class TheadPoolConfig {

    @Bean("CommonThreadPoolExecutor")
    public Executor syncExecutor() {

        // 获取可用处理器的Java虚拟机的数量
        int sum = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数 -> " + sum);

        // 实例化自定义线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置线程池中的核心线程数(最小线程数)
        executor.setCorePoolSize(5);
        // 设置线程池中的最大线程数
        executor.setMaxPoolSize(10);
        // 设置线程池中任务队列的容量
        executor.setQueueCapacity(25);
        // 设置线程池中空闲线程的存活时间
        executor.setKeepAliveSeconds(60);
        // 设置线程池中线程的名称前缀
        executor.setThreadNamePrefix("async-");
        // 设置线程池关闭时等待所有任务完成的时间。
        executor.setAwaitTerminationSeconds(60);
        // 设置线程池中任务队列已满时的拒绝策略,当线程池中的任务队列已满,而且线程池中的线程已经达到了最大线程数时,新的任务就无法被执行。这时就需要设置拒绝策略来处理这种情况。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 设置线程池在关闭时是否等待所有任务完成
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 初始化线程池的配置
        executor.initialize();

        return executor;
    }
}

接口处理mq消息:

  @Resource
    private AliDataService aliDataService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(
                    value = "${queue.data-group}",
                    durable = "true", exclusive = "false",
                    autoDelete = "false",
                    arguments = {@Argument(name = "x-message-ttl", value = "3600000", type = "java.lang.Long")}
            ),
            exchange = @Exchange(name = "${com.chunmi.mq.feiyan.exchange}", type = "topic")))
    @RabbitHandler
    @Async(value = "CommonThreadPoolExecutor")
    public void consumer(String jsonStr) {
        log.info("物联网生活平台设备上报的消息:{}", jsonStr);
        JSONObject jsonObject = JSONObject.parseObject(jsonStr);

        // 处理全部设备事件
        this.processDataSource(jsonObject);

        // 处理设备事件 device_event
        this.processData(jsonObject);
    }

 

标签:tables,null,return,SpringBoot,tableRule,分表,ShardingJdbc,public,String
From: https://www.cnblogs.com/wlong-blog/p/18421103

相关文章

  • 基于SpringBoot+Vue+MySQL的智能物流管理系统
    系统展示系统背景  随着信息技术的飞速发展和电子商务的蓬勃兴起,智能物流管理系统的需求日益迫切。传统的物流管理方式已难以满足高效、精准、实时的管理需求。因此,基于SpringBoot、Vue和MySQL的智能物流管理系统应运而生。该系统旨在通过现代化的技术手段,实现物......
  • springboot大学生科创项目在线管理系统的设计与实现
    大家好,我是永钊,一个混迹在java圈的码农,今天要和大家聊的是一款基于springboot的大学生科创项目在线管理系统,项目源码请联系永钊,目前有各类成品毕设javawebsshssmspringboot等等项目框架,源码丰富。专业团队,咨询就送开题报告,活动限时免费,有需要的朋友可以来留言咨询。本......
  • 基于SpringBoot的旅游信息管理系统+LW示例参考
    系列文章目录1.基于SSM的洗衣房管理系统+原生微信小程序+LW参考示例2.基于SpringBoot的宠物摄影网站管理系统+LW参考示例3.基于SpringBoot+Vue的企业人事管理系统+LW参考示例4.基于SSM的高校实验室管理系统+LW参考示例5.基于SpringBoot的二手数码回收系统+原生微信小......
  • 基于SpringBoot+Vue的医疗服务管理系统+原生微信小程序+LW示例参考
    系列文章目录1.基于SSM的洗衣房管理系统+原生微信小程序+LW参考示例2.基于SpringBoot的宠物摄影网站管理系统+LW参考示例3.基于SpringBoot+Vue的企业人事管理系统+LW参考示例4.基于SSM的高校实验室管理系统+LW参考示例5.基于SpringBoot的二手数码回收系统+原生微信小......
  • springboot+vue音乐网站【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着互联网的飞速发展,音乐已成为人们日常生活中不可或缺的一部分。传统音乐获取方式如购买CD、磁带等逐渐淡出市场,取而代之的是更为便捷、多样的在线音乐服务。当前市场上虽已存在众多音乐网站,但它们在用户体验、歌曲分类的精细化、歌......
  • springboot+vue音乐网站【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展,数字音乐已成为人们日常生活中不可或缺的一部分。音乐网站作为数字音乐传播的重要平台,不仅为用户提供了便捷的音乐获取渠道,还通过丰富的功能和个性化的服务,极大地丰富了人们的音乐体验。然而,当前市场上的音乐......
  • springboot+vue音乐网站【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着互联网的飞速发展,数字化娱乐已成为现代人生活中不可或缺的一部分,其中音乐作为跨越文化和国界的艺术形式,更是受到了广大网民的热烈追捧。传统的音乐获取方式如购买实体唱片或通过电视广播已难以满足用户对音乐资源多元化、即时化、......
  • springboot+vue音乐推荐系统【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着数字化时代的到来,音乐产业经历了前所未有的变革,音乐资源的海量化与获取途径的多样化使得用户在享受音乐盛宴的同时,也面临着信息过载的困扰。如何在浩如烟海的音乐库中快速找到符合个人喜好的音乐作品,成为了一个亟待解决的问题。音......
  • SpringBoot 整合 Activiti 实现工作流(项目代码分享)
    前言activiti工作流引擎项目,企业erp、oa、hr、crm等企事业办公系统轻松落地,一套完整并且实际运用在多套项目中的案例,满足日常业务流程审批需求。一、项目形式springboot+vue+activiti集成了activiti在线编辑器,流行的前后端分离部署开发模式,快速开发平台,可插拔工作流服务。工......
  • springboot 博客交流平台-计算机毕业设计源码56406
    摘要博客交流平台作为一种重要的网络平台,为用户提供了展示自我、分享经验和与他人互动的空间。在国内外,研究者们关注博客交流平台的各个方面,并取得了显著的进展。研究内容主要包括用户体验和界面设计、社交化和互动性、多媒体内容支持、移动设备适配和跨平台体验、数据分析......