Seditio Source
Root |
./othercms/xenForo 2.2.8/src/XF/Job/Manager.php
<?php

namespace XF\Job;

use function
count, intval, strlen;

class
Manager
{
   
/**
     * @var \XF\App
     */
   
protected $app;

   
/**
     * @var \XF\Db\AbstractAdapter
     */
   
protected $db;

    protected
$allowManual;

    protected
$forceManual = false;

    protected
$uniqueEnqueued = [];

    protected
$autoEnqueuedList = [];
    protected
$autoBlockingList = [];
    protected
$manualEnqueuedList = [];

    protected
$autoBlockingMessage = null;

    protected
$shutdownRegistered = false;
    protected
$runningJob;

    public function
__construct(\XF\App $app, $allowManual = true, $forceManual = false)
    {
       
$this->app = $app;
       
$this->db = $app->db();
       
$this->allowManual = $allowManual || $forceManual;
       
$this->forceManual = $forceManual;
    }

    public function
setAllowManual($allowManual)
    {
       
$this->allowManual = $allowManual;
    }

    public function
setForceManual($forceManual)
    {
       
$this->forceManual = $forceManual;
        if (
$forceManual)
        {
           
$this->setAllowManual(true);
        }
    }

    public function
canRunJobs(): bool
   
{
        return (\
XF::$versionId == $this->app->options()->currentVersionId || !$this->app->config('checkVersion'));
    }

   
/**
     * @param bool $manual
     * @param float $maxRunTime
     *
     * @return null|JobResult
     */
   
public function runQueue($manual, $maxRunTime)
    {
        if (
$maxRunTime < 2)
        {
           
$maxRunTime = 2;
        }

       
$runnable = $this->getRunnable($manual);
       
$startTime = microtime(true);
       
$result = null;

        foreach (
$runnable AS $job)
        {
           
$remaining = $maxRunTime - (microtime(true) - $startTime);
            if (
$remaining < 1)
            {
                break;
            }

           
$result = $this->runJobEntry($job, $remaining);
        }

        return
$result;
    }

   
/**
     * @param array $ids
     * @param $maxRunTime
     * @return array
     */
   
public function runByIds(array $ids, $maxRunTime)
    {
        if (
$maxRunTime < 2)
        {
           
$maxRunTime = 2;
        }

       
$startTime = microtime(true);
       
$result = null;

        foreach (
$ids AS $k => $id)
        {
           
$remaining = $maxRunTime - (microtime(true) - $startTime);
            if (
$remaining < 1)
            {
                break;
            }

           
$job = $this->getJob($id);
            if (
$job && $job['trigger_date'] <= time())
            {
               
$result = $this->runJobEntry($job, $remaining);
            }
            else
            {
               
$result = null;
            }

            if (!
$result || $result->completed)
            {
                unset(
$ids[$k]);
            }
        }

        return [
           
'result' => $result,
           
'remaining' => $ids
       
];
    }

   
/**
     * @param string $key
     * @param float $maxRunTime
     *
     * @return null|JobResult
     */
   
public function runUnique($key, $maxRunTime)
    {
        if (
$maxRunTime < 2)
        {
           
$maxRunTime = 2;
        }

       
$job = $this->getUniqueJob($key);
        if (
$job)
        {
            return
$this->runJobEntry($job, $maxRunTime);
        }
        else
        {
            return
null;
        }
    }

    public function
runById($id, $maxRunTime)
    {
        if (
$maxRunTime < 2)
        {
           
$maxRunTime = 2;
        }

       
$job = $this->getJob($id);
        if (
$job)
        {
            return
$this->runJobEntry($job, $maxRunTime);
        }
        else
        {
            return
null;
        }
    }

    public function
queuePending($manual)
    {
        return
count($this->getRunnable($manual)) > 0;
    }

   
/**
     * @param array $job
     * @param int $maxRunTime
     *
     * @return JobResult
     */
   
public function runJobEntry(array $job, $maxRunTime)
    {
       
$affected = $this->db->update('xf_job', [
           
'trigger_date' => time() + 15*60,
           
'last_run_date' => time()
        ],
'job_id = ? AND trigger_date = ?', [$job['job_id'], $job['trigger_date']]);
        if (!
$affected)
        {
           
// job has already been taken, treat it as complete
           
return new JobResult(true, $job['job_id']);
        }

       
$result = $this->runJobInternal($job, $maxRunTime);
        if (
$result->completed)
        {
           
$this->db->delete('xf_job', 'job_id = ?', $job['job_id']);

            unset(
               
$this->manualEnqueuedList[$job['job_id']],
               
$this->autoEnqueuedList[$job['job_id']],
               
$this->autoBlockingList[$job['job_id']]
            );

            if (
$job['unique_key'])
            {
                unset(
$this->uniqueEnqueued[$job['unique_key']]);
            }
        }
        else
        {
           
$update = [
               
'execute_data' => serialize($result->data),
               
'trigger_date' => $result->continueDate ? intval($result->continueDate) : $job['trigger_date'],
               
'last_run_date' => time()
            ];
           
$this->db->update('xf_job', $update, 'job_id = ?', $job['job_id']);
        }

        if (!
$job['manual_execute'])
        {
           
$this->scheduleRunTimeUpdate();
        }

        return
$result;
    }

    public function
getJobRunner(array $job)
    {
        return
$this->app->job($job['execute_class'], $job['job_id'], unserialize($job['execute_data']));
    }

    protected function
runJobInternal(array $job, $maxRunTime)
    {
       
$runner = $this->getJobRunner($job);
        if (!
$runner)
        {
           
$this->app->logException(new \Exception("Could not get runner for job $job[execute_class] (unique: $job[unique_key]). Skipping."));

            return new
JobResult(true, $job['job_id']);
        }

        if (!
$this->shutdownRegistered)
        {
           
register_shutdown_function([$this, 'handleShutdown']);
        }

       
$this->runningJob = $job;

        try
        {
           
$result = $runner->run($maxRunTime);
           
$this->runningJob = null;
        }
        catch (\
Exception $e)
        {
           
$this->runningJob = null;

           
$this->db->rollbackAll();

            if (
$job['manual_execute'] || $this->app->config()['development']['throwJobErrors'])
            {
               
$this->db->update('xf_job', [
                   
'trigger_date' => $job['trigger_date'],
                   
'last_run_date' => time()
                ],
'job_id = ?', $job['job_id']);

                throw
$e;
            }
            else
            {
               
$this->app->logException($e, false, "Job $job[execute_class]: ");
               
$result = new JobResult(true, $job['job_id'], [], "$job[execute_class] threw exception. See error log.");
            }
        }

        if (!(
$result instanceof \XF\Job\JobResult))
        {
            throw new \
LogicException("Jobs must return JobResult objects");
        }

        \
XF::triggerRunOnce();

        return
$result;
    }

    public function
handleShutdown()
    {
        if (!
$this->runningJob)
        {
            return;
        }

       
$job = $this->runningJob;

        try
        {
           
// job is being run manually, there's no error which implies a call to exit, or forced re-enqueue
           
if ($job['manual_execute'] || !error_get_last() || $this->app->config()['development']['throwJobErrors'])
            {
               
$this->db->rollbackAll();

               
$this->db->update('xf_job', [
                   
'trigger_date' => $job['trigger_date'],
                   
'last_run_date' => time()
                ],
'job_id = ?', $job['job_id']);

               
$this->updateNextRunTime();
            }
        }
        catch (\
Exception $e) {}
    }

    public function
cancelJob(array $job)
    {
       
$rows = $this->db->delete('xf_job', 'job_id = ?', $job['job_id']);
        if (
$rows)
        {
           
$this->scheduleRunTimeUpdate();
        }
    }

    public function
cancelUniqueJob($uniqueId)
    {
       
$job = $this->getUniqueJob($uniqueId);
        if (
$job)
        {
           
$this->cancelJob($job);
            return
true;
        }
        else
        {
            return
false;
        }
    }

    public function
getRunnable($manual)
    {
        return
$this->db->fetchAll("
            SELECT *
            FROM xf_job
            WHERE trigger_date <= ?
                AND manual_execute = ?
            ORDER BY trigger_date
            LIMIT 1000
        "
, [\XF::$time, $manual ? 1 : 0]);
    }

    public function
getFirstRunnable($manual)
    {
        return
$this->db->fetchRow("
            SELECT *
            FROM xf_job
            WHERE trigger_date <= ?
                AND manual_execute = ?
            ORDER BY trigger_date
            LIMIT 1
        "
, [\XF::$time, $manual ? 1 : 0]);
    }

    public function
hasStoppedJobs(): bool
   
{
       
$pending = $this->queuePending(false);

        if (!
$pending)
        {
            return
false;
        }

       
$jobRunTime = $this->app['job.runTime'];
        if (!
$jobRunTime)
        {
            return
false;
        }

        if (
$jobRunTime + 3600 <= \XF::$time)
        {
           
// scheduled run time exceeded by an hour so jobs appear to be stuck
           
return true;
        }
        else
        {
            return
false;
        }
    }

    public function
hasStoppedManualJobs()
    {
       
$match = $this->db->fetchRow("
            SELECT job_id
            FROM xf_job
            WHERE trigger_date <= ?
                AND (last_run_date <= ? OR last_run_date IS NULL)
                AND manual_execute = 1
            LIMIT 1
        "
, [\XF::$time - 15, \XF::$time - 180]);

        return
$match ? true : false;
    }

    public function
getJob($id)
    {
        return
$this->db->fetchRow("
            SELECT *
            FROM xf_job
            WHERE job_id = ?
        "
, $id);
    }

    public function
getUniqueJob($key)
    {
        return
$this->db->fetchRow("
            SELECT *
            FROM xf_job
            WHERE unique_key = ?
        "
, $key);
    }

    public function
getFirstAutomaticTime()
    {
        return
$this->db->fetchOne("
            SELECT MIN(trigger_date)
            FROM xf_job
            WHERE manual_execute = 0
        "
);
    }

    public function
updateNextRunTime()
    {
       
$runTime = $this->getFirstAutomaticTime();
       
$this->app->registry()->set('autoJobRun', $runTime);

        return
$runTime;
    }

    public function
setNextAutoRunTime($time)
    {
       
$this->app->registry()->set('autoJobRun', $time);
    }

    public function
scheduleRunTimeUpdate()
    {
        \
XF::runOnce('autoJobRun', function()
        {
           
$this->updateNextRunTime();
        });
    }

    public function
enqueue($jobClass, array $params = [], $manual = false)
    {
        return
$this->_enqueue(null, $jobClass, $params, $manual, null);
    }

    public function
enqueueAutoBlocking($jobClass, array $params = [])
    {
        return
$this->_enqueue(null, $jobClass, $params, false, null, true);
    }

    public function
enqueueUnique($uniqueId, $jobClass, array $params = [], $manual = true)
    {
        return
$this->_enqueue($uniqueId, $jobClass, $params, $manual, null);
    }

    public function
enqueueLater($uniqueId, $runTime, $jobClass, array $params = [], $manual = false)
    {
        return
$this->_enqueue($uniqueId, $jobClass, $params, $manual, $runTime);
    }

   
/**
     * @param string|null $uniqueId
     * @param string $jobClass
     * @param array $params
     * @param bool $manual
     * @param int|null $runTime
     * @param bool $blocking If auto, this job can be set as blocking which will change the UI for the triggerer
     *
     * @return int|null ID of the enqueued job (or null if an error happened)
     */
   
protected function _enqueue($uniqueId, $jobClass, array $params, $manual, $runTime, $blocking = false)
    {
        if (
$uniqueId)
        {
            if (
strlen($uniqueId) > 50)
            {
               
$uniqueId = md5($uniqueId);
            }

            if (isset(
$this->uniqueEnqueued[$uniqueId]))
            {
                return
$this->uniqueEnqueued[$uniqueId];
            }
        }
        else
        {
           
$uniqueId = null;
        }

        if (
$this->forceManual)
        {
           
$manual = true;
        }
        else if (!
$this->allowManual)
        {
           
$manual = false;
        }

        if (!
$runTime)
        {
           
$runTime = \XF::$time;
        }

       
$db = $this->db;
       
$affected = $db->insert('xf_job', [
           
'execute_class' => $jobClass,
           
'execute_data' => serialize($params),
           
'unique_key' => $uniqueId,
           
'manual_execute' => $manual ? 1 : 0,
           
'trigger_date' => $runTime
       
], false, '
            execute_class = VALUES(execute_class),
            execute_data = VALUES(execute_data),
            manual_execute = VALUES(manual_execute),
            trigger_date = VALUES(trigger_date),
            last_run_date = NULL
        '
);

        if (
$affected == 1)
        {
           
$id = $db->lastInsertId();
        }
        else
        {
           
// this is an update
           
$id = $db->fetchOne("
                SELECT job_id
                FROM xf_job
                WHERE unique_key = ?
            "
, $uniqueId);
            if (!
$id)
            {
                return
null;
            }
        }

        if (
$uniqueId)
        {
           
$this->uniqueEnqueued[$uniqueId] = $id;
        }

        if (
$manual)
        {
           
$this->manualEnqueuedList[$id] = $id;
        }
        else
        {
            if (
$blocking)
            {
               
$this->autoBlockingList[$id] = $id;
            }
           
$this->autoEnqueuedList[$id] = $id;

           
$this->scheduleRunTimeUpdate();
        }

        return
$id;
    }

    public function
hasManualEnqueued()
    {
        return
count($this->manualEnqueuedList) > 0;
    }

    public function
getManualEnqueued()
    {
        return
$this->manualEnqueuedList;
    }

    public function
hasAutoEnqueued()
    {
        return
count($this->autoEnqueuedList) > 0;
    }

    public function
getAutoEnqueued()
    {
        return
$this->autoEnqueuedList;
    }

    public function
hasAutoBlocking()
    {
        return
count($this->autoBlockingList) > 0;
    }

    public function
getAutoBlocking()
    {
        return
$this->autoBlockingList;
    }

    public function
setAutoBlockingMessage($message)
    {
       
$this->autoBlockingMessage = $message;
    }

    public function
getAutoBlockingMessage()
    {
        return
$this->autoBlockingMessage;
    }
}