SpringBoot整合分布式事务,JTA+Atomikos实现多数据源

开课吧开课吧锤锤2021-07-21 16:13

在整合MySQL,JDBCTemplate这篇中的结尾,我们成功的向两个数据库中分别添加了一条数据。但是我们思考一下,如果PetsServiceImpl.java中的savePets()中如果发生异常了会怎么样呢?  

@Override
@Transactional
public void savePets(Pets pets) {
    petsDAO.save(pets, familyJdbcTemplate);
    petsDAO.save(pets, family2JdbcTemplate);

    // 异常,分母不能为零
    int num = 1/0;
}

我们添加了@Transactional注解,按照正常的思路来说两个数据库中都不应该有数据加入:  

SpringBoot整合分布式事务

数据库Family确实没有数据加入:  

SpringBoot整合分布式事务

但是Family2却有数据加入:  

SpringBoot整合分布式事务

这是不符合逻辑的,因为数据库事物不能跨链接,数据源更不能跨库。如果出现了上述操作那这个事务就变成了分布式事务,需要一个统一协调的管理器。所以我们要解决这个问题。  

JTA实现跨库分布式事务  

XA规范:是一个两阶段提交协议(同时对数据进行处理),被很多数据库和中间件支持。  

两阶段提交协议(2PC):将整个事务分成准备阶段和提交阶段两个阶段。2指是两个阶段,P(Prepare),C(Commit)。  

正常情况下:  

准备阶段:事务管理器(TM)给Family和Family2发送Prepare消息,Family和Family2在本地执行事务,写本地的Undo/Redo日志,但是此时事务没有提交。  

提交阶段:事务管理器(TM)收到Family和Family2超时或执行失败的消息时,会对Family和Family2发送回滚消息。如果事务管理器没有收到关于失败的消息,就会发送提交消息。完成回滚或是提交之后需要释放事务处理过程中使用的锁资源。  

成功情况:  

SpringBoot整合分布式事务

失败情况:  

SpringBoot整合分布式事务

两阶段提交是在数据库层面实现的,下面我们来结合Family和Family2出现的问题进行解析。  

SpringBoot整合分布式事务

现在我们的应用程序持有Family和Family2两个数据库,应用程序通过事务管理器同时通知Family,Family2添加数据。事务管理器收到执行的回复,两个数据库有一方失败,就向另一方发起回滚事务。回滚完毕,资源锁释放。如果都是成功,此时向所有数据库发起提交事务,提交完毕,资源锁释放。  

XA规范缺点:  

同步阻塞,数据库锁定资源时间太长,全局锁,并发低,不适合长事务场景,还需要本地数据库支持XA规范。  

JTA规范:可以理解为XA规范的Java版本。  

Atomikos:分布式事务管理器,JTA/XA的具体实现。(支持分布式事务,单独引入了事物管理器导致性能开销大,不适合用于高并发场景)  

分布式事务就是跨数据库对数据进行操作的事务,要达到的目的是要么都成功,要么都失败。  

首先我们在pom.xml文件中引入atomikos依赖包:  

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

之后我们需要对application.yml进行修改:  

server:
  port: 8888

spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

  datasource:
    familydb:
      uniqueResourceName: family
      xaDataSourceClassName: com.mysql.cj.jdbc.MysqlXADataSource
      xaProperties:
        url: jdbc:mysql://localhost:3306/Family?serverTimezone=GMT%2b8&characterEncoding=utf-8
        username: root
        password: 123456
      exclusiveConnectionMode: true
      minPoolSize: 5
      maxPoolSize: 15
      testQuery: SELECT 1 from dual
    family2db:
      uniqueResourceName: family2
      xaDataSourceClassName: com.mysql.cj.jdbc.MysqlXADataSource
      xaProperties:
        url: jdbc:mysql://localhost:3306/Family2?serverTimezone=GMT%2b8&characterEncoding=utf-8
        username: root
        password: 123456
      exclusiveConnectionMode: true
      minPoolSize: 5
      maxPoolSize: 15
      testQuery: SELECT 1 from dual

现在再来改写一下DataSourceConfig.java:  

@Configuration
public class DataSourceConfig {
    // JTA数据源family
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "familydb")
    public DataSource familyDataSource() {
        // 返回AtomikosDataSourceBean,配置属性也都是注入到这个类里面
        return new AtomikosDataSourceBean();
    }

    // JTA数据源family2
    @Bean
    @ConfigurationProperties(prefix = "family2db")
    public DataSource family2DataSource()  {
        return new AtomikosDataSourceBean();
    }

    // familyJdbcTemplate使用familyDataSource数据源
    @Bean
    public JdbcTemplate familyJdbcTemplate(
            @Qualifier("familyDataSource") DataSource familyDataSource) {
        return new JdbcTemplate(familyDataSource);
    }

    // family2JdbcTemplate使用family2DataSource数据源
    @Bean
    public JdbcTemplate family2JdbcTemplate(
            @Qualifier("family2DataSource") DataSource family2DataSource) {
        return new JdbcTemplate(family2DataSource);
    }
}

将上述代码改写完成后,在config文件下创建TransactionManagerConfig.java:  

package com.javafamily.familydemo.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;


import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

@Configuration
public class TransactionManagerConfig {

    @Bean
    public UserTransaction userTransaction() throws SystemException {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
    public TransactionManager atomikosTransactionManager() throws Throwable {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }

    @Bean(name = "transactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() throws Throwable {
        UserTransaction userTransaction = userTransaction();

        JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
        return manager;
    }
}

以上的代码属于事务管理器配置,事务管理器负责协调多个JTA数据源实现事务机制,这是固定的写法,不需要纠结。  

完成了以上的配置之后,我们再次验证分母为零的异常问题:  

SpringBoot整合分布式事务

报出异常后,查看分别查看两个数据库:  

SpringBoot整合分布式事务

SpringBoot整合分布式事务

两个数据库由于异常都没有数据被插入,问题解决!  

以上就是开课吧广场小编为大家整理发布的“SpringBoot整合分布式事务,JTA+Atomikos实现多数据源”一文,更多Java教程相关内容尽在开课吧Java教程频道。

免责声明:本站所提供的内容均来源于网友提供或网络搜集,由本站编辑整理,仅供个人研究、交流学习使用。如涉及版权问题,请联系本站管理员予以更改或删除。
有用
分享