Compare commits

..

2 commits

5 changed files with 211 additions and 35 deletions

6
bot.py
View file

@ -2,13 +2,17 @@ import logging
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from database import Database
from telegram import CommandProcessor from telegram import CommandProcessor
load_dotenv() load_dotenv()
token = os.getenv('TELEGRAM_TOKEN') token = os.getenv('TELEGRAM_TOKEN')
db_path = os.getenv('DATABASE_PATH')
db = Database(db_path)
bot = CommandProcessor(token, db)
bot = CommandProcessor(token)
logging.info("Starting Telegram bot") logging.info("Starting Telegram bot")
bot.run() bot.run()

View file

@ -1,28 +1,25 @@
import sqlite3 import sqlite3
"""
Classes:
Database - implement intercaction with the database.
"""
class Database(): from exceptions import DisplayableException
"""Implement intercaction with the database."""
class Database:
"""Implement interaction with the database."""
def __init__(self, path: str) -> None: def __init__(self, path: str) -> None:
"""Create a database file if not exists.""" """Create a database file if not exists."""
self.conn = sqlite3.connect(path) # TODO: think about removing check_same_thread=False
self.conn = sqlite3.connect(path, check_same_thread=False)
self.cur = self.conn.cursor() self.cur = self.conn.cursor()
self.cur.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id NUMERIC NOT NULL UNIQUE, PRIMARY KEY(id))') self.__init_schema()
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
self.cur.execute('CREATE TABLE IF NOT EXISTS subscriptions (user_id INTEGER, feed_id INTEGER, UNIQUE (user_id, feed_id), FOREIGN KEY(user_id) REFERENCES users(id), FOREIGN KEY(feed_id) REFERENCES feeds(id))')
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds_last_items (feed_id INTEGER, url TEXT NOT NULL UNIQUE, title TEXT, description TEXT, date NUMERIC, FOREIGN KEY(feed_id) REFERENCES feeds(id))')
def add_user(self, telegram_id: str) -> int: def add_user(self, telegram_id: int) -> int:
"""Add a user's telegram id to the database and return its database id.""" """Add a user's telegram id to the database and return its database id."""
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id]) self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id])
self.conn.commit() self.conn.commit()
return self.cur.lastrowid return self.cur.lastrowid
def find_user(self, telegram_id: str) -> int | None: def find_user(self, telegram_id: int) -> int | None:
"""Get a user's telegram id and return its database id.""" """Get a user's telegram id and return its database id."""
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id]) self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id])
row = self.cur.fetchone() row = self.cur.fetchone()
@ -36,21 +33,70 @@ class Database():
self.conn.commit() self.conn.commit()
return self.cur.lastrowid return self.cur.lastrowid
def find_feed_by_url(self, url: str) -> int | None:
"""Find feed ID by url."""
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url])
row = self.cur.fetchone()
if row is None:
return None
return row[0]
def subscribe_user_by_url(self, user_id: int, url: str) -> None:
"""Subscribe user to the feed creating it if does not exist yet."""
feed_id = self.find_feed_by_url(url)
if feed_id is None:
feed_id = self.add_feed(url)
if self.is_user_subscribed(user_id, feed_id):
raise DisplayableException('Already subscribed')
self.subscribe_user(user_id, feed_id)
def subscribe_user(self, user_id: int, feed_id: int) -> None: def subscribe_user(self, user_id: int, feed_id: int) -> None:
"""Subscribe a user to the feed.""" """Subscribe a user to the feed."""
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id]) self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id])
self.conn.commit() self.conn.commit()
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
feed_id = self.find_feed_by_url(url)
if feed_id is None:
raise DisplayableException('Feed does not exist')
if not self.is_user_subscribed(user_id, feed_id):
raise DisplayableException('Not subscribed')
self.unsubscribe_user(user_id, feed_id)
if self.get_feed_subscribers_count(feed_id) == 0:
# Feed is not used anymore. Removing.
self.delete_feed(feed_id)
def unsubscribe_user(self, user_id: int, feed_id: int) -> None: def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
"""Unsubscribe a user from the feed.""" """Unsubscribe a user from the feed."""
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id]) self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id])
self.conn.commit() self.conn.commit()
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
"""Check if user subscribed to specific feed."""
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id])
row = self.cur.fetchone()
if row is None:
return False
return True
def delete_feed(self, feed_id: int) -> None: def delete_feed(self, feed_id: int) -> None:
"""Delete a feed.""" """Delete a feed."""
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id]) self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id])
self.conn.commit() self.conn.commit()
def get_feed_subscribers_count(self, feed_id: int) -> int:
"""Count feed subscribers."""
self.cur.execute('SELECT COUNT(user_id) FROM subscriptions WHERE feed_id = ?', [feed_id])
row = self.cur.fetchone()
if row is None:
return 0
return int(row[0])
def find_feeds(self) -> list: def find_feeds(self) -> list:
"""Get a list of feeds.""" """Get a list of feeds."""
self.cur.execute('SELECT * FROM feeds') self.cur.execute('SELECT * FROM feeds')
@ -58,7 +104,8 @@ class Database():
def find_user_feeds(self, user_id: int) -> list: def find_user_feeds(self, user_id: int) -> list:
"""Return a list of feeds the user is subscribed to.""" """Return a list of feeds the user is subscribed to."""
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)', [user_id]) self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)',
[user_id])
return self.cur.fetchall() return self.cur.fetchall()
def find_feed_items(self, feed_id: int) -> list: def find_feed_items(self, feed_id: int) -> list:
@ -72,5 +119,32 @@ class Database():
new_items[i] = (feed_id,) + new_items[i] new_items[i] = (feed_id,) + new_items[i]
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id]) self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
self.cur.executemany('INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items) self.cur.executemany(
'INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items)
self.conn.commit() self.conn.commit()
def __init_schema(self):
# TODO: Move to migrations
self.cur.execute(
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))'
)
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
self.cur.execute(
'CREATE TABLE IF NOT EXISTS subscriptions ('
' user_id INTEGER,'
' feed_id INTEGER,'
' UNIQUE (user_id, feed_id),'
' FOREIGN KEY(user_id) REFERENCES users(id),'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
')'
)
self.cur.execute(
'CREATE TABLE IF NOT EXISTS feeds_last_items ('
' feed_id INTEGER,'
' url TEXT NOT NULL UNIQUE,'
' title TEXT,'
' description TEXT,'
' date NUMERIC,'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
')'
)

3
exceptions.py Normal file
View file

@ -0,0 +1,3 @@
class DisplayableException(Exception):
"""Exception which could be safely displayed to the end-user."""
pass

View file

@ -1,8 +1,11 @@
certifi==2021.10.8 certifi==2021.10.8
charset-normalizer==2.0.12 charset-normalizer==2.0.12
decorator==5.1.1
feedparser==6.0.2
idna==3.3 idna==3.3
pyTelegramBotAPI==4.5.0 pyTelegramBotAPI==4.5.0
python-dotenv==0.20.0 python-dotenv==0.20.0
requests==2.27.1 requests==2.27.1
sgmllib3k==1.0.0
urllib3==1.26.9 urllib3==1.26.9
feedparser==6.0.2 validators==0.19.0

View file

@ -1,46 +1,93 @@
import telebot from telebot import TeleBot
from telebot.handler_backends import BaseMiddleware
from telebot.types import Message from telebot.types import Message
import validators
from database import Database
from exceptions import DisplayableException
class CommandProcessor: class CommandProcessor:
"""Processes user input and dispatches the data to other services.""" """Processes user input and dispatches the data to other services."""
bot: telebot.TeleBot def __init__(self, token: str, database: Database):
def __init__(self, token: str):
if token is None or len(token) == 0: if token is None or len(token) == 0:
raise ValueError("Token should not be empty") raise ValueError("Token should not be empty")
self.bot = telebot.TeleBot(token) self.bot: TeleBot = TeleBot(token, use_class_middlewares=True)
self.bot.setup_middleware(UserAuthMiddleware(database))
self.bot.setup_middleware(ExceptionHandlerMiddleware(self.bot))
self.db: Database = database
def run(self): def run(self):
"""Runs a bot and polls for new messages indefinitely.""" """Run a bot and poll for new messages indefinitely."""
self.bot.register_message_handler(commands=['help', 'start'], callback=self.__command_help)
self.bot.register_message_handler(commands=['add'], callback=self.__add_feed) self.bot.register_message_handler(commands=['add'], callback=self.__add_feed)
self.bot.register_message_handler(commands=['list'], callback=self.__list_feeds) self.bot.register_message_handler(commands=['list'], callback=self.__list_feeds)
self.bot.register_message_handler(commands=['del'], callback=self.__delete_feed) self.bot.register_message_handler(commands=['del'], callback=self.__delete_feed)
self.bot.register_message_handler(commands=['help', 'start'], callback=self.__command_help)
self.bot.register_message_handler(callback=self.__command_help)
self.bot.infinity_polling() self.bot.infinity_polling()
def __command_help(self, message: Message): def __command_help(self, message: Message, data: dict):
self.bot.reply_to(message, 'Commands: "/add", "/list", "/del", "/help".') self.bot.reply_to(
message,
'Supported commands:\n'
' /add <feed url> - Add new feed\n '
' /list - List currently added feeds\n'
' /del <feed url> - Remove feed\n'
' /help - Get this help message'
)
def __add_feed(self, message: Message): def __add_feed(self, message: Message, data: dict):
self.bot.reply_to(message, 'Feed added (stub)') args = message.text.split()
if len(args) < 2:
raise DisplayableException('Feed URL should be specified')
def __list_feeds(self, message: Message): url = str(args[1])
self.bot.reply_to(message, 'Your feeds: (stub)') if not self.__is_url_valid(url):
raise DisplayableException('Invalid feed URL')
def __delete_feed(self, message: Message): self.db.subscribe_user_by_url(data['user_id'], url)
self.bot.reply_to(message, 'Feed deleted: (stub)')
self.bot.reply_to(message, 'Successfully subscribed to feed.')
def __list_feeds(self, message: Message, data: dict):
feeds = self.db.find_user_feeds(data['user_id'])
feed_list = ''
for feed in feeds:
feed_list += '* ' + str(feed[0]) + ': ' + feed[1] + '\n'
self.bot.reply_to(message, 'Your feeds:\n' + feed_list)
def __delete_feed(self, message: Message, data: dict):
args = message.text.split()
if len(args) < 2:
raise DisplayableException('Feed URL should be specified')
url = str(args[1])
if not self.__is_url_valid(url):
raise DisplayableException('Invalid feed URL')
self.db.unsubscribe_user_by_url(data['user_id'], url)
self.bot.reply_to(message, 'Unsubscribed.')
def __is_url_valid(self, url: str) -> bool:
if not validators.url(url):
return False
"""For security reasons we should not allow anything except HTTP/HTTPS."""
if not url.startswith(('http://', 'https://')):
return False
return True
class Notifier: class Notifier:
"""Sends notifications to users about new RSS feed items.""" """Sends notifications to users about new RSS feed items."""
bot: telebot.TeleBot
def __init__(self, token: str): def __init__(self, token: str):
self.bot = telebot.TeleBot(token) self.bot: TeleBot = TeleBot(token)
def send_updates(self, chat_id: int, updates: list): def send_updates(self, chat_id: int, updates: list):
"""Send notification about new items to the user""" """Send notification about new items to the user"""
@ -65,3 +112,48 @@ class Notifier:
f"**[{update['title']}]({update['url']})**\n\n" f"**[{update['title']}]({update['url']})**\n\n"
f"{update['text']}" f"{update['text']}"
) )
class UserAuthMiddleware(BaseMiddleware):
"""Transparently authenticates and registers the user if needed."""
def __init__(self, db: Database):
super().__init__()
self.update_types = ['message']
self.db: Database = db
def pre_process(self, message: Message, data: dict):
"""Pre-process the message, find user and add it's ID to the handler data dictionary."""
data['user_id'] = self.__find_or_register_user(message)
def post_process(self, message: Message, data: dict, exception):
"""Post-process the message."""
pass
def __find_or_register_user(self, message: Message) -> int:
telegram_id = message.from_user.id
user_id = self.db.find_user(telegram_id)
if user_id is None:
return self.db.add_user(telegram_id)
return user_id
class ExceptionHandlerMiddleware(BaseMiddleware):
"""Sends messages to the user on exception."""
def __init__(self, bot: TeleBot):
super().__init__()
self.update_types = ['message']
self.bot: TeleBot = bot
def pre_process(self, message, data):
pass
def post_process(self, message: Message, data: dict, exception: Exception | None):
if exception is None:
return
elif type(exception) is DisplayableException:
self.bot.reply_to(message, 'Error: ' + str(exception))
else:
self.bot.reply_to(message, 'Something went wrong. Please try again (maybe later).')