WS 'post' processing draft.
This commit is contained in:
parent
dbc1c060f8
commit
a12bf9d9a2
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
namespace Skobkin\Bundle\PointToolsBundle\Command;
|
namespace Skobkin\Bundle\PointToolsBundle\Command;
|
||||||
|
|
||||||
|
use Doctrine\ORM\EntityManagerInterface;
|
||||||
use JMS\Serializer\Serializer;
|
use JMS\Serializer\Serializer;
|
||||||
use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy;
|
use Leezy\PheanstalkBundle\Proxy\PheanstalkProxy;
|
||||||
use Pheanstalk\Job;
|
use Pheanstalk\Job;
|
||||||
|
@ -17,6 +18,9 @@ use Symfony\Component\Console\Output\OutputInterface;
|
||||||
*/
|
*/
|
||||||
class ProcessWebsocketUpdatesCommand extends Command
|
class ProcessWebsocketUpdatesCommand extends Command
|
||||||
{
|
{
|
||||||
|
/** @var EntityManagerInterface */
|
||||||
|
private $em;
|
||||||
|
|
||||||
/** @var PheanstalkProxy */
|
/** @var PheanstalkProxy */
|
||||||
private $bsClient;
|
private $bsClient;
|
||||||
|
|
||||||
|
@ -33,12 +37,14 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
private $sentryClient;
|
private $sentryClient;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
|
EntityManagerInterface $em,
|
||||||
\Raven_Client $raven,
|
\Raven_Client $raven,
|
||||||
PheanstalkProxy $bsClient,
|
PheanstalkProxy $bsClient,
|
||||||
string $bsTubeName,
|
string $bsTubeName,
|
||||||
Serializer $serializer,
|
Serializer $serializer,
|
||||||
WebSocketMessageProcessor $processor
|
WebSocketMessageProcessor $processor
|
||||||
) {
|
) {
|
||||||
|
$this->em = $em;
|
||||||
$this->sentryClient = $raven;
|
$this->sentryClient = $raven;
|
||||||
$this->serializer = $serializer;
|
$this->serializer = $serializer;
|
||||||
$this->messageProcessor = $processor;
|
$this->messageProcessor = $processor;
|
||||||
|
@ -84,17 +90,19 @@ class ProcessWebsocketUpdatesCommand extends Command
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if ($this->messageProcessor->processMessage($message)) {
|
if ($this->messageProcessor->processMessage($message)) {
|
||||||
|
$this->em->flush();
|
||||||
|
|
||||||
if (!$keepJobs) {
|
if (!$keepJobs) {
|
||||||
$this->bsClient->delete($job);
|
$this->bsClient->delete($job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (UnsupportedTypeException $e) {
|
} catch (UnsupportedTypeException $e) {
|
||||||
$output->writeln('Unsupported message type: '.$message->getA());
|
$output->writeln(' Unsupported message type: '.$message->getA());
|
||||||
$this->sentryClient->captureException($e);
|
$this->sentryClient->captureException($e);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$output->writeln('Message processing error: '.$e->getMessage());
|
$output->writeln(' Message processing error: '.$e->getMessage());
|
||||||
$this->sentryClient->captureException($e);
|
$this->sentryClient->captureException($e);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -295,7 +295,6 @@ class Message implements ValidableInterface
|
||||||
return $this->hasCommonMandatoryData() && (
|
return $this->hasCommonMandatoryData() && (
|
||||||
// Text can be empty ("") though
|
// Text can be empty ("") though
|
||||||
null !== $this->text &&
|
null !== $this->text &&
|
||||||
null !== $this->private &&
|
|
||||||
null !== $this->tags
|
null !== $this->tags
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,7 @@ class Post
|
||||||
private $comments;
|
private $comments;
|
||||||
|
|
||||||
|
|
||||||
public function __construct(string $id, User $author, \DateTime $createdAt, string $type)
|
public function __construct(string $id, User $author, \DateTime $createdAt, string $type = self::TYPE_POST)
|
||||||
{
|
{
|
||||||
$this->id = $id;
|
$this->id = $id;
|
||||||
$this->author = $author;
|
$this->author = $author;
|
||||||
|
|
|
@ -191,15 +191,24 @@ class User
|
||||||
return $this->createdAt;
|
return $this->createdAt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function updateCreatedAt(\DateTime $date): self
|
||||||
|
{
|
||||||
|
$this->createdAt = $date;
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
public function getUpdatedAt(): ?\DateTime
|
public function getUpdatedAt(): ?\DateTime
|
||||||
{
|
{
|
||||||
return $this->updatedAt;
|
return $this->updatedAt;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function updatePrivacy(?bool $public, ?bool $whitelistOnly): void
|
public function updatePrivacy(bool $public, bool $whitelistOnly): self
|
||||||
{
|
{
|
||||||
$this->public = $public;
|
$this->public = $public;
|
||||||
$this->whitelistOnly = $whitelistOnly;
|
$this->whitelistOnly = $whitelistOnly;
|
||||||
|
|
||||||
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isPublic(): ?bool
|
public function isPublic(): ?bool
|
||||||
|
|
|
@ -151,7 +151,11 @@ class PostFactory extends AbstractFactory
|
||||||
if (null === $post = $this->postRepository->find($message->getPostId())) {
|
if (null === $post = $this->postRepository->find($message->getPostId())) {
|
||||||
/** @var User $author */
|
/** @var User $author */
|
||||||
if (null === $author = $this->userRepository->find($message->getAuthorId())) {
|
if (null === $author = $this->userRepository->find($message->getAuthorId())) {
|
||||||
// @todo create user
|
$author = $this->userFactory->findOrCreateFromIdLoginAndName(
|
||||||
|
$message->getAuthorId(),
|
||||||
|
$message->getAuthor(),
|
||||||
|
$message->getAuthorName()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
$post = new Post(
|
$post = new Post(
|
||||||
|
@ -163,7 +167,26 @@ class PostFactory extends AbstractFactory
|
||||||
$this->postRepository->add($post);
|
$this->postRepository->add($post);
|
||||||
}
|
}
|
||||||
|
|
||||||
$post->setText($message->getText());
|
$post
|
||||||
|
->setText($message->getText())
|
||||||
|
->setPrivate((bool) $message->getPrivate())
|
||||||
|
;
|
||||||
|
|
||||||
|
try {
|
||||||
|
$this->updatePostTags($post, $message->getTags() ?: []);
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
$this->logger->error('Error while updating post tags');
|
||||||
|
throw $e;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$this->updatePostFiles($post, $message->getFiles() ?: []);
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
$this->logger->error('Error while updating post files');
|
||||||
|
throw $e;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $post;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function findOrCreateFromApiDto(ApiPost $postData, User $author): Post
|
private function findOrCreateFromApiDto(ApiPost $postData, User $author): Post
|
||||||
|
@ -181,7 +204,7 @@ class PostFactory extends AbstractFactory
|
||||||
|
|
||||||
$post
|
$post
|
||||||
->setText($postData->getText())
|
->setText($postData->getText())
|
||||||
->setPrivate($postData->getPrivate())
|
->setPrivate((bool) $postData->getPrivate())
|
||||||
;
|
;
|
||||||
|
|
||||||
return $post;
|
return $post;
|
||||||
|
|
|
@ -20,27 +20,25 @@ class UserFactory extends AbstractFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param UserDTO $userData
|
|
||||||
*
|
|
||||||
* @return User
|
|
||||||
*
|
|
||||||
* @throws InvalidUserDataException
|
* @throws InvalidUserDataException
|
||||||
*/
|
*/
|
||||||
public function findOrCreateFromDTO(UserDTO $userData): User
|
public function findOrCreateFromDTO(UserDTO $userData): User
|
||||||
{
|
{
|
||||||
// @todo LOG
|
|
||||||
|
|
||||||
if (!$userData->isValid()) {
|
if (!$userData->isValid()) {
|
||||||
throw new InvalidUserDataException('Invalid user data', $userData);
|
throw new InvalidUserDataException('Invalid user data', $userData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$createdAt = \DateTime::createFromFormat(self::DATE_FORMAT, $userData->getCreated()) ?: new \DateTime();
|
||||||
|
|
||||||
/** @var User $user */
|
/** @var User $user */
|
||||||
if (null === ($user = $this->userRepository->find($userData->getId()))) {
|
if (null === ($user = $this->userRepository->find($userData->getId()))) {
|
||||||
$user = new User(
|
$user = new User(
|
||||||
$userData->getId(),
|
$userData->getId(),
|
||||||
\DateTime::createFromFormat(self::DATE_FORMAT, $userData->getCreated()) ?: new \DateTime()
|
$createdAt
|
||||||
);
|
);
|
||||||
$this->userRepository->add($user);
|
$this->userRepository->add($user);
|
||||||
|
} else {
|
||||||
|
$user->updateCreatedAt($createdAt);
|
||||||
}
|
}
|
||||||
|
|
||||||
$user->updateLoginAndName($userData->getLogin(), $userData->getName());
|
$user->updateLoginAndName($userData->getLogin(), $userData->getName());
|
||||||
|
@ -52,6 +50,17 @@ class UserFactory extends AbstractFactory
|
||||||
return $user;
|
return $user;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function findOrCreateFromIdLoginAndName(int $id, string $login, ?string $name): User
|
||||||
|
{
|
||||||
|
if (null === $user = $this->userRepository->find($id)) {
|
||||||
|
// We're using current date now but next time when we'll be updating user from API it'll be fixed
|
||||||
|
$user = new User($id, new \DateTime(), $login, $name);
|
||||||
|
$this->userRepository->add($user);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $user;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return User[]
|
* @return User[]
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -47,7 +47,9 @@ class WebSocketMessageProcessor
|
||||||
|
|
||||||
private function processPost(Message $postData): bool
|
private function processPost(Message $postData): bool
|
||||||
{
|
{
|
||||||
return false;
|
$this->postFactory->findOrCreateFromWebsocketDto($postData);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function processComment(Message $commentData): bool
|
private function processComment(Message $commentData): bool
|
||||||
|
|
Loading…
Reference in a new issue