WIP: migrate to PostgreSQL
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing

This commit is contained in:
mitsuha_s 2022-07-11 00:25:44 +00:00
parent 92ced4c91b
commit 1bdb00e8e7
6 changed files with 43 additions and 43 deletions

View file

@ -33,14 +33,14 @@ the `requirements.txt` file. Otherwise old dependencies could be lost.
## Running the bot ## Running the bot
```shell ```shell
export TELEGRAM_TOKEN=xxx export RSSBOT_TG_TOKEN=xxx
export DATABASE_PATH=./database.db export RSSBOT_DSN=xxx
python bot.py python bot.py
``` ```
## Running the update ## Running the update
```shell ```shell
export TELEGRAM_TOKEN=xxx export RSSBOT_TG_TOKEN=xxx
export DATABASE_PATH=./database.db export RSSBOT_DSN=xxx
python update.py python update.py
``` ```

6
bot.py
View file

@ -8,8 +8,8 @@ from telegram import CommandProcessor
load_dotenv() load_dotenv()
token = os.getenv('TELEGRAM_TOKEN') token = os.getenv('RSSBOT_TG_TOKEN')
db_path = os.getenv('DATABASE_PATH', './bot.db') db_dsn = os.getenv('RSSBOT_DSN',)
log_level = os.getenv('LOG_LEVEL', 'INFO') log_level = os.getenv('LOG_LEVEL', 'INFO')
print('Starting the bot with logging level', log_level.upper()) print('Starting the bot with logging level', log_level.upper())
@ -19,7 +19,7 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S', datefmt='%Y-%m-%d %H:%M:%S',
) )
db = Database(db_path, logging.getLogger('Database')) db = Database(db_dsn, logging.getLogger('Database'))
bot = CommandProcessor(token, db, logging.getLogger('CommandProcessor')) bot = CommandProcessor(token, db, logging.getLogger('CommandProcessor'))
bot.run() bot.run()

View file

@ -1,4 +1,5 @@
import sqlite3 import psycopg2
import psycopg2.extras
from logging import Logger from logging import Logger
@ -9,27 +10,25 @@ from rss import FeedItem
class Database: class Database:
"""Implement interaction with the database.""" """Implement interaction with the database."""
def __init__(self, path: str, log: Logger) -> None: def __init__(self, dsn: str, log: Logger) -> None:
"""Create a database file if not exists.""" """Create a database file if not exists."""
self.log: Logger = log self.log: Logger = log
self.log.debug('Database.__init__(path=\'%s\')', path) self.log.debug('Database.__init__(DSN=\'%s\')', dsn)
# TODO: think about removing check_same_thread=False self.conn = psycopg2.connect(dsn)
self.conn = sqlite3.connect(path, check_same_thread=False) self.cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
self.conn.row_factory = sqlite3.Row
self.cur = self.conn.cursor()
self.__init_schema() self.__init_schema()
def add_user(self, telegram_id: int) -> int: def add_user(self, telegram_id: int) -> int:
"""Add a user's telegram id to the database and return its database id.""" """Add a user's telegram id to the database and return its database id."""
self.log.debug('add_user(telegram_id=\'%s\')', telegram_id) self.log.debug('add_user(telegram_id=\'%s\')', telegram_id)
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id]) self.cur.execute('INSERT INTO users (telegram_id) VALUES (%s)', [telegram_id])
self.conn.commit() self.conn.commit()
return self.cur.lastrowid return self.find_user(telegram_id)
def find_user(self, telegram_id: int) -> int | None: def find_user(self, telegram_id: int) -> int | None:
"""Get a user's telegram id and return its database id.""" """Get a user's telegram id and return its database id."""
self.log.debug('find_user(telegram_id=\'%s\')', telegram_id) self.log.debug('find_user(telegram_id=\'%s\')', telegram_id)
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id]) self.cur.execute('SELECT id FROM users WHERE telegram_id = %s', [telegram_id])
row = self.cur.fetchone() row = self.cur.fetchone()
if row is None: if row is None:
return None return None
@ -38,14 +37,14 @@ class Database:
def add_feed(self, url: str) -> int: def add_feed(self, url: str) -> int:
"""Add a feed to the database and return its id.""" """Add a feed to the database and return its id."""
self.log.debug('add_feed(url=\'%s\')', url) self.log.debug('add_feed(url=\'%s\')', url)
self.cur.execute('INSERT INTO feeds (url) VALUES (?)', [url]) self.cur.execute('INSERT INTO feeds (url) VALUES (%s)', [url])
self.conn.commit() self.conn.commit()
return self.cur.lastrowid return self.find_feed_by_url(url)
def find_feed_by_url(self, url: str) -> int | None: def find_feed_by_url(self, url: str) -> int | None:
"""Find feed ID by url.""" """Find feed ID by url."""
self.log.debug('find_feed_by_url(url=\'%s\')', url) self.log.debug('find_feed_by_url(url=\'%s\')', url)
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url]) self.cur.execute('SELECT id FROM feeds WHERE url = %s', [url])
row = self.cur.fetchone() row = self.cur.fetchone()
if row is None: if row is None:
return None return None
@ -66,7 +65,7 @@ class Database:
def subscribe_user(self, user_id: int, feed_id: int) -> None: def subscribe_user(self, user_id: int, feed_id: int) -> None:
"""Subscribe a user to the feed.""" """Subscribe a user to the feed."""
self.log.debug('subscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id) self.log.debug('subscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id]) self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (%s, %s)', [user_id, feed_id])
self.conn.commit() self.conn.commit()
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None: def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
@ -88,13 +87,13 @@ class Database:
def unsubscribe_user(self, user_id: int, feed_id: int) -> None: def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
"""Unsubscribe a user from the feed.""" """Unsubscribe a user from the feed."""
self.log.debug('unsubscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id) self.log.debug('unsubscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id]) self.cur.execute('DELETE FROM subscriptions WHERE feed_id = %s AND user_id = %s', [feed_id, user_id])
self.conn.commit() self.conn.commit()
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool: def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
"""Check if user subscribed to specific feed.""" """Check if user subscribed to specific feed."""
self.log.debug('is_user_subscribed(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id) self.log.debug('is_user_subscribed(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id]) self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = %s AND feed_id = %s', [user_id, feed_id])
row = self.cur.fetchone() row = self.cur.fetchone()
if row is None: if row is None:
return False return False
@ -103,41 +102,41 @@ class Database:
def delete_feed(self, feed_id: int) -> None: def delete_feed(self, feed_id: int) -> None:
"""Delete a feed.""" """Delete a feed."""
self.log.debug('delete_feed(feed_id=\'%s\')', feed_id) self.log.debug('delete_feed(feed_id=\'%s\')', feed_id)
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id]) self.cur.execute('DELETE FROM feeds WHERE id = %s', [feed_id])
self.conn.commit() self.conn.commit()
def get_feed_subscribers_count(self, feed_id: int) -> int: def get_feed_subscribers_count(self, feed_id: int) -> int:
"""Count feed subscribers.""" """Count feed subscribers."""
self.log.debug('get_feed_subscribers_count(feed_id=\'%s\')', feed_id) self.log.debug('get_feed_subscribers_count(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = ?', [feed_id]) self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = %s', [feed_id])
row = self.cur.fetchone() row = self.cur.fetchone()
return row['amount_subscribers'] return row['amount_subscribers']
def find_feed_subscribers(self, feed_id: int) -> list[int]: def find_feed_subscribers(self, feed_id: int) -> list[int]:
"""Return feed subscribers""" """Return feed subscribers"""
self.log.debug('find_feed_subscribers(feed_id=\'%s\')', feed_id) self.log.debug('find_feed_subscribers(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = ?)', self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = %s)',
[feed_id]) [feed_id])
subscribers = self.cur.fetchall() subscribers = self.cur.fetchall()
return list(map(lambda x: x['telegram_id'], subscribers)) return list(map(lambda x: x['telegram_id'], subscribers))
def find_feeds(self) -> list[sqlite3.Row]: def find_feeds(self) -> list[psycopg2.extras.DictRow]:
"""Get a list of feeds.""" """Get a list of feeds."""
self.log.debug('find_feeds()') self.log.debug('find_feeds()')
self.cur.execute('SELECT * FROM feeds') self.cur.execute('SELECT * FROM feeds')
return self.cur.fetchall() return self.cur.fetchall()
def find_user_feeds(self, user_id: int) -> list[sqlite3.Row]: def find_user_feeds(self, user_id: int) -> list[psycopg2.extras.DictRow]:
"""Return a list of feeds the user is subscribed to.""" """Return a list of feeds the user is subscribed to."""
self.log.debug('find_user_feeds(user_id=\'%s\')', user_id) self.log.debug('find_user_feeds(user_id=\'%s\')', user_id)
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)', self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = %s)',
[user_id]) [user_id])
return self.cur.fetchall() return self.cur.fetchall()
def find_feed_items(self, feed_id: int) -> list[sqlite3.Row]: def find_feed_items(self, feed_id: int) -> list[psycopg2.extras.DictRow]:
"""Get last feed items.""" """Get last feed items."""
self.log.debug('find_feed_items(feed_id=\'%s\')', feed_id) self.log.debug('find_feed_items(feed_id=\'%s\')', feed_id)
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = ?', [feed_id]) self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = %s', [feed_id])
return self.cur.fetchall() return self.cur.fetchall()
def find_feed_items_urls(self, feed_id: int) -> list[str]: def find_feed_items_urls(self, feed_id: int) -> list[str]:
@ -154,18 +153,17 @@ class Database:
for i, _ in enumerate(new_items): for i, _ in enumerate(new_items):
new_items[i] = [feed_id] + list(new_items[i].__dict__.values())[:-1] new_items[i] = [feed_id] + list(new_items[i].__dict__.values())[:-1]
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id]) self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = %s', [feed_id])
self.cur.executemany( self.cur.executemany(
'INSERT INTO feeds_last_items (feed_id, url, title, description) VALUES (?, ?, ?, ?)', new_items) 'INSERT INTO feeds_last_items (feed_id, url, title, description) VALUES (%s, %s, %s, %s)', new_items)
self.conn.commit() self.conn.commit()
def __init_schema(self) -> None: def __init_schema(self) -> None:
self.log.debug('__init_schema()') self.log.debug('__init_schema()')
# TODO: Change to migrations
self.cur.execute( self.cur.execute(
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))' 'CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, telegram_id INTEGER NOT NULL UNIQUE)'
) )
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))') self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id SERIAL PRIMARY KEY, url TEXT NOT NULL UNIQUE)')
self.cur.execute( self.cur.execute(
'CREATE TABLE IF NOT EXISTS subscriptions (' 'CREATE TABLE IF NOT EXISTS subscriptions ('
' user_id INTEGER,' ' user_id INTEGER,'
@ -178,7 +176,7 @@ class Database:
self.cur.execute( self.cur.execute(
'CREATE TABLE IF NOT EXISTS feeds_last_items (' 'CREATE TABLE IF NOT EXISTS feeds_last_items ('
' feed_id INTEGER,' ' feed_id INTEGER,'
' url TEXT NOT NULL UNIQUE,' ' url TEXT NOT NULL,'
' title TEXT,' ' title TEXT,'
' description TEXT,' ' description TEXT,'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)' ' FOREIGN KEY(feed_id) REFERENCES feeds(id)'

View file

@ -4,6 +4,7 @@ charset-normalizer==2.0.12
decorator==5.1.1 decorator==5.1.1
feedparser==6.0.2 feedparser==6.0.2
idna==3.3 idna==3.3
psycopg2-binary==2.9.3
pyTelegramBotAPI==4.5.0 pyTelegramBotAPI==4.5.0
python-dotenv==0.20.0 python-dotenv==0.20.0
requests==2.27.1 requests==2.27.1

View file

@ -106,7 +106,8 @@ class CommandProcessor:
class Notifier: class Notifier:
"""Sends notifications to users about new RSS feed items.""" """Sends notifications to users about new RSS feed items."""
BATCH_LIMIT: int = 30 # https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
BATCH_LIMIT: int = 25
sent_counter: int = 0 sent_counter: int = 0
@ -115,7 +116,7 @@ class Notifier:
self.log.debug('Notifier.__init__(token=\'%s\', logger=%s)', token[:8] + '...', logger) self.log.debug('Notifier.__init__(token=\'%s\', logger=%s)', token[:8] + '...', logger)
self.bot: TeleBot = TeleBot(token) self.bot: TeleBot = TeleBot(token)
self.html_sanitizer: Cleaner = Cleaner( self.html_sanitizer: Cleaner = Cleaner(
tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'span', 'tg-spoiler', 'a', 'code', 'pre'], tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'tg-spoiler', 'a', 'code', 'pre'],
attributes={"a": ["href", "title"]}, attributes={"a": ["href", "title"]},
protocols=['http', 'https'], protocols=['http', 'https'],
strip=True, strip=True,
@ -156,7 +157,7 @@ class Notifier:
self.log.debug('__count_request_and_wait()') self.log.debug('__count_request_and_wait()')
if self.sent_counter >= self.BATCH_LIMIT: if self.sent_counter >= self.BATCH_LIMIT:
self.log.debug('Requests limit exceeded, sleeping for a second') self.log.debug('Requests limit exceeded, sleeping for a second')
time.sleep(1) time.sleep(2)
self.log.debug('Resetting counter') self.log.debug('Resetting counter')
self.sent_counter = 0 self.sent_counter = 0
self.sent_counter += 1 self.sent_counter += 1

View file

@ -10,8 +10,8 @@ from telegram import Notifier
load_dotenv() load_dotenv()
token = os.getenv('TELEGRAM_TOKEN') token = os.getenv('RSSBOT_TG_TOKEN')
db_path = os.getenv('DATABASE_PATH', './bot.db') db_dsn = os.getenv('RSSBOT_DSN')
log_level = os.getenv('LOG_LEVEL', 'INFO') log_level = os.getenv('LOG_LEVEL', 'INFO')
print('Starting the updater with logging level', log_level.upper()) print('Starting the updater with logging level', log_level.upper())
@ -21,7 +21,7 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
) )
db = Database(db_path, logging.getLogger('Database')) db = Database(db_dsn, logging.getLogger('Database'))
notifier = Notifier(token, logging.getLogger('Notifier')) notifier = Notifier(token, logging.getLogger('Notifier'))
rss_reader = RssReader(logging.getLogger('RssReader')) rss_reader = RssReader(logging.getLogger('RssReader'))