vendor/shopware/core/Framework/MessageQueue/ScheduledTask/Scheduler/TaskScheduler.php line 61

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\ScheduledTask\Scheduler;
  3. use Shopware\Core\Defaults;
  4. use Shopware\Core\Framework\Context;
  5. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Search\Aggregation\Metric\MinAggregation;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\AggregationResult;
  8. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\Metric\MinResult;
  9. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  10. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  11. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NotFilter;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  13. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  14. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskDefinition;
  15. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskEntity;
  16. use Symfony\Component\Messenger\MessageBusInterface;
  17. class TaskScheduler
  18. {
  19.     /**
  20.      * @var EntityRepositoryInterface
  21.      */
  22.     private $scheduledTaskRepository;
  23.     /**
  24.      * @var MessageBusInterface
  25.      */
  26.     private $bus;
  27.     public function __construct(
  28.         EntityRepositoryInterface $scheduledTaskRepository,
  29.         MessageBusInterface $bus
  30.     ) {
  31.         $this->scheduledTaskRepository $scheduledTaskRepository;
  32.         $this->bus $bus;
  33.     }
  34.     public function queueScheduledTasks(): void
  35.     {
  36.         $criteria $this->buildCriteriaForAllScheduledTask();
  37.         $context Context::createDefaultContext();
  38.         $tasks $this->scheduledTaskRepository->search($criteria$context)->getEntities();
  39.         if (\count($tasks) === 0) {
  40.             return;
  41.         }
  42.         // Tasks **must not** be queued before their state in the database has been updated. Otherwise,
  43.         // a worker could have already fetched the task and set its state to running before it gets set to
  44.         // queued, thus breaking the task.
  45.         /** @var ScheduledTaskEntity $task */
  46.         foreach ($tasks as $task) {
  47.             $this->scheduledTaskRepository->update([
  48.                 [
  49.                     'id' => $task->getId(),
  50.                     'status' => ScheduledTaskDefinition::STATUS_QUEUED,
  51.                 ],
  52.             ], $context);
  53.             $this->queueTask($task);
  54.         }
  55.     }
  56.     public function getNextExecutionTime(): ?\DateTimeInterface
  57.     {
  58.         $criteria $this->buildCriteriaForNextScheduledTask();
  59.         /** @var AggregationResult $aggregation */
  60.         $aggregation $this->scheduledTaskRepository
  61.             ->aggregate($criteriaContext::createDefaultContext())
  62.             ->get('nextExecutionTime');
  63.         /** @var MinResult $aggregation */
  64.         if (!$aggregation instanceof MinResult) {
  65.             return null;
  66.         }
  67.         if ($aggregation->getMin() === null) {
  68.             return null;
  69.         }
  70.         return new \DateTime((string) $aggregation->getMin());
  71.     }
  72.     public function getMinRunInterval(): ?int
  73.     {
  74.         $criteria $this->buildCriteriaForMinRunInterval();
  75.         $aggregation $this->scheduledTaskRepository
  76.             ->aggregate($criteriaContext::createDefaultContext())
  77.             ->get('runInterval');
  78.         /** @var MinResult $aggregation */
  79.         if (!$aggregation instanceof MinResult) {
  80.             return null;
  81.         }
  82.         if ($aggregation->getMin() === null) {
  83.             return null;
  84.         }
  85.         return (int) $aggregation->getMin();
  86.     }
  87.     private function buildCriteriaForAllScheduledTask(): Criteria
  88.     {
  89.         $criteria = new Criteria();
  90.         $criteria->addFilter(
  91.             new RangeFilter(
  92.                 'nextExecutionTime',
  93.                 [
  94.                     RangeFilter::LT => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  95.                 ]
  96.             ),
  97.             new EqualsFilter('status'ScheduledTaskDefinition::STATUS_SCHEDULED)
  98.         );
  99.         return $criteria;
  100.     }
  101.     private function queueTask(ScheduledTaskEntity $taskEntity): void
  102.     {
  103.         $taskClass $taskEntity->getScheduledTaskClass();
  104.         if (!\is_a($taskClassScheduledTask::class, true)) {
  105.             throw new \RuntimeException(sprintf(
  106.                 'Tried to schedule "%s", but class does not extend ScheduledTask',
  107.                 $taskClass
  108.             ));
  109.         }
  110.         $task = new $taskClass();
  111.         $task->setTaskId($taskEntity->getId());
  112.         $this->bus->dispatch($task);
  113.     }
  114.     private function buildCriteriaForNextScheduledTask(): Criteria
  115.     {
  116.         $criteria = new Criteria();
  117.         $criteria->addFilter(
  118.             new EqualsFilter('status'ScheduledTaskDefinition::STATUS_SCHEDULED)
  119.         )
  120.         ->addAggregation(new MinAggregation('nextExecutionTime''nextExecutionTime'));
  121.         return $criteria;
  122.     }
  123.     private function buildCriteriaForMinRunInterval(): Criteria
  124.     {
  125.         $criteria = new Criteria();
  126.         $criteria->addFilter(
  127.             new NotFilter(NotFilter::CONNECTION_AND, [
  128.                 new EqualsFilter('status'ScheduledTaskDefinition::STATUS_INACTIVE),
  129.             ])
  130.         )
  131.         ->addAggregation(new MinAggregation('runInterval''runInterval'));
  132.         return $criteria;
  133.     }
  134. }