Migrate to PostgreSQL #39
|
@ -33,14 +33,14 @@ the `requirements.txt` file. Otherwise old dependencies could be lost.
|
|||
## Running the bot
|
||||
|
||||
```shell
|
||||
export TELEGRAM_TOKEN=xxx
|
||||
export DATABASE_PATH=./database.db
|
||||
export RSSBOT_TG_TOKEN=xxx
|
||||
export RSSBOT_DSN=xxx
|
||||
python bot.py
|
||||
```
|
||||
## Running the update
|
||||
|
||||
```shell
|
||||
export TELEGRAM_TOKEN=xxx
|
||||
export DATABASE_PATH=./database.db
|
||||
export RSSBOT_TG_TOKEN=xxx
|
||||
export RSSBOT_DSN=xxx
|
||||
python update.py
|
||||
```
|
6
bot.py
6
bot.py
|
@ -8,8 +8,8 @@ from telegram import CommandProcessor
|
|||
|
||||
load_dotenv()
|
||||
|
||||
token = os.getenv('TELEGRAM_TOKEN')
|
||||
db_path = os.getenv('DATABASE_PATH', './bot.db')
|
||||
token = os.getenv('RSSBOT_TG_TOKEN')
|
||||
dsn = os.getenv('RSSBOT_DSN')
|
||||
skobkin marked this conversation as resolved
Outdated
|
||||
log_level = os.getenv('LOG_LEVEL', 'INFO')
|
||||
|
||||
print('Starting the bot with logging level', log_level.upper())
|
||||
|
@ -19,7 +19,7 @@ logging.basicConfig(
|
|||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
)
|
||||
|
||||
db = Database(db_path, logging.getLogger('Database'))
|
||||
db = Database(dsn, logging.getLogger('Database'))
|
||||
bot = CommandProcessor(token, db, logging.getLogger('CommandProcessor'))
|
||||
|
||||
bot.run()
|
||||
|
|
89
database.py
89
database.py
|
@ -1,7 +1,7 @@
|
|||
import sqlite3
|
||||
|
||||
from logging import Logger
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extensions import connection
|
||||
from psycopg2.extras import DictCursor, DictRow
|
||||
from exceptions import DisplayableException
|
||||
from rss import FeedItem
|
||||
|
||||
|
@ -9,27 +9,25 @@ from rss import FeedItem
|
|||
class Database:
|
||||
"""Implement interaction with the database."""
|
||||
|
||||
def __init__(self, path: str, log: Logger) -> None:
|
||||
def __init__(self, dsn: str, log: Logger) -> None:
|
||||
"""Create a database file if not exists."""
|
||||
self.log: Logger = log
|
||||
self.log.debug('Database.__init__(path=\'%s\')', path)
|
||||
# TODO: think about removing check_same_thread=False
|
||||
self.conn = sqlite3.connect(path, check_same_thread=False)
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
self.cur = self.conn.cursor()
|
||||
self.log.debug('Database.__init__(DSN=\'%s\')', dsn)
|
||||
self.conn: connection = psycopg2.connect(dsn)
|
||||
self.cur: DictCursor = self.conn.cursor(cursor_factory=DictCursor)
|
||||
skobkin marked this conversation as resolved
Outdated
skobkin
commented
Let's not forget about type hints. Let's not forget about type hints.
|
||||
self.__init_schema()
|
||||
|
||||
def add_user(self, telegram_id: int) -> int:
|
||||
"""Add a user's telegram id to the database and return its database id."""
|
||||
self.log.debug('add_user(telegram_id=\'%s\')', telegram_id)
|
||||
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id])
|
||||
self.cur.execute('INSERT INTO users (telegram_id) VALUES (%s) RETURNING id', [telegram_id])
|
||||
self.conn.commit()
|
||||
return self.cur.lastrowid
|
||||
return self.cur.fetchone()[0]
|
||||
|
||||
skobkin marked this conversation as resolved
Outdated
skobkin
commented
WAT WAT
- https://www.psycopg.org/docs/cursor.html#cursor.lastrowid or
- https://www.postgresql.org/docs/current/sql-insert.html (`RETURNING`)
|
||||
def find_user(self, telegram_id: int) -> int | None:
|
||||
"""Get a user's telegram id and return its database id."""
|
||||
self.log.debug('find_user(telegram_id=\'%s\')', telegram_id)
|
||||
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id])
|
||||
self.cur.execute('SELECT id FROM users WHERE telegram_id = %s', [telegram_id])
|
||||
row = self.cur.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
|
@ -38,14 +36,14 @@ class Database:
|
|||
def add_feed(self, url: str) -> int:
|
||||
"""Add a feed to the database and return its id."""
|
||||
self.log.debug('add_feed(url=\'%s\')', url)
|
||||
self.cur.execute('INSERT INTO feeds (url) VALUES (?)', [url])
|
||||
self.cur.execute('INSERT INTO feeds (url) VALUES (%s) RETURNING id', [url])
|
||||
self.conn.commit()
|
||||
return self.cur.lastrowid
|
||||
return self.cur.fetchone()[0]
|
||||
|
||||
skobkin marked this conversation as resolved
Outdated
skobkin
commented
WAT WAT
|
||||
def find_feed_by_url(self, url: str) -> int | None:
|
||||
"""Find feed ID by url."""
|
||||
self.log.debug('find_feed_by_url(url=\'%s\')', url)
|
||||
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url])
|
||||
self.cur.execute('SELECT id FROM feeds WHERE url = %s', [url])
|
||||
row = self.cur.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
|
@ -66,7 +64,7 @@ class Database:
|
|||
def subscribe_user(self, user_id: int, feed_id: int) -> None:
|
||||
"""Subscribe a user to the feed."""
|
||||
self.log.debug('subscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
|
||||
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id])
|
||||
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (%s, %s)', [user_id, feed_id])
|
||||
self.conn.commit()
|
||||
|
||||
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
|
||||
|
@ -88,13 +86,13 @@ class Database:
|
|||
def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
|
||||
"""Unsubscribe a user from the feed."""
|
||||
self.log.debug('unsubscribe_user(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
|
||||
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id])
|
||||
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = %s AND user_id = %s', [feed_id, user_id])
|
||||
self.conn.commit()
|
||||
|
||||
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
|
||||
"""Check if user subscribed to specific feed."""
|
||||
self.log.debug('is_user_subscribed(user_id=\'%s\', feed_id=\'%s\')', user_id, feed_id)
|
||||
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id])
|
||||
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = %s AND feed_id = %s', [user_id, feed_id])
|
||||
row = self.cur.fetchone()
|
||||
if row is None:
|
||||
return False
|
||||
|
@ -103,42 +101,42 @@ class Database:
|
|||
def delete_feed(self, feed_id: int) -> None:
|
||||
"""Delete a feed."""
|
||||
self.log.debug('delete_feed(feed_id=\'%s\')', feed_id)
|
||||
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id])
|
||||
self.cur.execute('DELETE FROM feeds WHERE id = %s', [feed_id])
|
||||
self.conn.commit()
|
||||
|
||||
def get_feed_subscribers_count(self, feed_id: int) -> int:
|
||||
"""Count feed subscribers."""
|
||||
self.log.debug('get_feed_subscribers_count(feed_id=\'%s\')', feed_id)
|
||||
self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = ?', [feed_id])
|
||||
self.cur.execute('SELECT COUNT(user_id) AS amount_subscribers FROM subscriptions WHERE feed_id = %s', [feed_id])
|
||||
row = self.cur.fetchone()
|
||||
return row['amount_subscribers']
|
||||
|
||||
def find_feed_subscribers(self, feed_id: int) -> list[int]:
|
||||
"""Return feed subscribers"""
|
||||
self.log.debug('find_feed_subscribers(feed_id=\'%s\')', feed_id)
|
||||
self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = ?)',
|
||||
self.cur.execute('SELECT telegram_id FROM users WHERE id IN (SELECT user_id FROM subscriptions WHERE feed_id = %s)',
|
||||
[feed_id])
|
||||
subscribers = self.cur.fetchall()
|
||||
return list(map(lambda x: x['telegram_id'], subscribers))
|
||||
|
||||
def find_feeds(self) -> list[sqlite3.Row]:
|
||||
def find_feeds(self) -> list[dict]:
|
||||
"""Get a list of feeds."""
|
||||
skobkin marked this conversation as resolved
Outdated
skobkin
commented
You can import You can import `DictRow`.
|
||||
self.log.debug('find_feeds()')
|
||||
self.cur.execute('SELECT * FROM feeds')
|
||||
return self.cur.fetchall()
|
||||
return self.__dictrow_to_dict_list(self.cur.fetchall())
|
||||
|
||||
def find_user_feeds(self, user_id: int) -> list[sqlite3.Row]:
|
||||
def find_user_feeds(self, user_id: int) -> list[dict]:
|
||||
"""Return a list of feeds the user is subscribed to."""
|
||||
self.log.debug('find_user_feeds(user_id=\'%s\')', user_id)
|
||||
skobkin marked this conversation as resolved
Outdated
skobkin
commented
I forgot to tell about that earlier, but it's better not to expose specifics of database implementation in public methods. I forgot to tell about that earlier, but it's better not to expose specifics of database implementation in public methods.
We can fix that by returning non-library-specific structure. For example `list[dict]` or [`list[collections.OrderedDict]`](https://realpython.com/python-ordereddict/) instead of [`list[psycopg2.extras.DictRow]`](https://github.com/psycopg/psycopg2/blob/1d3a89a0bba621dc1cc9b32db6d241bd2da85ad1/lib/extras.py#L160-L211).
|
||||
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)',
|
||||
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = %s)',
|
||||
[user_id])
|
||||
return self.cur.fetchall()
|
||||
return self.__dictrow_to_dict_list(self.cur.fetchall())
|
||||
|
||||
def find_feed_items(self, feed_id: int) -> list[sqlite3.Row]:
|
||||
def find_feed_items(self, feed_id: int) -> list[dict]:
|
||||
"""Get last feed items."""
|
||||
self.log.debug('find_feed_items(feed_id=\'%s\')', feed_id)
|
||||
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = ?', [feed_id])
|
||||
return self.cur.fetchall()
|
||||
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = %s', [feed_id])
|
||||
return self.__dictrow_to_dict_list(self.cur.fetchall())
|
||||
|
||||
def find_feed_items_urls(self, feed_id: int) -> list[str]:
|
||||
"""Return urls last feed items"""
|
||||
|
@ -152,35 +150,34 @@ class Database:
|
|||
"""Replace last feed items with a list items that receive."""
|
||||
self.log.debug('update_feed_items(feed_id=\'%s\', new_items=list(%d))', feed_id, len(new_items))
|
||||
for i, _ in enumerate(new_items):
|
||||
new_items[i] = [feed_id] + list(new_items[i].__dict__.values())[:-1]
|
||||
|
||||
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
|
||||
new_items[i] = [feed_id, new_items[i].url, new_items[i].guid]
|
||||
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = %s', [feed_id])
|
||||
self.cur.executemany(
|
||||
'INSERT INTO feeds_last_items (feed_id, url, title, description) VALUES (?, ?, ?, ?)', new_items)
|
||||
'INSERT INTO feeds_last_items (feed_id, url, guid) VALUES (%s, %s, %s)', new_items)
|
||||
self.conn.commit()
|
||||
|
||||
def __init_schema(self) -> None:
|
||||
self.log.debug('__init_schema()')
|
||||
# TODO: Change to migrations
|
||||
self.cur.execute(
|
||||
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))'
|
||||
'CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, telegram_id INTEGER NOT NULL UNIQUE)'
|
||||
)
|
||||
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
|
||||
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id SERIAL PRIMARY KEY, url TEXT NOT NULL UNIQUE)')
|
||||
self.cur.execute(
|
||||
'CREATE TABLE IF NOT EXISTS subscriptions ('
|
||||
' user_id INTEGER,'
|
||||
' feed_id INTEGER,'
|
||||
' UNIQUE (user_id, feed_id),'
|
||||
' FOREIGN KEY(user_id) REFERENCES users(id),'
|
||||
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
|
||||
' user_id INTEGER REFERENCES users,'
|
||||
' feed_id INTEGER REFERENCES feeds,'
|
||||
' UNIQUE (user_id, feed_id)'
|
||||
')'
|
||||
)
|
||||
self.cur.execute(
|
||||
'CREATE TABLE IF NOT EXISTS feeds_last_items ('
|
||||
' feed_id INTEGER,'
|
||||
' url TEXT NOT NULL UNIQUE,'
|
||||
' title TEXT,'
|
||||
' description TEXT,'
|
||||
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
|
||||
' feed_id INTEGER REFERENCES feeds ON DELETE CASCADE,'
|
||||
' url TEXT NOT NULL,'
|
||||
' guid TEXT'
|
||||
')'
|
||||
)
|
||||
|
||||
skobkin marked this conversation as resolved
Outdated
skobkin
commented
Do not forget to use GUID as the base for comparison. URL's are not reliable in our case. Do not forget to use GUID as the base for comparison. URL's are not reliable in our case.
|
||||
@staticmethod
|
||||
def __dictrow_to_dict_list(rows: list[DictRow]) -> list[dict]:
|
||||
"""Convert list of DictRows to list of dicts"""
|
||||
return list(map(dict, rows))
|
||||
|
|
|
@ -4,6 +4,7 @@ charset-normalizer==2.0.12
|
|||
decorator==5.1.1
|
||||
feedparser==6.0.2
|
||||
idna==3.3
|
||||
psycopg2-binary==2.9.3
|
||||
pyTelegramBotAPI==4.5.0
|
||||
python-dotenv==0.20.0
|
||||
requests==2.27.1
|
||||
|
|
1
rss.py
1
rss.py
|
@ -9,6 +9,7 @@ class FeedItem:
|
|||
self.url = item.get('link', '')
|
||||
self.title = item.get('title', '')
|
||||
self.description = item.get('summary', '')
|
||||
self.guid = item.get('id', '')
|
||||
if 'published' in item:
|
||||
self.date = datetime.fromtimestamp(mktime(item.published_parsed))
|
||||
else:
|
||||
|
|
|
@ -106,7 +106,8 @@ class CommandProcessor:
|
|||
class Notifier:
|
||||
"""Sends notifications to users about new RSS feed items."""
|
||||
|
||||
BATCH_LIMIT: int = 30
|
||||
# https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
|
||||
BATCH_LIMIT: int = 25
|
||||
|
||||
sent_counter: int = 0
|
||||
|
||||
|
@ -115,7 +116,7 @@ class Notifier:
|
|||
self.log.debug('Notifier.__init__(token=\'%s\', logger=%s)', token[:8] + '...', logger)
|
||||
self.bot: TeleBot = TeleBot(token)
|
||||
self.html_sanitizer: Cleaner = Cleaner(
|
||||
tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'span', 'tg-spoiler', 'a', 'code', 'pre'],
|
||||
tags=['b', 'strong', 'i', 'em', 'u', 'ins', 's', 'strike', 'del', 'tg-spoiler', 'a', 'code', 'pre'],
|
||||
attributes={"a": ["href", "title"]},
|
||||
protocols=['http', 'https'],
|
||||
strip=True,
|
||||
|
|
|
@ -10,8 +10,8 @@ from telegram import Notifier
|
|||
|
||||
load_dotenv()
|
||||
|
||||
token = os.getenv('TELEGRAM_TOKEN')
|
||||
db_path = os.getenv('DATABASE_PATH', './bot.db')
|
||||
token = os.getenv('RSSBOT_TG_TOKEN')
|
||||
dsn = os.getenv('RSSBOT_DSN')
|
||||
log_level = os.getenv('LOG_LEVEL', 'INFO')
|
||||
|
||||
print('Starting the updater with logging level', log_level.upper())
|
||||
|
@ -21,7 +21,7 @@ logging.basicConfig(
|
|||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
|
||||
db = Database(db_path, logging.getLogger('Database'))
|
||||
db = Database(dsn, logging.getLogger('Database'))
|
||||
notifier = Notifier(token, logging.getLogger('Notifier'))
|
||||
rss_reader = RssReader(logging.getLogger('RssReader'))
|
||||
|
||||
|
|
|
@ -24,35 +24,40 @@ class UpdateManager:
|
|||
feeds = self.database.find_feeds()
|
||||
self.log.info('Feeds to update: %d', len(feeds))
|
||||
|
||||
for feed_id, feed_url in feeds:
|
||||
self.log.info('Processing [%d] %s', feed_id, feed_url)
|
||||
feed = self.rss_reader.get_feed(feed_url)
|
||||
new_items = feed.items
|
||||
old_items_urls = self.database.find_feed_items_urls(feed_id)
|
||||
for feed in feeds:
|
||||
self.log.info('Processing [%d] %s', feed['id'], feed['url'])
|
||||
feed_obj = self.rss_reader.get_feed(feed['url'])
|
||||
new_items = feed_obj.items
|
||||
old_items = self.database.find_feed_items(feed['id'])
|
||||
|
||||
diff = self.__calculate_difference(new_items, old_items_urls)
|
||||
diff = self.__calculate_difference(new_items, old_items)
|
||||
|
||||
if not diff:
|
||||
continue
|
||||
|
||||
chat_ids = self.database.find_feed_subscribers(feed_id)
|
||||
self.notifier.send_updates(chat_ids, diff, feed.title)
|
||||
self.database.update_feed_items(feed_id, new_items)
|
||||
chat_ids = self.database.find_feed_subscribers(feed['id'])
|
||||
self.notifier.send_updates(chat_ids, diff, feed_obj.title)
|
||||
self.database.update_feed_items(feed['id'], new_items)
|
||||
|
||||
def __calculate_difference(self, new_items: list[FeedItem], old_items_urls: list[str]) -> list[FeedItem]:
|
||||
def __calculate_difference(self, new_items: list[FeedItem], old_items: list[dict]) -> list[FeedItem]:
|
||||
"""Calculate new feed items."""
|
||||
self.log.debug(
|
||||
'__calculate_difference(new_items=list(%d), old_items_urls=list(%d))', len(new_items), len(old_items_urls)
|
||||
'__calculate_difference(new_items=list(%d), old_items=list(%d))', len(new_items), len(old_items)
|
||||
)
|
||||
if not old_items_urls:
|
||||
if not old_items:
|
||||
self.log.debug('Old items are empty, returning new')
|
||||
return new_items
|
||||
|
||||
diff = []
|
||||
guids = [item['guid'] for item in old_items if item['guid']]
|
||||
urls = [item['url'] for item in old_items]
|
||||
|
||||
self.log.debug('Comparing %d new items with %d old', len(new_items), len(old_items_urls))
|
||||
self.log.debug('Comparing %d new items with %d old', len(new_items), len(old_items))
|
||||
for item in new_items:
|
||||
if item.url not in old_items_urls:
|
||||
if not guids and item.url not in urls:
|
||||
diff.append(item)
|
||||
continue
|
||||
if item.guid not in guids:
|
||||
diff.append(item)
|
||||
|
||||
self.log.debug('%d updates found', len(diff))
|
||||
|
|
Loading…
Reference in a new issue
DSN literally means "Data Source Name". So if we have no another data source, we don't need to have prefix here.
It doesn't mean that it's wrong and you should delete it right now though.