一、背景

定时任务查询出失效的数据,之后调用方法重新更新失效的数据,其中更新失效数据不保证一定会更新为非失效数据。失效数据量最多有两万条,并且更新方法中有两个查询和一个更新方法,方法整体耗时在一百毫秒左右。为了避免查询出的数据过多,对查询方法进行分页处理,并且将查询结果进行并行处理以加快处理速度。

二、问题

查询方法使用分页,从第一页开始处理,之后依次处理第二页、第三页直到查询出的结果集为空为止。处理操作会更新数据,成功更新后的数据再使用查询方法的条件进行查询是查询不出来的。这里就需要保证分页查询方法在查询每一页时的基础数据集是相同的,不能因为更新方法将数据从无效更新为有效而导致基础数据集减少。比如,总共 100 条数据,每页 10 条,第一页成功处理了 10 条,此时 100 条中还剩 90 条需要处理,之后查询第二页,不能让查询出的第二页数据为 90 条中的第二页,要保证是 100 条中的第二页,否则将会跳过一页的数据。

涉及到的知识点:

  • 事务隔离级别,使用 PostgreSQL 数据库,默认隔离级别为读已提交。
    • 读已提交。一个事务只能看到在它开始之前已经提交的数据,不能读取其他事务尚未提交的数据,避免了脏读。但是,如果同一个事务在不同时间点对同一数据进行多次读取,可能会因为其他事务的提交而看到不同的结果,这被称为不可重复读。此外,由于其他事务可能插入新的行,导致第一次查询的结果集与第二次查询不一致,这称为幻读。
    • 可重复读。一个事务在整个事务期间看到的数据是一致的,即它能够重复读取同一数据而不受其他事务的影响。PostgreSQL 在此级别时,不可重复读和幻读都是不会发生的。
  • 事务传播机制。
    • REQUIRED,如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。
    • REQUIRES_NEW,新建事务,如果当前存在事务,把当前事务挂起。
    • NESTED,如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与 REQUIRED 类似的操作(创建新事务)。
    • 关于事务传播行为的测试,参考 Spring 中事务传播机制 | z2huo

从上面事务隔离级别和事务传播机制中可以看出,问题为不可重复读问题。

三、代码示例

1、DAO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Repository  
@RequiredArgsConstructor
public class UserDAO {

private final UserMapper userMapper;

public List<UserDO> listInvalidUser(Integer pageNo, Integer pageSize) {
PageHelper.startPage(pageNo, pageSize);

UserDOExample userDOExample = new UserDOExample();
UserDOExample.Criteria criteria = userDOExample.createCriteria();
criteria.andValidFlagEqualTo("0");
userDOExample.setOrderByClause("user_code, id");
List<UserDO> userDOS = userMapper.selectByExample(userDOExample);
return userDOS;
}

public void updateUserValidFlag(String userCode, Long userId) {
UserDO userDO = new UserDO().setValidFlag("1");
UserDOExample userDOExample = new UserDOExample();
UserDOExample.Criteria criteria = userDOExample.createCriteria();
criteria.andIdEqualTo(userId);
criteria.andUserCodeEqualTo(userCode);
userMapper.updateByExampleSelective(userDO, userDOExample);
}
}

DAO 提供两个方法,listInvalidUser 方法为分页查询失效用户数据,用户数据根据用户代码和 ID 进行排序。updateUserValidFlag 方法根据用户代码和用户 ID 将用户有效状态更新为有效。

2、Service

只是示例,不考虑业务。

2.1 查询类

这个类会查询失效数据,并调用另一个类中的更新方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
@Service
@RequiredArgsConstructor
public class UserService {

private final UserDAO userDAO;

private final UserService2 userService2;

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
public void processInvalidUserData() {
for (int pageSize = 2, pageNo = 1; ; pageNo++) {
List<UserDO> userDOList = userDAO.listInvalidUser(pageNo, pageSize);
if (CollectionUtils.isEmpty(userDOList)) {
break;
}
userDOList.parallelStream().forEach(userDO -> {
try {
userService2.updateUserValidFlag(userDO.getUserCode(), userDO.getId());
} catch (Exception e) {
log.error("process data error, userCode is {}, userId is {}",
userDO.getUserCode(), userDO.getId());
}
});
}
}
}

processInvalidUserData 方法需要添加事务注解,并指定传播行为为 REQUIRED,隔离级别为可重复读。这样就能保证事务中分页读取失效用户数据时不会发生幻读问题。

updateUserValidFlag 方法的调用捕获异常,防止方法发生异常导致循环终止。

使用并行流来对失效用户结果集并行处理,加快处理速度。

关于 @Transactional 注解参考 Spring 事务注解 @Transactional | z2huo

2.2 更新类

1
2
3
4
5
6
7
8
9
10
11
12
@Service  
@RequiredArgsConstructor
public class UserService2 {

private final UserDAO userDAO;

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public void updateUserValidFlag(String userCode, Long userId) {
userDAO.updateUserValidFlag(userCode, userId);
}

}

updateUserValidFlag 方法事务的传播行为使用 REQUIRES_NEW,每一次更新操作都使用新事务,不会因为方法之间的调用导致事务发生回滚,除非方法本身抛出异常。

该方法事务的隔离级别可以不为可重复读,使用默认的读已提交即可。

四、其他

1、事务隔离级别验证

PostgreSQL 中可以通过 start transaction ... commit 命令包围 SQL 来手动创建和提交事务,begin ... commit 也可。并且在创建事务的时候,可以指定事务的隔离级别。

1.1 读已提交隔离级别

在 dataGrip 中打开两个查询,第一个查询中执行如下的 SQL:

1
2
3
4
5
6
7
8
9
10
11
start transaction isolation level READ COMMITTED  

select * from z2huo_user
WHERE valid_flag = '0'
order by user_code, id;

select * from z2huo_user
WHERE valid_flag = '0'
order by user_code, id;

commit;

第二个查询中执行如下的 SQL:

1
2
3
4
5
start transaction isolation level READ COMMITTED

update z2huo_user set valid_flag = '1' where id in (10529173407633408, 10529646898384896);

commit;

两个查询中的 SQL 交替执行,汇总如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- query 1
start transaction isolation level READ COMMITTED
select * from z2huo_user
WHERE valid_flag = '0'
order by user_code, id;

-- query 2
start transaction isolation level READ COMMITTED
update z2huo_user set valid_flag = '1' where id in (10529173407633408, 10529646898384896);
commit;

-- query 1
select * from z2huo_user
WHERE valid_flag = '0'
order by user_code, id;

commit;

也就是在第一个查询的事务的两个查询语句中间加上第二个查询的更新操作。现象为:查询一的第二个查询语句与第一个查询语句的查询结果不一致,发生幻读现象。

1.2 可重复读隔离级别

两个事务切换为可重复读隔离级别,或者只有查询一中的事务切换为可重复读隔离级别时,可以解决幻读的问题。汇总 SQL 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
start transaction isolation level REPEATABLE READ
select * from z2huo_user
WHERE valid_flag = '0'
order by user_code, id;

-- query 2
start transaction isolation level READ COMMITTED
update z2huo_user set valid_flag = '1' where id in (10529173407633408, 10529646898384896);
commit;

-- query 1
select * from z2huo_user
WHERE valid_flag = '0'
order by user_code, id;

commit;

修改隔离级别为可重复读后,查询一中的两个查询语句查询结果就一致了。

相关链接

Spring 中事务传播机制 | z2huo

Spring 事务注解 @Transactional | z2huo

[[Propagation 传播机制]]

[[@Transactional]]

OB tags

#Spring #事务