想不通StaleObjectStateException
我有一个困难时期试图找出我不断看到的原因:想不通StaleObjectStateException
`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`
我有一个使用Quartz调度消防工作服务,在我的背景下,这些作业被称为Flows
,每个流程可能由几个Tasks
组成,流程和任务为Executables
,关于其实际Executions
的信息存储为FlowExecutions
和TaskExecutions
。该服务使用FlowService
启动流程。
UPD:有一个石英工作,“ExecutorJob”负责射击我的流量/任务。当它被触发时,它使用FlowService来启动它所应该做的任何事情。所以我想知道是否有可能石英线程不会在每次使用服务时创建新的休眠会话,这是问题的原因。我没有改变FlowService的范围,所以它是一个单例,GORM如何管理它使用的会话?
UPD2:尝试使用ExecutorJob上的persistenceContextInterceptor来确保每次使用该服务都使用新会话,但它没有解决问题。添加了ExecutorJob的简化代码。
我无法在本地重现问题,但它在生产中经常发生,更具体地说,当有大量流程启动时。 我试过同步execute
方法的任务和流程,但它没有奏效,我会尝试使用悲观锁现在,但我的猜测是,它不会解决问题,因为检查应用程序日志它似乎在那里不是两个线程更新同一行。以下我试图展示一个模拟项目结构的简化版本。
// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
static hasMany = [flowTasks: FlowTask]
static transients = ['executions']
List<Execution> getExecutions() {
this.id ? Execution.findAllByExecutable(this) : []
}
void addToExecutions(Execution execution) {
execution.executable = this
execution.save()
}
abstract List<Execution> execute(Map params)
}
class Flow extends Executable {
SortedSet<FlowTask> tasks
static hasMany = [tasks: FlowTask]
private static final Object lockExecute = new Object()
private static final Object lockExecuteTask = new Object()
List<FlowExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiParams = multiplyParams(params)
multiParams.collect { Map param ->
FlowExecution flowExecution = new FlowExecution()
addToExecutions(flowExecution)
flowExecution.save()
this.attach()
save()
executeTasks(firstTasks(param), flowExecution, param)
}
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the executions that must be started
[params]
}
Set<FlowTask> firstTasks(Map params) {
// finds the first tasks to be executed for the flow
tasks.findAdll { true }
}
private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
synchronized (lockExecuteTask) {
tasks.each { FlowTask flowTask ->
try {
List<Execution> executions = flowTask.execute(params)
executions.each { Execution execution ->
flowExecution.addToExecutions(execution)
}
flowExecution.attach()
} catch {
// log error executing task
throw e
}
}
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving flow
throw e
}
flowExecution
}
}
}
class Task extends Executable {
private static final Object lockExecute = new Object()
private static final Object lockGetExecution = new Object()
TaskExecution execute(TaskExecution execution) {
taskService.start(execution)
execution
}
List<TaskExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiExecParams = multiplyParams(params)
multiExecParams.collect { Map param ->
TaskExecution execution = getExecution(param)
execute(execution)
}
}
}
TaskExecution getExecution(Map params) {
synchronized (lockGetExecution) {
TaskExecution execution = new TaskExecution(executable: this)
execution.setParameters(params)
addToExecutions(execution)
execution.attach()
execution.flowExecution?.attach()
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving task
throw e
}
execution
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the tasks that must be started
[params]
}
}
class FlowTask {
static belongsTo = [flow: Flow, executable: Executable]
List<Execution> execute(Map params) {
executable.execute(params)
}
}
abstract class Execution {
Map parameterData = [:]
static belongsTo = [executable: Executable, flowExecution: FlowExecution]
static transients = ['parameters', 'taskExecutions']
void setParameters(Map params) {
params.each { key, value ->
parameterData[key] = JsonParser.toJson(value)
}
}
}
class TaskExecution extends Execution {
}
class FlowExecution extends Execution {
List<Execution> executions
static transients = ['executions']
FlowExecution() {
executions = []
}
Set<TaskExecution> getTaskExecutions() {
executions?.collect { Execution execution ->
return execution.taskExecution
}?.flatten()?.toSet()
}
void addToExecutions(Execution execution){
executions.add(execution)
execution.flowExecution = this
execution.save()
}
def onLoad() {
try {
executions = this.id ? Execution.findAllByFlowExecution(this) : []
} catch (Exception e){
log.error(e)
[]
}
}
}
// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {
Map start(long flowId, Map params) {
Flow flow = Flow.lock(flowId)
startFlow(flow, params)
}
private Map startFlow(Flow flow, Map params) {
List<RunningFlow> runningFlows = flow.execute(params)
[data: [success: true], status: HTTP_OK]
}
}
//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {
def grailsApplication = Holders.getGrailsApplication()
static triggers = {}
private Thread thread
void execute(JobExecutionContext context) throws JobExecutionException {
thread = Thread.currentThread()
synchronized (LockContainer.taskLock) {
Map params = context.mergedJobDataMap
def persistenceInterceptor = persistenceInterceptorInstance
try {
persistenceInterceptor.init()
Long executableId = params.executableId as Long
def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
service.start(executableId, params)
} catch (Exception e) {
// log error
} finally {
persistenceInterceptor.flush()
persistenceInterceptor.destroy()
}
}
}
PersistenceContextInterceptor getPersistenceInterceptorInstance() {
grailsApplication.mainContext.getBean('persistenceInterceptor')
}
FluxoService getFlowServiceInstance() {
grailsApplication.mainContext.getBean('flowService')
}
TarefaService getTaskServiceInstance() {
grailsApplication.mainContext.getBean('taskService')
}
@Override
void interrupt() throws UnableToInterruptJobException {
thread?.interrupt()
}
}
任何人都知道的东西,可以帮助?
那么,很难理解发生了什么问题。但是,我猜这个错误是在会话中有一个对象已经被其他事务保存或更新时引发的。同样,当hibernate试图保存这个对象时,它给出了行被更新错误的另一个事务错误。
我想你可以在保存你的对象之前尝试刷新,看看它是怎么回事。
http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html
def b = Book.get(1)
…
b.refresh()
哪里是检索流程实例,并调用Flow.execute()的代码? –
这是FlowService的启动方法。 –
优秀。这是我需要看到的代码。 –