diff --git a/src/extensions/cmd.py b/src/extensions/cmd.py index 22515ad..01ac0e3 100644 --- a/src/extensions/cmd.py +++ b/src/extensions/cmd.py @@ -3,6 +3,7 @@ Extension for the `CommandCog`. Loading this file via `commands.Bot.load_extension` will add `CommandCog` to the bot. """ +import json import logging import validators @@ -10,13 +11,13 @@ import aiohttp import textwrap import feedparser from markdownify import markdownify -from discord import app_commands, Interaction, Embed +from discord import app_commands, Interaction, Embed, Colour from discord.ext import commands, tasks from discord.app_commands import Choice, Group, command, autocomplete from sqlalchemy import insert, select, update, and_, or_, delete from db import DatabaseManager, AuditModel, SentArticleModel, RssSourceModel, FeedChannelModel -from feed import Feeds, get_source +from feed import get_source, Source log = logging.getLogger(__name__) @@ -156,7 +157,15 @@ class CommandCog(commands.Cog): inter.user.id, database=database ) - await followup(inter, f"RSS source added [{nickname}]({url})", suppress_embeds=True) + embed = Embed( + title=f"New RSS Source: **{nickname}**", + url=url, + colour=Colour.from_str("#59ff00") + ) + embed.set_thumbnail(url=feed.get("feed", {}).get("image", {}).get("href")) + + # , f"RSS source added [{nickname}]({url})", suppress_embeds=True + await followup(inter, embed=embed) @rss_group.command(name="remove") @autocomplete(source=source_autocomplete) @@ -176,16 +185,17 @@ class CommandCog(commands.Cog): log.debug(f"Attempting to remove RSS source ({source=})") async with DatabaseManager() as database: - rss_source = (await database.session.execute( + select_result = await database.session.execute( select(RssSourceModel).filter( and_( RssSourceModel.discord_server_id == inter.guild_id, RssSourceModel.rss_url == source ) ) - )).fetchone() + ) + rss_source = select_result.fetchone() - result = await database.session.execute( + delete_result = await database.session.execute( delete(RssSourceModel).filter( and_( RssSourceModel.discord_server_id == inter.guild_id, @@ -194,11 +204,13 @@ class CommandCog(commands.Cog): ) ) + nickname, rss_url = rss_source.nick, rss_source.rss_url + # TODO: `if not result.rowcount` then show unique message and possible matches if any (like how the autocomplete works) - if result.rowcount: + if delete_result.rowcount: await followup(inter, - f"RSS source deleted successfully\n**[{rss_source.nick}]({rss_source.rss_url})**", + f"RSS source deleted successfully\n**[{nickname}]({rss_url})**", suppress_embeds=True ) return @@ -285,10 +297,6 @@ class CommandCog(commands.Cog): await followup(inter, embeds=embeds) - - - - async def setup(bot): """ Setup function for this extension. diff --git a/src/extensions/test.py b/src/extensions/test.py deleted file mode 100644 index 7651254..0000000 --- a/src/extensions/test.py +++ /dev/null @@ -1,105 +0,0 @@ -""" -Extension for the `test` cog. -Loading this file via `commands.Bot.load_extension` will add the `test` cog to the bot. -""" - -import logging - -import textwrap -from markdownify import markdownify -from discord import app_commands, Interaction, Embed -from discord.ext import commands, tasks -from sqlalchemy import insert, select, and_ - -from db import DatabaseManager, AuditModel, SentArticleModel, RssSourceModel -from feed import Feeds, get_source - -log = logging.getLogger(__name__) - - -class Test(commands.Cog): - """ - News cog. - Delivers embeds of news articles to discord channels. - """ - - def __init__(self, bot): - super().__init__() - self.bot = bot - - @commands.Cog.listener() - async def on_ready(self): - log.info(f"{self.__class__.__name__} cog is ready") - - async def source_autocomplete(self, inter: Interaction, current: str): - """ - - """ - - async with DatabaseManager() as database: - whereclause = and_( - RssSourceModel.discord_server_id == inter.guild_id, - RssSourceModel.rss_url.ilike(f"%{current}%") - ) - query = select(RssSourceModel).where(whereclause) - result = await database.session.execute(query) - sources = [ - app_commands.Choice(name=rss.rss_url, value=rss.rss_url) - for rss in result.scalars().all() - ] - - return sources - - @app_commands.command(name="test-latest-article") - @app_commands.autocomplete(source=source_autocomplete) - async def test_news(self, inter: Interaction, source: str): - - await inter.response.defer() - await self.bot.audit("Requesting latest article.", inter.user.id) - - try: - source = get_source(source) - article = source.get_latest_article() - except IndexError as e: - log.error(e) - await inter.followup.send("An error occured, it's possible that the source provided was bad.") - return - - md_description = markdownify(article.description, strip=("img",)) - article_description = textwrap.shorten(md_description, 4096) - - embed = Embed( - title=article.title, - description=article_description, - url=article.url, - timestamp=article.published, - ) - embed.set_thumbnail(url=source.icon_url) - embed.set_image(url=await article.get_thumbnail_url()) - embed.set_footer(text=article.author) - embed.set_author( - name=source.name, - url=source.url, - ) - - async with DatabaseManager() as database: - query = insert(SentArticleModel).values( - discord_server_id=inter.guild_id, - discord_channel_id=inter.channel_id, - discord_message_id=inter.id, - article_url=article.url - ) - await database.session.execute(query) - - await inter.followup.send(embed=embed) - - -async def setup(bot): - """ - Setup function for this extension. - Adds the `ErrorCog` cog to the bot. - """ - - cog = Test(bot) - await bot.add_cog(cog) - log.info(f"Added {cog.__class__.__name__} cog") diff --git a/src/feed.py b/src/feed.py index 09b377d..d4a8474 100644 --- a/src/feed.py +++ b/src/feed.py @@ -1,7 +1,9 @@ +""" + +""" import json import logging -from enum import Enum from dataclasses import dataclass from datetime import datetime @@ -10,16 +12,70 @@ from bs4 import BeautifulSoup as bs4 from feedparser import FeedParserDict, parse log = logging.getLogger(__name__) +dumps = lambda _dict: json.dumps(_dict, indent=8) -class Feeds(Enum): - THE_UPPER_LIP = "https://theupperlip.co.uk/rss" - THE_BABYLON_BEE= "https://babylonbee.com/feed" - BBC_NEWS = "https://feeds.bbci.co.uk/news/rss.xml" +@dataclass +class Article: + """Represents a news article, or entry from an RSS feed.""" + + title: str | None + description: str | None + url: str | None + published: datetime | None + author: str | None + + @classmethod + def from_entry(cls, entry:FeedParserDict): + """Create an Article from an RSS feed entry. + + Parameters + ---------- + entry : FeedParserDict + An entry pulled from a complete FeedParserDict object. + + Returns + ------- + Article + The Article created from the feed entry. + """ + + log.debug("Creating Article from entry: %s", dumps(entry)) + + published_parsed = entry.get("published_parsed") + published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None + + return cls( + title=entry.get("title"), + description=entry.get("description"), + url=entry.get("link"), + published=published, + author = entry.get("author") + ) + + async def get_thumbnail_url(self) -> str | None: + """Returns the thumbnail URL for an article. + + Returns + ------- + str or None + The thumbnail URL, or None if not found. + """ + + log.debug("Fetching thumbnail for article: %s", self) + + async with aiohttp.ClientSession() as session: + async with session.get(self.url) as response: + html = await response.text() + + soup = bs4(html, "html.parser") + image_element = soup.select_one("meta[property='og:image']") + return image_element.get("content") if image_element else None @dataclass class Source: + """Represents an RSS source.""" name: str | None url: str | None @@ -28,7 +84,20 @@ class Source: @classmethod def from_parsed(cls, feed:FeedParserDict): - # print(json.dumps(feed, indent=8)) + """Returns a Source object from a parsed feed. + + Parameters + ---------- + feed : FeedParserDict + The feed used to create the Source. + + Returns + ------- + Source + The Source object + """ + + log.debug("Creating Source from feed: %s", dumps(feed)) return cls( name=feed.get("channel", {}).get("title"), @@ -37,86 +106,42 @@ class Source: feed=feed ) - def get_latest_articles(self, max: int) -> list: - """""" + def get_latest_articles(self, max: int) -> list[Article]: + """Returns a list of Article objects. - articles = [] + Parameters + ---------- + max : int + The maximum number of articles to return. - for i, entry in enumerate(self.feed.entries): - if i >= max: - break - - articles.append(Article.from_entry(entry)) - - return articles - - -@dataclass -class Article: - - title: str | None - description: str | None - url: str | None - published: datetime | None - author: str | None - - @classmethod - def from_parsed(cls, feed:FeedParserDict): - entry = feed.entries[0] - # log.debug(json.dumps(entry, indent=8)) - - published_parsed = entry.get("published_parsed") - published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None - - return cls( - title=entry.get("title"), - description=entry.get("description"), - url=entry.get("link"), - published=published, - author = entry.get("author") - ) - - @classmethod - def from_entry(cls, entry:FeedParserDict): - - published_parsed = entry.get("published_parsed") - published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None - - return cls( - title=entry.get("title"), - description=entry.get("description"), - url=entry.get("link"), - published=published, - author = entry.get("author") - ) - - - async def get_thumbnail_url(self): + Returns + ------- + list of Article + A list of Article objects. """ - """ + log.debug("Fetching latest articles from %s, max=%s", self, max) - async with aiohttp.ClientSession() as session: - async with session.get(self.url) as response: - html = await response.text() - - # Parse the thumbnail for the news story - soup = bs4(html, "html.parser") - image_element = soup.select_one("meta[property='og:image']") - return image_element.get("content") if image_element else None + return [ + Article.from_entry(entry) + for i, entry in enumerate(self.feed.entries) + if i < max + ] def get_source(rss_url: str) -> Source: - """ + """_summary_ + Parameters + ---------- + rss_url : str + _description_ + + Returns + ------- + Source + _description_ """ parsed_feed = parse(rss_url) return Source.from_parsed(parsed_feed) - - -def get_test(): - - parsed = parse(Feeds.THE_UPPER_LIP.value) - print(json.dumps(parsed, indent=4)) - return parsed