vendor/shopware/core/Framework/MessageQueue/Middleware/RetryMiddleware.php line 41

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Middleware;
  3. use Shopware\Core\Framework\Context;
  4. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  6. use Shopware\Core\Framework\MessageQueue\DeadMessage\DeadMessageEntity;
  7. use Shopware\Core\Framework\MessageQueue\Exception\MessageFailedException;
  8. use Shopware\Core\Framework\MessageQueue\Message\RetryMessage;
  9. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  10. use Shopware\Core\Framework\MessageQueue\Stamp\DecryptedStamp;
  11. use Shopware\Core\Framework\Uuid\Uuid;
  12. use Shopware\Core\Framework\Webhook\Event\RetryWebhookMessageFailedEvent;
  13. use Shopware\Core\Framework\Webhook\Message\WebhookEventMessage;
  14. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  15. use Symfony\Component\Messenger\Envelope;
  16. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  17. use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
  18. use Symfony\Component\Messenger\Middleware\StackInterface;
  19. class RetryMiddleware implements MiddlewareInterface
  20. {
  21.     private EntityRepositoryInterface $deadMessageRepository;
  22.     private Context $context;
  23.     private EventDispatcherInterface $eventDispatcher;
  24.     public function __construct(EntityRepositoryInterface $deadMessageRepositoryEventDispatcherInterface $eventDispatcher)
  25.     {
  26.         $this->deadMessageRepository $deadMessageRepository;
  27.         $this->context Context::createDefaultContext();
  28.         $this->eventDispatcher $eventDispatcher;
  29.     }
  30.     public function handle(Envelope $envelopeStackInterface $stack): Envelope
  31.     {
  32.         try {
  33.             return $stack->next()->handle($envelope$stack);
  34.         } catch (HandlerFailedException $e) {
  35.             $deadMessage $this->getExistingDeadMessage($envelope);
  36.             $unhandledExceptions = [];
  37.             foreach ($e->getNestedExceptions() as $nestedException) {
  38.                 if (!($nestedException instanceof MessageFailedException)) {
  39.                     $unhandledExceptions[] = $nestedException;
  40.                     continue;
  41.                 }
  42.                 if ($deadMessage) {
  43.                     $this->handleExistingDeadMessage($deadMessage$nestedException);
  44.                     $this->handleRetryWebhookMessageFailed($deadMessage);
  45.                 } else {
  46.                     $this->createDeadMessageFromEnvelope($envelope$nestedException);
  47.                 }
  48.             }
  49.             if (\count($unhandledExceptions) > 0) {
  50.                 throw new HandlerFailedException($envelope$unhandledExceptions);
  51.             }
  52.         }
  53.         return $envelope;
  54.     }
  55.     private function createDeadMessageFromEnvelope(Envelope $envelopeMessageFailedException $e): void
  56.     {
  57.         $this->context->scope(Context::SYSTEM_SCOPE, function () use ($envelope$e): void {
  58.             $encrypted \count($envelope->all(DecryptedStamp::class)) > 0;
  59.             $scheduledTaskId null;
  60.             if ($envelope->getMessage() instanceof ScheduledTask) {
  61.                 $scheduledTaskId $envelope->getMessage()->getTaskId();
  62.             }
  63.             $id Uuid::randomHex();
  64.             $params = [
  65.                 'id' => $id,
  66.                 'originalMessageClass' => \get_class($envelope->getMessage()),
  67.                 'serializedOriginalMessage' => serialize($envelope->getMessage()),
  68.                 'handlerClass' => $e->getHandlerClass(),
  69.                 'encrypted' => $encrypted,
  70.                 'nextExecutionTime' => DeadMessageEntity::calculateNextExecutionTime(1),
  71.                 'exception' => \get_class($e->getException()),
  72.                 'exceptionMessage' => $e->getException()->getMessage(),
  73.                 'exceptionFile' => $e->getException()->getFile(),
  74.                 'exceptionLine' => $e->getException()->getLine(),
  75.                 'scheduledTaskId' => $scheduledTaskId,
  76.             ];
  77.             try {
  78.                 $this->deadMessageRepository->create([$params], $this->context);
  79.             } catch (\Throwable $e) {
  80.                 $params['exceptionMessage'] = ' ';
  81.                 $this->deadMessageRepository->create([$params], $this->context);
  82.             }
  83.         });
  84.     }
  85.     private function handleExistingDeadMessage(DeadMessageEntity $deadMessageMessageFailedException $e): void
  86.     {
  87.         if ($this->isExceptionEqual($deadMessage$e->getException())) {
  88.             $this->incrementErrorCount($deadMessage);
  89.             return;
  90.         }
  91.         $this->deadMessageRepository->delete([
  92.             [
  93.                 'id' => $deadMessage->getId(),
  94.             ],
  95.         ], $this->context);
  96.         $this->createDeadMessageFromExistingMessage($deadMessage$e);
  97.     }
  98.     private function isExceptionEqual(DeadMessageEntity $deadMessage\Throwable $e): bool
  99.     {
  100.         return $deadMessage->getException() === \get_class($e)
  101.             && $deadMessage->getExceptionMessage() === $e->getMessage()
  102.             && $deadMessage->getExceptionFile() === $e->getFile()
  103.             && $deadMessage->getExceptionLine() === $e->getLine();
  104.     }
  105.     private function incrementErrorCount(DeadMessageEntity $deadMessage): void
  106.     {
  107.         $this->context->scope(Context::SYSTEM_SCOPE, function () use ($deadMessage): void {
  108.             $this->deadMessageRepository->update([
  109.                 [
  110.                     'id' => $deadMessage->getId(),
  111.                     'errorCount' => $deadMessage->getErrorCount() + 1,
  112.                     'nextExecutionTime' => DeadMessageEntity::calculateNextExecutionTime($deadMessage->getErrorCount() + 1),
  113.                 ],
  114.             ], $this->context);
  115.         });
  116.     }
  117.     private function createDeadMessageFromExistingMessage(DeadMessageEntity $messageMessageFailedException $e): void
  118.     {
  119.         $this->context->scope(Context::SYSTEM_SCOPE, function () use ($message$e): void {
  120.             $id Uuid::randomHex();
  121.             $this->deadMessageRepository->create([
  122.                 [
  123.                     'id' => $id,
  124.                     'originalMessageClass' => $message->getOriginalMessageClass(),
  125.                     'serializedOriginalMessage' => serialize($message->getOriginalMessage()),
  126.                     'handlerClass' => $e->getHandlerClass(),
  127.                     'encrypted' => $message->isEncrypted(),
  128.                     'nextExecutionTime' => DeadMessageEntity::calculateNextExecutionTime(1),
  129.                     'exception' => \get_class($e->getException()),
  130.                     'exceptionMessage' => $e->getException()->getMessage(),
  131.                     'exceptionFile' => $e->getException()->getFile(),
  132.                     'exceptionLine' => $e->getException()->getLine(),
  133.                 ],
  134.             ], $this->context);
  135.         });
  136.     }
  137.     private function getExistingDeadMessage(Envelope $envelope): ?DeadMessageEntity
  138.     {
  139.         if (!($envelope->getMessage() instanceof RetryMessage)) {
  140.             return null;
  141.         }
  142.         /** @var DeadMessageEntity|null $deadMessage */
  143.         $deadMessage $this->deadMessageRepository
  144.             ->search(new Criteria([$envelope->getMessage()->getDeadMessageId()]), $this->context)
  145.             ->get($envelope->getMessage()->getDeadMessageId());
  146.         return $deadMessage;
  147.     }
  148.     private function handleRetryWebhookMessageFailed(DeadMessageEntity $deadMessage): void
  149.     {
  150.         if (!($deadMessage->getOriginalMessage() instanceof WebhookEventMessage)) {
  151.             return;
  152.         }
  153.         $this->eventDispatcher->dispatch(
  154.             new RetryWebhookMessageFailedEvent($deadMessage$this->context)
  155.         );
  156.     }
  157. }