vendor/shopware/core/Framework/MessageQueue/Api/ConsumeMessagesController.php line 125

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Api;
  3. use OpenApi\Annotations as OA;
  4. use Shopware\Core\Framework\MessageQueue\Subscriber\CountHandledMessagesListener;
  5. use Shopware\Core\Framework\MessageQueue\Subscriber\EarlyReturnMessagesListener;
  6. use Shopware\Core\Framework\Routing\Annotation\RouteScope;
  7. use Shopware\Core\Framework\Routing\Annotation\Since;
  8. use Shopware\Core\Framework\Util\MemorySizeCalculator;
  9. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  10. use Symfony\Component\DependencyInjection\ServiceLocator;
  11. use Symfony\Component\EventDispatcher\EventDispatcher;
  12. use Symfony\Component\HttpFoundation\JsonResponse;
  13. use Symfony\Component\HttpFoundation\Request;
  14. use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
  15. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  16. use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
  17. use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
  18. use Symfony\Component\Messenger\MessageBusInterface;
  19. use Symfony\Component\Messenger\Worker;
  20. use Symfony\Component\Routing\Annotation\Route;
  21. /**
  22.  * @Route(defaults={"_routeScope"={"api"}})
  23.  */
  24. class ConsumeMessagesController extends AbstractController
  25. {
  26.     /**
  27.      * @var ServiceLocator
  28.      */
  29.     private $receiverLocator;
  30.     /**
  31.      * @var MessageBusInterface
  32.      */
  33.     private $bus;
  34.     /**
  35.      * @var int
  36.      */
  37.     private $pollInterval;
  38.     /**
  39.      * @var StopWorkerOnRestartSignalListener
  40.      */
  41.     private $stopWorkerOnRestartSignalListener;
  42.     /**
  43.      * @var StopWorkerOnSigtermSignalListener
  44.      */
  45.     private $stopWorkerOnSigtermSignalListener;
  46.     /**
  47.      * @var DispatchPcntlSignalListener
  48.      */
  49.     private $dispatchPcntlSignalListener;
  50.     /**
  51.      * @var EarlyReturnMessagesListener
  52.      */
  53.     private $earlyReturnListener;
  54.     private string $defaultTransportName;
  55.     private string $memoryLimit;
  56.     public function __construct(
  57.         ServiceLocator $receiverLocator,
  58.         MessageBusInterface $bus,
  59.         int $pollInterval,
  60.         StopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener,
  61.         StopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener,
  62.         DispatchPcntlSignalListener $dispatchPcntlSignalListener,
  63.         EarlyReturnMessagesListener $earlyReturnListener,
  64.         string $defaultTransportName,
  65.         string $memoryLimit
  66.     ) {
  67.         $this->receiverLocator $receiverLocator;
  68.         $this->bus $bus;
  69.         $this->pollInterval $pollInterval;
  70.         $this->stopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener;
  71.         $this->stopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener;
  72.         $this->dispatchPcntlSignalListener $dispatchPcntlSignalListener;
  73.         $this->earlyReturnListener $earlyReturnListener;
  74.         $this->defaultTransportName $defaultTransportName;
  75.         $this->memoryLimit $memoryLimit;
  76.     }
  77.     /**
  78.      * @Since("6.0.0.0")
  79.      * @OA\Post(
  80.      *     path="/_action/message-queue/consume",
  81.      *     summary="Consume messages from the message queue.",
  82.      *     description="This route can be used to consume messenges from the message queue. It is intended to be used if
  83. no cronjob is configured to consume messages regulary.",
  84.      *     operationId="consumeMessages",
  85.      *     tags={"Admin API", "System Operations"},
  86.      *     @OA\RequestBody(
  87.      *         required=true,
  88.      *         @OA\JsonContent(
  89.      *             required={"receiver"},
  90.      *             @OA\Property(
  91.      *                 property="receiver",
  92.      *                 description="The name of the transport in the messenger that should be processed.
  93. See the [Symfony Messenger documentation](https://symfony.com/doc/current/messenger.html) for more information",
  94.      *                 type="string"
  95.      *             )
  96.      *         )
  97.      *     ),
  98.      *     @OA\Response(
  99.      *         response="200",
  100.      *         description="Returns information about handled messages",
  101.      *         @OA\JsonContent(
  102.      *               @OA\Property(
  103.      *                  property="handledMessages",
  104.      *                  description="The number of messages processed.",
  105.      *                  type="integer"
  106.      *              )
  107.      *         )
  108.      *     )
  109.      * )
  110.      * @Route("/api/_action/message-queue/consume", name="api.action.message-queue.consume", methods={"POST"})
  111.      */
  112.     public function consumeMessages(Request $request): JsonResponse
  113.     {
  114.         $receiverName $request->get('receiver');
  115.         if (!$receiverName || !$this->receiverLocator->has($receiverName)) {
  116.             throw new \RuntimeException('No receiver name provided.');
  117.         }
  118.         $receiver $this->receiverLocator->get($receiverName);
  119.         $workerDispatcher = new EventDispatcher();
  120.         $listener = new CountHandledMessagesListener($this->pollInterval);
  121.         $workerDispatcher->addSubscriber($listener);
  122.         $workerDispatcher->addSubscriber($this->stopWorkerOnRestartSignalListener);
  123.         $workerDispatcher->addSubscriber($this->stopWorkerOnSigtermSignalListener);
  124.         $workerDispatcher->addSubscriber($this->dispatchPcntlSignalListener);
  125.         $workerDispatcher->addSubscriber($this->earlyReturnListener);
  126.         if ($this->memoryLimit !== '-1') {
  127.             $workerDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener(
  128.                 MemorySizeCalculator::convertToBytes($this->memoryLimit)
  129.             ));
  130.         }
  131.         $worker = new Worker([$this->defaultTransportName => $receiver], $this->bus$workerDispatcher);
  132.         $worker->run(['sleep' => 50]);
  133.         return $this->json(['handledMessages' => $listener->getHandledMessages()]);
  134.     }
  135. }