From 26ee4522fcf9ae417626e0dc57be342b06b66794 Mon Sep 17 00:00:00 2001 From: Alexey Skobkin Date: Sat, 19 Jan 2019 16:27:16 +0300 Subject: [PATCH] Unfinished work on WS comment retrieval, --- .../Version20180427143940.php | 4 + app/config/services.yml | 1 + .../ProcessWebsocketUpdatesCommand.php | 25 ++++- .../DTO/Api/WebSocket/Message.php | 97 ++++++++++++++++--- .../Bundle/PointToolsBundle/Entity/User.php | 1 + .../Service/Factory/Blogs/CommentFactory.php | 18 ++++ .../Service/Factory/UserFactory.php | 21 +++- .../WebSocket/WebSocketMessageProcessor.php | 2 +- 8 files changed, 148 insertions(+), 21 deletions(-) diff --git a/app/DoctrineMigrations/Version20180427143940.php b/app/DoctrineMigrations/Version20180427143940.php index 795184e..35d4306 100644 --- a/app/DoctrineMigrations/Version20180427143940.php +++ b/app/DoctrineMigrations/Version20180427143940.php @@ -9,6 +9,7 @@ use Doctrine\DBAL\Schema\Schema; * Issue #44 - Post and Comment schema refactoring. * - Post subscription status added. * - Comments parent-child relations removed. + * - User login index * - Other adjustments */ class Version20180427143940 extends AbstractMigration @@ -28,6 +29,8 @@ class Version20180427143940 extends AbstractMigration $this->addSql('ALTER TABLE posts.comments ALTER number TYPE INT'); $this->addSql('ALTER TABLE posts.comments ALTER number DROP DEFAULT'); $this->addSql('CREATE UNIQUE INDEX UNIQ_6289997596901F54 ON posts.comments (number)'); + + $this->addSql('CREATE INDEX idx_user_login ON users.users (login)'); } public function down(Schema $schema) @@ -43,5 +46,6 @@ class Version20180427143940 extends AbstractMigration $this->addSql('ALTER TABLE posts.comments ALTER number DROP DEFAULT'); $this->addSql('ALTER TABLE posts.comments ADD CONSTRAINT fk_62899975727aca70 FOREIGN KEY (parent_id) REFERENCES posts.comments (id) NOT DEFERRABLE INITIALLY IMMEDIATE'); $this->addSql('CREATE INDEX idx_62899975727aca70 ON posts.comments (parent_id)'); + $this->addSql('DROP INDEX users.idx_user_login'); } } diff --git a/app/config/services.yml b/app/config/services.yml index 0ccf746..57d163d 100644 --- a/app/config/services.yml +++ b/app/config/services.yml @@ -104,6 +104,7 @@ services: # WebSocket MQ processing Skobkin\Bundle\PointToolsBundle\Command\ProcessWebsocketUpdatesCommand: arguments: + $bsClient: '@leezy.pheanstalk.primary' $bsTubeName: '%beanstalkd_ws_updates_tube%' tags: - { name: console.command } diff --git a/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php b/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php index 3f76d60..85c5703 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Command/ProcessWebsocketUpdatesCommand.php @@ -3,6 +3,8 @@ namespace Skobkin\Bundle\PointToolsBundle\Command; use JMS\Serializer\Serializer; +use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy; +use Pheanstalk\Job; use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message; use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor; use Symfony\Component\Console\Command\Command; @@ -14,6 +16,9 @@ use Symfony\Component\Console\Output\OutputInterface; */ class ProcessWebsocketUpdatesCommand extends Command { + /** @var PheanstalkProxy */ + private $bsClient; + /** @var string */ private $bsTubeName; @@ -24,12 +29,14 @@ class ProcessWebsocketUpdatesCommand extends Command private $messageProcessor; public function __construct( + PheanstalkProxy $bsClient, + string $bsTubeName, Serializer $serializer, - WebSocketMessageProcessor $processor, - string $bsTubeName + WebSocketMessageProcessor $processor ) { $this->serializer = $serializer; $this->messageProcessor = $processor; + $this->bsClient = $bsClient; $this->bsTubeName = $bsTubeName; parent::__construct(); @@ -48,9 +55,19 @@ class ProcessWebsocketUpdatesCommand extends Command /** {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { + /** @var Job $job */ + while ($job = $this->bsClient->reserveFromTube($this->bsTubeName, 0)) { + try { + $message = $this->serializer->deserialize($job->getData(), Message::class, 'json'); + } catch (\Exception $e) { + $output->writeln(sprintf( + 'Error while deserializing #%d data: \'%s\'', + $job->getId(), + $job->getData() + )); - foreach ($updates as $update) { - $message = $this->serializer->deserialize($update, Message::class, 'json'); + continue; + } if ($this->messageProcessor->processMessage($message)) { // BS delete item diff --git a/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php b/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php index cb45c9f..1eae7d4 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php +++ b/src/Skobkin/Bundle/PointToolsBundle/DTO/Api/WebSocket/Message.php @@ -12,30 +12,55 @@ class Message implements ValidableInterface { public const TYPE_COMMENT = 'comment'; public const TYPE_POST = 'post'; - public const TYPE_RECOMMENDATION = 'ok'; + public const TYPE_POST_RECOMMENDATION = 'rec'; + public const TYPE_COMMENT_RECOMMENDATION = 'ok'; - /** @var string */ + /** + * Event type. @see Message::TYPE_* constants + * + * @var string + */ private $a; - /** @var string */ + /** + * Login of the user + * + * @var string + */ private $author; /** @var string|null */ private $authorName; - /** @var int|null */ + /** + * Number of the comment in the thread + * + * @var int|null + */ private $commentId; - /** @var bool|null */ + /** + * ??? + * + * @var bool|null + */ private $cut; - /** @var string[]|null */ + /** + * Array of file paths + * + * @var string[]|null + */ private $files; /** @var string|null */ private $html; - /** @var string|null */ + /** + * @deprecated Link in the Post::type=feed posts + * + * @var string|null + */ private $link; /** @var string */ @@ -44,22 +69,49 @@ class Message implements ValidableInterface /** @var bool|null */ private $private; - /** @var string[]|null */ + /** + * Number of the comment in the thread for recommendation with text + * + * @var int|null + */ + private $rcid; + + /** + * Array of tags + * + * @var string[]|null + */ private $tags; /** @var string */ private $text; - /** @var string|null */ + /** + * @deprecated ??? + * + * @var string|null + */ private $title; - /** @var string|null */ + /** + * Number of the comment to which this comment is answering + * + * @var string|null + */ private $toCommentId; - /** @var string|null */ + /** + * Text quotation of the comment to which this comment is answering + * + * @var string|null + */ private $toText; - /** @var string[]|null */ + /** + * Array of logins of users to which post is addressed + * + * @var string[]|null + */ private $toUsers; public function isPost(): bool @@ -72,9 +124,14 @@ class Message implements ValidableInterface return self::TYPE_COMMENT === $this->a; } - public function isRecommendation(): bool + public function isCommentRecommendation(): bool { - return self::TYPE_RECOMMENDATION === $this->a; + return self::TYPE_COMMENT_RECOMMENDATION === $this->a; + } + + public function isPostRecommendation(): bool + { + return self::TYPE_POST_RECOMMENDATION === $this->a; } /** @@ -108,7 +165,7 @@ class Message implements ValidableInterface } break; - case self::TYPE_RECOMMENDATION; + case self::TYPE_COMMENT_RECOMMENDATION; if ( null !== $this->author && null !== $this->postId @@ -228,6 +285,16 @@ class Message implements ValidableInterface $this->private = $private; } + public function getRcid(): ?int + { + return $this->rcid; + } + + public function setRcid(?int $rcid): void + { + $this->rcid = $rcid; + } + public function getTags(): ?array { return $this->tags; diff --git a/src/Skobkin/Bundle/PointToolsBundle/Entity/User.php b/src/Skobkin/Bundle/PointToolsBundle/Entity/User.php index 1b7dc0a..5b67a9e 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Entity/User.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Entity/User.php @@ -7,6 +7,7 @@ use Doctrine\ORM\Mapping as ORM; /** * @ORM\Table(name="users", schema="users", indexes={ + * @ORM\Index(name="idx_user_login", columns={"login"}), * @ORM\Index(name="idx_user_public", columns={"public"}), * @ORM\Index(name="idx_user_removed", columns={"is_removed"}) * }) diff --git a/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/Blogs/CommentFactory.php b/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/Blogs/CommentFactory.php index 8223692..33b294d 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/Blogs/CommentFactory.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/Blogs/CommentFactory.php @@ -3,6 +3,8 @@ namespace Skobkin\Bundle\PointToolsBundle\Service\Factory\Blogs; use Psr\Log\LoggerInterface; +use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message; +use Skobkin\Bundle\PointToolsBundle\Entity\Blogs\Comment; use Skobkin\Bundle\PointToolsBundle\Repository\Blogs\{CommentRepository, PostRepository}; use Skobkin\Bundle\PointToolsBundle\Service\Factory\{AbstractFactory, UserFactory}; @@ -25,4 +27,20 @@ class CommentFactory extends AbstractFactory $this->commentRepository = $commentRepository; $this->postRepository = $postRepository; } + + public function findOrCreateFromWebsocketMessage(Message $message): Comment + { + if ($message->isValid()) { + throw new \InvalidArgumentException('Comment is invalid'); + } + if ($message->isComment()) { + throw new \InvalidArgumentException(sprintf( + 'Invalid Message object provided. %s expected, %s given', + Message::TYPE_COMMENT, + $message->getA() + )); + } + + + } } \ No newline at end of file diff --git a/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/UserFactory.php b/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/UserFactory.php index 63f7762..016b86a 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/UserFactory.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Service/Factory/UserFactory.php @@ -7,6 +7,7 @@ use Skobkin\Bundle\PointToolsBundle\DTO\Api\User as UserDTO; use Skobkin\Bundle\PointToolsBundle\Entity\User; use Skobkin\Bundle\PointToolsBundle\Repository\UserRepository; use Skobkin\Bundle\PointToolsBundle\Exception\Factory\InvalidUserDataException; +use Skobkin\Bundle\PointToolsBundle\Service\Api\UserApi; class UserFactory extends AbstractFactory { @@ -15,11 +16,29 @@ class UserFactory extends AbstractFactory /** @var UserRepository */ private $userRepository; + /** @var UserApi */ + private $userApi; - public function __construct(LoggerInterface $logger, UserRepository $userRepository) + public function __construct(LoggerInterface $logger, UserRepository $userRepository, UserApi $userApi) { parent::__construct($logger); $this->userRepository = $userRepository; + $this->userApi = $userApi; + } + + public function findOrCreateByLogin(string $login, bool $retrieveMissingFromApi = true): User + { + /** @var User $user */ + if (null === $user = $this->userRepository->findBy(['login' => $login])) { + if ($retrieveMissingFromApi) { + $user = $this->userApi->getUserByLogin($login); + } else { + // TODO neen more specific exception + throw new \RuntimeException(sprintf('User \'%s\' not found in the database. Api retrieval disabled.', $login)); + } + } + + return $user; } /** diff --git a/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php b/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php index c497f03..ccbd56e 100644 --- a/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php +++ b/src/Skobkin/Bundle/PointToolsBundle/Service/WebSocket/WebSocketMessageProcessor.php @@ -38,7 +38,7 @@ class WebSocketMessageProcessor return $this->processComment($message); break; - case $message->isRecommendation(): + case $message->isCommentRecommendation(): return $this->processRecommendation($message); break; }