diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 56339ff..bc0108a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,4 +1,4 @@ [bumpversion] -current_version = 0.1.1 +current_version = 0.2.0 commit = True tag = True diff --git a/.vscode/launch.json b/.vscode/launch.json index e298949..fe67e8b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,7 +5,7 @@ "version": "0.2.0", "configurations": [ { - "name": "Python: NewsBot", + "name": "Python: PYRSS Bot", "type": "python", "request": "launch", "program": "${workspaceFolder}/src/main.py", diff --git a/CHANGELOG.md b/CHANGELOG.md index 075686a..9a31efc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ +**v0.2.0** + +- Fix: Fetch channels if not found in bot cache (error fix) +- Enhancement: command to test a channel's permissions allow for the Bot to function +- Enhancement: account for active state from a server's settings (`GuildSettings`) +- Enhancement: command to view tracked content from the server or a given subscription of the same server. +- Other: code optimisation & `GuildSettings` dataclass +- Other: Cleaned out many instances of unused code + **v0.1.1** - Docs: Start of changelog -- Enhancement: Versioning with tagged docker images \ No newline at end of file +- Enhancement: Versioning with tagged docker images diff --git a/src/api.py b/src/api.py index fb96347..1016ccf 100644 --- a/src/api.py +++ b/src/api.py @@ -132,6 +132,15 @@ class API: return await self._get_many(self.API_ENDPOINT + "subchannel/", filters) + async def get_guild_settings(self, **filters) -> tuple[list[dict], int]: + """ + Get many guild settings. + """ + + log.debug("getting multiple guild settings") + + return await self._get_many(self.API_ENDPOINT + "guild-settings/", filters) + async def create_tracked_content(self, **data) -> dict: """ Create an instance of tracked content. diff --git a/src/extensions/cmds.py b/src/extensions/cmds.py new file mode 100644 index 0000000..287bdc6 --- /dev/null +++ b/src/extensions/cmds.py @@ -0,0 +1,230 @@ +""" +Extension for the `FeedCog`. +Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bot. +""" + +import logging +from typing import Tuple +from datetime import datetime + +import aiohttp +import validators +from feedparser import FeedParserDict, parse +from discord.ext import commands +from discord import Interaction, TextChannel, Embed, Colour +from discord.app_commands import Choice, Group, autocomplete, rename, command +from discord.errors import Forbidden + +from api import API +from feed import Subscription, TrackedContent +from utils import ( + Followup, + PaginationView, + get_rss_data, +) + +log = logging.getLogger(__name__) + +rss_list_sort_choices = [ + Choice(name="Nickname", value=0), + Choice(name="Date Added", value=1) +] +channels_list_sort_choices=[ + Choice(name="Feed Nickname", value=0), + Choice(name="Channel ID", value=1), + Choice(name="Date Added", value=2) +] + +# TODO SECURITY: a potential attack is that the user submits an rss feed then changes the +# target resource. Run a period task to check this. +async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]: + """Validate a provided RSS source. + + Parameters + ---------- + nickname : str + Nickname of the source. Must not contain URL. + url : str + URL of the source. Must be URL with valid status code and be an RSS feed. + + Returns + ------- + str or None + String invalid message if invalid, NoneType if valid. + FeedParserDict or None + The feed parsed from the given URL or None if invalid. + """ + + # Ensure the URL is valid + if not validators.url(url): + return f"The URL you have entered is malformed or invalid:\n`{url=}`", None + + # Check the nickname is not a URL + if validators.url(nickname): + return "It looks like the nickname you have entered is a URL.\n" \ + f"For security reasons, this is not allowed.\n`{nickname=}`", None + + + feed_data, status_code = await get_rss_data(url) + + # Check the URL status code is valid + if status_code != 200: + return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None + + # Check the contents is actually an RSS feed. + feed = parse(feed_data) + if not feed.version: + return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None + + return None, feed + + +class CommandsCog(commands.Cog): + """ + Command cog. + """ + + def __init__(self, bot: commands.Bot): + super().__init__() + self.bot = bot + + @commands.Cog.listener() + async def on_ready(self): + """Instructions to call when the cog is ready.""" + + log.info("%s cog is ready", self.__class__.__name__) + + # Group for commands about viewing data + view_group = Group( + name="view", + description="View data.", + guild_only=True + ) + + @view_group.command(name="subscriptions") + async def cmd_list_subs(self, inter: Interaction): + """List Subscriptions from this server.""" + + await inter.response.defer() + + def formatdata(index, item): + item = Subscription.from_dict(item) + + channels = f"{item.channels_count}{' channels' if item.channels_count != 1 else ' channel'}" + filters = f"{len(item.filters)}{' filters' if len(item.filters) != 1 else ' filter'}" + notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes + links = f"[RSS Link]({item.url}) · [API Link]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)" + + description = f"{channels}, {filters}\n" + description += f"{notes}\n" if notes else "" + description += links + + key = f"{index}. {item.name}" + return key, description # key, value pair + + async def getdata(page: int, pagesize: int): + async with aiohttp.ClientSession() as session: + api = API(self.bot.api_token, session) + return await api.get_subscriptions( + guild_id=inter.guild.id, page=page, page_size=pagesize + ) + + embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed + pagination = PaginationView( + self.bot, + inter=inter, + embed=embed, + getdata=getdata, + formatdata=formatdata, + pagesize=10, + initpage=1 + ) + await pagination.send() + + @view_group.command(name="tracked-content") + async def cmd_list_tracked(self, inter: Interaction): + """List Tracked Content from this server, or a given sub""" + + await inter.response.defer() + + def formatdata(index, item): + item = TrackedContent.from_dict(item) + sub = Subscription.from_dict(item.subscription) + + links = f"[Content Link]({item.url}) · [Message Link](https://discord.com/channels/{sub.guild_id}/{item.channel_id}/{item.message_id}/)" + description = f"Subscription: {sub.name}\n{links}" + + key = f"{item.id}. {item.title}" + return key, description + + async def getdata(page: int, pagesize: int): + async with aiohttp.ClientSession() as session: + api = API(self.bot.api_token, session) + return await api.get_tracked_content( + subscription__guild_id=inter.guild_id, page=page, page_size=pagesize + ) + + embed = Followup(f"Tracked Content in {inter.guild.name}").info()._embed + pagination = PaginationView( + self.bot, + inter=inter, + embed=embed, + getdata=getdata, + formatdata=formatdata, + pagesize=10, + initpage=1 + ) + await pagination.send() + + # Group for test related commands + test_group = Group( + name="test", + description="Commands to test Bot functionality.", + guild_only=True + ) + + @test_group.command(name="channel-permissions") + async def cmd_test_channel_perms(self, inter: Interaction): + """Test that the current channel's permissions allow for PYRSS to operate in it.""" + + try: + test_message = await inter.channel.send(content="... testing permissions ...") + await self.test_channel_perms(inter.channel) + except Exception as error: + await inter.response.send_message(content=f"Failed: {error}") + return + + await test_message.delete() + await inter.response.send_message(content="Success") + + async def test_channel_perms(self, channel: TextChannel): + + # Test generic message and delete + msg = await channel.send(content="test message") + await msg.delete() + + # Test detailed embed + embed = Embed( + title="test title", + description="test description", + colour=Colour.random(), + timestamp=datetime.now(), + url="https://google.com" + ) + embed.set_author(name="test author") + embed.set_footer(text="test footer") + embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") + embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") + embed_msg = await channel.send(embed=embed) + await embed_msg.delete() + + +async def setup(bot): + """ + Setup function for this extension. + Adds `CommandsCog` to the bot. + """ + + cog = CommandsCog(bot) + await bot.add_cog(cog) + log.info("Added %s cog", cog.__class__.__name__) diff --git a/src/extensions/rss.py b/src/extensions/rss.py deleted file mode 100644 index 73951b6..0000000 --- a/src/extensions/rss.py +++ /dev/null @@ -1,378 +0,0 @@ -""" -Extension for the `FeedCog`. -Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bot. -""" - -import logging -from typing import Tuple - -import aiohttp -import validators -from feedparser import FeedParserDict, parse -from discord.ext import commands -from discord import Interaction, TextChannel -from discord.app_commands import Choice, Group, autocomplete, rename, command - -from api import API -from feed import Subscription, SubscriptionChannel, TrackedContent -from utils import ( - Followup, - PaginationView, - get_rss_data, -) - -log = logging.getLogger(__name__) - -rss_list_sort_choices = [ - Choice(name="Nickname", value=0), - Choice(name="Date Added", value=1) -] -channels_list_sort_choices=[ - Choice(name="Feed Nickname", value=0), - Choice(name="Channel ID", value=1), - Choice(name="Date Added", value=2) -] - -# TODO SECURITY: a potential attack is that the user submits an rss feed then changes the -# target resource. Run a period task to check this. -async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]: - """Validate a provided RSS source. - - Parameters - ---------- - nickname : str - Nickname of the source. Must not contain URL. - url : str - URL of the source. Must be URL with valid status code and be an RSS feed. - - Returns - ------- - str or None - String invalid message if invalid, NoneType if valid. - FeedParserDict or None - The feed parsed from the given URL or None if invalid. - """ - - # Ensure the URL is valid - if not validators.url(url): - return f"The URL you have entered is malformed or invalid:\n`{url=}`", None - - # Check the nickname is not a URL - if validators.url(nickname): - return "It looks like the nickname you have entered is a URL.\n" \ - f"For security reasons, this is not allowed.\n`{nickname=}`", None - - - feed_data, status_code = await get_rss_data(url) - - # Check the URL status code is valid - if status_code != 200: - return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None - - # Check the contents is actually an RSS feed. - feed = parse(feed_data) - if not feed.version: - return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None - - return None, feed - - -class FeedCog(commands.Cog): - """ - Command cog. - """ - - def __init__(self, bot: commands.Bot): - super().__init__() - self.bot = bot - - @commands.Cog.listener() - async def on_ready(self): - """Instructions to call when the cog is ready.""" - - log.info("%s cog is ready", self.__class__.__name__) - - # async def autocomplete_subscriptions(self, inter: Interaction, name: str) -> list[Choice]: - # """""" - - # log.debug("autocompleting subscriptions '%s'", name) - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # results, _ = await api.get_subscriptions(server=inter.guild_id, search=name) - - # except Exception as exc: - # log.error(exc) - # return [] - - # subscriptions = Subscription.from_list(results) - - # return [ - # Choice(name=sub.name, value=sub.uuid) - # for sub in subscriptions - # ] - - # async def autocomplete_subscription_channels(self, inter: Interaction, uuid: str): - # """""" - - # log.debug("autocompleting subscription channels") - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # results, _ = await api.get_subscription_channels() - - # except Exception as exc: - # log.error(exc) - # return [] - - # subscription_channels = SubscriptionChannel.from_list(results) - - # async def name(link): - # result = self.bot.get_channel(link.id) or await self.bot.fetch_channel(link.id) - # return f"{link.subscription.name} -> #{result.name}" - - # return [ - # Choice(name=await name(link), value=link.uuid) - # for link in subscription_channels - # ] - - # subscription_group = Group( - # name="subscriptions", - # description="subscription commands", - # guild_only=True - # ) - - # @subscription_group.command(name="link") - # @autocomplete(sub_uuid=autocomplete_subscriptions) - # @rename(sub_uuid="subscription") - # async def link_subscription_channel(self, inter: Interaction, sub_uuid: str, channel: TextChannel): - # """ - # Link Subscription to discord.TextChannel. - # """ - - # await inter.response.defer() - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # data = await api.create_subscription_channel(str(channel.id), sub_uuid) - - # except aiohttp.ClientResponseError as exc: - # return await ( - # Followup( - # f"Error · {exc.message}", - # "Ensure you haven't: \n" - # "- Already linked this subscription to this channel\n" - # "- Already linked this subscription to the maximum of 4 channels" - # ) - # .footer(f"HTTP {exc.code}") - # .error() - # .send(inter) - # ) - - # subscription = Subscription.from_dict(data.pop("subscription")) - # data["subscription"] = ( - # f"{subscription.name}\n" - # f"[RSS]({subscription.rss_url}) · " - # f"[API Subscription]({API.SUBSCRIPTION_ENDPOINT}{subscription.uuid}) · " - # f"[API Link]({API.CHANNEL_ENDPOINT}{data['uuid']})" - # ) - - # channel_id = int(data.pop("id")) - # channel = self.bot.get_channel(channel_id) or await self.bot.fetch_channel(channel_id) - # data["channel"] = channel.mention - - # data.pop("creation_datetime") - # data.pop("uuid") - - # await ( - # Followup("Linked!") - # .fields(**data) - # .added() - # .send(inter) - # ) - - # @subscription_group.command(name="unlink") - # @autocomplete(uuid=autocomplete_subscription_channels) - # @rename(uuid="link") - # async def unlink_subscription_channel(self, inter: Interaction, uuid: str): - # """ - # Unlink subscription from discord.TextChannel. - # """ - - # await inter.response.defer() - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # # data = await api.get_subscription(uuid=uuid) - # await api.delete_subscription_channel(uuid=uuid) - # # sub_channel = await SubscriptionChannel.from_dict(data) - - # except Exception as exc: - # return await ( - # Followup(exc.__class__.__name__, str(exc)) - # .error() - # .send(inter) - # ) - - # await ( - # Followup("Subscription unlinked!", uuid) - # .added() - # .send(inter) - # ) - - # @subscription_group.command(name="list-links") - # async def list_subscription(self, inter: Interaction): - # """List Subscriptions Channels in this server.""" - - # await inter.response.defer() - - # async def formatdata(index: int, item: dict) -> tuple[str, str]: - # item = SubscriptionChannel.from_dict(item) - # next_emoji = self.bot.get_emoji(1204542366602502265) - # key = f"{index}. {item.subscription.name} {next_emoji} {item.mention}" - # return key, item.hyperlinks_string - - # async def getdata(page: int, pagesize: int) -> dict: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # return await api.get_subscription_channels( - # subscription__server=inter.guild.id, page=page, page_size=pagesize - # ) - - # embed = Followup(f"Links in {inter.guild.name}").info()._embed - # pagination = PaginationView( - # self.bot, - # inter=inter, - # embed=embed, - # getdata=getdata, - # formatdata=formatdata, - # pagesize=10, - # initpage=1 - # ) - # await pagination.send() - - # @subscription_group.command(name="add") - # async def new_subscription(self, inter: Interaction, name: str, rss_url: str): - # """Subscribe this server to a new RSS Feed.""" - - # await inter.response.defer() - - # try: - # parsed_rssfeed = await self.bot.functions.validate_feed(name, rss_url) - # image_url = parsed_rssfeed.get("feed", {}).get("image", {}).get("href") - - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # data = await api.create_subscription(name, rss_url, image_url, str(inter.guild_id), [-1]) - - # except aiohttp.ClientResponseError as exc: - # return await ( - # Followup( - # f"Error · {exc.message}", - # "Ensure you haven't: \n" - # "- Reused an identical name of an existing Subscription\n" - # "- Already created the maximum of 25 Subscriptions" - # ) - # .footer(f"HTTP {exc.code}") - # .error() - # .send(inter) - # ) - - # # Omit data we dont want the user to see - # data.pop("uuid") - # data.pop("image") - # data.pop("server") - # data.pop("creation_datetime") - - # # Update keys to be more human readable - # data["url"] = data.pop("rss_url") - - # await ( - # Followup("Subscription Added!") - # .fields(**data) - # .image(image_url) - # .added() - # .send(inter) - # ) - - # @subscription_group.command(name="remove") - # @autocomplete(uuid=autocomplete_subscriptions) - # @rename(uuid="choice") - # async def remove_subscriptions(self, inter: Interaction, uuid: str): - # """Unsubscribe this server from an existing RSS Feed.""" - - # await inter.response.defer() - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # await api.delete_subscription(uuid) - - # except Exception as exc: - # return await ( - # Followup(exc.__class__.__name__, str(exc)) - # .error() - # .send(inter) - # ) - - # await ( - # Followup("Subscription Removed!", uuid) - # .trash() - # .send(inter) - # ) - - @command(name="subscriptions") - async def list_subscription(self, inter: Interaction): - """List Subscriptions from this server.""" - - await inter.response.defer() - - def formatdata(index, item): - item = Subscription.from_dict(item) - - channels = f"{item.channels_count}{' channels' if item.channels_count != 1 else ' channel'}" - filters = f"{len(item.filters)}{' filters' if len(item.filters) != 1 else ' filter'}" - notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes - links = f"[RSS URL]({item.url}) · [API URL]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)" - - description = f"{channels}, {filters}\n" - description += f"{notes}\n" if notes else "" - description += links - - key = f"{index}. {item.name}" - return key, description # key, value pair - - async def getdata(page: int, pagesize: int): - async with aiohttp.ClientSession() as session: - api = API(self.bot.api_token, session) - return await api.get_subscriptions( - guild_id=inter.guild.id, page=page, page_size=pagesize - ) - - embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed - pagination = PaginationView( - self.bot, - inter=inter, - embed=embed, - getdata=getdata, - formatdata=formatdata, - pagesize=10, - initpage=1 - ) - await pagination.send() - # await Followup("results", str(await getdata(1, 10))).send(inter) - - -async def setup(bot): - """ - Setup function for this extension. - Adds `FeedCog` to the bot. - """ - - cog = FeedCog(bot) - await bot.add_cog(cog) - log.info("Added %s cog", cog.__class__.__name__) diff --git a/src/extensions/tasks.py b/src/extensions/tasks.py index b3e0cea..c718623 100644 --- a/src/extensions/tasks.py +++ b/src/extensions/tasks.py @@ -18,7 +18,7 @@ from discord.ext import commands, tasks from discord.errors import Forbidden from feedparser import parse -from feed import RSSFeed, Subscription, RSSItem +from feed import RSSFeed, Subscription, RSSItem, GuildSettings from utils import get_unparsed_feed from filters import match_text from api import API @@ -83,43 +83,88 @@ class TaskCog(commands.Cog): async with aiohttp.ClientSession() as session: self.api = API(self.bot.api_token, session) - subscriptions = await self.get_subscriptions() - await self.process_subscriptions(subscriptions) + await self.execute_task() end_time = perf_counter() log.debug(f"task completed in {end_time - start_time:.4f} seconds") - async def get_subscriptions(self) -> list[Subscription]: + async def execute_task(self): + """Execute the task directly.""" + + # Filter out inactive guild IDs using related settings guild_ids = [guild.id for guild in self.bot.guilds] - sub_data = [] + guild_settings = await self.get_guild_settings(guild_ids) + active_guild_ids = [settings.guild_id for settings in guild_settings if settings.active] + subscriptions = await self.get_subscriptions(active_guild_ids) + await self.process_subscriptions(subscriptions) + + async def get_guild_settings(self, guild_ids: list[int]) -> list[int]: + """Returns a list of guild settings from the Bot's guilds, if they exist.""" + + guild_settings = [] + + # Iterate infinitely taking the iter no. as `page` + # data will be empty after last page reached. for page, _ in enumerate(iter(int, 1)): - try: - log.debug("fetching page '%s'", page + 1) - sub_data.extend( - (await self.api.get_subscriptions(server__in=guild_ids, page=page+1))[0] - ) - except aiohttp.ClientResponseError as error: - match error.status: - case 404: - log.debug("final page reached '%s'", page) - break - case 403: - log.critical(error) - self.subscription_task.cancel() - return [] # returning an empty list should gracefully end the task - case _: - log.error(error) - break - - except Exception as error: - log.error("Exception while gathering page data %s", error) + data = await self.get_guild_settings_page(guild_ids, page) + if not data: break + guild_settings.extend(data[0]) - return Subscription.from_list(sub_data) + # Only return active guild IDs + return GuildSettings.from_list(guild_settings) + + async def get_guild_settings_page(self, guild_ids: list[int], page: int) -> list[dict]: + """Returns an individual page of guild settings.""" + + try: + return await self.api.get_guild_settings(guild_id__in=guild_ids, page=page+1) + except aiohttp.ClientResponseError as error: + self.handle_pagination_error(error) + return [] + + def handle_pagination_error(self, error: aiohttp.ClientResponseError): + """Handle the error cases from pagination attempts.""" + + match error.status: + case 404: + log.debug("final page reached") + case 403: + log.critical("[403] Bot likely lacks permissions: %s", error, exc_info=True) + self.subscription_task.cancel() # can't do task without proper auth, so cancel permanently + case _: + log.debug(error) + + async def get_subscriptions(self, guild_ids: list[int]) -> list[Subscription]: + """Get a list of `Subscription`s matching the given `guild_ids`.""" + + subscriptions = [] + + # Iterate infinitely taking the iter no. as `page` + # data will be empty after last page reached. + for page, _ in enumerate(iter(int, 1)): + data = await self.get_subs_page(guild_ids, page) + if not data: + break + + subscriptions.extend(data[0]) + + return Subscription.from_list(subscriptions) + + async def get_subs_page(self, guild_ids: list[int], page: int) -> list[Subscription]: + """Returns an individual page of subscriptions.""" + + try: + return await self.api.get_subscriptions(guild_id__in=guild_ids, page=page+1) + except aiohttp.ClientResponseError as error: + self.handle_pagination_error(error) + return [] async def process_subscriptions(self, subscriptions: list[Subscription]): + """Process a given list of `Subscription`s.""" + async def process_single_subscription(sub: Subscription): log.debug("processing subscription '%s' for '%s'", sub.id, sub.guild_id) @@ -143,7 +188,7 @@ class TaskCog(commands.Cog): async def process_items(self, sub: Subscription, feed: RSSFeed): log.debug("processing items") - channels = [self.bot.get_channel(channel.channel_id) for channel in await sub.get_channels(self.api)] + channels = await self.fetch_or_get_channels(await sub.get_channels(self.api)) filters = [await self.api.get_filter(filter_id) for filter_id in sub.filters] for item in feed.items: @@ -159,6 +204,18 @@ class TaskCog(commands.Cog): for channel in channels: await self.track_and_send(sub, feed, item, mutated_item, channel, blocked) + async def fetch_or_get_channels(self, channels_data: list[dict]): + channels = [] + + for data in channels_data: + try: + channel = self.bot.get_channel(data.channel_id) + channels.append(channel or await self.bot.fetch_channel(data.channel_id)) + except Forbidden: + log.error(f"Forbidden Channel '{data.channel_id}'") + + return channels + def filter_item(self, _filter: dict, item: RSSItem) -> bool: """ Returns `True` if item should be ignored due to filters. @@ -188,8 +245,8 @@ class TaskCog(commands.Cog): log.debug("sending '%s', exists '%s'", item.guid, result[1]) message = await channel.send(embed=await mutated_item.to_embed(sub, feed, self.api.session)) message_id = message.id - except Forbidden as error: - log.error(error) + except Forbidden: + log.error(f"Forbidden to send to channel {channel.id}") await self.mark_tracked_item(sub, item, channel.id, message_id, blocked) diff --git a/src/feed.py b/src/feed.py index 0269e3b..443e86d 100644 --- a/src/feed.py +++ b/src/feed.py @@ -270,6 +270,19 @@ class DjangoDataModel(ABC): return cls(**cls.parser(data)) +@dataclass(slots=True) +class GuildSettings(DjangoDataModel): + + id: int + guild_id: int + default_embed_colour: str + active: bool + + @staticmethod + def parser(item: dict) -> dict: + return item + + @dataclass(slots=True) class Subscription(DjangoDataModel): @@ -324,51 +337,21 @@ class SubChannel(DjangoDataModel): return f"<#{self.channel_id}>" -@dataclass(slots=True) -class SubscriptionChannel(DjangoDataModel): - - uuid: str - id: int - subscription: Subscription - creation_datetime: datetime - - @staticmethod - def parser(item: dict) -> dict: - - item["id"] = int(item["id"]) - item["subscription"] = Subscription.from_dict(item.pop("subscription")) - item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT) - return item - - @property - def mention(self) -> str: - """ - Returns the `id` as a string in the discord mention format. - """ - - return f"<#{self.id}>" - - @property - def hyperlinks_string(self) -> str: - """""" - - api_hyperlink = f"[API]({API.CHANNEL_ENDPOINT}{self.uuid}/)" - rss_hyperlink = f"[RSS]({self.subscription.rss_url})" - value = f"{rss_hyperlink} · {api_hyperlink}" - - return value - - @dataclass(slots=True) class TrackedContent(DjangoDataModel): - uuid: str + id: int + guid: str + title: str + url: str subscription: str - content_url: str + channel_id: int + message_id: int + blocked: bool creation_datetime: datetime @staticmethod def parser(item: dict) -> dict: - item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT) + item["creation_datetime"] = datetime.strptime(item["creation_datetime"], "%Y-%m-%dT%H:%M:%S.%f%z") return item diff --git a/src/utils.py b/src/utils.py index 6b4dc30..e4fdede 100644 --- a/src/utils.py +++ b/src/utils.py @@ -83,8 +83,6 @@ class PaginationView(View): self.index = initpage # emoji reference - next_emoji = bot.get_emoji(1204542366602502265) - prev_emoji = bot.get_emoji(1204542365432422470) self.start_emoji = bot.get_emoji(1204542364073463818) self.end_emoji = bot.get_emoji(1204542367752003624) @@ -113,7 +111,6 @@ class PaginationView(View): @staticmethod def calc_total_pages(results: int, max_pagesize: int) -> int: result = ((results - 1) // max_pagesize) + 1 - log.debug("total pages calculated: %s", result) return result def calc_dataitem_index(self, dataitem_index: int): @@ -200,6 +197,7 @@ class PaginationView(View): raise exc self.maxpage = self.calc_total_pages(total_results, self.pagesize) + log.debug(f"{self.maxpage=!r}") for i, item in enumerate(data): i = self.calc_dataitem_index(i) @@ -228,6 +226,9 @@ class PaginationView(View): self.children[1].disabled = self.index == self.maxpage async def send(self): + """Send the pagination view. It may be important to defer before invoking this method.""" + + log.debug("sending pagination view") embed = await self.create_paged_embed() if self.maxpage <= 1: