tg_rss_bot/telegram.py

158 lines
5.5 KiB
Python
Raw Normal View History

from telebot import TeleBot
from telebot.handler_backends import BaseMiddleware
2022-05-01 23:43:04 +00:00
from telebot.types import Message
import validators
from database import Database
from exceptions import DisplayableException
from rss import FeedItem
2022-05-01 23:43:04 +00:00
2022-05-02 20:44:16 +00:00
2022-05-01 23:43:04 +00:00
class CommandProcessor:
"""Processes user input and dispatches the data to other services."""
2022-05-02 14:53:53 +00:00
def __init__(self, token: str, database: Database):
if token is None or len(token) == 0:
raise ValueError("Token should not be empty")
self.bot: TeleBot = TeleBot(token, use_class_middlewares=True)
self.bot.setup_middleware(UserAuthMiddleware(database))
self.bot.setup_middleware(ExceptionHandlerMiddleware(self.bot))
self.database: Database = database
2022-05-01 23:43:04 +00:00
def run(self):
"""Run a bot and poll for new messages indefinitely."""
self.bot.register_message_handler(commands=['add'], callback=self.__add_feed)
self.bot.register_message_handler(commands=['list'], callback=self.__list_feeds)
self.bot.register_message_handler(commands=['del'], callback=self.__delete_feed)
self.bot.register_message_handler(commands=['help', 'start'], callback=self.__command_help)
self.bot.register_message_handler(callback=self.__command_help)
2022-05-01 23:43:04 +00:00
self.bot.infinity_polling()
def __command_help(self, message: Message):
self.bot.reply_to(
message,
'Supported commands:\n'
2022-05-30 19:58:19 +00:00
' /add <feed url> - Add new feed\n'
' /list - List currently added feeds\n'
' /del <feed url> - Remove feed\n'
' /help - Get this help message'
)
def __add_feed(self, message: Message, data: dict):
args = message.text.split()
if len(args) < 2:
raise DisplayableException('Feed URL should be specified')
url = str(args[1])
if not self.__is_url_valid(url):
raise DisplayableException('Invalid feed URL')
self.database.subscribe_user_by_url(data['user_id'], url)
self.bot.reply_to(message, 'Successfully subscribed to feed.')
def __list_feeds(self, message: Message, data: dict):
feeds = self.database.find_user_feeds(data['user_id'])
feed_list = ''
for feed in feeds:
feed_list += '* ' + str(feed[0]) + ': ' + feed[1] + '\n'
self.bot.reply_to(message, 'Your feeds:\n' + feed_list)
def __delete_feed(self, message: Message, data: dict):
args = message.text.split()
if len(args) < 2:
raise DisplayableException('Feed URL should be specified')
url = str(args[1])
if not self.__is_url_valid(url):
raise DisplayableException('Invalid feed URL')
2022-05-01 23:43:04 +00:00
self.database.unsubscribe_user_by_url(data['user_id'], url)
2022-05-01 23:43:04 +00:00
self.bot.reply_to(message, 'Unsubscribed.')
2022-05-01 23:43:04 +00:00
@staticmethod
def __is_url_valid(url: str) -> bool:
if not validators.url(url):
return False
# For security reasons we should not allow anything except HTTP/HTTPS.
if not url.startswith(('http://', 'https://')):
return False
return True
2022-05-29 21:33:36 +00:00
class Notifier:
"""Sends notifications to users about new RSS feed items."""
def __init__(self, token: str):
self.bot: TeleBot = TeleBot(token)
2022-05-29 21:33:36 +00:00
def send_updates(self, chat_id: int, updates: list[FeedItem]):
2022-05-29 21:33:36 +00:00
"""Send notification about new items to the user"""
for update in updates:
self.__send_update(chat_id, update)
def __send_update(self, telegram_id: int, update: FeedItem):
2022-05-29 21:33:36 +00:00
self.bot.send_message(
chat_id=telegram_id,
text=self.__format_message(update),
parse_mode='HTML'
2022-05-29 21:33:36 +00:00
)
@staticmethod
def __format_message(item: FeedItem) -> str:
2022-05-29 21:33:36 +00:00
return (
f"<strong><a href=\"{item.url}\">{item.title}</a></strong>\n\n"
f"{item.description}"
2022-05-29 21:33:36 +00:00
)
class UserAuthMiddleware(BaseMiddleware):
"""Transparently authenticates and registers the user if needed."""
def __init__(self, database: Database):
super().__init__()
self.update_types = ['message']
self.database: Database = database
def pre_process(self, message: Message, data: dict):
"""Pre-process update, find user and add it's ID to the handler data dictionary."""
data['user_id'] = self.__find_or_register_user(message)
def post_process(self, message: Message, data: dict, exception):
"""Post-process update."""
def __find_or_register_user(self, message: Message) -> int:
telegram_id = message.from_user.id
user_id = self.database.find_user(telegram_id)
if user_id is None:
return self.database.add_user(telegram_id)
return user_id
class ExceptionHandlerMiddleware(BaseMiddleware):
"""Sends messages to the user on exception."""
def __init__(self, bot: TeleBot):
super().__init__()
self.update_types = ['message']
self.bot: TeleBot = bot
def pre_process(self, message: Message, data: dict):
"""Pre-process update."""
# pylint: disable=W0613
def post_process(self, message: Message, data: dict, exception: Exception | None):
"""Post-process update. Send user an error notification."""
if exception is None:
return
if isinstance(exception, DisplayableException):
self.bot.reply_to(message, 'Error: ' + str(exception))
else:
self.bot.reply_to(message, 'Something went wrong. Please try again (maybe later).')