QuartzSchedulerThread的执行分析.docx_第1页
QuartzSchedulerThread的执行分析.docx_第2页
QuartzSchedulerThread的执行分析.docx_第3页
QuartzSchedulerThread的执行分析.docx_第4页
QuartzSchedulerThread的执行分析.docx_第5页
已阅读5页,还剩19页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();只有availThreadCount大于0时才会进行真正的调度,负责将轮询等待线程的释放。所以我们来看看可用线程数充足的情况下的执行过程。获取触发器获取触发器的代码见代码清单1。其中调用了JobStore的acquireNextTriggers方法来获取触发器。代码清单1 List triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize(), qsRsrcs.getBatchTimeWindow(); lastAcquireFailed = false; if (log.isDebugEnabled() log.debug(batch acquisition of + (triggers = null ? 0 : triggers.size() + triggers); catch (JobPersistenceException jpe) /省略异常信息 catch (RuntimeException e) /省略异常信息 以JobStore的实现类LocalDataSourceJobStore来具体看看acquireNextTriggers方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的acquireNextTriggers方法(见代码清单2),此方法用于从数据源获取触发器。代码清单2 public List acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException String lockName; if(isAcquireTriggersWithinLock() | maxCount 1) lockName = LOCK_TRIGGER_ACCESS; else lockName = null; return executeInNonManagedTXLock(lockName, new TransactionCallbackList() public List execute(Connection conn) throws JobPersistenceException return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); , new TransactionValidatorList() public Boolean validate(Connection conn, List result) throws JobPersistenceException try List acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId(); Set fireInstanceIds = new HashSet(); for (FiredTriggerRecord ft : acquired) fireInstanceIds.add(ft.getFireInstanceId(); for (OperableTrigger tr : result) if (fireInstanceIds.contains(tr.getFireInstanceId() return true; return false; catch (SQLException e) throw new JobPersistenceException(error validating trigger acquisition, e); ); JobStoreSupport的acquireNextTriggers方法,主要调用了executeInNonManagedTXLock方法(见代码清单3),其执行逻辑如下:1. 获取数据库连接;2. 回调txCallback(即代码清单2中的TransactionCallback的匿名类)的execute方法,因此调用了acquireNextTrigger方法获取触发器;3. 调用commitConnection方法提交第2步中的所有sql;4. 返回获取的触发器集合;代码清单3 protected T executeInNonManagedTXLock( String lockName, TransactionCallback txCallback, final TransactionValidator txValidator) throws JobPersistenceException boolean transOwner = false; Connection conn = null; try if (lockName != null) / If we arent using db locks, then delay getting DB connection / until after acquiring the lock since it isnt needed. if (getLockHandler().requiresConnection() conn = getNonManagedTXConnection(); transOwner = getLockHandler().obtainLock(conn, lockName); if (conn = null) conn = getNonManagedTXConnection(); final T result = txCallback.execute(conn); try commitConnection(conn); catch (JobPersistenceException e) rollbackConnection(conn); if (txValidator = null | !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback() Override public Boolean execute(Connection conn) throws JobPersistenceException return txValidator.validate(conn, result); ) throw e; Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null & sigTime = 0) signalSchedulingChangeImmediately(sigTime); return result; catch (JobPersistenceException e) rollbackConnection(conn); throw e; catch (RuntimeException e) rollbackConnection(conn); throw new JobPersistenceException(Unexpected runtime exception: + e.getMessage(), e); finally try releaseLock(lockName, transOwner); finally cleanupConnection(conn); acquireNextTrigger方法用于获取触发器,它的执行步骤如下:首先,查询状态为WAITING的触发器(见代码清单4),以StdJDBCDelegate为例,其selectTriggerToAcquire方法(使用JDBC的API,留给读者自己去看)实际就是执行sql查询触发器,执行的sql为:SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME = ?) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC。TRIGGER_STATE的条件是WAITING,NEXT_FIRE_TIME小于最迟的触发时间,并且要大于最早的触发时间。注意:本文所有sql中的0为QRTZ_,1为schedulerFactoryBean。代码清单4 List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);其次,遍历集合keys中的TriggerKey,在循环中执行以下步骤:1. 从表QRTZ_TRIGGERS中查询触发器,代码为:OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);retrieveTrigger方法内部实际执行了Delegate的selectTrigger方法。以StdJDBCDelegate为例,其selectTrigger方法根据TriggerKey查询触发器,执行的sql为:SELECT * FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?2. 根据OperableTrigger持有的JobKey查询表QRTZ_JOB_DETAILS中对应的作业信息,并将作业添加到集合acquiredJobKeysForNoConcurrentExec中,代码如下:3. JobKey jobKey = nextTrigger.getJobKey();4. JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper();5. if (job.isConcurrentExectionDisallowed() 6. if (acquiredJobKeysForNoConcurrentExec.contains(jobKey) 7. continue; / next trigger8. else 9. acquiredJobKeysForNoConcurrentExec.add(jobKey);10. 以StdJDBCDelegate为例,其selectJobDetail方法执行的sql为:SELECT * FROM 0JOB_DETAILS WHERE SCHED_NAME = 1 AND JOB_NAME = ? AND JOB_GROUP = ?11. 根据已获得的TriggerKey,将此触发器在表QRTZ_TRIGGERS中的状态从WAITING更新为ACQUIRED,代码如下: int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);以StdJDBCDelegate为例,其updateTriggerStateFromOtherState方法执行的sql为:UPDATE 0TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?12. 给OperableTrigger设置实例ID,然后将已触发的触发器插入表QRTZ_FIRED_TRIGGERS,代码如下:13. nextTrigger.setFireInstanceId(getFiredTriggerRecordId(); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);以StdJDBCDelegate为例,其insertFiredTrigger方法执行的sql为:INSERT INTO 0FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES(1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)14. 将OperableTrigger添加到acquiredTriggers,代码如下: acquiredTriggers.add(nextTrigger);最后,返回获得的所有触发器集合acquiredTriggers;触发触发器在获取触发器后,下下来就是要触发这些触发器(见代码清单5),可以看到调用了JobStore的triggersFired方法。代码清单5 / set triggers to executing List bndles = new ArrayList(); boolean goAhead = true; synchronized(sigLock) goAhead = !halted.get(); if(goAhead) try List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; catch (SchedulerException se) /省略异常信息 以JobStore的实现类LocalDataSourceJobStore来具体看看triggersFired方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的triggersFired方法(见代码清单6),此方法用于触发触发器。代码清单6 public List triggersFired(final List triggers) throws JobPersistenceException return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallbackList() public List execute(Connection conn) throws JobPersistenceException List results = new ArrayList(); TriggerFiredResult result; for (OperableTrigger trigger : triggers) try TriggerFiredBundle bundle = triggerFired(conn, trigger); result = new TriggerFiredResult(bundle); catch (JobPersistenceException jpe) result = new TriggerFiredResult(jpe); catch(RuntimeException re) result = new TriggerFiredResult(re); results.add(result); return results; , new TransactionValidatorList() Override public Boolean validate(Connection conn, List result) throws JobPersistenceException try List acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId(); Set executingTriggers = new HashSet(); for (FiredTriggerRecord ft : acquired) if (STATE_EXECUTING.equals(ft.getFireInstanceState() executingTriggers.add(ft.getFireInstanceId(); for (TriggerFiredResult tr : result) if (tr.getTriggerFiredBundle() != null & executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId() return true; return false; catch (SQLException e) throw new JobPersistenceException(error validating trigger acquisition, e); ); 可以看到triggersFired方法也调用了executeInNonManagedTXLock方法,我们根据前面的分析,知道最终实际会回调新的TransactionCallback匿名类的execute方法,可以看到其主要执行逻辑无非循环triggers列表,并且调用triggerFired方法获取TriggerFiredBundle。triggerFired的执行步骤如下:1. 获取表QRTZ_TRIGGERS中的触发器状态,代码如下:2. try / if trigger was deleted, state will be STATE_DELETED3. String state = getDelegate().selectTriggerState(conn,4. trigger.getKey();5. if (!state.equals(STATE_ACQUIRED) 6. return null;7. 8. catch (SQLException e) 9. throw new JobPersistenceException(Couldnt select trigger state: 10. + e.getMessage(), e); 以StdJDBCDelegate为例,其selectTriggerState方法的执行sql为:SELECT TRIGGER_STATE FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?11. 查询触发器对应的作业,代码如下:12. try 13. job = retrieveJob(conn, trigger.getJobKey();14. if (job = null) return null; 15. catch (JobPersistenceException jpe) 16. try 17. getLog().error(Error retrieving job, setting trigger state to ERROR., jpe);18. getDelegate().updateTriggerState(conn, trigger.getKey(),19. STATE_ERROR);20. catch (SQLException sqle) 21. getLog().error(Unable to set trigger state to ERROR., sqle);22. 23. throw jpe; 这里的retrieveJob方法实际也调用了Delegate的selectJobDetail方法,不再赘述。24. 更新表QRTZ_FIRED_TRIGGERS中此触发器被触发的状态为STATE_EXECUTING,代码如下:25. try 26. getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);27. catch (SQLException e) 28. throw new JobPersistenceException(Couldnt insert fired trigger: 29. + e.getMessage(), e); 这里实际执行的sql为UPDATE 0FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = 1 AND ENTRY_ID = ?30. 更新触发器被触发的状态,代码如下: trigger.triggered(cal);31. 如果捕获到DisallowConcurrentExecution,则将处于STATE_WAITING、STATE_ACQUIRED、STATE_PAUSED状态的触发器的状态修改为STATE_BLOCKED,代码如下:32. if (job.isConcurrentExectionDisallowed() 33. state = STATE_BLOCKED;34. force = false;35. try 36. getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),37. STATE_BLOCKED, STATE_WAITING);38. getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),39. STATE_BLOCKED, STATE_ACQUIRED);40. getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),41. STATE_PAUSED_BLOCKED, STATE_PAUSED);42. catch (SQLException e) 43. throw new JobPersistenceException(44. Couldnt update states of blocked triggers: 45. + e.getMessage(), e);46. 47. 插入新的触发器,代码如下: storeTrigger(conn, trigger, job, true, state, force, false);storeTrigger方法首先调用triggerExists用于判断当前触发器是否存在: boolean existingTrigger = triggerExists(conn, newTrigger.getKey();以StdJDBCDelegate为例,其triggerExists方法中执行的sql为:SELECT TRIGGER_NAME FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?。然后根据existingTrigger的值插入或者更新表QRTZ_TRIGGERS中触发器的下次触发时间,代码如下: if (existingTrigger) getDelegate().updateTrigger(conn, newTrigger, state, job); else getDelegate().insertTrigger(conn, newTrigger, state, job); 以StdJDBCDelegate为例,其执行的sql为:INSERT INTO 0TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES(1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)48. 返回TriggerFiredBundle对象,代码如下:49. return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()50. .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime();51.创建作业运行的shell脚本之后QuartzSchedulerThread会遍历每个TriggerFiredBundle,然后创建作业运行的shell脚本,见代码清单7.代码清单7 for (int i = 0; i bndles.size(); i+) TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) getLog().error(RuntimeException while firing trigger + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i); continue; / its possible to get null if the triggers was paused, / blocked, or other similar occurrences that prevent it being / fired at this time. or if the scheduler was shutdown (halted) if (bndle = null) qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i); continue;

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论