UpdateSubscriptionsCommand refactored and simplified. API delay moved to parameters.yml.

This commit is contained in:
Alexey Skobkin 2017-01-09 05:22:56 +03:00
parent fa76440fa2
commit a6ac3757b6
3 changed files with 187 additions and 91 deletions

View file

@ -13,6 +13,8 @@ parameters:
point_base_url: https://point.im/ point_base_url: https://point.im/
point_api_base_url: https://point.im/api/ point_api_base_url: https://point.im/api/
# Delay between calls when updating users subscribers (in microseconds)
point_api_delay: 500000
point_use_https: true point_use_https: true
point_login: point-tools point_login: point-tools
point_id: 435 point_id: 435

View file

@ -2,17 +2,81 @@
namespace Skobkin\Bundle\PointToolsBundle\Command; namespace Skobkin\Bundle\PointToolsBundle\Command;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Skobkin\Bundle\PointToolsBundle\Entity\Subscription;
use Skobkin\Bundle\PointToolsBundle\Entity\User;
use Skobkin\Bundle\PointToolsBundle\Repository\UserRepository;
use Skobkin\Bundle\PointToolsBundle\Service\SubscriptionsManager; use Skobkin\Bundle\PointToolsBundle\Service\SubscriptionsManager;
use Skobkin\Bundle\PointToolsBundle\Service\UserApi; use Skobkin\Bundle\PointToolsBundle\Service\UserApi;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\Input;
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\Output;
use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Output\OutputInterface;
class UpdateSubscriptionsCommand extends ContainerAwareCommand class UpdateSubscriptionsCommand extends ContainerAwareCommand
{ {
/**
* @var EntityManagerInterface
*/
private $em;
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var UserRepository
*/
private $userRepo;
/**
* @var InputInterface
*/
private $input;
/**
* @var UserApi
*/
private $api;
/**
* @var int
*/
private $apiDelay = 500000;
/**
* @var SubscriptionsManager
*/
private $subscriptionManager;
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function setEntityManager(EntityManagerInterface $em)
{
$this->em = $em;
}
public function setApiClient(UserApi $userApi)
{
$this->api = $userApi;
}
public function setApiDelay(int $microSecs)
{
$this->apiDelay = $microSecs;
}
public function setSubscriptionManager(SubscriptionsManager $subscriptionsManager)
{
$this->subscriptionManager = $subscriptionsManager;
}
protected function configure() protected function configure()
{ {
$this $this
@ -42,139 +106,156 @@ class UpdateSubscriptionsCommand extends ContainerAwareCommand
*/ */
protected function execute(InputInterface $input, OutputInterface $output) protected function execute(InputInterface $input, OutputInterface $output)
{ {
$log = $this->getContainer()->get('logger'); $this->input = $input;
$em = $this->getContainer()->get('doctrine.orm.entity_manager'); $this->userRepo = $this->em->getRepository('SkobkinPointToolsBundle:User');
$userRepository = $em->getRepository('SkobkinPointToolsBundle:User');
$log->debug('UpdateSubscriptionsCommand started.'); $this->logger->debug('UpdateSubscriptionsCommand started.');
/** @var UserApi $api */
$api = $this->getContainer()->get('app.point.api_user');
/** @var SubscriptionsManager $subscriptionsManager */
$subscriptionsManager = $this->getContainer()->get('app.point.subscriptions_manager');
try { try {
$serviceUserId = $this->getContainer()->getParameter('point_id'); $appUserId = $this->getContainer()->getParameter('point_id');
} catch (\InvalidArgumentException $e) { } catch (\InvalidArgumentException $e) {
$log->alert('Could not get point_id parameter from config file', ['exception_message' => $e->getMessage()]); $this->logger->alert('Could not get point_id parameter from config file', ['exception_message' => $e->getMessage()]);
return 1; return 1;
} }
// Beginning transaction for all changes // Beginning transaction for all changes
$em->beginTransaction(); $this->em->beginTransaction();
if ($input->getOption('all-users')) { try {
$usersForUpdate = $userRepository->findAll(); $usersForUpdate = $this->getUsersForUpdate($appUserId);
} else { } catch (\Exception $e) {
$serviceUser = $userRepository->find($serviceUserId); $this->logger->error('Error while getting service subscribers', ['exception' => get_class($e), 'message' => $e->getMessage()]);
if (!$serviceUser) {
$log->info('Service user not found');
// @todo Retrieving user
return 1; return 1;
} }
$log->info('Getting service subscribers'); if (0 === count($usersForUpdate)) {
$this->logger->info('No local subscribers. Finishing.');
try {
$usersForUpdate = $api->getUserSubscribersById($serviceUserId);
} catch (\Exception $e) {
$log->warning(
'Error while getting service subscribers. Fallback to local list.',
[
'user_login' => $serviceUser->getLogin(),
'user_id' => $serviceUser->getId(),
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine()
]
);
$usersForUpdate = [];
foreach ($serviceUser->getSubscribers() as $subscription) {
$usersForUpdate[] = $subscription->getSubscriber();
}
if (!count($usersForUpdate)) {
$log->info('No local subscribers. Finishing.');
return 0; return 0;
} }
$this->logger->info('Processing users subscribers');
$this->updateUsersSubscribers($usersForUpdate);
// Flushing all changes at once to database
$this->em->flush();
$this->em->commit();
$this->logger->debug('Finished');
return 0;
} }
$log->debug('Updating service subscribers'); /**
* @param User[] $users
// Updating service subscribers */
try { private function updateUsersSubscribers(array $users)
$subscriptionsManager->updateUserSubscribers($serviceUser, $usersForUpdate); {
} catch (\Exception $e) {
$log->error(
'Error while updating service subscribers',
[
'user_login' => $serviceUser->getLogin(),
'user_id' => $serviceUser->getId(),
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine()
]
);
return 1;
}
}
$log->info('Processing users subscribers');
// Updating users subscribers // Updating users subscribers
foreach ($usersForUpdate as $user) { foreach ($users as $user) {
$log->info('Processing @'.$user->getLogin()); $this->logger->info('Processing @'.$user->getLogin());
try { try {
$userCurrentSubscribers = $api->getUserSubscribersById($user->getId()); $userCurrentSubscribers = $this->api->getUserSubscribersById($user->getId());
} catch (\Exception $e) { } catch (\Exception $e) {
$log->error( $this->logger->error(
'Error while getting subscribers. Skipping.', 'Error while getting subscribers. Skipping.',
[ [
'user_login' => $user->getLogin(), 'user_login' => $user->getLogin(),
'user_id' => $user->getId(), 'user_id' => $user->getId(),
'message' => $e->getMessage(), 'message' => $e->getMessage(),
'file' => $e->getFile(), 'file' => $e->getFile(),
'line' => $e->getLine() 'line' => $e->getLine(),
] ]
); );
continue; continue;
} }
$log->debug('Updating user subscribers'); $this->logger->debug('Updating user subscribers');
try { try {
// Updating user subscribers // Updating user subscribers
$subscriptionsManager->updateUserSubscribers($user, $userCurrentSubscribers); $this->subscriptionManager->updateUserSubscribers($user, $userCurrentSubscribers);
} catch (\Exception $e) { } catch (\Exception $e) {
$log->error( $this->logger->error(
'Error while updating user subscribers', 'Error while updating user subscribers',
[ [
'user_login' => $user->getLogin(), 'user_login' => $user->getLogin(),
'user_id' => $user->getId(), 'user_id' => $user->getId(),
'message' => $e->getMessage(), 'message' => $e->getMessage(),
'file' => $e->getFile(), 'file' => $e->getFile(),
'line' => $e->getLine() 'line' => $e->getLine(),
] ]
); );
} }
// @todo move to the config usleep($this->apiDelay);
usleep(500000); }
} }
// Flushing all changes at once to database private function getUsersForUpdate(int $appUserId): array
$em->flush(); {
$em->commit(); if ($this->input->getOption('all-users')) {
$usersForUpdate = $this->userRepo->findAll();
$log->debug('Finished'); } else {
/** @var User $serviceUser */
return 0; $serviceUser = $this->userRepo->find($appUserId);
if (!$serviceUser) {
$this->logger->info('Service user not found');
// @todo Retrieving user
throw new \RuntimeException('Service user not found in the database');
}
$this->logger->info('Getting service subscribers');
try {
$usersForUpdate = $this->api->getUserSubscribersById($appUserId);
} catch (\Exception $e) {
$this->logger->warning(
'Error while getting service subscribers. Fallback to local list.',
[
'user_login' => $serviceUser->getLogin(),
'user_id' => $serviceUser->getId(),
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine(),
]
);
$usersForUpdate = [];
/** @var Subscription $subscription */
foreach ($serviceUser->getSubscribers() as $subscription) {
$usersForUpdate[] = $subscription->getSubscriber();
}
}
$this->logger->debug('Updating service subscribers');
// Updating service subscribers
try {
$this->subscriptionManager->updateUserSubscribers($serviceUser, $usersForUpdate);
} catch (\Exception $e) {
$this->logger->error(
'Error while updating service subscribers',
[
'user_login' => $serviceUser->getLogin(),
'user_id' => $serviceUser->getId(),
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine(),
]
);
throw $e;
}
}
return $usersForUpdate;
} }
} }

View file

@ -45,6 +45,19 @@ services:
- '@logger' - '@logger'
# Console commands
app.point.update_subscribers_command:
class: Skobkin\Bundle\PointToolsBundle\Command\UpdateSubscriptionsCommand
calls:
- [setLogger, ['@logger']]
- [setEntityManager, ['@doctrine.orm.entity_manager']]
- [setApiClient, ['@app.point.api_user']]
- [setApiDelay, ['%point_api_delay%']]
- [setSubscriptionManager, ['@app.point.subscriptions_manager']]
tags:
- { name: console.command }
# Factories # Factories
# User factory # User factory
app.point.user_factory: app.point.user_factory: