diff --git a/app/config/parameters.yml.dist b/app/config/parameters.yml.dist index d501131..accad42 100644 --- a/app/config/parameters.yml.dist +++ b/app/config/parameters.yml.dist @@ -6,6 +6,11 @@ parameters: database_user: point database_password: ~ + # Message Queue settings + beanstalkd_host: 'localhost' + beanstalkd_port: 11300 + beanstalkd_ws_updates_tube: 'point-websocket-updates' + mailer_transport: smtp mailer_host: 127.0.0.1 mailer_user: ~ diff --git a/app/config/services.yml b/app/config/services.yml index 21617a4..0ccf746 100644 --- a/app/config/services.yml +++ b/app/config/services.yml @@ -101,6 +101,12 @@ services: # Send message Skobkin\Bundle\PointToolsBundle\Command\TelegramSendMessageCommand: tags: [{ name: console.command }] + # WebSocket MQ processing + Skobkin\Bundle\PointToolsBundle\Command\ProcessWebsocketUpdatesCommand: + arguments: + $bsTubeName: '%beanstalkd_ws_updates_tube%' + tags: + - { name: console.command } # Entity repositories as services diff --git a/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php b/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php index b321802..3f76d60 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php @@ -2,12 +2,40 @@ namespace Skobkin\Bundle\PointToolsBundle\Command; +use JMS\Serializer\Serializer; +use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message; +use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; +/** + * This command processes WebSocket updates MQ and stores new content in the DB + */ class ProcessWebsocketUpdatesCommand extends Command { + /** @var string */ + private $bsTubeName; + + /** @var Serializer */ + private $serializer; + + /** @var WebSocketMessageProcessor */ + private $messageProcessor; + + public function __construct( + Serializer $serializer, + WebSocketMessageProcessor $processor, + string $bsTubeName + ) { + $this->serializer = $serializer; + $this->messageProcessor = $processor; + $this->bsTubeName = $bsTubeName; + + parent::__construct(); + } + + /** {@inheritdoc} */ protected function configure() { @@ -21,5 +49,14 @@ class ProcessWebsocketUpdatesCommand extends Command protected function execute(InputInterface $input, OutputInterface $output) { + foreach ($updates as $update) { + $message = $this->serializer->deserialize($update, Message::class, 'json'); + + if ($this->messageProcessor->processMessage($message)) { + // BS delete item + } else { + // BS return to queue + } + } } } diff --git a/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php b/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php new file mode 100644 index 0000000..cb45c9f --- /dev/null +++ b/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php @@ -0,0 +1,290 @@ +a; + } + + public function isComment(): bool + { + return self::TYPE_COMMENT === $this->a; + } + + public function isRecommendation(): bool + { + return self::TYPE_RECOMMENDATION === $this->a; + } + + /** + * @throws \RuntimeException + * @throws UnsupportedTypeException + */ + public function isValid(): bool + { + switch ($this->a) { + case self::TYPE_POST: + if ( + null !== $this->author && + null !== $this->html && + null !== $this->postId && + null !== $this->private && + null !== $this->tags + ) { + return true; + } + break; + + case self::TYPE_COMMENT; + if ( + null !== $this->author && + null !== $this->commentId && + null !== $this->html && + null !== $this->postId && + null !== $this->text + ) { + return true; + } + break; + + case self::TYPE_RECOMMENDATION; + if ( + null !== $this->author && + null !== $this->postId + ) { + return true; + } + break; + + case null: + throw new \RuntimeException('Message has NULL type.'); + break; + + default: + throw new UnsupportedTypeException(sprintf('Type \'%s\' is not supported.', $this->a)); + } + + return false; + } + + public function getA(): string + { + return $this->a; + } + + public function setA(string $a): void + { + $this->a = $a; + } + + public function getAuthor(): string + { + return $this->author; + } + + public function setAuthor(string $author): void + { + $this->author = $author; + } + + public function getAuthorName(): ?string + { + return $this->authorName; + } + + public function setAuthorName(?string $authorName): void + { + $this->authorName = $authorName; + } + + public function getCommentId(): ?int + { + return $this->commentId; + } + + public function setCommentId(?int $commentId): void + { + $this->commentId = $commentId; + } + + public function getCut(): ?bool + { + return $this->cut; + } + + public function setCut(?bool $cut): void + { + $this->cut = $cut; + } + + public function getFiles(): ?array + { + return $this->files; + } + + public function setFiles(?array $files): void + { + $this->files = $files; + } + + public function getHtml(): ?string + { + return $this->html; + } + + public function setHtml(?string $html): void + { + $this->html = $html; + } + + public function getLink(): ?string + { + return $this->link; + } + + public function setLink(?string $link): void + { + $this->link = $link; + } + + public function getPostId(): string + { + return $this->postId; + } + + public function setPostId(string $postId): void + { + $this->postId = $postId; + } + + public function getPrivate(): ?bool + { + return $this->private; + } + + public function setPrivate(?bool $private): void + { + $this->private = $private; + } + + public function getTags(): ?array + { + return $this->tags; + } + + public function setTags(?array $tags): void + { + $this->tags = $tags; + } + + public function getText(): string + { + return $this->text; + } + + public function setText(string $text): void + { + $this->text = $text; + } + + public function getTitle(): ?string + { + return $this->title; + } + + public function setTitle(?string $title): void + { + $this->title = $title; + } + + public function getToCommentId(): ?string + { + return $this->toCommentId; + } + + public function setToCommentId(?string $toCommentId): void + { + $this->toCommentId = $toCommentId; + } + + public function getToText(): ?string + { + return $this->toText; + } + + public function setToText(?string $toText): void + { + $this->toText = $toText; + } + + public function getToUsers(): ?array + { + return $this->toUsers; + } + + public function setToUsers(?array $toUsers): void + { + $this->toUsers = $toUsers; + } +} \ No newline at end of file diff --git a/src/Skobkin/Bundle/PointToolsBundle/Exception/WebSocket/UnsupportedTypeException.php b/src/Skobkin/Bundle/PointToolsBundle/Exception/WebSocket/UnsupportedTypeException.php new file mode 100644 index 0000000..4e3ceff --- /dev/null +++ b/src/Skobkin/Bundle/PointToolsBundle/Exception/WebSocket/UnsupportedTypeException.php @@ -0,0 +1,9 @@ + + serialized_name: 'files' + html: + type: string + serialized_name: 'html' + link: + type: string + serialized_name: 'link' + postId: + type: string + serialized_name: 'post_id' + private: + type: bool + serialized_name: 'private' + tags: + type: array + serialized_name: 'tags' + text: + type: string + serialized_name: 'text' + title: + type: string + serialized_name: 'title' + toCommentId: + type: string + serialized_name: 'to_comment_id' + toText: + type: string + serialized_name: 'to_text' + toUsers: + type: array + serialized_name: 'to_users' diff --git a/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php b/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php new file mode 100644 index 0000000..c497f03 --- /dev/null +++ b/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php @@ -0,0 +1,63 @@ +postFactory = $postFactory; + $this->commentFactory = $commentFactory; + } + + /** + * Returns true on success (all data saved) + */ + public function processMessage(Message $message): bool + { + if (!$message->isValid()) { + return false; + } + + switch (true) { + case $message->isPost(): + return $this->processPost($message); + break; + + case $message->isComment(): + return $this->processComment($message); + break; + + case $message->isRecommendation(): + return $this->processRecommendation($message); + break; + } + + return false; + } + + private function processPost(Message $postData): bool + { + + } + + private function processComment(Message $commentData): bool + { + + } + + private function processRecommendation(Message $recommendData): bool + { + + } +} \ No newline at end of file