SpringBoot Mybatis 乐观锁重试机制代码实现

乐观锁重试机制代码实现

有乐观锁,那当然也是有悲观锁的

悲观锁和乐观锁的原理和应用场景

悲观锁(Pessimistic Lock)

顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock)

顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁

应用场景

悲观锁:

比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。

乐观锁:

比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。

总结:两种所各有优缺点,读取频繁使用乐观锁,写入频繁使用悲观锁。

代码实现原理

这里采用版本号的方式

简单例子表结构

CREATE TABLE `usr_user_account` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(20) NOT NULL COMMENT '用户ID',
  `balance` decimal(50,10) DEFAULT '0.0000000000' COMMENT '余额',
  `version` bigint(20) DEFAULT '0' COMMENT '乐观锁版本号',
  `create_by` bigint(20) DEFAULT NULL COMMENT ' 创建人',
  `create_time` datetime NOT NULL COMMENT ' 创建时间',
  `modify_by` bigint(20) DEFAULT NULL,
  `modify_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='用户账户表';

比如现在表中有两条数据
SpringBoot Mybatis 乐观锁重试机制代码实现

现在 user_id = 1001user_id=1002 转账 50
转账,无非就是一加一减,我就不啰嗦了,先看完整代码实现 如下

@Service
@Transactional
public class UserAccountService implements IUserAccountService{

    @Autowired
    private UserAccountMapper userAccountMapper;

    /**
     * 转账
     * @param dto
     * @return
     * @throws Exception
     */
    @Transactional
    @IsTryAgain
    public Result transfter(TransferDTO dto) throws Exception {
        updateAccount(dto.getFromUserId(),dto.getAmount(), Consts.ACCOUNT_OUT);
        updateAccount(dto.getToUserId(),dto.getAmount(), Consts.ACCOUNT_IN);
        return Result.ok();
    }

    /**
     * 更新账户余额
     * @param userId 账户用户ID
     * @param amount    操作金额数量
     * @param state     转入--1 ,转出 -- 0
     * @throws Exception
     */
    public void updateAccount( Long userId, BigDecimal amount,int state) throws Exception{
        UserAccount userAccount = userAccountMapper.getUserAccountByUserId(userId);
        if(userAccount == null){
            throw new ApiException(ApiResultEnum.ACCOUNT_NOT_FOUND);
        }
        BigDecimal afterBalance = BigDecimal.ZERO;
        if(state == Consts.ACCOUNT_OUT){
            if(userAccount.getBalance().compareTo(amount)<0){
                throw new ApiException(ApiResultEnum.ACCOUNT_NOT_SUFFICIENT);
            }
            afterBalance = userAccount.getBalance().subtract(amount);
        }else if(state == Consts.ACCOUNT_IN){
            afterBalance = userAccount.getBalance().add(amount);
        }
        userAccount.setBalance(afterBalance);
        userAccount.setModifyBy(userId);
        userAccount.setModifyTime(LocalDateTime.now());
        if(!userAccountMapper.updateAccount(userAccount)){
            //如果更新失败就抛出去,重试
            throw new TryAgainException(ApiResultEnum.ERROR_TRY_AGAIN);
        }
    }
}

代码不算多

只需要看 updateAccount 方法中的 userAccountMapper.updateAccount 这个方法,这个方法也就是 乐观锁 实现的原理, mapper 如下:

<update id="updateAccount" parameterType="top.lrshuai.optimisticlock.usr.entity.UserAccount">
        update usr_user_account set
        version = version+1
        <if test="userId != null">
            ,user_id = #{userId}
        </if>
        <if test="balance != null">
            ,balance = #{balance}
        </if>
        <if test="createBy != null">
            ,create_by = #{createBy}
        </if>
        <if test="createTime != null">
            ,create_time = #{createTime}
        </if>
        <if test="modifyBy != null">
            ,modify_by = #{modifyBy}
        </if>
        <if test="modifyTime != null">
           ,modify_time = #{modifyTime}
        </if>
        where
         version=#{version}
        <if test="id != null">
           and  id=#{id}
        </if>
        <if test="userId != null">
          and user_id=#{userId}
        </if>

    </update>

重点看 version 的变化

第一点: 更新条件 要匹配 version=上一次数据的version
第二点: 就是 更新成功的时候,把 version + 1
注意:当数据表有一个写锁时,其它进程的读写操作都需等待读锁释放后才会执行。所以保证了version 的正确性

乐观锁实现就是这么简单

如果说 更新失败了,那我们就抛异常,这样用户体验不是非常好,所以就有了重试机制

1、重试原理,利用到AOP 切片,然后通过 @Around 进行方法增强。

为了方便,创建一个自定义注解,在需要重试的方法上添加 注解即可

@IsTryAgain

重试注解

/**
 * 重试注解
 */
@Retention(RetentionPolicy.RUNTIME)
public @interface IsTryAgain {
}
TryAgainException

重试异常

public class TryAgainException extends ApiException {

    public TryAgainException(ApiResultEnum apiResultEnum) {
        super(apiResultEnum);
    }

}
TryAgainAspect
@Aspect
@Configuration
public class TryAgainAspect {

	/**
	 * 默认重试几次
	 */
	private static final int    DEFAULT_MAX_RETRIES = 3;

	private int                 maxRetries          = DEFAULT_MAX_RETRIES;
	private int                 order               = 1;

	public void setMaxRetries(int maxRetries) {
		this.maxRetries = maxRetries;
	}

	public int getOrder() {
		return this.order;
	}

	@Pointcut("@annotation(IsTryAgain)")
	public void retryOnOptFailure() {
		// pointcut mark
	}

	@Around("retryOnOptFailure()")
	@Transactional(rollbackFor = Exception.class)
	public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
		int numAttempts = 0;
		do {
			numAttempts++;
			try {
				//再次执行业务代码
				return pjp.proceed();
			} catch (TryAgainException ex) {
				if (numAttempts > maxRetries) {
					//log failure information, and throw exception
//					如果大于 默认的重试机制 次数,我们这回就真正的抛出去了
					throw new ApiException(ApiResultEnum.ERROR_TRY_AGAIN_FAILED);
				}else{
					//如果 没达到最大的重试次数,将再次执行
					System.out.println("=====正在重试====="+numAttempts+"次");
				}
			}
		} while (numAttempts <= this.maxRetries);

		return null;
	}
}

现在,反回去看上面的UserAccountService 或许就是知道什么意思了,和平常写的业务代码一样,唯一多的只是一个 @IsTryAgain 注解而已

重试可以,但是不能无限重试吧,所以也是会有重试失败的时候,如果你给它重试次数越多,失败就越少,我这里为了演示只有3次机会。

测试类,测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class ThreadTest {

    @Autowired
    private IUserAccountService userAccountService;

    /**
     * 并发测试重试机制
     *
     * @throws Exception
     */
    @Test
    public void test() throws Exception {

        TransferDTO dto = new TransferDTO();
        dto.setFromUserId(1001l);
        dto.setToUserId(1002l);
        dto.setAmount(BigDecimal.ONE);

        int clientTotal = 100;
        // 同时并发执行的线程数
        int threadTotal = 20;
        int count = 0;
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量,此处用于控制并发的线程数
        final Semaphore semaphore = new Semaphore(threadTotal);
        //闭锁,可实现计数器递减
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    //执行此方法用于获取执行许可,当总计未释放的许可数不超过200时,
                    //允许通行,否则线程阻塞等待,直到获取到许可。
                    semaphore.acquire();
                    userAccountService.transfter(dto);
                    //释放许可
                    semaphore.release();
                } catch (Exception e) {
                    //log.error("exception", e);
                    e.printStackTrace();

                }
                //闭锁减一
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行
        executorService.shutdown();
    }
}

结果如下:
SpringBoot Mybatis 乐观锁重试机制代码实现

100 个请求,20 个并发,好多个都是重试成功了,少部分失败了。

完整源代码:Github地址:https://github.com/rstyro/optimistic-lock

完整源代码:Gitee地址:https://gitee.com/rstyro/optimistic-lock