XXL-JOB > Task start and stop

如何知道 Task 的状态是否停止?

v2.4.1

从数据库的表结构可知:

xxl_job_info.trigger_status 代表任务的状态,0 是停止 1 是运行。

查看如下接口的实现:

/jobinfo/pageList
/jobinfo/start
/jobinfo/stop

注意到如下配置:

XxlJobAdminConfig#afterPropertiesSet
XxlJobScheduler#init

查看后发现 xxl-job-admin 在项目启动配置加载完成后,启动了一堆线程。

public void init() throws Exception {
    // init i18n
    initI18n();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin registry monitor run
    JobRegistryHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run ( depend on JobTriggerPoolHelper )
    JobCompleteHelper.getInstance().start();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule  ( depend on JobTriggerPoolHelper )
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}

其中 JobScheduleHelper 会以大约 5000 毫秒的间隔,

xxl_job_info 表中配置的 trigger_status = 1 的任务加载出来,

如果当前时间戳大于 trigger_next_time 就将该任务加入 trigger 队列中准备执行。

也就是说,

如果想要批量暂停 task,只需要将 xxl_job_info.trigger_status 设为 1。

v1.8.2

查看如下接口的实现:

/jobinfo/pageList
/jobinfo/resume
/jobinfo/pause

注意到:

XxlJobDynamicScheduler#init

查看发现,job-admin 注册该 bean 时,仅启动动态扫描注册器和异常报警的线程,和 2.4.1 的架构大不相同。

public void init() throws Exception {
    // admin registry monitor run
    JobRegistryMonitorHelper.getInstance().start();

    // admin monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin-server(spring-mvc)
    NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
    NetComServerFactory.setAccessToken(accessToken);

    // valid
    Assert.notNull(scheduler, "quartz scheduler is null");
    logger.info(">>>>>>>>> init xxl-job admin success.");
}

任务活动周期的管理全部交由 quartzScheduler 管理。

<bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="autoStartup" value="true" />            <!--自动启动 -->
    <property name="startupDelay" value="20" />             <!--延时启动,应用启动成功后在启动 -->
    <property name="overwriteExistingJobs" value="true" />  <!--覆盖DB中JOB:true、以数据库中已经存在的为准:false -->
    <property name="applicationContextSchedulerContextKey"  value="applicationContextKey" />
    <property name="configLocation" value="classpath:quartz.properties"/>
</bean>

<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
    <!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
    <property name="scheduler" ref="quartzScheduler"/>
    <property name="accessToken" value="${xxl.job.accessToken}" />
</bean>

也就是完全依赖 quartz 框架的任务调度管理。

quartz

查看 quartz 的实现,注意到表 xxl_job_qrtz_triggers.trigger_state;

暂停 task 是不是控制这个值就可以了呢?

顺着 /jobinfo/pageList 继续看:

StdScheduler#getTriggerState
QuartzScheduler#getTriggerState
JobStoreSupport#getTriggerState
StdJDBCDelegate#selectTriggerState

发现确实是通过查询数据库的值来确定任务状态。

以及它在 quartz 中的转换关系:

String ts = getDelegate().selectTriggerState(conn, key);
if (ts == null) {
    return TriggerState.NONE;
}
if (ts.equals(STATE_DELETED)) {
    return TriggerState.NONE;
}
if (ts.equals(STATE_COMPLETE)) {
    return TriggerState.COMPLETE;
}
if (ts.equals(STATE_PAUSED)) {
    return TriggerState.PAUSED;
}
if (ts.equals(STATE_PAUSED_BLOCKED)) {
    return TriggerState.PAUSED;
}
if (ts.equals(STATE_ERROR)) {
    return TriggerState.ERROR;
}
if (ts.equals(STATE_BLOCKED)) {
    return TriggerState.BLOCKED;
}
return TriggerState.NORMAL; 

那么暂停一个任务只需要更新这个值就可以吗?

直接试验一下,

修改字段 trigger_state,将 WAITINGPAUSED,任务停止了。

但是重新启动不等同于把 PAUSED 再恢复 WAITING。

查看 /jobinfo/pause 的实现

StdScheduler#pauseTrigger
QuartzScheduler#pauseTrigger
JobStoreSupport#pauseTrigger

在第三步中,更新了数据库的字段值。

// JobStoreSupport#pauseTrigger
String oldState = getDelegate().selectTriggerState(conn,
        triggerKey);
if (oldState.equals(STATE_WAITING)
        || oldState.equals(STATE_ACQUIRED)) {
    getDelegate().updateTriggerState(conn, triggerKey,
            STATE_PAUSED);
} else if (oldState.equals(STATE_BLOCKED)) {
    getDelegate().updateTriggerState(conn, triggerKey,
            STATE_PAUSED_BLOCKED);
} 

第二步中,除了调用更新值的方法,还通知了两个线程。

// QuartzScheduler#pauseTrigger
public void pauseTrigger(TriggerKey triggerKey) throws SchedulerException {
    validateState();

    resources.getJobStore().pauseTrigger(triggerKey);
    notifySchedulerThread(0L);
    notifySchedulerListenersPausedTrigger(triggerKey);
}

查看 notifySchedulerListenersPausedTrigger,这个方法,啥也没做。

// SchedulerListener#triggerPaused
// SchedulerListenerSupport#triggerPaused

public void triggerPaused(TriggerKey triggerKey) {
}

查看 notifySchedulerThread,注意到 QuartzSchedulerThread

QuartzScheduler#notifySchedulerThread
SchedulerSignalerImpl#signalSchedulingChange
QuartzSchedulerThread#signalSchedulingChange

QuartzSchedulerThread 是一个触发任务执行的线程。

// QuartzSchedulerThread#signalSchedulingChange
/**
 * <p>
 * Signals the main processing loop that a change in scheduling has been
 * made - in order to interrupt any sleeping that may be occuring while
 * waiting for the fire time to arrive.
 * </p>
 *
 * @param candidateNewNextFireTime the time (in millis) when the newly scheduled trigger
 * will fire.  If this method is being called do to some other even (rather
 * than scheduling a trigger), the caller should pass zero (0).
 */
public void signalSchedulingChange(long candidateNewNextFireTime) {
    synchronized(sigLock) {
        signaled = true;
        signaledNextFireTime = candidateNewNextFireTime;
        sigLock.notifyAll();
    }
}

signaledNextFireTime 设为 0,代表该触发器不再触发。

试想一个运行中的任务,此时它的下次执行时间是明天 1 点钟。

现在我们把 trigger_state 设为 PAUSED,却没有管任何正在运行的 QuartzSchedulerThread

那么下次执行的时候,该触发器会执行吗?

直接查看 QuartzSchedulerThread.run 的实现。

QuartzSchedulerThread#run
QuartzSchedulerThread#isCandidateNewTimeEarlierWithinReason
JobStoreSupport#triggerFired

注意到 triggerFired,查看实现。

protected TriggerFiredBundle triggerFired(Connection conn,
        OperableTrigger trigger)
    throws JobPersistenceException {
    ... ...

    // Make sure trigger wasn't deleted, paused, or completed...
    try { // if trigger was deleted, state will be STATE_DELETED
        String state = getDelegate().selectTriggerState(conn,
                trigger.getKey());
        if (!state.equals(STATE_ACQUIRED)) {
            return null;
        }
    } catch (SQLException e) {
        throw new JobPersistenceException("Couldn't select trigger state: "
                + e.getMessage(), e);
    }
    ... ...
}

到这里可以安心了,执行前会根据数据库中 trigger_state 值拦截触发器。

所以 v1.8.2 可以通过修改 xxl_job_qrtz_triggers.trigger_state 的值批量暂停 task。