首页 > 编程语言 >Java解决单机环境下多数据源的事务问题

Java解决单机环境下多数据源的事务问题

时间:2022-11-07 20:01:13浏览次数:54  
标签:Java 数据源 public return connection SQLException Override 下多 throws

springboot单机环境下的@Transictional可以保证事务,但多数据源的情况就无法使用了,这里简单实现一下多数据源的情况下如何保证事务。

一,事务实现方案

利用 ThreadLocal 将事务方法 内用到的 connection 缓存起来,当业务执行完毕,再统一 commit 或者 rollback;

二,自定义开启事务注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface MultiDSTransaction {
}

三,yml配置多数据源

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    datasource1:
      url: jdbc:mysql://localhost:3306/datasource1?serverTimezone=UTC&useUnicode=true
      username: root
      password: 1234
      driver-class-name: com.mysql.cj.jdbc.Driver
    datasource2:
      type: com.alibaba.druid.pool.DruidDataSource
      url: jdbc:mysql://localhost:3306/datasource2?serverTimezone=UTC&useUnicode=true
      username: root
      password: 1234
      driver-class-name: com.mysql.cj.jdbc.Driver

四,多数据源配置类

/**
 * 注入两个数据源
 */
@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.datasource1")
    public DataSource dataSource1(){
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.datasource2")
    public DataSource dataSource2(){
        return DruidDataSourceBuilder.create().build();
    }

五,重写Connection连接

package com.example.demo.config;

import com.example.demo.config.TransactionContext;

import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

public class CustomConnection implements Connection {
    // 真实的连接
    private Connection connection;

    public CustomConnection(Connection connection) {
        this.connection = connection;
    }
    @Override
    public void commit() throws SQLException {
        // 如果没开启多数据源事务,则走 commit
        if (!TransactionContext.isOpenTran()) {
            connection.commit();
        }
    }

    @Override
    public void rollback() throws SQLException {
        connection.rollback();
    }

    public void commitMultiDbTran() throws SQLException {
        // 如果开启多数据源,则走 这里的 commit
        connection.commit();
    }
    @Override
    public void close() throws SQLException {
        // mybatis 执行完业务后,会触发 close() 操作,如果 connection 被提前 close 了,业务就会出错
        if (!TransactionContext.isOpenTran()) {
            connection.close();
        }
    }
    public void closeMultiDbTran() throws SQLException {
        // 如果开启多数据源事务,则走 这里的 close
        connection.close();
    }

    @Override
    public Statement createStatement() throws SQLException {
        return connection.createStatement();
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return connection.prepareStatement(sql);
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return connection.prepareCall(sql);
    }

    @Override
    public String nativeSQL(String sql) throws SQLException {
        return connection.nativeSQL(sql);
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        connection.setAutoCommit(autoCommit);
    }

    @Override
    public boolean getAutoCommit() throws SQLException {
        return connection.getAutoCommit();
    }

    @Override
    public boolean isClosed() throws SQLException {
        return connection.isClosed();
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return connection.getMetaData();
    }

    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {
        connection.setReadOnly(readOnly);
    }

    @Override
    public boolean isReadOnly() throws SQLException {
        return connection.isReadOnly();
    }

    @Override
    public void setCatalog(String catalog) throws SQLException {
        connection.setCatalog(catalog);
    }

    @Override
    public String getCatalog() throws SQLException {
        return connection.getCatalog();
    }

    @Override
    public void setTransactionIsolation(int level) throws SQLException {
        connection.setTransactionIsolation(level);
    }

    @Override
    public int getTransactionIsolation() throws SQLException {
        return connection.getTransactionIsolation();
    }

    @Override
    public SQLWarning getWarnings() throws SQLException {
        return connection.getWarnings();
    }

    @Override
    public void clearWarnings() throws SQLException {
        connection.clearWarnings();
    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.createStatement(resultSetType,resultSetConcurrency);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.prepareStatement(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.prepareCall(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return connection.getTypeMap();
    }

    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
        connection.setTypeMap(map);
    }

    @Override
    public void setHoldability(int holdability) throws SQLException {
        connection.setHoldability(holdability);
    }

    @Override
    public int getHoldability() throws SQLException {
        return connection.getHoldability();
    }

    @Override
    public Savepoint setSavepoint() throws SQLException {
        return connection.setSavepoint();
    }

    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return connection.setSavepoint(name);
    }

    @Override
    public void rollback(Savepoint savepoint) throws SQLException {
        connection.rollback(savepoint);
    }

    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
        connection.releaseSavepoint(savepoint);
    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.createStatement(resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.prepareStatement(sql,resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.prepareCall(sql,resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return connection.prepareStatement(sql,autoGeneratedKeys);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return connection.prepareStatement(sql,columnIndexes);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return connection.prepareStatement(sql,columnNames);
    }

    @Override
    public Clob createClob() throws SQLException {
        return connection.createClob();
    }

    @Override
    public Blob createBlob() throws SQLException {
        return connection.createBlob();
    }

    @Override
    public NClob createNClob() throws SQLException {
        return connection.createNClob();
    }

    @Override
    public SQLXML createSQLXML() throws SQLException {
        return connection.createSQLXML();
    }

    @Override
    public boolean isValid(int timeout) throws SQLException {
        return connection.isValid(timeout);
    }

    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {
        connection.setClientInfo(name,value);
    }

    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {
        connection.setClientInfo(properties);
    }

    @Override
    public String getClientInfo(String name) throws SQLException {
        return connection.getClientInfo(name);
    }

    @Override
    public Properties getClientInfo() throws SQLException {
        return connection.getClientInfo();
    }

    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return connection.createArrayOf(typeName,elements);
    }

    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return connection.createStruct(typeName,attributes);
    }

    @Override
    public void setSchema(String schema) throws SQLException {
        connection.setSchema(schema);
    }

    @Override
    public String getSchema() throws SQLException {
        return connection.getSchema();
    }

    @Override
    public void abort(Executor executor) throws SQLException {
        connection.abort(executor);
    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
        connection.setNetworkTimeout(executor,milliseconds);
    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        return connection.getNetworkTimeout();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return connection.unwrap(iface);
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return connection.isWrapperFor(iface);
    }
}

六,事务管理Context

package com.example.demo.config;

public class TransactionContext {
    private static final ThreadLocal<Boolean> TRAN_SWITCH_CONTEXT = new ThreadLocal<>();
    static {
        // 默认事务处于关闭状态
        TRAN_SWITCH_CONTEXT.set(false);
    }
    // 开启事务
    public static void openTran() {
        TRAN_SWITCH_CONTEXT.set(true);
    }
    // 关闭事务
    public static void closeTran() {
        TRAN_SWITCH_CONTEXT.set(false);
    }
    // 判断是否开启事务
    public static Boolean isOpenTran() {
        return TRAN_SWITCH_CONTEXT.get();
    }
}

七,自定义数据源

在自定义数据源中注入上边那两个多数据源,维持多数据源执行事务期间用到的连接列表,在自定义数据源中添加事务相关业务,既在获取 连接的地方将 Connection 缓存到 ThreadLocal 中

使用了@Primary注解后,作用是将该bean设置为主要注入bean,当注入相同类型的datasource的bean时就不会注入DataSourceConfig配置类中注入的两个bean了,只会注入这个,mybatis在使用DataSourceUtil.getDataSource的时候获取的是这个自定义数据源,执行的是此自定义数据源的getConnection方法。

package com.example.demo.config;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

/**
 * 管理多个数据源
 */
@Component
@Primary //将该bean设置为主要注入bean,当注入相同类型的datasource的bean时就不会注入DataSourceConfig配置类中注入的两个bean了,只会注入这个
public class DynamicDataSource implements DataSource, InitializingBean {

    /**
     * 多数据源 执行 事务期间用到的连接
     */
    public static final ThreadLocal<List<CustomConnection>> MULTI_TRAN_CONNECTION = new ThreadLocal<>();

    //当前使用的数据源标识
    public static ThreadLocal<String> name = new ThreadLocal<>();
    //两个数据源
    @Autowired
    private DataSource dataSource1;
    @Autowired
    private DataSource dataSource2;

    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = null;
        if(name.get().equals("one")){
            connection = dataSource1.getConnection();
        }else if(name.get().equals("two")){
            connection = dataSource2.getConnection();
        }
        CustomConnection customConnection = new CustomConnection(connection);
        if (TransactionContext.isOpenTran()) {
            customConnection.setAutoCommit(false);
            List<CustomConnection> customConnections = MULTI_TRAN_CONNECTION.get();
            if(customConnections == null){
                customConnections = new ArrayList<>();
            }
            customConnections.add(customConnection);
            MULTI_TRAN_CONNECTION.set(customConnections);
        }
        return customConnection;
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return null;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return null;
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {

    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {

    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return 0;
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return null;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //初始化,默认设置当前连接要获取的数据源是datasource1
        name.set("one");
    }
}

八,写切面

利用 AOP 进行方法拦截,对使用了 多数据源 事务注解的方法,执行事务业务

package com.example.demo.aspect;

import com.example.demo.config.CustomConnection;
import com.example.demo.config.DynamicDataSource;
import com.example.demo.config.TransactionContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.context.annotation.Configuration;

@Aspect
@Configuration
public class MultiDSTransactionConfig {
    @Pointcut("@annotation(com.example.demo.annotation.MultiDSTransaction)")
    public void transactPoint() {}

    @Around("transactPoint()")
    public Object multiTranAop(ProceedingJoinPoint joinPoint) throws Throwable {
        // 开启事务
        TransactionContext.openTran();
        try {
            // 执行业务
            Object proceed = joinPoint.proceed();
            // 提交事务
            for (CustomConnection connection : DynamicDataSource.MULTI_TRAN_CONNECTION.get()) {
                connection.commitMultiDbTran();
                connection.closeMultiDbTran();
            }
            return proceed;
        } catch (Throwable t) {
            for (CustomConnection connection : DynamicDataSource.MULTI_TRAN_CONNECTION.get()) {
                // 事务回滚
                connection.rollback();
                connection.closeMultiDbTran();
            }
            throw t;
        } finally {
            // 清空 事务 连接,关闭当前事务
            DynamicDataSource.MULTI_TRAN_CONNECTION.get().clear();
            TransactionContext.closeTran();
        }
    }
}

九,测试

测试代码如下,当报错的之后,事务同时回滚,数据没插入成功,当未出现报错,数据则都插入成功

@Mapper
public interface TestMapper {

    /**
     * 测试datasource1
     * @return
     */
    @Insert(value = "insert into test1(name) values ('呵呵');")
    int test1();

    /**
     * 测试datasource2
     * @return
     */
    @Insert(value = "insert into test2(name) values ('哈哈');")
    int test2();
}
@RestController
public class TestController {

    @Autowired
    private TestMapper testMapper;

    @RequestMapping("test")
    @MultiDSTransaction
    public void test(){
        //选择datasource1数据源
        DynamicDataSource.name.set("one");
        int i = testMapper.test1();

        //模拟报错
        int k = 1/0;

        //选择datasource2数据源
        DynamicDataSource.name.remove();
        DynamicDataSource.name.set("two");
        int j = testMapper.test2();

        System.out.println("over");
    }
}

标签:Java,数据源,public,return,connection,SQLException,Override,下多,throws
From: https://www.cnblogs.com/fantongxue/p/16867228.html

相关文章

  • Java网络编程
    软件结构C/S结构:即Client/Server结构,指客户端和服务器结构。常见的有QQ、迅雷等。B/S结构:即Browser/Server结构,指浏览端和服务器结构。常见的有谷歌浏览器、火狐浏览器等......
  • JAVA 模板设计模式
    今天来介绍下一个我觉得蛮不错的设计模式(比较容易应用于业务场景),它就是---模板设计模式。OK,我们直接看代码:模板模式,那当然我们需要建一个模板先,建一个抽象类,VehicleControlM......
  • JAVA base64 工具类
    importjava.io.UnsupportedEncodingException;importjava.util.Iterator;importjava.util.Map;importjava.util.Set;importjava.util.SortedMap;publicclassBase64Ut......
  • JAVA 接口签名sign生成 工具类
    importorg.springframework.util.StringUtils;importjava.util.Map;importjava.util.Random;importjava.util.TreeMap;/***@Author:JCccc*@CreateTime:2018-10-30......
  • Java-SSRF
    漏洞分析原理:服务端提供了从其他服务器应用获取数据的功能且没有对目标地址做过滤与限制。大部分的web服务器架构中,web服务器自身都可以访问互联网和服务器所在的内网......
  • JavaScript之数组高阶API—reduce()
    一文搞懂JavaScript数组中最难的数组API——reduce()前面我们讲了数组的一些基本方法,今天给大家讲一下数组的reduce(),它是数组里面非常重要也是比较难的函数,那么这篇文章......
  • JAVA MD5加密工具类
     importjava.io.UnsupportedEncodingException;importjava.security.MessageDigest;importjava.security.NoSuchAlgorithmException;/***@Author:JCccc*@CreateTim......
  • JAVA 责任链设计模式
    这次介绍责任链模式,采用最普遍的请假例子来编码实现。先给列出个模拟的需求,一个人请假,调用一个接口,传入的参数是他请假的天数。然后,请假的天数---->如果小于2天,由直属领导......
  • JAVA回调函数简单讲解 CallBack
    回调,其实就是有个回应的那种感觉。那么,接下来,我们就用消息推送的场景,简简单单地讲解下回调函数的使用。直接看代码,先创建一个回调接口,MessageCallBack/***@Author:JCccc......
  • Java-xss
    XSS代码分析在php里面会使用echo对用户输入的参数进行直接输出,导致了xss漏洞的产生。而在Java里面会将接收到的未经过滤的参数共享到request域中,在jsp的页面里面使用EL表......