From 4d7549bddd1923f52ce2b379206714033f395567 Mon Sep 17 00:00:00 2001 From: Alexey Skobkin Date: Mon, 25 Feb 2019 20:01:47 +0300 Subject: [PATCH] Fixing crash on unexpected message type or deserialization error. Adding sentry logging to WS processing. --- .../ProcessWebsocketUpdatesCommand.php | 34 +++++++++++++++---- .../WebSocket/WebSocketMessageProcessor.php | 6 ++-- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php b/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php index 1b45cc8..94c6433 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php @@ -6,6 +6,7 @@ use JMS\Serializer\Serializer; use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy; use Pheanstalk\Job; use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message; +use Skobkin\Bundle\PointToolsBundle\Exception\WebSocket\UnsupportedTypeException; use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\{InputInterface, InputOption}; @@ -28,12 +29,17 @@ class ProcessWebsocketUpdatesCommand extends Command /** @var WebSocketMessageProcessor */ private $messageProcessor; + /** @var \Raven_Client */ + private $sentryClient; + public function __construct( + \Raven_Client $raven, PheanstalkProxy $bsClient, string $bsTubeName, Serializer $serializer, WebSocketMessageProcessor $processor ) { + $this->sentryClient = $raven; $this->serializer = $serializer; $this->messageProcessor = $processor; $this->bsClient = $bsClient; @@ -61,6 +67,7 @@ class ProcessWebsocketUpdatesCommand extends Command /** @var Job $job */ while ($job = $this->bsClient->reserveFromTube($this->bsTubeName, 0)) { try { + /** @var Message $message */ $message = $this->serializer->deserialize($job->getData(), Message::class, 'json'); } catch (\Exception $e) { $output->writeln(sprintf( @@ -68,20 +75,33 @@ class ProcessWebsocketUpdatesCommand extends Command $job->getId(), $job->getData() )); + $this->sentryClient->captureException($e); continue; } - $output->writeln('Processing job #'.$job->getId()); + $output->writeln('Processing job #'.$job->getId().' ('.$message->getA().')'); - if ($this->messageProcessor->processMessage($message)) { - if ($keepJobs) { - $this->bsClient->release($job); + try { + if ($this->messageProcessor->processMessage($message)) { + if ($keepJobs) { + + } else { + $this->bsClient->delete($job); + } } else { - $this->bsClient->delete($job); + } - } else { - $this->bsClient->release($job); + } catch (UnsupportedTypeException $e) { + $output->writeln('Unsupported message type: '.$message->getA()); + $this->sentryClient->captureException($e); + + continue; + } catch (\Exception $e) { + $output->writeln('Message processing error: '.$e->getMessage()); + $this->sentryClient->captureException($e); + + continue; } } } diff --git a/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php b/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php index 6a88a92..faea402 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php @@ -47,16 +47,16 @@ class WebSocketMessageProcessor private function processPost(Message $postData): bool { - // @todo implement + return false; } private function processComment(Message $commentData): bool { - // @todo implement + return false; } private function processRecommendation(Message $recommendData): bool { - // @todo implement + return false; } } \ No newline at end of file