微服务分布式事务TCC核心实现篇

开课吧樵夫2021-12-22 16:49

  微服务架构的技术体系在社区中越来越成熟。在最初的系统架构建设中,或者当现有架构达到瓶颈需要进化时,许多架构师许多架构师和运维工程师会考虑是否需要建立微服务架构体系。虽然很多文章说微服务架构复杂,会带来很多分布式问题,但只要我们理解并找到解决方案,就会有一种消除云的感觉。

微服务分布式事务TCC核心实现篇

  Seata TCC 模型实现架构图

微服务分布式事务TCC核心实现篇

  将上图的TCC事务模型架构核心代码 抒写如下(以下代码按照图上结构对应):

  TM核心架构代码

  TMClient 客户端的实现

public final class TmRpcClient extends AbstractRpcRemotingClient {

    /**
     * 获取事务管理客户端实例
     */
    public static TmRpcClient getInstance(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = getInstance();
        //设置应用编号
        tmRpcClient.setApplicationId(applicationId);
        //设置TC集群键
        tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
        return tmRpcClient;
    }

    /**
     * 获取事务管理客户端实例
     */
    public static TmRpcClient getInstance() {
        if (null == instance) {
            synchronized (TmRpcClient.class) {
                if (null == instance) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                        new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                            nettyClientConfig.getClientWorkerThreads()),
                        RejectedPolicies.runsOldestTaskPolicy());
                    instance = new TmRpcClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }
}

  TransactionManager 事物管理器契约

public interface TransactionManager {

    /**
     *  开始一个新的全局事务
     */
    String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException;

    /**
     * 提交全局事务
     */
    GlobalStatus commit(String xid) throws TransactionException;

    /**
     *  回滚全局事务
     */
    GlobalStatus rollback(String xid) throws TransactionException;

    /**
     *  获取指定全局事务状态
     */
    GlobalStatus getStatus(String xid) throws TransactionException;

    /**
     *  全局事务报告
     */
    GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}

  DefaultTransactionManager 事物管理器实现

  伪代码 ,提供核心实现点

public class DefaultTransactionManager implements TransactionManager {

    //开始全局事物
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        //构建全局事务请求
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        //同步发送全局事务开启
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }
  
    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            //通过客户端向TC发送协议请求
            return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }
}

  GlobalTransaction 全局事务契约

public interface GlobalTransaction {

    /**
     * 开始全局事务
     */
    void begin() throws TransactionException;

    /**
     * 开始全局事务,指定超时时间
     */
    void begin(int timeout) throws TransactionException;

    /**
     * 开始全局事务,指定事务名
     */
    void begin(int timeout, String name) throws TransactionException;

    /**
     * 全局事务提交
     */
    void commit() throws TransactionException;

    /**
     * 全局事务回滚
     */
    void rollback() throws TransactionException;

    /**
     * 获取全局事务状态
     */
    GlobalStatus getStatus() throws TransactionException;

    /**
     * 全局事务编号
     */
    String getXid();

    /**
     * 报告全局事务状态
     */
    void globalReport(GlobalStatus globalStatus) throws TransactionException;

}

  DefaultGlobalTransaction 全局事务实现

public class DefaultGlobalTransaction implements GlobalTransaction {

    //开始全局事务
    public void begin() throws TransactionException {
        begin(DEFAULT_GLOBAL_TX_TIMEOUT);
    }

    //开始全局事务
    public void begin(int timeout) throws TransactionException {
        begin(timeout, DEFAULT_GLOBAL_TX_NAME);
    }

    //开始全局事务
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            check();
            return;
        }
        if (xid != null) {
            throw new IllegalStateException();
        }
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        //通过事务管理器开始全局事务
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //全局上下文绑定全局事务编号
        RootContext.bind(xid);

    }


    //提交全局事务
    public void commit() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            return;
        }
        if (xid == null) {
            throw new IllegalStateException();
        }
        int retry = COMMIT_RETRY_COUNT;
        try {
            //最大失败重试次数
            while (retry > 0) {
                try {
                    //提交事务
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
    }
  ............... 其他方法基本差不多,可以查看源代码
}

  GlobalTransactionalInterceptor 全局事务方法拦截器

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
    
    //@GlobalTransaction 注解方法拦截
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        //获取目标方法所在的类对象
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
            : null;
        //获取方法对象
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        //获取具体的方法
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        //获取GlobalTransactional注解
        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        //获取全局锁GlobalLock注解
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        //已经开启全局事务 且 方法上面标记有全局事务注解
        if (!disable && globalTransactionalAnnotation != null) {
            //进行全局事务调用流程
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (!disable && globalLockAnnotation != null) { //开启全局事务 且全局锁注解不为空
            //进行全局锁流程
            return handleGlobalLock(methodInvocation);
        } else {
            //普通方法调用
            return methodInvocation.proceed();
        }
    }

  RM核心架构代码

  RMClient 客户端核心代码

public class RMClient {

    /**
     * 资源管理客户端初始化
     */
    public static void init(String applicationId, String transactionServiceGroup) {
        //创建资源管理客户端(单利)
        RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
        //设置资源管理
        rmRpcClient.setResourceManager(DefaultResourceManager.get());
        //设置客户端消息监听器
        rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get(), rmRpcClient));
        //资源客户端初始化
        rmRpcClient.init();
    }

}

  ResourceManagerInbound 资源管理消息接收契约

public interface ResourceManagerInbound {

    /**
     * 接收TC发送的本地事务提交
     */
    BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;

    /**
     * 接收TC发送的本地事务回滚
     */
    BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
}

  ResourceManager 资源管理契约

public interface ResourceManager extends ResourceManagerInbound, ResourceManagerOutbound {

    /**
     * 注册资源
     */
    void registerResource(Resource resource);

    /**
     * 删除资源
     */
    void unregisterResource(Resource resource);

    /**
     * 获取所有资源
     */
    Map<String, Resource> getManagedResources();

    /**
     * 获取事务模型类型
     */
    BranchType getBranchType();
}

  TCCResourceManager TCC事务管理器

public class TCCResourceManager extends AbstractResourceManager {
//分支提交
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
                                     
        //获取TCC资源实体
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        //获取目标对象
        Object targetTCCBean = tccResource.getTargetBean();
        //获取提交方法
        Method commitMethod = tccResource.getCommitMethod();
 
        try {
            boolean result = false;
            //获取 BusinessActionContext 对象
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            // 提交方法调用
            Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
            //获取提交方法结果
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            }
            //返回二阶段提交结果值
            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
        } catch (Throwable t) {
            //失败抛异常
            throw new FrameworkException(t, msg);
        }
    }
    
    //TCC 分支类型
    public BranchType getBranchType() {
        return BranchType.TCC;
    }
}

  TC核心架构代码

  RpcServer 事务协调者(服务端)

public class RpcServer extends AbstractRpcRemotingServer {
 
    
    /**
     * 同步发送响应
     */
    @Override
    public void sendResponse(RpcMessage request, Channel channel, Object msg) {
        Channel clientChannel = channel;
        //非心跳包
        if (!(msg instanceof HeartbeatMessage)) {
            //获取客户端通道
            clientChannel = ChannelManager.getSameClientChannel(channel);
        }
        if (clientChannel != null) {
            //客户端通道不为空则发送响应结果
            super.defaultSendResponse(request, clientChannel, msg);
        } else {
            throw new RuntimeException("channel is error. channel:" + clientChannel);
        }
    }

    /**
     * 同步发送请求且携带响应体
     */
    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object message,
                                  long timeout) throws TimeoutException {
        Channel clientChannel = ChannelManager.getChannel(resourceId, clientId);
        if (clientChannel == null) {
            throw new RuntimeException("rm client is not connected. dbkey:" + resourceId
                + ",clientId:" + clientId);

        }
        return sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
    }

    /**
     * 同步发送请求且携带响应对象
     */
    @Override
    public Object sendSyncRequest(Channel clientChannel, Object message) throws TimeoutException {
        return sendSyncRequest(clientChannel, message, NettyServerConfig.getRpcRequestTimeout());
    }

    /**
     * 同步发送请求且携带响应对象
     */
    @Override
    public Object sendSyncRequest(Channel clientChannel, Object message, long timeout) throws TimeoutException {
        if (clientChannel == null) {
            throw new RuntimeException("rm client is not connected");

        }
        return sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
    }

    /**
     * 异步发送请求且携带响应对象
     */
    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object message)
        throws TimeoutException {
        return sendSyncRequest(resourceId, clientId, message, NettyServerConfig.getRpcRequestTimeout());
    }

    /**
     * 异步发送请求且携带响应对象
     */
    @Override
    public Object sendASyncRequest(Channel channel, Object message) throws TimeoutException {
        return sendAsyncRequestWithoutResponse(channel, message);
    }
}

  TransactionMessageHandler 事务协调者接收消息处理契约

public interface TransactionMessageHandler {

    /**
     * 接收请求消息
     */
    AbstractResultMessage onRequest(AbstractMessage request, RpcContext context);

    /**
     * 接收响应消息
     */
    void onResponse(AbstractResultMessage response, RpcContext context);

}

  以上就是小编为大家整理发布的“微服务分布式事务TCC核心实现篇”一文,更多相关内容尽在开课吧广场Java教程频道。

微服务分布式事务TCC核心实现篇

免责声明:本站所提供的内容均来源于网友提供或网络搜集,由本站编辑整理,仅供个人研究、交流学习使用。如涉及版权问题,请联系本站管理员予以更改或删除。
有用
分享