Migrate to PostgreSQL (#39)
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #39
Reviewed-by: Alexey Skobkin <skobkin-ru@ya.ru>
This commit is contained in:
Miroslavsckaya 2022-07-14 01:53:53 +03:00
parent 92ced4c91b
commit e0992b2351
8 changed files with 77 additions and 72 deletions

View file

@ -33,14 +33,14 @@ the `requirements.txt` file. Otherwise old dependencies could be lost.
## Running the bot ## Running the bot
```shell ```shell
export TELEGRAM_TOKEN=xxx export RSSBOT_TG_TOKEN=xxx
export DATABASE_PATH=./database.db export RSSBOT_DSN=xxx
python bot.py python bot.py
``` ```
## Running the update ## Running the update
```shell ```shell
export TELEGRAM_TOKEN=xxx export RSSBOT_TG_TOKEN=xxx
export DATABASE_PATH=./database.db export RSSBOT_DSN=xxx
python update.py python update.py
``` ```

6
bot.py
View file

@ -8,8 +8,8 @@ from telegram import CommandProcessor
load_dotenv() load_dotenv()
token = os.getenv('TELEGRAM_TOKEN') token = os.getenv('RSSBOT_TG_TOKEN')
db_path = os.getenv('DATABASE_PATH', './bot.db') dsn = os.getenv('RSSBOT_DSN')
log_level = os.getenv('LOG_LEVEL', 'INFO') log_level = os.getenv('LOG_LEVEL', 'INFO')
print('Starting the bot with logging level', log_level.upper()) print('Starting the bot with logging level', log_level.upper())
@ -19,7 +19,7 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S', datefmt='%Y-%m-%d %H:%M:%S',
) )
db = Database(db_path, logging.getLogger('Database')) db = Database(dsn, logging.getLogger('Database'))
bot = CommandProcessor(token, db, logging.getLogger('CommandProcessor')) bot = CommandProcessor(token, db, logging.getLogger('CommandProcessor'))
bot.run() bot.run()

View file

@ -1,7 +1,7 @@
import sqlite3
from logging import Logger from logging import Logger
import psycopg2
from psycopg2.extensions import connection
from psycopg2.extras import DictCursor, DictRow
from exceptions import DisplayableException from exceptions import DisplayableException
from rss import FeedItem from rss import FeedItem
@ -9,27 +9,25 @@ from rss import FeedItem
class Database: class Database:
"""Implement interaction with the database.""" """Implement interaction with the database."""
def __init__(self, path: str, log: Logger) -> None: def __init__(self, dsn: str, log: Logger) -> None:
"""Create a database file if not exists.""" """Create a database file if not exists."""
self.log: Logger = log self.log: Logger = log
self.log.debug('Database.__init__(path=\'%s\')', path) self.log.debug('Database.__init__(DSN=\'%s\')', dsn)
# TODO: think about removing check_same_thread=False self.conn: connection = psycopg2.connect(dsn)
self.conn = sqlite3.connect(path, check_same_thread=False) self.cur: DictCursor = self.conn.cursor(cursor_factory=DictCursor)
self.conn.row_factory = sqlite3.Row
self.cur = self.conn.cursor()
self.__init_schema() self.__init_schema()
def add_user(self, telegram_id: int) -> int: def add_user(self, telegram_id: int) -> int:
"""Add a user's telegram id to the database and return its database id.""" """Add a user's telegram id to the database and return its database id."""
self.log.debug('add_user(telegram_id=\'%s\')', telegram_id) self.log.debug('add_user(telegram_id=\'%s\')', telegram_id)
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id]) self.cur.execute('INSERT INTO users (telegram_id) VALUES (%s) RETURNING id', [telegram_id])
self.conn.commit() self.conn.commit()
return self.cur.lastrowid return self.cur.fetchone()[0]
def find_user(self, telegram_id: int) -> int | None: def find_user(self, telegram_id: int) -> int | None:
"""Get a user's telegram id and return its database id.""" """Get a user's telegram id and return its database id."""
self.log.debug('find_user(telegram_id=\'%s\')', telegram_id) self.log.debug('find_user(telegram_id=\'%s\')', telegram_id)
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id]) self.cur.execute('SELECT id FROM users WHERE telegram_id = %s', [telegram_id])
row = self.cur.fetchone() row = self.cur.fetchone()
if row is None: if row is None:
return None return None
@ -38,14 +36,14 @@ class Database:
def add_feed(self, url: str) -> int: def add_feed(self, url: str) -> int:
"""Add a feed to the database and return its id.""" """Add a feed to the database and return its id."""
self.log.debug('add_feed(url=\'%s\')', url) self.log.debug('add_feed(url=\'%s\')', url)
self.cur.execute('INSERT INTO feeds (url) VALUES (?)', [url]) self.cur.execute('INSERT INTO feeds (url) VALUES (%s) RETURNING id', [url])
self.conn.commit() self.conn.commit()
return self.cur.lastrowid return self.cur.fetchone()[0]
def find_feed_by_url(self, url: str) -> int | None: def find_feed_by_url(self, url: str) -> int | None:
"""Find feed ID by url.""" """Find feed ID by url."""
self.log.debug('find_feed_by_url(url=\'%s\')', url) self.log.debug('find_feed_by_url(url=\'%s\')', url)
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url]) self.cur.execute('SELECT id FROM feeds WHERE url = %s', [url])
row = self.cur.fetchone() row = self.cur.fetchone()
if row is None: if row is None:
return None return None
@ -66,7 +64,7 @@ class Database:
def subscribe_user(self, user_id: int, feed_id: int) -> None: def subscribe_user(self, user_id: int, feed_id: int) -> None:
"""Subscribe a user to the feed.""" """Subscribe a user to the feed."""
self.log.debug('subscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id) self.log.debug('subscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id]) self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (%s, %s)', [user_id, feed_id])
self.conn.commit() self.conn.commit()
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None: def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
@ -88,13 +86,13 @@ class Database:
def unsubscribe_user(self, user_id: int, feed_id: int) -> None: def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
"""Unsubscribe a user from the feed.""" """Unsubscribe a user from the feed."""
self.log.debug('unsubscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id) self.log.debug('unsubscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id]) self.cur.execute('DELETE FROM subscriptions WHERE feed_id = %s AND user_id = %s', [feed_id, user_id])
self.conn.commit() self.conn.commit()
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool: def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
"""Check if user subscribed to specific feed.""" """Check if user subscribed to specific feed."""
self.log.debug('is_user_subscribed(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id) self.log.debug('is_user_subscribed(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id]) self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = %s AND feed_id = %s', [user_id, feed_id])
row = self.cur.fetchone() row = self.cur.fetchone()
if row is None: if row is None:
return False return False
@ -103,42 +101,42 @@ class Database:
def delete_feed(self, feed_id: int) -> None: def delete_feed(self, feed_id: int) -> None:
"""Delete a feed.""" """Delete a feed."""
self.log.debug('delete_feed(feed_id=\'%s\')', feed_id) self.log.debug('delete_feed(feed_id=\'%s\')', feed_id)
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id]) self.cur.execute('DELETE FROM feeds WHERE id = %s', [feed_id])
self.conn.commit() self.conn.commit()
def get_feed_subscribers_count(self, feed_id: int) -> int: def get_feed_subscribers_count(self, feed_id: int) -> int:
"""Count feed subscribers.""" """Count feed subscribers."""
self.log.debug('get_feed_subscribers_count(feed_id=\'%s\')', feed_id) self.log.debug('get_feed_subscribers_count(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = ?', [feed_id]) self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = %s', [feed_id])
row = self.cur.fetchone() row = self.cur.fetchone()
return row['amount_subscribers'] return row['amount_subscribers']
def find_feed_subscribers(self, feed_id: int) -> list[int]: def find_feed_subscribers(self, feed_id: int) -> list[int]:
"""Return feed subscribers""" """Return feed subscribers"""
self.log.debug('find_feed_subscribers(feed_id=\'%s\')', feed_id) self.log.debug('find_feed_subscribers(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = ?)', self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = %s)',
[feed_id]) [feed_id])
subscribers = self.cur.fetchall() subscribers = self.cur.fetchall()
return list(map(lambda x: x['telegram_id'], subscribers)) return list(map(lambda x: x['telegram_id'], subscribers))
def find_feeds(self) -> list[sqlite3.Row]: def find_feeds(self) -> list[dict]:
"""Get a list of feeds.""" """Get a list of feeds."""
self.log.debug('find_feeds()') self.log.debug('find_feeds()')
self.cur.execute('SELECT * FROM feeds') self.cur.execute('SELECT * FROM feeds')
return self.cur.fetchall() return self.__dictrow_to_dict_list(self.cur.fetchall())
def find_user_feeds(self, user_id: int) -> list[sqlite3.Row]: def find_user_feeds(self, user_id: int) -> list[dict]:
"""Return a list of feeds the user is subscribed to.""" """Return a list of feeds the user is subscribed to."""
self.log.debug('find_user_feeds(user_id=\'%s\')', user_id) self.log.debug('find_user_feeds(user_id=\'%s\')', user_id)
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)', self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = %s)',
[user_id]) [user_id])
return self.cur.fetchall() return self.__dictrow_to_dict_list(self.cur.fetchall())
def find_feed_items(self, feed_id: int) -> list[sqlite3.Row]: def find_feed_items(self, feed_id: int) -> list[dict]:
"""Get last feed items.""" """Get last feed items."""
self.log.debug('find_feed_items(feed_id=\'%s\')', feed_id) self.log.debug('find_feed_items(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = ?', [feed_id]) self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = %s', [feed_id])
return self.cur.fetchall() return self.__dictrow_to_dict_list(self.cur.fetchall())
def find_feed_items_urls(self, feed_id: int) -> list[str]: def find_feed_items_urls(self, feed_id: int) -> list[str]:
"""Return urls last feed items""" """Return urls last feed items"""
@ -152,35 +150,34 @@ class Database:
"""Replace last feed items with a list items that receive.""" """Replace last feed items with a list items that receive."""
self.log.debug('update_feed_items(feed_id=\'%s\', new_items=list(%d))', feed_id, len(new_items)) self.log.debug('update_feed_items(feed_id=\'%s\', new_items=list(%d))', feed_id, len(new_items))
for i, _ in enumerate(new_items): for i, _ in enumerate(new_items):
new_items[i] = [feed_id] + list(new_items[i].__dict__.values())[:-1] new_items[i] = [feed_id, new_items[i].url, new_items[i].guid]
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = %s', [feed_id])
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
self.cur.executemany( self.cur.executemany(
'INSERT INTO feeds_last_items (feed_id, url, title, description) VALUES (?, ?, ?, ?)', new_items) 'INSERT INTO feeds_last_items (feed_id, url, guid) VALUES (%s, %s, %s)', new_items)
self.conn.commit() self.conn.commit()
def __init_schema(self) -> None: def __init_schema(self) -> None:
self.log.debug('__init_schema()') self.log.debug('__init_schema()')
# TODO: Change to migrations
self.cur.execute( self.cur.execute(
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))' 'CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, telegram_id INTEGER NOT NULL UNIQUE)'
) )
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))') self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id SERIAL PRIMARY KEY, url TEXT NOT NULL UNIQUE)')
self.cur.execute( self.cur.execute(
'CREATE TABLE IF NOT EXISTS subscriptions (' 'CREATE TABLE IF NOT EXISTS subscriptions ('
' user_id INTEGER,' ' user_id INTEGER REFERENCES users,'
' feed_id INTEGER,' ' feed_id INTEGER REFERENCES feeds,'
' UNIQUE (user_id, feed_id),' ' UNIQUE (user_id, feed_id)'
' FOREIGN KEY(user_id) REFERENCES users(id),'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
')' ')'
) )
self.cur.execute( self.cur.execute(
'CREATE TABLE IF NOT EXISTS feeds_last_items (' 'CREATE TABLE IF NOT EXISTS feeds_last_items ('
' feed_id INTEGER,' ' feed_id INTEGER REFERENCES feeds ON DELETE CASCADE,'
' url TEXT NOT NULL UNIQUE,' ' url TEXT NOT NULL,'
' title TEXT,' ' guid TEXT'
' description TEXT,'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
')' ')'
) )
@staticmethod
def __dictrow_to_dict_list(rows: list[DictRow]) -> list[dict]:
"""Convert list of DictRows to list of dicts"""
return list(map(dict, rows))

View file

@ -4,6 +4,7 @@ charset-normalizer==2.0.12
decorator==5.1.1 decorator==5.1.1
feedparser==6.0.2 feedparser==6.0.2
idna==3.3 idna==3.3
psycopg2-binary==2.9.3
pyTelegramBotAPI==4.5.0 pyTelegramBotAPI==4.5.0
python-dotenv==0.20.0 python-dotenv==0.20.0
requests==2.27.1 requests==2.27.1

1
rss.py
View file

@ -9,6 +9,7 @@ class FeedItem:
self.url = item.get('link', '') self.url = item.get('link', '')
self.title = item.get('title', '') self.title = item.get('title', '')
self.description = item.get('summary', '') self.description = item.get('summary', '')
self.guid = item.get('id', '')
if 'published' in item: if 'published' in item:
self.date = datetime.fromtimestamp(mktime(item.published_parsed)) self.date = datetime.fromtimestamp(mktime(item.published_parsed))
else: else:

View file

@ -106,7 +106,8 @@ class CommandProcessor:
class Notifier: class Notifier:
"""Sends notifications to users about new RSS feed items.""" """Sends notifications to users about new RSS feed items."""
BATCH_LIMIT: int = 30 # https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
BATCH_LIMIT: int = 25
sent_counter: int = 0 sent_counter: int = 0
@ -115,7 +116,7 @@ class Notifier:
self.log.debug('Notifier.__init__(token=\'%s\', logger=%s)', token[:8] + '...', logger) self.log.debug('Notifier.__init__(token=\'%s\', logger=%s)', token[:8] + '...', logger)
self.bot: TeleBot = TeleBot(token) self.bot: TeleBot = TeleBot(token)
self.html_sanitizer: Cleaner = Cleaner( self.html_sanitizer: Cleaner = Cleaner(
tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'span', 'tg-spoiler', 'a', 'code', 'pre'], tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'tg-spoiler', 'a', 'code', 'pre'],
attributes={"a": ["href", "title"]}, attributes={"a": ["href", "title"]},
protocols=['http', 'https'], protocols=['http', 'https'],
strip=True, strip=True,

View file

@ -10,8 +10,8 @@ from telegram import Notifier
load_dotenv() load_dotenv()
token = os.getenv('TELEGRAM_TOKEN') token = os.getenv('RSSBOT_TG_TOKEN')
db_path = os.getenv('DATABASE_PATH', './bot.db') dsn = os.getenv('RSSBOT_DSN')
log_level = os.getenv('LOG_LEVEL', 'INFO') log_level = os.getenv('LOG_LEVEL', 'INFO')
print('Starting the updater with logging level', log_level.upper()) print('Starting the updater with logging level', log_level.upper())
@ -21,7 +21,7 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
) )
db = Database(db_path, logging.getLogger('Database')) db = Database(dsn, logging.getLogger('Database'))
notifier = Notifier(token, logging.getLogger('Notifier')) notifier = Notifier(token, logging.getLogger('Notifier'))
rss_reader = RssReader(logging.getLogger('RssReader')) rss_reader = RssReader(logging.getLogger('RssReader'))

View file

@ -24,35 +24,40 @@ class UpdateManager:
feeds = self.database.find_feeds() feeds = self.database.find_feeds()
self.log.info('Feeds to update: %d', len(feeds)) self.log.info('Feeds to update: %d', len(feeds))
for feed_id, feed_url in feeds: for feed in feeds:
self.log.info('Processing [%d] %s', feed_id, feed_url) self.log.info('Processing [%d] %s', feed['id'], feed['url'])
feed = self.rss_reader.get_feed(feed_url) feed_obj = self.rss_reader.get_feed(feed['url'])
new_items = feed.items new_items = feed_obj.items
old_items_urls = self.database.find_feed_items_urls(feed_id) old_items = self.database.find_feed_items(feed['id'])
diff = self.__calculate_difference(new_items, old_items_urls) diff = self.__calculate_difference(new_items, old_items)
if not diff: if not diff:
continue continue
chat_ids = self.database.find_feed_subscribers(feed_id) chat_ids = self.database.find_feed_subscribers(feed['id'])
self.notifier.send_updates(chat_ids, diff, feed.title) self.notifier.send_updates(chat_ids, diff, feed_obj.title)
self.database.update_feed_items(feed_id, new_items) self.database.update_feed_items(feed['id'], new_items)
def __calculate_difference(self, new_items: list[FeedItem], old_items_urls: list[str]) -> list[FeedItem]: def __calculate_difference(self, new_items: list[FeedItem], old_items: list[dict]) -> list[FeedItem]:
"""Calculate new feed items.""" """Calculate new feed items."""
self.log.debug( self.log.debug(
'__calculate_difference(new_items=list(%d), old_items_urls=list(%d))', len(new_items), len(old_items_urls) '__calculate_difference(new_items=list(%d), old_items=list(%d))', len(new_items), len(old_items)
) )
if not old_items_urls: if not old_items:
self.log.debug('Old items are empty, returning new') self.log.debug('Old items are empty, returning new')
return new_items return new_items
diff = [] diff = []
guids = [item['guid'] for item in old_items if item['guid']]
urls = [item['url'] for item in old_items]
self.log.debug('Comparing %d new items with %d old', len(new_items), len(old_items_urls)) self.log.debug('Comparing %d new items with %d old', len(new_items), len(old_items))
for item in new_items: for item in new_items:
if item.url not in old_items_urls: if not guids and item.url not in urls:
diff.append(item)
continue
if item.guid not in guids:
diff.append(item) diff.append(item)
self.log.debug('%d updates found', len(diff)) self.log.debug('%d updates found', len(diff))