Migrate to PostgreSQL #39

Merged
Miroslavsckaya merged 16 commits from migrate_to_postgresql into master 2022-07-13 22:53:54 +00:00
8 changed files with 77 additions and 72 deletions

View file

@ -33,14 +33,14 @@ the `requirements.txt` file. Otherwise old dependencies could be lost.
## Running the bot
```shell
export TELEGRAM_TOKEN=xxx
export DATABASE_PATH=./database.db
export RSSBOT_TG_TOKEN=xxx
export RSSBOT_DSN=xxx
python bot.py
```
## Running the update
```shell
export TELEGRAM_TOKEN=xxx
export DATABASE_PATH=./database.db
export RSSBOT_TG_TOKEN=xxx
export RSSBOT_DSN=xxx
python update.py
```

6
bot.py
View file

@ -8,8 +8,8 @@ from telegram import CommandProcessor
load_dotenv()
token = os.getenv('TELEGRAM_TOKEN')
db_path = os.getenv('DATABASE_PATH', './bot.db')
token = os.getenv('RSSBOT_TG_TOKEN')
dsn = os.getenv('RSSBOT_DSN')
log_level = os.getenv('LOG_LEVEL', 'INFO')
print('Starting the bot with logging level', log_level.upper())
@ -19,7 +19,7 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S',
)
db = Database(db_path, logging.getLogger('Database'))
db = Database(dsn, logging.getLogger('Database'))
bot = CommandProcessor(token, db, logging.getLogger('CommandProcessor'))
bot.run()

View file

@ -1,7 +1,7 @@
import sqlite3
from logging import Logger
import psycopg2
from psycopg2.extensions import connection
from psycopg2.extras import DictCursor, DictRow
from exceptions import DisplayableException
from rss import FeedItem
@ -9,27 +9,25 @@ from rss import FeedItem
class Database:
"""Implement interaction with the database."""
def __init__(self, path: str, log: Logger) -> None:
def __init__(self, dsn: str, log: Logger) -> None:
"""Create a database file if not exists."""
self.log: Logger = log
self.log.debug('Database.__init__(path=\'%s\')', path)
# TODO: think about removing check_same_thread=False
self.conn = sqlite3.connect(path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.cur = self.conn.cursor()
self.log.debug('Database.__init__(DSN=\'%s\')', dsn)
self.conn: connection = psycopg2.connect(dsn)
self.cur: DictCursor = self.conn.cursor(cursor_factory=DictCursor)
self.__init_schema()
def add_user(self, telegram_id: int) -> int:
"""Add a user's telegram id to the database and return its database id."""
self.log.debug('add_user(telegram_id=\'%s\')', telegram_id)
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id])
self.cur.execute('INSERT INTO users (telegram_id) VALUES (%s) RETURNING id', [telegram_id])
self.conn.commit()
return self.cur.lastrowid
return self.cur.fetchone()[0]
def find_user(self, telegram_id: int) -> int | None:
"""Get a user's telegram id and return its database id."""
self.log.debug('find_user(telegram_id=\'%s\')', telegram_id)
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id])
self.cur.execute('SELECT id FROM users WHERE telegram_id = %s', [telegram_id])
row = self.cur.fetchone()
if row is None:
return None
@ -38,14 +36,14 @@ class Database:
def add_feed(self, url: str) -> int:
"""Add a feed to the database and return its id."""
self.log.debug('add_feed(url=\'%s\')', url)
self.cur.execute('INSERT INTO feeds (url) VALUES (?)', [url])
self.cur.execute('INSERT INTO feeds (url) VALUES (%s) RETURNING id', [url])
self.conn.commit()
return self.cur.lastrowid
return self.cur.fetchone()[0]
def find_feed_by_url(self, url: str) -> int | None:
"""Find feed ID by url."""
self.log.debug('find_feed_by_url(url=\'%s\')', url)
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url])
self.cur.execute('SELECT id FROM feeds WHERE url = %s', [url])
row = self.cur.fetchone()
if row is None:
return None
@ -66,7 +64,7 @@ class Database:
def subscribe_user(self, user_id: int, feed_id: int) -> None:
"""Subscribe a user to the feed."""
self.log.debug('subscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id])
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (%s, %s)', [user_id, feed_id])
self.conn.commit()
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
@ -88,13 +86,13 @@ class Database:
def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
"""Unsubscribe a user from the feed."""
self.log.debug('unsubscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id])
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = %s AND user_id = %s', [feed_id, user_id])
self.conn.commit()
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
"""Check if user subscribed to specific feed."""
self.log.debug('is_user_subscribed(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id])
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = %s AND feed_id = %s', [user_id, feed_id])
row = self.cur.fetchone()
if row is None:
return False
@ -103,42 +101,42 @@ class Database:
def delete_feed(self, feed_id: int) -> None:
"""Delete a feed."""
self.log.debug('delete_feed(feed_id=\'%s\')', feed_id)
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id])
self.cur.execute('DELETE FROM feeds WHERE id = %s', [feed_id])
self.conn.commit()
def get_feed_subscribers_count(self, feed_id: int) -> int:
"""Count feed subscribers."""
self.log.debug('get_feed_subscribers_count(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = ?', [feed_id])
self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = %s', [feed_id])
row = self.cur.fetchone()
return row['amount_subscribers']
def find_feed_subscribers(self, feed_id: int) -> list[int]:
"""Return feed subscribers"""
self.log.debug('find_feed_subscribers(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = ?)',
self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = %s)',
[feed_id])
subscribers = self.cur.fetchall()
return list(map(lambda x: x['telegram_id'], subscribers))
def find_feeds(self) -> list[sqlite3.Row]:
def find_feeds(self) -> list[dict]:
"""Get a list of feeds."""
self.log.debug('find_feeds()')
self.cur.execute('SELECT * FROM feeds')
return self.cur.fetchall()
return self.__dictrow_to_dict_list(self.cur.fetchall())
def find_user_feeds(self, user_id: int) -> list[sqlite3.Row]:
def find_user_feeds(self, user_id: int) -> list[dict]:
"""Return a list of feeds the user is subscribed to."""
self.log.debug('find_user_feeds(user_id=\'%s\')', user_id)
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)',
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = %s)',
[user_id])
return self.cur.fetchall()
return self.__dictrow_to_dict_list(self.cur.fetchall())
def find_feed_items(self, feed_id: int) -> list[sqlite3.Row]:
def find_feed_items(self, feed_id: int) -> list[dict]:
"""Get last feed items."""
self.log.debug('find_feed_items(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = ?', [feed_id])
return self.cur.fetchall()
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = %s', [feed_id])
return self.__dictrow_to_dict_list(self.cur.fetchall())
def find_feed_items_urls(self, feed_id: int) -> list[str]:
"""Return urls last feed items"""
@ -152,35 +150,34 @@ class Database:
"""Replace last feed items with a list items that receive."""
self.log.debug('update_feed_items(feed_id=\'%s\', new_items=list(%d))', feed_id, len(new_items))
for i, _ in enumerate(new_items):
new_items[i] = [feed_id] + list(new_items[i].__dict__.values())[:-1]
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
new_items[i] = [feed_id, new_items[i].url, new_items[i].guid]
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = %s', [feed_id])
self.cur.executemany(
'INSERT INTO feeds_last_items (feed_id, url, title, description) VALUES (?, ?, ?, ?)', new_items)
'INSERT INTO feeds_last_items (feed_id, url, guid) VALUES (%s, %s, %s)', new_items)
self.conn.commit()
def __init_schema(self) -> None:
self.log.debug('__init_schema()')
# TODO: Change to migrations
self.cur.execute(
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))'
'CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, telegram_id INTEGER NOT NULL UNIQUE)'
)
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id SERIAL PRIMARY KEY, url TEXT NOT NULL UNIQUE)')
self.cur.execute(
'CREATE TABLE IF NOT EXISTS subscriptions ('
' user_id INTEGER,'
' feed_id INTEGER,'
' UNIQUE (user_id, feed_id),'
' FOREIGN KEY(user_id) REFERENCES users(id),'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
' user_id INTEGER REFERENCES users,'
' feed_id INTEGER REFERENCES feeds,'
' UNIQUE (user_id, feed_id)'
')'
)
self.cur.execute(
'CREATE TABLE IF NOT EXISTS feeds_last_items ('
' feed_id INTEGER,'
' url TEXT NOT NULL UNIQUE,'
' title TEXT,'
' description TEXT,'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
' feed_id INTEGER REFERENCES feeds ON DELETE CASCADE,'
' url TEXT NOT NULL,'
' guid TEXT'
')'
)
@staticmethod
def __dictrow_to_dict_list(rows: list[DictRow]) -> list[dict]:
"""Convert list of DictRows to list of dicts"""
return list(map(dict, rows))

View file

@ -4,6 +4,7 @@ charset-normalizer==2.0.12
decorator==5.1.1
feedparser==6.0.2
idna==3.3
psycopg2-binary==2.9.3
pyTelegramBotAPI==4.5.0
python-dotenv==0.20.0
requests==2.27.1

1
rss.py
View file

@ -9,6 +9,7 @@ class FeedItem:
self.url = item.get('link', '')
self.title = item.get('title', '')
self.description = item.get('summary', '')
self.guid = item.get('id', '')
if 'published' in item:
self.date = datetime.fromtimestamp(mktime(item.published_parsed))
else:

View file

@ -106,7 +106,8 @@ class CommandProcessor:
class Notifier:
"""Sends notifications to users about new RSS feed items."""
BATCH_LIMIT: int = 30
# https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
BATCH_LIMIT: int = 25
sent_counter: int = 0
@ -115,7 +116,7 @@ class Notifier:
self.log.debug('Notifier.__init__(token=\'%s\', logger=%s)', token[:8] + '...', logger)
self.bot: TeleBot = TeleBot(token)
self.html_sanitizer: Cleaner = Cleaner(
tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'span', 'tg-spoiler', 'a', 'code', 'pre'],
tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'tg-spoiler', 'a', 'code', 'pre'],
attributes={"a": ["href", "title"]},
protocols=['http', 'https'],
strip=True,

View file

@ -10,8 +10,8 @@ from telegram import Notifier
load_dotenv()
token = os.getenv('TELEGRAM_TOKEN')
db_path = os.getenv('DATABASE_PATH', './bot.db')
token = os.getenv('RSSBOT_TG_TOKEN')
dsn = os.getenv('RSSBOT_DSN')
log_level = os.getenv('LOG_LEVEL', 'INFO')
print('Starting the updater with logging level', log_level.upper())
@ -21,7 +21,7 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S'
)
db = Database(db_path, logging.getLogger('Database'))
db = Database(dsn, logging.getLogger('Database'))
notifier = Notifier(token, logging.getLogger('Notifier'))
rss_reader = RssReader(logging.getLogger('RssReader'))

View file

@ -24,35 +24,40 @@ class UpdateManager:
feeds = self.database.find_feeds()
self.log.info('Feeds to update: %d', len(feeds))
for feed_id, feed_url in feeds:
self.log.info('Processing [%d] %s', feed_id, feed_url)
feed = self.rss_reader.get_feed(feed_url)
new_items = feed.items
old_items_urls = self.database.find_feed_items_urls(feed_id)
for feed in feeds:
self.log.info('Processing [%d] %s', feed['id'], feed['url'])
feed_obj = self.rss_reader.get_feed(feed['url'])
new_items = feed_obj.items
old_items = self.database.find_feed_items(feed['id'])
diff = self.__calculate_difference(new_items, old_items_urls)
diff = self.__calculate_difference(new_items, old_items)
if not diff:
continue
chat_ids = self.database.find_feed_subscribers(feed_id)
self.notifier.send_updates(chat_ids, diff, feed.title)
self.database.update_feed_items(feed_id, new_items)
chat_ids = self.database.find_feed_subscribers(feed['id'])
self.notifier.send_updates(chat_ids, diff, feed_obj.title)
self.database.update_feed_items(feed['id'], new_items)
def __calculate_difference(self, new_items: list[FeedItem], old_items_urls: list[str]) -> list[FeedItem]:
def __calculate_difference(self, new_items: list[FeedItem], old_items: list[dict]) -> list[FeedItem]:
"""Calculate new feed items."""
self.log.debug(
'__calculate_difference(new_items=list(%d), old_items_urls=list(%d))', len(new_items), len(old_items_urls)
'__calculate_difference(new_items=list(%d), old_items=list(%d))', len(new_items), len(old_items)
)
if not old_items_urls:
if not old_items:
self.log.debug('Old items are empty, returning new')
return new_items
diff = []
guids = [item['guid'] for item in old_items if item['guid']]
urls = [item['url'] for item in old_items]
self.log.debug('Comparing %d new items with %d old', len(new_items), len(old_items_urls))
self.log.debug('Comparing %d new items with %d old', len(new_items), len(old_items))
for item in new_items:
if item.url not in old_items_urls:
if not guids and item.url not in urls:
diff.append(item)
continue
if item.guid not in guids:
diff.append(item)
self.log.debug('%d updates found', len(diff))