Unfinished work on WS comment retrieval,
This commit is contained in:
parent
63b27dc312
commit
26ee4522fc
|
@ -9,6 +9,7 @@ use Doctrine\DBAL\Schema\Schema;
|
||||||
* Issue #44 - Post and Comment schema refactoring.
|
* Issue #44 - Post and Comment schema refactoring.
|
||||||
* - Post subscription status added.
|
* - Post subscription status added.
|
||||||
* - Comments parent-child relations removed.
|
* - Comments parent-child relations removed.
|
||||||
|
* - User login index
|
||||||
* - Other adjustments
|
* - Other adjustments
|
||||||
*/
|
*/
|
||||||
class Version20180427143940 extends AbstractMigration
|
class Version20180427143940 extends AbstractMigration
|
||||||
|
@ -28,6 +29,8 @@ class Version20180427143940 extends AbstractMigration
|
||||||
$this->addSql('ALTER TABLE posts.comments ALTER number TYPE INT');
|
$this->addSql('ALTER TABLE posts.comments ALTER number TYPE INT');
|
||||||
$this->addSql('ALTER TABLE posts.comments ALTER number DROP DEFAULT');
|
$this->addSql('ALTER TABLE posts.comments ALTER number DROP DEFAULT');
|
||||||
$this->addSql('CREATE UNIQUE INDEX UNIQ_6289997596901F54 ON posts.comments (number)');
|
$this->addSql('CREATE UNIQUE INDEX UNIQ_6289997596901F54 ON posts.comments (number)');
|
||||||
|
|
||||||
|
$this->addSql('CREATE INDEX idx_user_login ON users.users (login)');
|
||||||
}
|
}
|
||||||
|
|
||||||
public function down(Schema $schema)
|
public function down(Schema $schema)
|
||||||
|
@ -43,5 +46,6 @@ class Version20180427143940 extends AbstractMigration
|
||||||
$this->addSql('ALTER TABLE posts.comments ALTER number DROP DEFAULT');
|
$this->addSql('ALTER TABLE posts.comments ALTER number DROP DEFAULT');
|
||||||
$this->addSql('ALTER TABLE posts.comments ADD CONSTRAINT fk_62899975727aca70 FOREIGN KEY (parent_id) REFERENCES posts.comments (id) NOT DEFERRABLE INITIALLY IMMEDIATE');
|
$this->addSql('ALTER TABLE posts.comments ADD CONSTRAINT fk_62899975727aca70 FOREIGN KEY (parent_id) REFERENCES posts.comments (id) NOT DEFERRABLE INITIALLY IMMEDIATE');
|
||||||
$this->addSql('CREATE INDEX idx_62899975727aca70 ON posts.comments (parent_id)');
|
$this->addSql('CREATE INDEX idx_62899975727aca70 ON posts.comments (parent_id)');
|
||||||
|
$this->addSql('DROP INDEX users.idx_user_login');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,7 @@ services:
|
||||||
# WebSocket MQ processing
|
# WebSocket MQ processing
|
||||||
Skobkin\Bundle\PointToolsBundle\Command\ProcessWebsocketUpdatesCommand:
|
Skobkin\Bundle\PointToolsBundle\Command\ProcessWebsocketUpdatesCommand:
|
||||||
arguments:
|
arguments:
|
||||||
|
$bsClient: '@leezy.pheanstalk.primary'
|
||||||
$bsTubeName: '%beanstalkd_ws_updates_tube%'
|
$bsTubeName: '%beanstalkd_ws_updates_tube%'
|
||||||
tags:
|
tags:
|
||||||
- { name: console.command }
|
- { name: console.command }
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
namespace Skobkin\Bundle\PointToolsBundle\Command;
|
namespace Skobkin\Bundle\PointToolsBundle\Command;
|
||||||
|
|
||||||
use JMS\Serializer\Serializer;
|
use JMS\Serializer\Serializer;
|
||||||
|
use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy;
|
||||||
|
use Pheanstalk\Job;
|
||||||
use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message;
|
use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message;
|
||||||
use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor;
|
use Skobkin\Bundle\PointToolsBundle\Service\WebSocket\WebSocketMessageProcessor;
|
||||||
use Symfony\Component\Console\Command\Command;
|
use Symfony\Component\Console\Command\Command;
|
||||||
|
@ -14,6 +16,9 @@ use Symfony\Component\Console\Output\OutputInterface;
|
||||||
*/
|
*/
|
||||||
class ProcessWebsocketUpdatesCommand extends Command
|
class ProcessWebsocketUpdatesCommand extends Command
|
||||||
{
|
{
|
||||||
|
/** @var PheanstalkProxy */
|
||||||
|
private $bsClient;
|
||||||
|
|
||||||
/** @var string */
|
/** @var string */
|
||||||
private $bsTubeName;
|
private $bsTubeName;
|
||||||
|
|
||||||
|
@ -24,12 +29,14 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
private $messageProcessor;
|
private $messageProcessor;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
|
PheanstalkProxy $bsClient,
|
||||||
|
string $bsTubeName,
|
||||||
Serializer $serializer,
|
Serializer $serializer,
|
||||||
WebSocketMessageProcessor $processor,
|
WebSocketMessageProcessor $processor
|
||||||
string $bsTubeName
|
|
||||||
) {
|
) {
|
||||||
$this->serializer = $serializer;
|
$this->serializer = $serializer;
|
||||||
$this->messageProcessor = $processor;
|
$this->messageProcessor = $processor;
|
||||||
|
$this->bsClient = $bsClient;
|
||||||
$this->bsTubeName = $bsTubeName;
|
$this->bsTubeName = $bsTubeName;
|
||||||
|
|
||||||
parent::__construct();
|
parent::__construct();
|
||||||
|
@ -48,9 +55,19 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
/** {@inheritdoc} */
|
/** {@inheritdoc} */
|
||||||
protected function execute(InputInterface $input, OutputInterface $output)
|
protected function execute(InputInterface $input, OutputInterface $output)
|
||||||
{
|
{
|
||||||
|
/** @var Job $job */
|
||||||
|
while ($job = $this->bsClient->reserveFromTube($this->bsTubeName, 0)) {
|
||||||
|
try {
|
||||||
|
$message = $this->serializer->deserialize($job->getData(), Message::class, 'json');
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
$output->writeln(sprintf(
|
||||||
|
'Error while deserializing #%d data: \'%s\'',
|
||||||
|
$job->getId(),
|
||||||
|
$job->getData()
|
||||||
|
));
|
||||||
|
|
||||||
foreach ($updates as $update) {
|
continue;
|
||||||
$message = $this->serializer->deserialize($update, Message::class, 'json');
|
}
|
||||||
|
|
||||||
if ($this->messageProcessor->processMessage($message)) {
|
if ($this->messageProcessor->processMessage($message)) {
|
||||||
// BS delete item
|
// BS delete item
|
||||||
|
|
|
@ -12,30 +12,55 @@ class Message implements ValidableInterface
|
||||||
{
|
{
|
||||||
public const TYPE_COMMENT = 'comment';
|
public const TYPE_COMMENT = 'comment';
|
||||||
public const TYPE_POST = 'post';
|
public const TYPE_POST = 'post';
|
||||||
public const TYPE_RECOMMENDATION = 'ok';
|
public const TYPE_POST_RECOMMENDATION = 'rec';
|
||||||
|
public const TYPE_COMMENT_RECOMMENDATION = 'ok';
|
||||||
|
|
||||||
/** @var string */
|
/**
|
||||||
|
* Event type. @see Message::TYPE_* constants
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
private $a;
|
private $a;
|
||||||
|
|
||||||
/** @var string */
|
/**
|
||||||
|
* Login of the user
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
private $author;
|
private $author;
|
||||||
|
|
||||||
/** @var string|null */
|
/** @var string|null */
|
||||||
private $authorName;
|
private $authorName;
|
||||||
|
|
||||||
/** @var int|null */
|
/**
|
||||||
|
* Number of the comment in the thread
|
||||||
|
*
|
||||||
|
* @var int|null
|
||||||
|
*/
|
||||||
private $commentId;
|
private $commentId;
|
||||||
|
|
||||||
/** @var bool|null */
|
/**
|
||||||
|
* ???
|
||||||
|
*
|
||||||
|
* @var bool|null
|
||||||
|
*/
|
||||||
private $cut;
|
private $cut;
|
||||||
|
|
||||||
/** @var string[]|null */
|
/**
|
||||||
|
* Array of file paths
|
||||||
|
*
|
||||||
|
* @var string[]|null
|
||||||
|
*/
|
||||||
private $files;
|
private $files;
|
||||||
|
|
||||||
/** @var string|null */
|
/** @var string|null */
|
||||||
private $html;
|
private $html;
|
||||||
|
|
||||||
/** @var string|null */
|
/**
|
||||||
|
* @deprecated Link in the Post::type=feed posts
|
||||||
|
*
|
||||||
|
* @var string|null
|
||||||
|
*/
|
||||||
private $link;
|
private $link;
|
||||||
|
|
||||||
/** @var string */
|
/** @var string */
|
||||||
|
@ -44,22 +69,49 @@ class Message implements ValidableInterface
|
||||||
/** @var bool|null */
|
/** @var bool|null */
|
||||||
private $private;
|
private $private;
|
||||||
|
|
||||||
/** @var string[]|null */
|
/**
|
||||||
|
* Number of the comment in the thread for recommendation with text
|
||||||
|
*
|
||||||
|
* @var int|null
|
||||||
|
*/
|
||||||
|
private $rcid;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Array of tags
|
||||||
|
*
|
||||||
|
* @var string[]|null
|
||||||
|
*/
|
||||||
private $tags;
|
private $tags;
|
||||||
|
|
||||||
/** @var string */
|
/** @var string */
|
||||||
private $text;
|
private $text;
|
||||||
|
|
||||||
/** @var string|null */
|
/**
|
||||||
|
* @deprecated ???
|
||||||
|
*
|
||||||
|
* @var string|null
|
||||||
|
*/
|
||||||
private $title;
|
private $title;
|
||||||
|
|
||||||
/** @var string|null */
|
/**
|
||||||
|
* Number of the comment to which this comment is answering
|
||||||
|
*
|
||||||
|
* @var string|null
|
||||||
|
*/
|
||||||
private $toCommentId;
|
private $toCommentId;
|
||||||
|
|
||||||
/** @var string|null */
|
/**
|
||||||
|
* Text quotation of the comment to which this comment is answering
|
||||||
|
*
|
||||||
|
* @var string|null
|
||||||
|
*/
|
||||||
private $toText;
|
private $toText;
|
||||||
|
|
||||||
/** @var string[]|null */
|
/**
|
||||||
|
* Array of logins of users to which post is addressed
|
||||||
|
*
|
||||||
|
* @var string[]|null
|
||||||
|
*/
|
||||||
private $toUsers;
|
private $toUsers;
|
||||||
|
|
||||||
public function isPost(): bool
|
public function isPost(): bool
|
||||||
|
@ -72,9 +124,14 @@ class Message implements ValidableInterface
|
||||||
return self::TYPE_COMMENT === $this->a;
|
return self::TYPE_COMMENT === $this->a;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isRecommendation(): bool
|
public function isCommentRecommendation(): bool
|
||||||
{
|
{
|
||||||
return self::TYPE_RECOMMENDATION === $this->a;
|
return self::TYPE_COMMENT_RECOMMENDATION === $this->a;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function isPostRecommendation(): bool
|
||||||
|
{
|
||||||
|
return self::TYPE_POST_RECOMMENDATION === $this->a;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -108,7 +165,7 @@ class Message implements ValidableInterface
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case self::TYPE_RECOMMENDATION;
|
case self::TYPE_COMMENT_RECOMMENDATION;
|
||||||
if (
|
if (
|
||||||
null !== $this->author &&
|
null !== $this->author &&
|
||||||
null !== $this->postId
|
null !== $this->postId
|
||||||
|
@ -228,6 +285,16 @@ class Message implements ValidableInterface
|
||||||
$this->private = $private;
|
$this->private = $private;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function getRcid(): ?int
|
||||||
|
{
|
||||||
|
return $this->rcid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function setRcid(?int $rcid): void
|
||||||
|
{
|
||||||
|
$this->rcid = $rcid;
|
||||||
|
}
|
||||||
|
|
||||||
public function getTags(): ?array
|
public function getTags(): ?array
|
||||||
{
|
{
|
||||||
return $this->tags;
|
return $this->tags;
|
||||||
|
|
|
@ -7,6 +7,7 @@ use Doctrine\ORM\Mapping as ORM;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @ORM\Table(name="users", schema="users", indexes={
|
* @ORM\Table(name="users", schema="users", indexes={
|
||||||
|
* @ORM\Index(name="idx_user_login", columns={"login"}),
|
||||||
* @ORM\Index(name="idx_user_public", columns={"public"}),
|
* @ORM\Index(name="idx_user_public", columns={"public"}),
|
||||||
* @ORM\Index(name="idx_user_removed", columns={"is_removed"})
|
* @ORM\Index(name="idx_user_removed", columns={"is_removed"})
|
||||||
* })
|
* })
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
namespace Skobkin\Bundle\PointToolsBundle\Service\Factory\Blogs;
|
namespace Skobkin\Bundle\PointToolsBundle\Service\Factory\Blogs;
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Skobkin\Bundle\PointToolsBundle\DTO\Api\WebSocket\Message;
|
||||||
|
use Skobkin\Bundle\PointToolsBundle\Entity\Blogs\Comment;
|
||||||
use Skobkin\Bundle\PointToolsBundle\Repository\Blogs\{CommentRepository, PostRepository};
|
use Skobkin\Bundle\PointToolsBundle\Repository\Blogs\{CommentRepository, PostRepository};
|
||||||
use Skobkin\Bundle\PointToolsBundle\Service\Factory\{AbstractFactory, UserFactory};
|
use Skobkin\Bundle\PointToolsBundle\Service\Factory\{AbstractFactory, UserFactory};
|
||||||
|
|
||||||
|
@ -25,4 +27,20 @@ class CommentFactory extends AbstractFactory
|
||||||
$this->commentRepository = $commentRepository;
|
$this->commentRepository = $commentRepository;
|
||||||
$this->postRepository = $postRepository;
|
$this->postRepository = $postRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function findOrCreateFromWebsocketMessage(Message $message): Comment
|
||||||
|
{
|
||||||
|
if ($message->isValid()) {
|
||||||
|
throw new \InvalidArgumentException('Comment is invalid');
|
||||||
|
}
|
||||||
|
if ($message->isComment()) {
|
||||||
|
throw new \InvalidArgumentException(sprintf(
|
||||||
|
'Invalid Message object provided. %s expected, %s given',
|
||||||
|
Message::TYPE_COMMENT,
|
||||||
|
$message->getA()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -7,6 +7,7 @@ use Skobkin\Bundle\PointToolsBundle\DTO\Api\User as UserDTO;
|
||||||
use Skobkin\Bundle\PointToolsBundle\Entity\User;
|
use Skobkin\Bundle\PointToolsBundle\Entity\User;
|
||||||
use Skobkin\Bundle\PointToolsBundle\Repository\UserRepository;
|
use Skobkin\Bundle\PointToolsBundle\Repository\UserRepository;
|
||||||
use Skobkin\Bundle\PointToolsBundle\Exception\Factory\InvalidUserDataException;
|
use Skobkin\Bundle\PointToolsBundle\Exception\Factory\InvalidUserDataException;
|
||||||
|
use Skobkin\Bundle\PointToolsBundle\Service\Api\UserApi;
|
||||||
|
|
||||||
class UserFactory extends AbstractFactory
|
class UserFactory extends AbstractFactory
|
||||||
{
|
{
|
||||||
|
@ -15,11 +16,29 @@ class UserFactory extends AbstractFactory
|
||||||
/** @var UserRepository */
|
/** @var UserRepository */
|
||||||
private $userRepository;
|
private $userRepository;
|
||||||
|
|
||||||
|
/** @var UserApi */
|
||||||
|
private $userApi;
|
||||||
|
|
||||||
public function __construct(LoggerInterface $logger, UserRepository $userRepository)
|
public function __construct(LoggerInterface $logger, UserRepository $userRepository, UserApi $userApi)
|
||||||
{
|
{
|
||||||
parent::__construct($logger);
|
parent::__construct($logger);
|
||||||
$this->userRepository = $userRepository;
|
$this->userRepository = $userRepository;
|
||||||
|
$this->userApi = $userApi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function findOrCreateByLogin(string $login, bool $retrieveMissingFromApi = true): User
|
||||||
|
{
|
||||||
|
/** @var User $user */
|
||||||
|
if (null === $user = $this->userRepository->findBy(['login' => $login])) {
|
||||||
|
if ($retrieveMissingFromApi) {
|
||||||
|
$user = $this->userApi->getUserByLogin($login);
|
||||||
|
} else {
|
||||||
|
// TODO neen more specific exception
|
||||||
|
throw new \RuntimeException(sprintf('User \'%s\' not found in the database. Api retrieval disabled.', $login));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return $user;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -38,7 +38,7 @@ class WebSocketMessageProcessor
|
||||||
return $this->processComment($message);
|
return $this->processComment($message);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case $message->isRecommendation():
|
case $message->isCommentRecommendation():
|
||||||
return $this->processRecommendation($message);
|
return $this->processRecommendation($message);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue