From 2e44df0bbd506de45619a06739b8abdc5175812e Mon Sep 17 00:00:00 2001 From: Alexey Skobkin Date: Mon, 30 May 2022 23:54:26 +0300 Subject: [PATCH] #2 #7 Telegram module (#10) `telegram` module. Closes #2 #7. Two service classes: * `Notifier` class * `CommandProcessor` class Two [middlewares](https://github.com/eternnoir/pyTelegramBotAPI/tree/master/examples/middleware): * `UserAuthMiddleware` * `ExceptionHandlerMiddleware` One exception for usage in the code called by the bot: * `DisplayableException` Merge **ONLY** after #9 is merged. Reviewed-on: https://git.skobk.in/Miroslavsckaya/tg_rss_bot/pulls/10 Reviewed-by: Miroslavsckaya --- .drone.yml | 2 +- README.md | 7 ++ bot.py | 18 +++++ database.py | 104 ++++++++++++++++++++++++----- exceptions.py | 2 + pylama.ini | 2 +- requirements.txt | 6 +- telegram.py | 169 +++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 292 insertions(+), 18 deletions(-) create mode 100644 bot.py create mode 100644 exceptions.py create mode 100644 telegram.py diff --git a/.drone.yml b/.drone.yml index 4e296fa..2a18ece 100644 --- a/.drone.yml +++ b/.drone.yml @@ -24,7 +24,7 @@ steps: commands: - python -m venv .venv - source ./.venv/bin/activate - - 'pip install pylama pylama\[all\]' + - 'pip install pylama pylama\[all\] > /dev/null' - pylama when: event: diff --git a/README.md b/README.md index b19ac07..f90e6dc 100644 --- a/README.md +++ b/README.md @@ -29,3 +29,10 @@ pip freeze > requirements.txt **Do not forget** to install the latest dependencies before adding new dependencies and rewriting the `requirements.txt` file. Otherwise old dependencies could be lost. + +## Running the bot + +```shell +export TELEGRAM_TOKEN=xxx +python bot.py +``` diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..6cc7d6b --- /dev/null +++ b/bot.py @@ -0,0 +1,18 @@ +import logging +import os +from dotenv import load_dotenv + +from database import Database +from telegram import CommandProcessor + + +load_dotenv() + +token = os.getenv('TELEGRAM_TOKEN') +db_path = os.getenv('DATABASE_PATH') + +db = Database(db_path) +bot = CommandProcessor(token, db) + +logging.info("Starting Telegram bot") +bot.run() diff --git a/database.py b/database.py index f8f4542..6c74564 100644 --- a/database.py +++ b/database.py @@ -1,28 +1,25 @@ import sqlite3 -""" -Classes: -Database - implement intercaction with the database. -""" -class Database(): - """Implement intercaction with the database.""" +from exceptions import DisplayableException + + +class Database: + """Implement interaction with the database.""" def __init__(self, path: str) -> None: """Create a database file if not exists.""" - self.conn = sqlite3.connect(path) + # TODO: think about removing check_same_thread=False + self.conn = sqlite3.connect(path, check_same_thread=False) self.cur = self.conn.cursor() - self.cur.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id NUMERIC NOT NULL UNIQUE, PRIMARY KEY(id))') - self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))') - self.cur.execute('CREATE TABLE IF NOT EXISTS subscriptions (user_id INTEGER, feed_id INTEGER, UNIQUE (user_id, feed_id), FOREIGN KEY(user_id) REFERENCES users(id), FOREIGN KEY(feed_id) REFERENCES feeds(id))') - self.cur.execute('CREATE TABLE IF NOT EXISTS feeds_last_items (feed_id INTEGER, url TEXT NOT NULL UNIQUE, title TEXT, description TEXT, date NUMERIC, FOREIGN KEY(feed_id) REFERENCES feeds(id))') + self.__init_schema() - def add_user(self, telegram_id: str) -> int: + def add_user(self, telegram_id: int) -> int: """Add a user's telegram id to the database and return its database id.""" self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id]) self.conn.commit() return self.cur.lastrowid - def find_user(self, telegram_id: str) -> int | None: + def find_user(self, telegram_id: int) -> int | None: """Get a user's telegram id and return its database id.""" self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id]) row = self.cur.fetchone() @@ -36,21 +33,70 @@ class Database(): self.conn.commit() return self.cur.lastrowid + def find_feed_by_url(self, url: str) -> int | None: + """Find feed ID by url.""" + self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url]) + row = self.cur.fetchone() + if row is None: + return None + return row[0] + + def subscribe_user_by_url(self, user_id: int, url: str) -> None: + """Subscribe user to the feed creating it if does not exist yet.""" + feed_id = self.find_feed_by_url(url) + if feed_id is None: + feed_id = self.add_feed(url) + + if self.is_user_subscribed(user_id, feed_id): + raise DisplayableException('Already subscribed') + + self.subscribe_user(user_id, feed_id) + def subscribe_user(self, user_id: int, feed_id: int) -> None: """Subscribe a user to the feed.""" self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id]) self.conn.commit() + def unsubscribe_user_by_url(self, user_id: int, url: str) -> None: + feed_id = self.find_feed_by_url(url) + if feed_id is None: + raise DisplayableException('Feed does not exist') + + if not self.is_user_subscribed(user_id, feed_id): + raise DisplayableException('Not subscribed') + + self.unsubscribe_user(user_id, feed_id) + + if self.get_feed_subscribers_count(feed_id) == 0: + # Feed is not used anymore. Removing. + self.delete_feed(feed_id) + def unsubscribe_user(self, user_id: int, feed_id: int) -> None: """Unsubscribe a user from the feed.""" self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id]) self.conn.commit() + def is_user_subscribed(self, user_id: int, feed_id: int) -> bool: + """Check if user subscribed to specific feed.""" + self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id]) + row = self.cur.fetchone() + if row is None: + return False + return True + def delete_feed(self, feed_id: int) -> None: """Delete a feed.""" self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id]) self.conn.commit() + def get_feed_subscribers_count(self, feed_id: int) -> int: + """Count feed subscribers.""" + self.cur.execute('SELECT COUNT(user_id) FROM subscriptions WHERE feed_id = ?', [feed_id]) + row = self.cur.fetchone() + if row is None: + return 0 + return int(row[0]) + def find_feeds(self) -> list: """Get a list of feeds.""" self.cur.execute('SELECT * FROM feeds') @@ -58,7 +104,8 @@ class Database(): def find_user_feeds(self, user_id: int) -> list: """Return a list of feeds the user is subscribed to.""" - self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)', [user_id]) + self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)', + [user_id]) return self.cur.fetchall() def find_feed_items(self, feed_id: int) -> list: @@ -72,5 +119,32 @@ class Database(): new_items[i] = (feed_id,) + new_items[i] self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id]) - self.cur.executemany('INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items) + self.cur.executemany( + 'INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items) self.conn.commit() + + def __init_schema(self): + # TODO: Move to migrations + self.cur.execute( + 'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))' + ) + self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))') + self.cur.execute( + 'CREATE TABLE IF NOT EXISTS subscriptions (' + ' user_id INTEGER,' + ' feed_id INTEGER,' + ' UNIQUE (user_id, feed_id),' + ' FOREIGN KEY(user_id) REFERENCES users(id),' + ' FOREIGN KEY(feed_id) REFERENCES feeds(id)' + ')' + ) + self.cur.execute( + 'CREATE TABLE IF NOT EXISTS feeds_last_items (' + ' feed_id INTEGER,' + ' url TEXT NOT NULL UNIQUE,' + ' title TEXT,' + ' description TEXT,' + ' date NUMERIC,' + ' FOREIGN KEY(feed_id) REFERENCES feeds(id)' + ')' + ) diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..a1effe0 --- /dev/null +++ b/exceptions.py @@ -0,0 +1,2 @@ +class DisplayableException(Exception): + """Exception which could be safely displayed to the end-user.""" diff --git a/pylama.ini b/pylama.ini index b0a5d70..a17cf17 100644 --- a/pylama.ini +++ b/pylama.ini @@ -3,7 +3,7 @@ format = pylint skip = .venv/* linters = pyflakes,pylint,pycodestyle -#ignore = F0401,C0111,E731 +ignore = F0401,C0114,R0903 [pylama:pylint] max_line_length = 120 diff --git a/requirements.txt b/requirements.txt index 5a8b4cb..46d6009 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,11 @@ certifi==2021.10.8 charset-normalizer==2.0.12 +decorator==5.1.1 +feedparser==6.0.2 idna==3.3 pyTelegramBotAPI==4.5.0 +python-dotenv==0.20.0 requests==2.27.1 +sgmllib3k==1.0.0 urllib3==1.26.9 -feedparser==6.0.2 \ No newline at end of file +validators==0.19.0 diff --git a/telegram.py b/telegram.py new file mode 100644 index 0000000..bf39403 --- /dev/null +++ b/telegram.py @@ -0,0 +1,169 @@ +import time + +from telebot import TeleBot +from telebot.handler_backends import BaseMiddleware +from telebot.types import Message +import validators + +from database import Database +from exceptions import DisplayableException +from rss import FeedItem + + +class CommandProcessor: + """Processes user input and dispatches the data to other services.""" + + def __init__(self, token: str, database: Database): + if token is None or len(token) == 0: + raise ValueError("Token should not be empty") + self.bot: TeleBot = TeleBot(token, use_class_middlewares=True) + self.bot.setup_middleware(UserAuthMiddleware(database)) + self.bot.setup_middleware(ExceptionHandlerMiddleware(self.bot)) + self.database: Database = database + + def run(self): + """Run a bot and poll for new messages indefinitely.""" + self.bot.register_message_handler(commands=['add'], callback=self.__add_feed) + self.bot.register_message_handler(commands=['list'], callback=self.__list_feeds) + self.bot.register_message_handler(commands=['del'], callback=self.__delete_feed) + self.bot.register_message_handler(commands=['help', 'start'], callback=self.__command_help) + self.bot.register_message_handler(callback=self.__command_help) + + self.bot.infinity_polling() + + def __command_help(self, message: Message): + self.bot.reply_to( + message, + 'Supported commands:\n' + ' /add - Add new feed\n' + ' /list - List currently added feeds\n' + ' /del - Remove feed\n' + ' /help - Get this help message' + ) + + def __add_feed(self, message: Message, data: dict): + args = message.text.split() + if len(args) < 2: + raise DisplayableException('Feed URL should be specified') + + url = str(args[1]) + if not self.__is_url_valid(url): + raise DisplayableException('Invalid feed URL') + + self.database.subscribe_user_by_url(data['user_id'], url) + + self.bot.reply_to(message, 'Successfully subscribed to feed.') + + def __list_feeds(self, message: Message, data: dict): + feeds = self.database.find_user_feeds(data['user_id']) + + feed_list = '' + for feed in feeds: + feed_list += '* ' + str(feed[0]) + ': ' + feed[1] + '\n' + + self.bot.reply_to(message, 'Your feeds:\n' + feed_list) + + def __delete_feed(self, message: Message, data: dict): + args = message.text.split() + if len(args) < 2: + raise DisplayableException('Feed URL should be specified') + + url = str(args[1]) + if not self.__is_url_valid(url): + raise DisplayableException('Invalid feed URL') + + self.database.unsubscribe_user_by_url(data['user_id'], url) + + self.bot.reply_to(message, 'Unsubscribed.') + + @staticmethod + def __is_url_valid(url: str) -> bool: + if not validators.url(url): + return False + + # For security reasons we should not allow anything except HTTP/HTTPS. + if not url.startswith(('http://', 'https://')): + return False + return True + + +class Notifier: + """Sends notifications to users about new RSS feed items.""" + + BATCH_LIMIT: int = 30 + + sent_counter: int = 0 + + def __init__(self, token: str): + self.bot: TeleBot = TeleBot(token) + + def send_updates(self, chat_ids: list[int], updates: list[FeedItem]): + """Send notification about new items to the user""" + for chat_id in chat_ids: + for update in updates: + self.__send_update(chat_id, update) + self.sent_counter += 1 + if self.sent_counter >= self.BATCH_LIMIT: + # TODO: probably implement better later + time.sleep(1) + self.sent_counter = 0 + + def __send_update(self, chat_id: int, update: FeedItem): + self.bot.send_message( + chat_id=chat_id, + text=self.__format_message(update), + parse_mode='HTML' + ) + + @staticmethod + def __format_message(item: FeedItem) -> str: + return ( + f"{item.title}\n\n" + f"{item.description}" + ) + + +class UserAuthMiddleware(BaseMiddleware): + """Transparently authenticates and registers the user if needed.""" + + def __init__(self, database: Database): + super().__init__() + self.update_types = ['message'] + self.database: Database = database + + def pre_process(self, message: Message, data: dict): + """Pre-process update, find user and add it's ID to the handler data dictionary.""" + data['user_id'] = self.__find_or_register_user(message) + + def post_process(self, message: Message, data: dict, exception): + """Post-process update.""" + + def __find_or_register_user(self, message: Message) -> int: + telegram_id = message.from_user.id + + user_id = self.database.find_user(telegram_id) + if user_id is None: + return self.database.add_user(telegram_id) + return user_id + + +class ExceptionHandlerMiddleware(BaseMiddleware): + """Sends messages to the user on exception.""" + + def __init__(self, bot: TeleBot): + super().__init__() + self.update_types = ['message'] + self.bot: TeleBot = bot + + def pre_process(self, message: Message, data: dict): + """Pre-process update.""" + + # pylint: disable=W0613 + def post_process(self, message: Message, data: dict, exception: Exception | None): + """Post-process update. Send user an error notification.""" + if exception is None: + return + if isinstance(exception, DisplayableException): + self.bot.reply_to(message, 'Error: ' + str(exception)) + else: + self.bot.reply_to(message, 'Something went wrong. Please try again (maybe later).')