tg_rss_bot/database.py
Alexey Skobkin 2e44df0bbd
All checks were successful
continuous-integration/drone/push Build is passing
#2 #7 Telegram module (#10)
`telegram` module. Closes #2 #7.

Two service classes:

* `Notifier` class
* `CommandProcessor` class

Two [middlewares](https://github.com/eternnoir/pyTelegramBotAPI/tree/master/examples/middleware):

* `UserAuthMiddleware`
* `ExceptionHandlerMiddleware`

One exception for usage in the code called by the bot:

* `DisplayableException`

Merge **ONLY** after #9 is merged.

Reviewed-on: #10
Reviewed-by: Miroslavsckaya <miroslavsckaya@noreply.git.skobk.in>
2022-05-30 23:54:26 +03:00

151 lines
5.8 KiB
Python

import sqlite3
from exceptions import DisplayableException
class Database:
"""Implement interaction with the database."""
def __init__(self, path: str) -> None:
"""Create a database file if not exists."""
# TODO: think about removing check_same_thread=False
self.conn = sqlite3.connect(path, check_same_thread=False)
self.cur = self.conn.cursor()
self.__init_schema()
def add_user(self, telegram_id: int) -> int:
"""Add a user's telegram id to the database and return its database id."""
self.cur.execute('INSERT INTO users (telegram_id) VALUES (?)', [telegram_id])
self.conn.commit()
return self.cur.lastrowid
def find_user(self, telegram_id: int) -> int | None:
"""Get a user's telegram id and return its database id."""
self.cur.execute('SELECT id FROM users WHERE telegram_id = ?', [telegram_id])
row = self.cur.fetchone()
if row is None:
return None
return row[0]
def add_feed(self, url: str) -> int:
"""Add a feed to the database and return its id."""
self.cur.execute('INSERT INTO feeds (url) VALUES (?)', [url])
self.conn.commit()
return self.cur.lastrowid
def find_feed_by_url(self, url: str) -> int | None:
"""Find feed ID by url."""
self.cur.execute('SELECT id FROM feeds WHERE url = ?', [url])
row = self.cur.fetchone()
if row is None:
return None
return row[0]
def subscribe_user_by_url(self, user_id: int, url: str) -> None:
"""Subscribe user to the feed creating it if does not exist yet."""
feed_id = self.find_feed_by_url(url)
if feed_id is None:
feed_id = self.add_feed(url)
if self.is_user_subscribed(user_id, feed_id):
raise DisplayableException('Already subscribed')
self.subscribe_user(user_id, feed_id)
def subscribe_user(self, user_id: int, feed_id: int) -> None:
"""Subscribe a user to the feed."""
self.cur.execute('INSERT INTO subscriptions (user_id, feed_id) VALUES (?, ?)', [user_id, feed_id])
self.conn.commit()
def unsubscribe_user_by_url(self, user_id: int, url: str) -> None:
feed_id = self.find_feed_by_url(url)
if feed_id is None:
raise DisplayableException('Feed does not exist')
if not self.is_user_subscribed(user_id, feed_id):
raise DisplayableException('Not subscribed')
self.unsubscribe_user(user_id, feed_id)
if self.get_feed_subscribers_count(feed_id) == 0:
# Feed is not used anymore. Removing.
self.delete_feed(feed_id)
def unsubscribe_user(self, user_id: int, feed_id: int) -> None:
"""Unsubscribe a user from the feed."""
self.cur.execute('DELETE FROM subscriptions WHERE feed_id = ? AND user_id = ?', [feed_id, user_id])
self.conn.commit()
def is_user_subscribed(self, user_id: int, feed_id: int) -> bool:
"""Check if user subscribed to specific feed."""
self.cur.execute('SELECT 1 FROM subscriptions WHERE user_id = ? AND feed_id = ?', [user_id, feed_id])
row = self.cur.fetchone()
if row is None:
return False
return True
def delete_feed(self, feed_id: int) -> None:
"""Delete a feed."""
self.cur.execute('DELETE FROM feeds WHERE id = ?', [feed_id])
self.conn.commit()
def get_feed_subscribers_count(self, feed_id: int) -> int:
"""Count feed subscribers."""
self.cur.execute('SELECT COUNT(user_id) FROM subscriptions WHERE feed_id = ?', [feed_id])
row = self.cur.fetchone()
if row is None:
return 0
return int(row[0])
def find_feeds(self) -> list:
"""Get a list of feeds."""
self.cur.execute('SELECT * FROM feeds')
return self.cur.fetchall()
def find_user_feeds(self, user_id: int) -> list:
"""Return a list of feeds the user is subscribed to."""
self.cur.execute('SELECT * FROM feeds WHERE id IN (SELECT feed_id FROM subscriptions WHERE user_id = ?)',
[user_id])
return self.cur.fetchall()
def find_feed_items(self, feed_id: int) -> list:
"""Get last feed items."""
self.cur.execute('SELECT * FROM feeds_last_items WHERE feed_id = ?', [feed_id])
return self.cur.fetchall()
def update_feed_items(self, feed_id: int, new_items: list) -> None:
"""Replace last feed items with a list items that receive."""
for i in range(len(new_items)):
new_items[i] = (feed_id,) + new_items[i]
self.cur.execute('DELETE FROM feeds_last_items WHERE feed_id = ?', [feed_id])
self.cur.executemany(
'INSERT INTO feeds_last_items (feed_id, url, title, description, date) VALUES (?, ?, ?, ?, ?)', new_items)
self.conn.commit()
def __init_schema(self):
# TODO: Move to migrations
self.cur.execute(
'CREATE TABLE IF NOT EXISTS users (id INTEGER, telegram_id INTEGER NOT NULL UNIQUE, PRIMARY KEY(id))'
)
self.cur.execute('CREATE TABLE IF NOT EXISTS feeds (id INTEGER, url TEXT NOT NULL UNIQUE, PRIMARY KEY(id))')
self.cur.execute(
'CREATE TABLE IF NOT EXISTS subscriptions ('
' user_id INTEGER,'
' feed_id INTEGER,'
' UNIQUE (user_id, feed_id),'
' FOREIGN KEY(user_id) REFERENCES users(id),'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
')'
)
self.cur.execute(
'CREATE TABLE IF NOT EXISTS feeds_last_items ('
' feed_id INTEGER,'
' url TEXT NOT NULL UNIQUE,'
' title TEXT,'
' description TEXT,'
' date NUMERIC,'
' FOREIGN KEY(feed_id) REFERENCES feeds(id)'
')'
)