vendor/shopware/core/Framework/MessageQueue/Monitoring/MonitoringBusDecorator.php line 37

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Monitoring;
  3. use Shopware\Core\Framework\Increment\Exception\IncrementGatewayNotFoundException;
  4. use Shopware\Core\Framework\Increment\IncrementGatewayRegistry;
  5. use Symfony\Component\Messenger\Envelope;
  6. use Symfony\Component\Messenger\MessageBusInterface;
  7. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  8. use Symfony\Component\Messenger\Stamp\SentStamp;
  9. class MonitoringBusDecorator implements MessageBusInterface
  10. {
  11.     private MessageBusInterface $innerBus;
  12.     private string $defaultTransportName;
  13.     private IncrementGatewayRegistry $gatewayRegistry;
  14.     public function __construct(
  15.         MessageBusInterface $inner,
  16.         string $defaultTransportName,
  17.         IncrementGatewayRegistry $gatewayRegistry
  18.     ) {
  19.         $this->innerBus $inner;
  20.         $this->defaultTransportName $defaultTransportName;
  21.         $this->gatewayRegistry $gatewayRegistry;
  22.     }
  23.     /**
  24.      * Dispatches the given message to the inner Bus and Logs it.
  25.      *
  26.      * @param object|Envelope $message
  27.      */
  28.     public function dispatch($message, array $stamps = []): Envelope
  29.     {
  30.         $message $this->innerBus->dispatch(Envelope::wrap($message$stamps), $stamps);
  31.         if ($this->wasSentToDefaultTransport($message)) {
  32.             $this->incrementMessageQueueSize($message);
  33.         }
  34.         if ($this->wasReceivedByDefaultTransport($message)) {
  35.             $this->decrementMessageQueueSize($message);
  36.         }
  37.         return $message;
  38.     }
  39.     private function incrementMessageQueueSize(Envelope $message): void
  40.     {
  41.         try {
  42.             $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  43.         } catch (IncrementGatewayNotFoundException $exception) {
  44.             // In case message_queue pool is disabled
  45.             return;
  46.         }
  47.         $gateway->increment('message_queue_stats'\get_class($message->getMessage()));
  48.     }
  49.     private function decrementMessageQueueSize(Envelope $message): void
  50.     {
  51.         try {
  52.             $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  53.         } catch (IncrementGatewayNotFoundException $exception) {
  54.             // In case message_queue pool is disabled
  55.             return;
  56.         }
  57.         $gateway->decrement('message_queue_stats'\get_class($message->getMessage()));
  58.     }
  59.     private function wasSentToDefaultTransport(Envelope $message): bool
  60.     {
  61.         foreach ($message->all(SentStamp::class) as $stamp) {
  62.             if ($stamp instanceof SentStamp && $stamp->getSenderAlias() === $this->defaultTransportName) {
  63.                 return true;
  64.             }
  65.         }
  66.         return false;
  67.     }
  68.     private function wasReceivedByDefaultTransport(Envelope $message): bool
  69.     {
  70.         foreach ($message->all(ReceivedStamp::class) as $stamp) {
  71.             if ($stamp instanceof ReceivedStamp && $stamp->getTransportName() === $this->defaultTransportName) {
  72.                 return true;
  73.             }
  74.         }
  75.         return false;
  76.     }
  77. }