diff --git a/src/extensions/tasks.py b/src/extensions/tasks.py index f514720..7634fe9 100644 --- a/src/extensions/tasks.py +++ b/src/extensions/tasks.py @@ -231,7 +231,8 @@ class TaskCog(commands.Cog): title=article.title, url=article.url, subscription=sub_id, - blocked=blocked + blocked=blocked, + channel_id="-_-" ) log.debug("successfully tracked %s", article.guid) diff --git a/src/feed.py b/src/feed.py index bac579a..23e5660 100644 --- a/src/feed.py +++ b/src/feed.py @@ -16,7 +16,7 @@ from sqlalchemy import select, insert, delete, and_ from sqlalchemy.exc import NoResultFound from textwrap import shorten -from mutators import mutator_map +from mutators import registry as mutator_registry from errors import IllegalFeed from db import DatabaseManager, RssSourceModel, FeedChannelModel from utils import get_rss_data, get_unparsed_feed @@ -26,8 +26,114 @@ log = logging.getLogger(__name__) dumps = lambda _dict: json.dumps(_dict, indent=8) +from xml.etree.ElementTree import Element, SubElement, tostring +from feedparser import parse + +class RSSItem: + def __init__(self, title, link, description, pub_date, guid): + self.title = title + self.link = link + self.description = description + self.pub_date = pub_date + self.guid = guid + + def __str__(self): + return self.title + + def create_mutated_copy(self, mutators): + pass + + def create_embed(self, sub, feed): + + title = shorten(markdownify(self.title, strip=["img", "a"]), 256) + desc = shorten(markdownify(self.description, strip=["img"]), 4096) + + author = "" + author_url = "" + + icon_url = "" + thumb_url = "" + + # Replace HTML with Markdown, and shorten text. + # author = shorten(self.source.name, 256) + + # validate urls + # author_url = self.source.url if validators.url(self.source.url) else None + # icon_url = self.source.icon_url if validators.url(self.source.icon_url) else None + # thumb_url = await self.get_thumbnail_url(session) # validation done inside func + + # Combined length validation + # Can't exceed combined 6000 characters, [400 Bad Request] if failed. + combined_length = len(title) + len(desc) + (len(author) * 2) + cutoff = combined_length - 6000 + desc = shorten(desc, cutoff) if cutoff > 0 else desc + + embed = Embed( + title=title, + description=desc, + timestamp=self.published, + url=self.link if validators.url(self.link) else None, + colour=colour + ) + + # embed.set_thumbnail(url=icon_url) + # embed.set_image(url=thumb_url) + # embed.set_author(url=author_url, name=author) + # embed.set_footer(text=self.author) + + return embed + + +class RSSFeed: + def __init__(self, title, link, description, language='en-gb', pub_date=None, last_build_date=None): + self.title = title + self.link = link + self.description = description + self.language = language + self.pub_date = pub_date + self.last_build_date = last_build_date + self.items = [] + + def add_item(self, item: RSSItem): + if not isinstance(item, RSSItem): + raise TypeError("item must be an instance of RSSItem") + + self.items.append(item) + + def __str__(self): + return self.guid + + @classmethod + def from_parsed_feed(cls, parsed_feed): + title = parsed_feed.feed.get('title', 'No title') + link = parsed_feed.feed.get('link', 'No link') + description = parsed_feed.feed.get('description', 'No description') + language = parsed_feed.feed.get('language', 'en-gb') + pub_date = parsed_feed.feed.get('published', None) + last_build_date = parsed_feed.feed.get('updated', None) + + feed = cls(title, link, description, language, pub_date, last_build_date) + + for entry in parsed_feed.entries: + item_title = entry.get('title', 'No title') + item_link = entry.get('link', 'No link') + item_description = entry.get('description', 'No description') + item_pub_data = entry.get('published_parsed', None) + item_guid = entry.get('id', None) or entry.get("guid", None) + + item_published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None + + item = RSSItem(item_title, item_link, item_description, item_published, item_guid) + feed.add_item(item) + + feed.items.reverse() + + return feed + + + @dataclass -class Article: +class RSSArticle: """Represents a news article, or entry from an RSS feed.""" guid: str @@ -68,21 +174,27 @@ class Article: source=source ) - def mutate(self, attr: str, mutator: dict): + def mutate(self, attr: str, mutator: dict[str, str]): """ Apply a mutation to a certain text attribute of this Article instance. """ - log.debug("Applying mutator '%s'", mutator["name"]) + # WARN: + # This could be really bad if the end user is able to effect the 'attr' value. + # Shouldn't happen though. - mutator_value = mutator["value"] + log.debug("applying mutator '%s'", mutator["name"]) + val = mutator["value"] + + try: + mutator = mutator_registry.get_mutator(val) + except ValueError as err: + log.error(err) + + setattr(self, attr, mutator.mutate(getattr(self, attr))) + log.debug("mutated %s, to: %s", attr, getattr(self, attr)) - if mutator_value in mutator_map: - setattr(self, attr, mutator_map[mutator_value](getattr(self, attr))) - log.debug("mutated %s, to: %s", attr, getattr(self, attr)) - else: - log.warn("Unknown mutator value '%s', skipping", mutator["value"]) async def get_thumbnail_url(self, session: aiohttp.ClientSession) -> str | None: """Returns the thumbnail URL for an article. @@ -165,8 +277,8 @@ class Article: @dataclass -class Source: - """Represents an RSS source.""" +class RSSFeedSource: + """Represents an RSS Feed.""" name: str | None description: str | None @@ -232,7 +344,7 @@ class Source: @dataclass -class RSSFeed: +class RSSFeedSource_: uuid: str name: str