#2 #7 Telegram module (#10)
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
`telegram` module. Closes #2 #7. Two service classes: * `Notifier` class * `CommandProcessor` class Two [middlewares](https://github.com/eternnoir/pyTelegramBotAPI/tree/master/examples/middleware): * `UserAuthMiddleware` * `ExceptionHandlerMiddleware` One exception for usage in the code called by the bot: * `DisplayableException` Merge **ONLY** after #9 is merged. Reviewed-on: #10 Reviewed-by: Miroslavsckaya <miroslavsckaya@noreply.git.skobk.in>
This commit is contained in:
parent
22dcc4ef06
commit
2e44df0bbd
|
@ -24,7 +24,7 @@ steps:
|
||||||
commands:
|
commands:
|
||||||
- python -m venv .venv
|
- python -m venv .venv
|
||||||
- source ./.venv/bin/activate
|
- source ./.venv/bin/activate
|
||||||
- 'pip install pylama pylama\[all\]'
|
- 'pip install pylama pylama\[all\] > /dev/null'
|
||||||
- pylama
|
- pylama
|
||||||
when:
|
when:
|
||||||
event:
|
event:
|
||||||
|
|
|
@ -29,3 +29,10 @@ pip freeze > requirements.txt
|
||||||
|
|
||||||
**Do not forget** to install the latest dependencies before adding new dependencies and rewriting
|
**Do not forget** to install the latest dependencies before adding new dependencies and rewriting
|
||||||
the `requirements.txt` file. Otherwise old dependencies could be lost.
|
the `requirements.txt` file. Otherwise old dependencies could be lost.
|
||||||
|
|
||||||
|
## Running the bot
|
||||||
|
|
||||||
|
```shell
|
||||||
|
export TELEGRAM_TOKEN=xxx
|
||||||
|
python bot.py
|
||||||
|
```
|
||||||
|
|
18
bot.py
Normal file
18
bot.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from database import Database
|
||||||
|
from telegram import CommandProcessor
|
||||||
|
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
token = os.getenv('TELEGRAM_TOKEN')
|
||||||
|
db_path = os.getenv('DATABASE_PATH')
|
||||||
|
|
||||||
|
db = Database(db_path)
|
||||||
|
bot = CommandProcessor(token, db)
|
||||||
|
|
||||||
|
logging.info("Starting Telegram bot")
|
||||||
|
bot.run()
|
104
database.py
104
database.py
|
@ -1,28 +1,25 @@
|
||||||
import sqlite3
|
import sqlite3
|
||||||
"""
|
|
||||||
Classes:
|
|
||||||
Database - implement intercaction with the database.
|
|
||||||
"""
|
|
||||||
|
|
||||||
class Database():
|
from exceptions import DisplayableException
|
||||||
"""Implement intercaction with the database."""
|
|
||||||
|
|
||||||
|
class Database:
|
||||||
|
"""Implement interaction with the database."""
|
||||||
|
|
||||||
def __init__(self, path: str) -> None:
|
def __init__(self, path: str) -> None:
|
||||||
"""Create a database file if not exists."""
|
"""Create a database file if not exists."""
|
||||||
self.conn = sqlite3.connect(path)
|
# TODO: think about removing check_same_thread=False
|
||||||
|
self.conn = sqlite3.connect(path, check_same_thread=False)
|
||||||
self.cur = self.conn.cursor()
|
self.cur = self.conn.cursor()
|
||||||
self.cur.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id NUMERIC NOT NULL UNIQUE, PRIMARY KEY(id))')
|
self.__init_schema()
|
||||||
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
|
|
||||||
self.cur.execute('CREATE TABLE IF NOT EXISTS subscriptions (user_id INTEGER, feed_id INTEGER, UNIQUE (user_id, feed_id), FOREIGN KEY(user_id) REFERENCES users(id), FOREIGN KEY(feed_id) REFERENCES feeds(id))')
|
|
||||||
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds_last_items (feed_id INTEGER, url TEXT NOT NULL UNIQUE, title TEXT, description TEXT, date NUMERIC, FOREIGN KEY(feed_id) REFERENCES feeds(id))')
|
|
||||||
|
|
||||||
def add_user(self, telegram_id: str) -> int:
|
def add_user(self, telegram_id: int) -> int:
|
||||||
"""Add a user's telegram id to the database and return its database id."""
|
"""Add a user's telegram id to the database and return its database id."""
|
||||||
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id])
|
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id])
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
return self.cur.lastrowid
|
return self.cur.lastrowid
|
||||||
|
|
||||||
def find_user(self, telegram_id: str) -> int | None:
|
def find_user(self, telegram_id: int) -> int | None:
|
||||||
"""Get a user's telegram id and return its database id."""
|
"""Get a user's telegram id and return its database id."""
|
||||||
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id])
|
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id])
|
||||||
row = self.cur.fetchone()
|
row = self.cur.fetchone()
|
||||||
|
@ -36,21 +33,70 @@ class Database():
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
return self.cur.lastrowid
|
return self.cur.lastrowid
|
||||||
|
|
||||||
|
def find_feed_by_url(self, url: str) -> int | None:
|
||||||
|
"""Find feed ID by url."""
|
||||||
|
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url])
|
||||||
|
row = self.cur.fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return row[0]
|
||||||
|
|
||||||
|
def subscribe_user_by_url(self, user_id: int, url: str) -> None:
|
||||||
|
"""Subscribe user to the feed creating it if does not exist yet."""
|
||||||
|
feed_id = self.find_feed_by_url(url)
|
||||||
|
if feed_id is None:
|
||||||
|
feed_id = self.add_feed(url)
|
||||||
|
|
||||||
|
if self.is_user_subscribed(user_id, feed_id):
|
||||||
|
raise DisplayableException('Already subscribed')
|
||||||
|
|
||||||
|
self.subscribe_user(user_id, feed_id)
|
||||||
|
|
||||||
def subscribe_user(self, user_id: int, feed_id: int) -> None:
|
def subscribe_user(self, user_id: int, feed_id: int) -> None:
|
||||||
"""Subscribe a user to the feed."""
|
"""Subscribe a user to the feed."""
|
||||||
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id])
|
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id])
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
|
||||||
|
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
|
||||||
|
feed_id = self.find_feed_by_url(url)
|
||||||
|
if feed_id is None:
|
||||||
|
raise DisplayableException('Feed does not exist')
|
||||||
|
|
||||||
|
if not self.is_user_subscribed(user_id, feed_id):
|
||||||
|
raise DisplayableException('Not subscribed')
|
||||||
|
|
||||||
|
self.unsubscribe_user(user_id, feed_id)
|
||||||
|
|
||||||
|
if self.get_feed_subscribers_count(feed_id) == 0:
|
||||||
|
# Feed is not used anymore. Removing.
|
||||||
|
self.delete_feed(feed_id)
|
||||||
|
|
||||||
def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
|
def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
|
||||||
"""Unsubscribe a user from the feed."""
|
"""Unsubscribe a user from the feed."""
|
||||||
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id])
|
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id])
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
|
||||||
|
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
|
||||||
|
"""Check if user subscribed to specific feed."""
|
||||||
|
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id])
|
||||||
|
row = self.cur.fetchone()
|
||||||
|
if row is None:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
def delete_feed(self, feed_id: int) -> None:
|
def delete_feed(self, feed_id: int) -> None:
|
||||||
"""Delete a feed."""
|
"""Delete a feed."""
|
||||||
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id])
|
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id])
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
|
||||||
|
def get_feed_subscribers_count(self, feed_id: int) -> int:
|
||||||
|
"""Count feed subscribers."""
|
||||||
|
self.cur.execute('SELECT COUNT(user_id) FROM subscriptions WHERE feed_id = ?', [feed_id])
|
||||||
|
row = self.cur.fetchone()
|
||||||
|
if row is None:
|
||||||
|
return 0
|
||||||
|
return int(row[0])
|
||||||
|
|
||||||
def find_feeds(self) -> list:
|
def find_feeds(self) -> list:
|
||||||
"""Get a list of feeds."""
|
"""Get a list of feeds."""
|
||||||
self.cur.execute('SELECT * FROM feeds')
|
self.cur.execute('SELECT * FROM feeds')
|
||||||
|
@ -58,7 +104,8 @@ class Database():
|
||||||
|
|
||||||
def find_user_feeds(self, user_id: int) -> list:
|
def find_user_feeds(self, user_id: int) -> list:
|
||||||
"""Return a list of feeds the user is subscribed to."""
|
"""Return a list of feeds the user is subscribed to."""
|
||||||
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)', [user_id])
|
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)',
|
||||||
|
[user_id])
|
||||||
return self.cur.fetchall()
|
return self.cur.fetchall()
|
||||||
|
|
||||||
def find_feed_items(self, feed_id: int) -> list:
|
def find_feed_items(self, feed_id: int) -> list:
|
||||||
|
@ -72,5 +119,32 @@ class Database():
|
||||||
new_items[i] = (feed_id,) + new_items[i]
|
new_items[i] = (feed_id,) + new_items[i]
|
||||||
|
|
||||||
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
|
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
|
||||||
self.cur.executemany('INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items)
|
self.cur.executemany(
|
||||||
|
'INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items)
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
|
||||||
|
def __init_schema(self):
|
||||||
|
# TODO: Move to migrations
|
||||||
|
self.cur.execute(
|
||||||
|
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))'
|
||||||
|
)
|
||||||
|
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
|
||||||
|
self.cur.execute(
|
||||||
|
'CREATE TABLE IF NOT EXISTS subscriptions ('
|
||||||
|
' user_id INTEGER,'
|
||||||
|
' feed_id INTEGER,'
|
||||||
|
' UNIQUE (user_id, feed_id),'
|
||||||
|
' FOREIGN KEY(user_id) REFERENCES users(id),'
|
||||||
|
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
|
||||||
|
')'
|
||||||
|
)
|
||||||
|
self.cur.execute(
|
||||||
|
'CREATE TABLE IF NOT EXISTS feeds_last_items ('
|
||||||
|
' feed_id INTEGER,'
|
||||||
|
' url TEXT NOT NULL UNIQUE,'
|
||||||
|
' title TEXT,'
|
||||||
|
' description TEXT,'
|
||||||
|
' date NUMERIC,'
|
||||||
|
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
|
||||||
|
')'
|
||||||
|
)
|
||||||
|
|
2
exceptions.py
Normal file
2
exceptions.py
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
class DisplayableException(Exception):
|
||||||
|
"""Exception which could be safely displayed to the end-user."""
|
|
@ -3,7 +3,7 @@
|
||||||
format = pylint
|
format = pylint
|
||||||
skip = .venv/*
|
skip = .venv/*
|
||||||
linters = pyflakes,pylint,pycodestyle
|
linters = pyflakes,pylint,pycodestyle
|
||||||
#ignore = F0401,C0111,E731
|
ignore = F0401,C0114,R0903
|
||||||
|
|
||||||
[pylama:pylint]
|
[pylama:pylint]
|
||||||
max_line_length = 120
|
max_line_length = 120
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
certifi==2021.10.8
|
certifi==2021.10.8
|
||||||
charset-normalizer==2.0.12
|
charset-normalizer==2.0.12
|
||||||
|
decorator==5.1.1
|
||||||
|
feedparser==6.0.2
|
||||||
idna==3.3
|
idna==3.3
|
||||||
pyTelegramBotAPI==4.5.0
|
pyTelegramBotAPI==4.5.0
|
||||||
|
python-dotenv==0.20.0
|
||||||
requests==2.27.1
|
requests==2.27.1
|
||||||
|
sgmllib3k==1.0.0
|
||||||
urllib3==1.26.9
|
urllib3==1.26.9
|
||||||
feedparser==6.0.2
|
validators==0.19.0
|
||||||
|
|
169
telegram.py
Normal file
169
telegram.py
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
from telebot import TeleBot
|
||||||
|
from telebot.handler_backends import BaseMiddleware
|
||||||
|
from telebot.types import Message
|
||||||
|
import validators
|
||||||
|
|
||||||
|
from database import Database
|
||||||
|
from exceptions import DisplayableException
|
||||||
|
from rss import FeedItem
|
||||||
|
|
||||||
|
|
||||||
|
class CommandProcessor:
|
||||||
|
"""Processes user input and dispatches the data to other services."""
|
||||||
|
|
||||||
|
def __init__(self, token: str, database: Database):
|
||||||
|
if token is None or len(token) == 0:
|
||||||
|
raise ValueError("Token should not be empty")
|
||||||
|
self.bot: TeleBot = TeleBot(token, use_class_middlewares=True)
|
||||||
|
self.bot.setup_middleware(UserAuthMiddleware(database))
|
||||||
|
self.bot.setup_middleware(ExceptionHandlerMiddleware(self.bot))
|
||||||
|
self.database: Database = database
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Run a bot and poll for new messages indefinitely."""
|
||||||
|
self.bot.register_message_handler(commands=['add'], callback=self.__add_feed)
|
||||||
|
self.bot.register_message_handler(commands=['list'], callback=self.__list_feeds)
|
||||||
|
self.bot.register_message_handler(commands=['del'], callback=self.__delete_feed)
|
||||||
|
self.bot.register_message_handler(commands=['help', 'start'], callback=self.__command_help)
|
||||||
|
self.bot.register_message_handler(callback=self.__command_help)
|
||||||
|
|
||||||
|
self.bot.infinity_polling()
|
||||||
|
|
||||||
|
def __command_help(self, message: Message):
|
||||||
|
self.bot.reply_to(
|
||||||
|
message,
|
||||||
|
'Supported commands:\n'
|
||||||
|
' /add <feed url> - Add new feed\n'
|
||||||
|
' /list - List currently added feeds\n'
|
||||||
|
' /del <feed url> - Remove feed\n'
|
||||||
|
' /help - Get this help message'
|
||||||
|
)
|
||||||
|
|
||||||
|
def __add_feed(self, message: Message, data: dict):
|
||||||
|
args = message.text.split()
|
||||||
|
if len(args) < 2:
|
||||||
|
raise DisplayableException('Feed URL should be specified')
|
||||||
|
|
||||||
|
url = str(args[1])
|
||||||
|
if not self.__is_url_valid(url):
|
||||||
|
raise DisplayableException('Invalid feed URL')
|
||||||
|
|
||||||
|
self.database.subscribe_user_by_url(data['user_id'], url)
|
||||||
|
|
||||||
|
self.bot.reply_to(message, 'Successfully subscribed to feed.')
|
||||||
|
|
||||||
|
def __list_feeds(self, message: Message, data: dict):
|
||||||
|
feeds = self.database.find_user_feeds(data['user_id'])
|
||||||
|
|
||||||
|
feed_list = ''
|
||||||
|
for feed in feeds:
|
||||||
|
feed_list += '* ' + str(feed[0]) + ': ' + feed[1] + '\n'
|
||||||
|
|
||||||
|
self.bot.reply_to(message, 'Your feeds:\n' + feed_list)
|
||||||
|
|
||||||
|
def __delete_feed(self, message: Message, data: dict):
|
||||||
|
args = message.text.split()
|
||||||
|
if len(args) < 2:
|
||||||
|
raise DisplayableException('Feed URL should be specified')
|
||||||
|
|
||||||
|
url = str(args[1])
|
||||||
|
if not self.__is_url_valid(url):
|
||||||
|
raise DisplayableException('Invalid feed URL')
|
||||||
|
|
||||||
|
self.database.unsubscribe_user_by_url(data['user_id'], url)
|
||||||
|
|
||||||
|
self.bot.reply_to(message, 'Unsubscribed.')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __is_url_valid(url: str) -> bool:
|
||||||
|
if not validators.url(url):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# For security reasons we should not allow anything except HTTP/HTTPS.
|
||||||
|
if not url.startswith(('http://', 'https://')):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class Notifier:
|
||||||
|
"""Sends notifications to users about new RSS feed items."""
|
||||||
|
|
||||||
|
BATCH_LIMIT: int = 30
|
||||||
|
|
||||||
|
sent_counter: int = 0
|
||||||
|
|
||||||
|
def __init__(self, token: str):
|
||||||
|
self.bot: TeleBot = TeleBot(token)
|
||||||
|
|
||||||
|
def send_updates(self, chat_ids: list[int], updates: list[FeedItem]):
|
||||||
|
"""Send notification about new items to the user"""
|
||||||
|
for chat_id in chat_ids:
|
||||||
|
for update in updates:
|
||||||
|
self.__send_update(chat_id, update)
|
||||||
|
self.sent_counter += 1
|
||||||
|
if self.sent_counter >= self.BATCH_LIMIT:
|
||||||
|
# TODO: probably implement better later
|
||||||
|
time.sleep(1)
|
||||||
|
self.sent_counter = 0
|
||||||
|
|
||||||
|
def __send_update(self, chat_id: int, update: FeedItem):
|
||||||
|
self.bot.send_message(
|
||||||
|
chat_id=chat_id,
|
||||||
|
text=self.__format_message(update),
|
||||||
|
parse_mode='HTML'
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __format_message(item: FeedItem) -> str:
|
||||||
|
return (
|
||||||
|
f"<strong><a href=\"{item.url}\">{item.title}</a></strong>\n\n"
|
||||||
|
f"{item.description}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UserAuthMiddleware(BaseMiddleware):
|
||||||
|
"""Transparently authenticates and registers the user if needed."""
|
||||||
|
|
||||||
|
def __init__(self, database: Database):
|
||||||
|
super().__init__()
|
||||||
|
self.update_types = ['message']
|
||||||
|
self.database: Database = database
|
||||||
|
|
||||||
|
def pre_process(self, message: Message, data: dict):
|
||||||
|
"""Pre-process update, find user and add it's ID to the handler data dictionary."""
|
||||||
|
data['user_id'] = self.__find_or_register_user(message)
|
||||||
|
|
||||||
|
def post_process(self, message: Message, data: dict, exception):
|
||||||
|
"""Post-process update."""
|
||||||
|
|
||||||
|
def __find_or_register_user(self, message: Message) -> int:
|
||||||
|
telegram_id = message.from_user.id
|
||||||
|
|
||||||
|
user_id = self.database.find_user(telegram_id)
|
||||||
|
if user_id is None:
|
||||||
|
return self.database.add_user(telegram_id)
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
|
||||||
|
class ExceptionHandlerMiddleware(BaseMiddleware):
|
||||||
|
"""Sends messages to the user on exception."""
|
||||||
|
|
||||||
|
def __init__(self, bot: TeleBot):
|
||||||
|
super().__init__()
|
||||||
|
self.update_types = ['message']
|
||||||
|
self.bot: TeleBot = bot
|
||||||
|
|
||||||
|
def pre_process(self, message: Message, data: dict):
|
||||||
|
"""Pre-process update."""
|
||||||
|
|
||||||
|
# pylint: disable=W0613
|
||||||
|
def post_process(self, message: Message, data: dict, exception: Exception | None):
|
||||||
|
"""Post-process update. Send user an error notification."""
|
||||||
|
if exception is None:
|
||||||
|
return
|
||||||
|
if isinstance(exception, DisplayableException):
|
||||||
|
self.bot.reply_to(message, 'Error: ' + str(exception))
|
||||||
|
else:
|
||||||
|
self.bot.reply_to(message, 'Something went wrong. Please try again (maybe later).')
|
Loading…
Reference in a new issue