乐观锁重试机制代码实现
有乐观锁,那当然也是有悲观锁的
悲观锁和乐观锁的原理和应用场景
悲观锁(Pessimistic Lock)
顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁(Optimistic Lock)
顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁
应用场景
悲观锁:
比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。
乐观锁:
比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。
总结:两种所各有优缺点,读取频繁使用乐观锁,写入频繁使用悲观锁。
代码实现原理
这里采用版本号的方式
简单例子表结构
1 2 3 4 5 6 7 8 9 10 11 12
| CREATE TABLE `usr_user_account` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `user_id` bigint(20) NOT NULL COMMENT '用户ID', `balance` decimal(50,10) DEFAULT '0.0000000000' COMMENT '余额', `version` bigint(20) DEFAULT '0' COMMENT '乐观锁版本号', `create_by` bigint(20) DEFAULT NULL COMMENT ' 创建人', `create_time` datetime NOT NULL COMMENT ' 创建时间', `modify_by` bigint(20) DEFAULT NULL, `modify_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='用户账户表';
|
比如现在表中有两条数据
现在 user_id = 1001
给 user_id=1002
转账 50
元
转账,无非就是一加一减
,我就不啰嗦了,先看完整代码实现 如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Service @Transactional public class UserAccountService implements IUserAccountService{
@Autowired private UserAccountMapper userAccountMapper;
@Transactional @IsTryAgain public Result transfter(TransferDTO dto) throws Exception { updateAccount(dto.getFromUserId(),dto.getAmount(), Consts.ACCOUNT_OUT); updateAccount(dto.getToUserId(),dto.getAmount(), Consts.ACCOUNT_IN); return Result.ok(); }
public void updateAccount( Long userId, BigDecimal amount,int state) throws Exception{ UserAccount userAccount = userAccountMapper.getUserAccountByUserId(userId); if(userAccount == null){ throw new ApiException(ApiResultEnum.ACCOUNT_NOT_FOUND); } BigDecimal afterBalance = BigDecimal.ZERO; if(state == Consts.ACCOUNT_OUT){ if(userAccount.getBalance().compareTo(amount)<0){ throw new ApiException(ApiResultEnum.ACCOUNT_NOT_SUFFICIENT); } afterBalance = userAccount.getBalance().subtract(amount); }else if(state == Consts.ACCOUNT_IN){ afterBalance = userAccount.getBalance().add(amount); } userAccount.setBalance(afterBalance); userAccount.setModifyBy(userId); userAccount.setModifyTime(LocalDateTime.now()); if(!userAccountMapper.updateAccount(userAccount)){ throw new TryAgainException(ApiResultEnum.ERROR_TRY_AGAIN); } } }
|
代码不算多
只需要看 updateAccount
方法中的 userAccountMapper.updateAccount
这个方法,这个方法也就是 乐观锁 实现的原理, mapper 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| <update id="updateAccount" parameterType="top.lrshuai.optimisticlock.usr.entity.UserAccount"> update usr_user_account set version = version+1 <if test="userId != null"> ,user_id = #{userId} </if> <if test="balance != null"> ,balance = #{balance} </if> <if test="createBy != null"> ,create_by = #{createBy} </if> <if test="createTime != null"> ,create_time = #{createTime} </if> <if test="modifyBy != null"> ,modify_by = #{modifyBy} </if> <if test="modifyTime != null"> ,modify_time = #{modifyTime} </if> where version=#{version} <if test="id != null"> and id=#{id} </if> <if test="userId != null"> and user_id=#{userId} </if>
</update>
|
重点看 version 的变化
第一点: 更新条件 要匹配 version=上一次数据的version
第二点: 就是 更新成功的时候,把 version + 1
注意:当数据表有一个写锁时,其它进程的读写操作都需等待读锁释放后才会执行。所以保证了version
的正确性
乐观锁实现就是这么简单
如果说 更新失败了,那我们就抛异常,这样用户体验不是非常好,所以就有了重试机制
1、重试原理,利用到AOP 切片,然后通过 @Around 进行方法增强。
为了方便,创建一个自定义注解,在需要重试的方法上添加 注解即可
@IsTryAgain
重试注解
1 2 3 4 5 6
|
@Retention(RetentionPolicy.RUNTIME) public @interface IsTryAgain { }
|
TryAgainException
重试异常
1 2 3 4 5 6 7
| public class TryAgainException extends ApiException {
public TryAgainException(ApiResultEnum apiResultEnum) { super(apiResultEnum); }
}
|
TryAgainAspect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Aspect @Configuration public class TryAgainAspect implements Ordered {
private static final int DEFAULT_MAX_RETRIES = 3;
private int maxRetries = DEFAULT_MAX_RETRIES; private int order = 1;
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public int getOrder() { return this.order; }
@Pointcut("@annotation(IsTryAgain)") public void retryOnOptFailure() { }
@Around("retryOnOptFailure()") @Transactional(rollbackFor = Exception.class) public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable { int numAttempts = 0; do { numAttempts++; try { return pjp.proceed(); } catch (TryAgainException ex) { if (numAttempts > maxRetries) {
throw new ApiException(ApiResultEnum.ERROR_TRY_AGAIN_FAILED); }else{ System.out.println("=====正在重试====="+numAttempts+"次"); } } } while (numAttempts <= this.maxRetries);
return null; } }
|
现在,反回去看上面的UserAccountService
或许就是知道什么意思了,和平常写的业务代码一样,唯一多的只是一个 @IsTryAgain
注解而已
重试可以,但是不能无限重试吧,所以也是会有重试失败的时候,如果你给它重试次数越多,失败就越少,我这里为了演示只有3次机会。
测试类,测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @RunWith(SpringRunner.class) @SpringBootTest public class ThreadTest {
@Autowired private IUserAccountService userAccountService;
@Test public void test() throws Exception {
TransferDTO dto = new TransferDTO(); dto.setFromUserId(1001l); dto.setToUserId(1002l); dto.setAmount(BigDecimal.ONE);
int clientTotal = 100; int threadTotal = 20; int count = 0; ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); userAccountService.transfter(dto); semaphore.release(); } catch (Exception e) { e.printStackTrace();
} countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); } }
|
结果如下:
100 个请求,20 个并发,好多个都是重试成功了,少部分失败了。