logger = $logger; } public function setEntityManager(EntityManagerInterface $em): void { $this->em = $em; } public function setUserRepository(UserRepository $repository): void { $this->userRepo = $repository; } public function setApiClient(UserApi $userApi): void { $this->api = $userApi; } public function setApiDelay(int $microSecs): void { $this->apiDelay = $microSecs; } public function setSubscriptionManager(SubscriptionsManager $subscriptionsManager): void { $this->subscriptionManager = $subscriptionsManager; } protected function configure() { $this ->setName('point:update:subscriptions') ->setDescription('Update subscriptions of users subscribed to service') ->addOption( 'all-users', null, InputOption::VALUE_NONE, 'If set, command will check subscribers of all service users instead of service subscribers only' ) ->addOption( 'check-only', null, InputOption::VALUE_NONE, 'If set, command will not perform write operations in the database' ) // @todo add option for checking only selected user ; } /** * @param InputInterface $input * @param OutputInterface $output * * @return int */ protected function execute(InputInterface $input, OutputInterface $output) { $this->input = $input; $this->logger->debug('UpdateSubscriptionsCommand started.'); try { $appUserId = $this->getContainer()->getParameter('point_id'); } catch (\InvalidArgumentException $e) { $this->logger->alert('Could not get point_id parameter from config file', ['exception_message' => $e->getMessage()]); return 1; } $this->progress = new ProgressBar($output); $this->progress->setFormat('debug'); // Beginning transaction for all changes $this->em->beginTransaction(); $this->progress->setMessage('Getting service subscribers'); try { $usersForUpdate = $this->getUsersForUpdate($appUserId); } catch (\Exception $e) { $this->logger->error('Error while getting service subscribers', ['exception' => get_class($e), 'message' => $e->getMessage()]); return 1; } if (0 === count($usersForUpdate)) { $this->logger->info('No local subscribers. Finishing.'); return 0; } $this->logger->info('Processing users subscribers'); $this->progress->setMessage('Processing users subscribers'); $this->progress->start(count($usersForUpdate)); $this->updateUsersSubscribers($usersForUpdate); $this->progress->finish(); // Flushing all changes at once to database $this->em->flush(); $this->em->commit(); $this->logger->debug('Finished'); return 0; } /** * @param User[] $users */ private function updateUsersSubscribers(array $users): void { // Updating users subscribers foreach ($users as $user) { usleep($this->apiDelay); $this->progress->advance(); $this->logger->info('Processing @'.$user->getLogin()); try { $userCurrentSubscribers = $this->api->getUserSubscribersById($user->getId()); } catch (UserNotFoundException $e) { $this->logger->warning('User not found. Marking as removed', ['login' => $user->getLogin(), 'user_id' => $user->getId()]); $user->markAsRemoved(); continue; } catch (\Exception $e) { $this->logger->error( 'Error while getting subscribers. Skipping.', [ 'user_login' => $user->getLogin(), 'user_id' => $user->getId(), 'message' => $e->getMessage(), 'file' => $e->getFile(), 'line' => $e->getLine(), ] ); continue; } $this->logger->debug('Updating user subscribers'); try { // Updating user subscribers $this->subscriptionManager->updateUserSubscribers($user, $userCurrentSubscribers); } catch (\Exception $e) { $this->logger->error( 'Error while updating user subscribers', [ 'user_login' => $user->getLogin(), 'user_id' => $user->getId(), 'message' => $e->getMessage(), 'file' => $e->getFile(), 'line' => $e->getLine(), ] ); } } } private function getUsersForUpdate(int $appUserId): array { $usersForUpdate = []; if ($this->input->getOption('all-users')) { $usersForUpdate = $this->userRepo->findBy(['removed' => false]); } else { /** @var User $serviceUser */ try { $serviceUser = $this->userRepo->findActiveUserWithSubscribers($appUserId); } catch (\Exception $e) { $this->logger->error('Error while getting active user with subscribers', ['app_user_id' => $appUserId]); throw $e; } if (!$serviceUser) { $this->logger->critical('Service user not found or marked as removed'); // @todo Retrieving user throw new \RuntimeException('Service user not found in the database'); } $this->logger->info('Getting service subscribers'); try { $usersForUpdate = $this->api->getUserSubscribersById($appUserId); } catch (UserNotFoundException $e) { $this->logger->critical('Service user deleted or API response is invalid'); throw $e; } catch (\Exception $e) { $this->logger->warning( 'Error while getting service subscribers. Fallback to local list.', [ 'user_login' => $serviceUser->getLogin(), 'user_id' => $serviceUser->getId(), 'message' => $e->getMessage(), 'file' => $e->getFile(), 'line' => $e->getLine(), ] ); /** @var Subscription $subscription */ foreach ((array) $serviceUser->getSubscribers() as $subscription) { $usersForUpdate[] = $subscription->getSubscriber(); } } $this->logger->debug('Updating service subscribers'); // Updating service subscribers try { $this->subscriptionManager->updateUserSubscribers($serviceUser, $usersForUpdate); } catch (\Exception $e) { $this->logger->error( 'Error while updating service subscribers', [ 'user_login' => $serviceUser->getLogin(), 'user_id' => $serviceUser->getId(), 'message' => $e->getMessage(), 'file' => $e->getFile(), 'line' => $e->getLine(), ] ); throw $e; } } return $usersForUpdate; } }