0%

乐观锁重试机制代码实现

乐观锁重试机制代码实现

有乐观锁,那当然也是有悲观锁的

悲观锁和乐观锁的原理和应用场景

悲观锁(Pessimistic Lock)

顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock)

顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁

应用场景

悲观锁:

比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。

乐观锁:

比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。

总结:两种所各有优缺点,读取频繁使用乐观锁,写入频繁使用悲观锁。

代码实现原理

这里采用版本号的方式

简单例子表结构

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `usr_user_account` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`balance` decimal(50,10) DEFAULT '0.0000000000' COMMENT '余额',
`version` bigint(20) DEFAULT '0' COMMENT '乐观锁版本号',
`create_by` bigint(20) DEFAULT NULL COMMENT ' 创建人',
`create_time` datetime NOT NULL COMMENT ' 创建时间',
`modify_by` bigint(20) DEFAULT NULL,
`modify_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='用户账户表';

比如现在表中有两条数据
数据图

现在 user_id = 1001user_id=1002 转账 50
转账,无非就是一加一减,我就不啰嗦了,先看完整代码实现 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Service
@Transactional
public class UserAccountService implements IUserAccountService{

@Autowired
private UserAccountMapper userAccountMapper;

/**
* 转账
* @param dto
* @return
* @throws Exception
*/
@Transactional
@IsTryAgain
public Result transfter(TransferDTO dto) throws Exception {
updateAccount(dto.getFromUserId(),dto.getAmount(), Consts.ACCOUNT_OUT);
updateAccount(dto.getToUserId(),dto.getAmount(), Consts.ACCOUNT_IN);
return Result.ok();
}

/**
* 更新账户余额
* @param userId 账户用户ID
* @param amount 操作金额数量
* @param state 转入--1 ,转出 -- 0
* @throws Exception
*/
public void updateAccount( Long userId, BigDecimal amount,int state) throws Exception{
UserAccount userAccount = userAccountMapper.getUserAccountByUserId(userId);
if(userAccount == null){
throw new ApiException(ApiResultEnum.ACCOUNT_NOT_FOUND);
}
BigDecimal afterBalance = BigDecimal.ZERO;
if(state == Consts.ACCOUNT_OUT){
if(userAccount.getBalance().compareTo(amount)<0){
throw new ApiException(ApiResultEnum.ACCOUNT_NOT_SUFFICIENT);
}
afterBalance = userAccount.getBalance().subtract(amount);
}else if(state == Consts.ACCOUNT_IN){
afterBalance = userAccount.getBalance().add(amount);
}
userAccount.setBalance(afterBalance);
userAccount.setModifyBy(userId);
userAccount.setModifyTime(LocalDateTime.now());
if(!userAccountMapper.updateAccount(userAccount)){
//如果更新失败就抛出去,重试
throw new TryAgainException(ApiResultEnum.ERROR_TRY_AGAIN);
}
}
}

代码不算多

只需要看 updateAccount 方法中的 userAccountMapper.updateAccount 这个方法,这个方法也就是 乐观锁 实现的原理, mapper 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<update id="updateAccount" parameterType="top.lrshuai.optimisticlock.usr.entity.UserAccount">
update usr_user_account set
version = version+1
<if test="userId != null">
,user_id = #{userId}
</if>
<if test="balance != null">
,balance = #{balance}
</if>
<if test="createBy != null">
,create_by = #{createBy}
</if>
<if test="createTime != null">
,create_time = #{createTime}
</if>
<if test="modifyBy != null">
,modify_by = #{modifyBy}
</if>
<if test="modifyTime != null">
,modify_time = #{modifyTime}
</if>
where
version=#{version}
<if test="id != null">
and id=#{id}
</if>
<if test="userId != null">
and user_id=#{userId}
</if>

</update>

重点看 version 的变化

第一点: 更新条件 要匹配 version=上一次数据的version
第二点: 就是 更新成功的时候,把 version + 1
注意:当数据表有一个写锁时,其它进程的读写操作都需等待读锁释放后才会执行。所以保证了version 的正确性

乐观锁实现就是这么简单

如果说 更新失败了,那我们就抛异常,这样用户体验不是非常好,所以就有了重试机制

1、重试原理,利用到AOP 切片,然后通过 @Around 进行方法增强。

为了方便,创建一个自定义注解,在需要重试的方法上添加 注解即可

@IsTryAgain

重试注解

1
2
3
4
5
6
/**
* 重试注解
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface IsTryAgain {
}
TryAgainException

重试异常

1
2
3
4
5
6
7
public class TryAgainException extends ApiException {

public TryAgainException(ApiResultEnum apiResultEnum) {
super(apiResultEnum);
}

}
TryAgainAspect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Aspect
@Configuration
public class TryAgainAspect implements Ordered {

/**
* 默认重试几次
*/
private static final int DEFAULT_MAX_RETRIES = 3;

private int maxRetries = DEFAULT_MAX_RETRIES;
private int order = 1;

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public int getOrder() {
return this.order;
}

@Pointcut("@annotation(IsTryAgain)")
public void retryOnOptFailure() {
// pointcut mark
}

@Around("retryOnOptFailure()")
@Transactional(rollbackFor = Exception.class)
public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
int numAttempts = 0;
do {
numAttempts++;
try {
//再次执行业务代码
return pjp.proceed();
} catch (TryAgainException ex) {
if (numAttempts > maxRetries) {
//log failure information, and throw exception
// 如果大于 默认的重试机制 次数,我们这回就真正的抛出去了
throw new ApiException(ApiResultEnum.ERROR_TRY_AGAIN_FAILED);
}else{
//如果 没达到最大的重试次数,将再次执行
System.out.println("=====正在重试====="+numAttempts+"次");
}
}
} while (numAttempts <= this.maxRetries);

return null;
}
}

现在,反回去看上面的UserAccountService 或许就是知道什么意思了,和平常写的业务代码一样,唯一多的只是一个 @IsTryAgain 注解而已

重试可以,但是不能无限重试吧,所以也是会有重试失败的时候,如果你给它重试次数越多,失败就越少,我这里为了演示只有3次机会。

测试类,测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@RunWith(SpringRunner.class)
@SpringBootTest
public class ThreadTest {

@Autowired
private IUserAccountService userAccountService;

/**
* 并发测试重试机制
*
* @throws Exception
*/
@Test
public void test() throws Exception {

TransferDTO dto = new TransferDTO();
dto.setFromUserId(1001l);
dto.setToUserId(1002l);
dto.setAmount(BigDecimal.ONE);

int clientTotal = 100;
// 同时并发执行的线程数
int threadTotal = 20;
int count = 0;
ExecutorService executorService = Executors.newCachedThreadPool();
//信号量,此处用于控制并发的线程数
final Semaphore semaphore = new Semaphore(threadTotal);
//闭锁,可实现计数器递减
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
//执行此方法用于获取执行许可,当总计未释放的许可数不超过200时,
//允许通行,否则线程阻塞等待,直到获取到许可。
semaphore.acquire();
userAccountService.transfter(dto);
//释放许可
semaphore.release();
} catch (Exception e) {
//log.error("exception", e);
e.printStackTrace();

}
//闭锁减一
countDownLatch.countDown();
});
}
countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行
executorService.shutdown();
}
}

结果如下:
控制台输出

100 个请求,20 个并发,好多个都是重试成功了,少部分失败了。

完整源代码:Github地址:https://github.com/rstyro/optimistic-lock

完整源代码:Gitee地址:https://gitee.com/rstyro/optimistic-lock

您的打赏,是我创作的动力!不给钱?那我只能靠想象力充饥了。