如何使用悲观锁定修复乐观锁定竞争条件
概括
在我以前的文章中 ,我解释了使用显式乐观锁定的好处。 然后我们发现,在很短的时间范围内,并发交易仍可以在我们当前交易被提交之前立即提交产品价格更改。
- 爱丽丝拿产品
- 然后,她决定订购
- 获得产品乐观锁
- 订单已插入当前交易数据库会话中
- 产品版本由Hibernate显式乐观锁定例程检查
- 价格引擎设法提交产品价格更改
- 承诺进行Alice交易而未意识到产品价格刚刚改变
复制问题
因此,我们需要一种在乐观锁支票和订单交易提交之间注入产品价格变化的方法。
在分析了Hibernate源代码之后,我们发现SessionImpl.beforeTransactionCompletion()方法正在内部actionQueue阶段处理程序(检查显式乐观锁定实体版本)之后紧接着调用当前配置的Interceptor.beforeTransactionCompletion()回调:
public void beforeTransactionCompletion(TransactionImplementor hibernateTransaction) { LOG.trace( "before transaction completion" ); actionQueue.beforeTransactionCompletion(); try { interceptor.beforeTransactionCompletion( hibernateTransaction ); } catch (Throwable t) { LOG.exceptionInBeforeTransactionCompletionInterceptor( t ); } }
有了这些信息,我们可以设置一个测试来复制我们的比赛条件:
private AtomicBoolean ready = new AtomicBoolean(); private final CountDownLatch endLatch = new CountDownLatch(1); @Override protected Interceptor interceptor() { return new EmptyInterceptor() { @Override public void beforeTransactionCompletion(Transaction tx) { if(ready.get()) { LOGGER.info("Overwrite product price asynchronously"); executeNoWait(new Callable<Void>() { @Override public Void call() throws Exception { Session _session = getSessionFactory().openSession(); _session.doWork(new Work() { @Override public void execute(Connection connection) throws SQLException { try(PreparedStatement ps = connection.prepareStatement("UPDATE product set price = 14.49 WHERE id = 1")) { ps.executeUpdate(); } } }); _session.close(); endLatch.countDown(); return null; } }); try { LOGGER.info("Wait 500 ms for lock to be acquired!"); Thread.sleep(500); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } }; } @Test public void testExplicitOptimisticLocking() throws InterruptedException { try { doInTransaction(new TransactionCallable<Void>() { @Override public Void execute(Session session) { try { final Product product = (Product) session.get(Product.class, 1L, new LockOptions(LockMode.OPTIMISTIC)); OrderLine orderLine = new OrderLine(product); session.persist(orderLine); lockUpgrade(session, product); ready.set(true); } catch (Exception e) { throw new IllegalStateException(e); } return null; } }); } catch (OptimisticEntityLockException expected) { LOGGER.info("Failure: ", expected); } endLatch.await(); } protected void lockUpgrade(Session session, Product product) {}
运行它时,测试将生成以下输出:
#Alice selects a Product DEBUG [main]: Query:{[select abstractlo0_.id as id1_1_0_, abstractlo0_.description as descript2_1_0_, abstractlo0_.price as price3_1_0_, abstractlo0_.version as version4_1_0_ from product abstractlo0_ where abstractlo0_.id=?][1]} #Alice inserts an OrderLine DEBUG [main]: Query:{[insert into order_line (id, product_id, unitPrice, version) values (default, ?, ?, ?)][1,12.99,0]} #Alice transaction verifies the Product version DEBUG [main]: Query:{[select version from product where id =?][1]} #The price engine thread is started INFO [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Overwrite product price asynchronously #Alice thread sleeps for 500ms to give the price engine a chance to execute its transaction INFO [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Wait 500 ms for lock to be acquired! #The price engine changes the Product price DEBUG [pool-1-thread-1]: Query:{[UPDATE product set price = 14.49 WHERE id = 1][]} #Alice transaction is committed without realizing the Product price change DEBUG [main]: o.h.e.t.i.j.JdbcTransaction - committed JDBC Connection
因此,比赛条件是真实的。 由您决定当前的应用程序是否需要更强的数据完整性要求,但是根据经验,安全性胜过遗憾。
解决问题
要解决此问题,我们只需要在结束事务处理方法之前添加一个悲观的锁定请求即可。
@Override protected void lockUpgrade(Session session, Product product) { session.buildLockRequest(new LockOptions(LockMode.PESSIMISTIC_READ)).lock(product); }
显式共享锁将防止对我们之前乐观地锁定的实体进行并发写入。 使用此方法,在释放此锁之前(在提交或回滚当前事务之后),没有其他并发事务可以更改产品。
有了新的悲观锁定请求,先前的测试将产生以下输出:
#Alice selects a Product DEBUG [main]: Query:{[select abstractlo0_.id as id1_1_0_, abstractlo0_.description as descript2_1_0_, abstractlo0_.price as price3_1_0_, abstractlo0_.version as version4_1_0_ from product abstractlo0_ where abstractlo0_.id=?][1]} #Alice inserts an OrderLine DEBUG [main]: Query:{[insert into order_line (id, product_id, unitPrice, version) values (default, ?, ?, ?)][1,12.99,0]} #Alice applies an explicit physical lock on the Product entity DEBUG [main]: Query:{[select id from product where id =? and version =? for update][1,0]} #Alice transaction verifies the Product version DEBUG [main]: Query:{[select version from product where id =?][1]} #The price engine thread is started INFO [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Overwrite product price asynchronously #Alice thread sleeps for 500ms to give the price engine a chance to execute its transaction INFO [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Wait 500 ms for lock to be acquired! #The price engine cannot proceed because of the Product entity was locked exclusively, so Alice transaction is committed against the ordered Product price DEBUG [main]: o.h.e.t.i.j.JdbcTransaction - committed JDBC Connection #The physical lock is released and the price engine can change the Product price DEBUG [pool-1-thread-1]: Query:{[UPDATE product set price = 14.49 WHERE id = 1][]}
即使我们要求使用PESSIMISTIC_READ锁,HSQLDB也只能执行FOR UPDATE排他锁,等效于显式的PESSIMISTIC_WRITE锁模式。
结论
如果您想知道为什么我们在当前事务中同时使用乐观锁定和悲观锁定,则必须记住, 乐观锁定是用于多请求对话的唯一可行的并发控制机制。
在我们的示例中,第一个请求使用只读事务加载了Product实体。 产品实体具有关联的版本,并且在写时事务期间将乐观地锁定此读时实体快照。
悲观锁仅在写时事务期间有用,以防止在检查产品实体版本后发生任何并发更新。 因此,逻辑锁和物理锁都可以协同工作以确保订单价格数据的完整性。
当我撰写此博客文章时, Java冠军 Markus Eisele 接受了有关Hibernate Master Class计划的采访 。 在采访中,我试图解释当前的帖子示例,同时强调了解参考文档之外的工具的真正重要性。
- 代码可在GitHub上获得 。