Fixing crash on unexpected message type or deserialization error. Adding sentry logging to WS processing.
This commit is contained in:
parent
ac0d905ce4
commit
4d7549bddd
|
@ -6,6 +6,7 @@ use JMS\Serializer\Serializer;
|
||||||
use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy;
|
use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy;
|
||||||
use Pheanstalk\Job;
|
use Pheanstalk\Job;
|
||||||
use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message;
|
use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message;
|
||||||
|
use Skobkin\Bundle\PointToolsBundle\Exception\WebSocket\UnsupportedTypeException;
|
||||||
use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor;
|
use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor;
|
||||||
use Symfony\Component\Console\Command\Command;
|
use Symfony\Component\Console\Command\Command;
|
||||||
use Symfony\Component\Console\Input\{InputInterface, InputOption};
|
use Symfony\Component\Console\Input\{InputInterface, InputOption};
|
||||||
|
@ -28,12 +29,17 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
/** @var WebSocketMessageProcessor */
|
/** @var WebSocketMessageProcessor */
|
||||||
private $messageProcessor;
|
private $messageProcessor;
|
||||||
|
|
||||||
|
/** @var \Raven_Client */
|
||||||
|
private $sentryClient;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
|
\Raven_Client $raven,
|
||||||
PheanstalkProxy $bsClient,
|
PheanstalkProxy $bsClient,
|
||||||
string $bsTubeName,
|
string $bsTubeName,
|
||||||
Serializer $serializer,
|
Serializer $serializer,
|
||||||
WebSocketMessageProcessor $processor
|
WebSocketMessageProcessor $processor
|
||||||
) {
|
) {
|
||||||
|
$this->sentryClient = $raven;
|
||||||
$this->serializer = $serializer;
|
$this->serializer = $serializer;
|
||||||
$this->messageProcessor = $processor;
|
$this->messageProcessor = $processor;
|
||||||
$this->bsClient = $bsClient;
|
$this->bsClient = $bsClient;
|
||||||
|
@ -61,6 +67,7 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
/** @var Job $job */
|
/** @var Job $job */
|
||||||
while ($job = $this->bsClient->reserveFromTube($this->bsTubeName, 0)) {
|
while ($job = $this->bsClient->reserveFromTube($this->bsTubeName, 0)) {
|
||||||
try {
|
try {
|
||||||
|
/** @var Message $message */
|
||||||
$message = $this->serializer->deserialize($job->getData(), Message::class, 'json');
|
$message = $this->serializer->deserialize($job->getData(), Message::class, 'json');
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$output->writeln(sprintf(
|
$output->writeln(sprintf(
|
||||||
|
@ -68,20 +75,33 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
$job->getId(),
|
$job->getId(),
|
||||||
$job->getData()
|
$job->getData()
|
||||||
));
|
));
|
||||||
|
$this->sentryClient->captureException($e);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
$output->writeln('Processing job #'.$job->getId());
|
$output->writeln('Processing job #'.$job->getId().' ('.$message->getA().')');
|
||||||
|
|
||||||
if ($this->messageProcessor->processMessage($message)) {
|
try {
|
||||||
if ($keepJobs) {
|
if ($this->messageProcessor->processMessage($message)) {
|
||||||
$this->bsClient->release($job);
|
if ($keepJobs) {
|
||||||
|
|
||||||
|
} else {
|
||||||
|
$this->bsClient->delete($job);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
$this->bsClient->delete($job);
|
|
||||||
}
|
}
|
||||||
} else {
|
} catch (UnsupportedTypeException $e) {
|
||||||
$this->bsClient->release($job);
|
$output->writeln('Unsupported message type: '.$message->getA());
|
||||||
|
$this->sentryClient->captureException($e);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
$output->writeln('Message processing error: '.$e->getMessage());
|
||||||
|
$this->sentryClient->captureException($e);
|
||||||
|
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,16 +47,16 @@ class WebSocketMessageProcessor
|
||||||
|
|
||||||
private function processPost(Message $postData): bool
|
private function processPost(Message $postData): bool
|
||||||
{
|
{
|
||||||
// @todo implement
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function processComment(Message $commentData): bool
|
private function processComment(Message $commentData): bool
|
||||||
{
|
{
|
||||||
// @todo implement
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function processRecommendation(Message $recommendData): bool
|
private function processRecommendation(Message $recommendData): bool
|
||||||
{
|
{
|
||||||
// @todo implement
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in a new issue