Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听

大概内容

  • scheduler.scheduleJob(jobDetail, trigger)
  • scheduler.start()
scheduler.scheduleJob() Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
文章图片
Scheduler使用 StdScheduler 【Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听】StdScheduler的方法基本上都代理给QuartzScheduler类来处理。
public class StdScheduler implements Scheduler { private QuartzScheduler sched; public StdScheduler(QuartzScheduler sched) { this.sched = sched; }public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { return sched.scheduleJob(jobDetail, trigger); }public void start() throws SchedulerException { sched.start(); }/** * 只有这个方法没有委托给QuartzScheduler * 除了getClass(),其他方法在QuartzScheduler都可以拿到 */ public SchedulerMetaData getMetaData() { return new SchedulerMetaData(getSchedulerName(), getSchedulerInstanceId(), getClass(), false, isStarted(), isInStandbyMode(), isShutdown(), sched.runningSince(), sched.numJobsExecuted(), sched.getJobStoreClass(), sched.supportsPersistence(), sched.isClustered(), sched.getThreadPoolClass(), sched.getThreadPoolSize(), sched.getVersion()); }// 其他代码}

QuartzScheduler Quartz的小心脏,org.quartz.Scheduler接口的间接实现。
public class QuartzScheduler implements RemotableQuartzScheduler {// QuartzSchedulerResources对象是通过构造器放进去的 public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); }this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); schedThreadExecutor.execute(this.schedThread); if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime); }jobMgr = new ExecutingJobsManager(); addInternalJobListener(jobMgr); errLogger = new ErrorLogger(); addInternalSchedulerListener(errLogger); signaler = new SchedulerSignalerImpl(this, this.schedThread); getLog().info("Quartz Scheduler v." + getVersion() + " created."); }public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { validateState(); if (jobDetail == null) { throw new SchedulerException("JobDetail cannot be null"); } if (trigger == null) { throw new SchedulerException("Trigger cannot be null"); } if (jobDetail.getKey() == null) { throw new SchedulerException("Job's key cannot be null"); } if (jobDetail.getJobClass() == null) { throw new SchedulerException("Job's class cannot be null"); } // TriggerBuilder.build()会生成一个OperableTrigger实例。 OperableTrigger trig = (OperableTrigger)trigger; if (trigger.getJobKey() == null) { trig.setJobKey(jobDetail.getKey()); } else if (!trigger.getJobKey().equals(jobDetail.getKey())) { throw new SchedulerException( "Trigger does not reference given job!"); }trig.validate(); Calendar cal = null; if (trigger.getCalendarName() != null) { cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName()); } // TODO: 解析各种类型的Trigger Date ft = trig.computeFirstFireTime(cal); if (ft == null) { throw new SchedulerException( "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire."); } // 关键代码就是下面这一行 resources.getJobStore().storeJobAndTrigger(jobDetail, trig); notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger); return ft; }// 其他代码}

RAMJobStore 介绍一下RAMJobStore的属性
  • HashMap对象:也是通过空间来换取查询时间的策略,把JobDetail和Trigger的信息放进这些HashMap对象中,方便程序可以根据key或者group来匹配相关的JobDetail和Trigger。
  • TreeSet timeTriggers:利用TreeSet排重和有序的特性,timeTriggers.first()方法总能返回最先要处理的Trigger。
class RAMJobStore { #HashMap jobsByKey #HashMap triggersByKey #HashMap> jobsByGroup #HashMap> triggersByGroup #TreeSet timeTriggers #HashMap calendarsByName #Map> triggersByJob #Object lock #HashSet pausedTriggerGroups #HashSet pausedJobGroups #HashSet blockedJobs #long misfireThreshold #SchedulerSignaler signaler -Logger log -{static}AtomicLong ftrCtr }

下面是RAMJobStore.storeJob()代码解析,storeTrigger()方法的逻辑类似。
/** * 内部类JobWrapper是一个包括jobKey和jobDetail的类。 * 克隆一个新的JobDetail来创建一个JobWrapper,然后维护到jobsByKey和jobsByGroup属性中。 * 维护HashMap系列对象的时候,通过lock的synchronized代码块来做线程同步 */ public void storeJob(JobDetail newJob,boolean replaceExisting) throws ObjectAlreadyExistsException { JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); boolean repl = false; synchronized (lock) { if (jobsByKey.get(jw.key) != null) { if (!replaceExisting) { throw new ObjectAlreadyExistsException(newJob); } repl = true; }if (!repl) { // get job group HashMap grpMap = jobsByGroup.get(newJob.getKey().getGroup()); if (grpMap == null) { grpMap = new HashMap(100); jobsByGroup.put(newJob.getKey().getGroup(), grpMap); } // add to jobs by group grpMap.put(newJob.getKey(), jw); // add to jobs by FQN map jobsByKey.put(jw.key, jw); } else { // update job detail JobWrapper orig = jobsByKey.get(jw.key); orig.jobDetail = jw.jobDetail; // already cloned } } }

scheduler.start() QuartzScheduler 案例用的是RAMJobStore,其中的schedulerStarted()和schedulerResumed()是空方法,没有代码立即。对于JobStoreSupport,这两个方法是有很多逻辑的,后面的篇章再做解析。
public class QuartzScheduler implements RemotableQuartzScheduler {public void start() throws SchedulerException { if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); }// QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null) {//初始化标识为null,进行初始化操作 initialStart = new Date(); // RAMJobStore 啥都不做 // JobStoreSupport 判断是否集群,恢复Job等 this.resources.getJobStore().schedulerStarted(); startPlugins(); } else { resources.getJobStore().schedulerResumed(); // 如果已经初始化过,则恢复jobStore }schedThread.togglePause(false); // 唤醒所有等待的线程getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted(); }// 其他代码}

QuartzSchedulerThread
public class QuartzSchedulerThread extends Thread {/** * pause为true,发出让主循环暂停的信号,以便线程在下一个可处理的时刻暂停 * pause为false,唤醒sigLock对象的所有等待队列的线程 */ void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); } else { sigLock.notifyAll(); } } }// 其他代码}

Listener事件监听 Listener事件监听是观察者模式的一个应用。
QuartzScheduler的scheduleJob()start()方法都有notifyXXX代码逻辑,这些就是JobDetail、Trigger和Scheduler事件监听的代码逻辑。
在《Scheduler的初始化》篇章里面,初始化一个Scheduler,里面有"根据PropertiesParser创建Listeners"的步骤,Listeners就包括JobListener和TriggerListener的List对象。
SchedulerListener不支持配置在quartz.properties里面,初始化Scheduler的过程中没有这一块的代码逻辑。如果要添加一个观察者,那么可以通过StdScheduler.getListenerManager()获取ListenerManager实例,通过它可以拿到所有观察者的引用。
类图
Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
文章图片
Quartz Listener类图 角色说明
类名 角色
QuartzScheduler Subject
SchedulerListener Observer
JobListener Observer
TriggerListener Observer
代码示例
Subject通知Observer,都是遍历Observer列表,触发相应的通知,实现事件监听的效果。
这里特别说明一下,获取Listeners集合的时候,是通过新建一个不可改变的集合对象来实现。如果是为了避免多线程的读写问题,这和CopyOnWriteList写时复制的做法相反,而且这里读的场景大于写的场景。况且,ListenerManagerImpl的add()方法都做了代码块的synchronized。新建一个不可改变的集合来返回,这么做的目的没有想明白。
public void notifySchedulerListenersJobAdded(JobDetail jobDetail) { // build a list of all scheduler listeners that are to be notified... List schedListeners = buildSchedulerListenerList(); // notify all scheduler listeners for(SchedulerListener sl: schedListeners) { try { sl.jobAdded(jobDetail); } catch (Exception e) { getLog().error( "Error while notifying SchedulerListener of JobAdded.", e); } } }

ListenerManagerImpl
public class ListenerManagerImpl implements ListenerManager { // 其他代码 public List getSchedulerListeners() { synchronized (schedulerListeners) { return java.util.Collections.unmodifiableList(new ArrayList(schedulerListeners)); } } }

系列文章
  • Quartz 源码解析(一) —— 基本介绍
  • Quartz 源码解析(二) —— Scheduler的初始化
  • Quartz 源码解析(三) —— JobDetail、Trigger和它们的Builder
  • Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
  • Quartz 源码解析(五) —— QuartzSchedulerThread
  • Quartz 源码解析(六) —— 解析Cron表达式

    推荐阅读