from __future__ import annotations import json import copy import logging from dataclasses import dataclass from datetime import datetime, timezone from abc import ABC, abstractmethod import aiohttp import validators from discord import Embed, Colour from bs4 import BeautifulSoup as bs4 from feedparser import FeedParserDict from markdownify import markdownify from textwrap import shorten from mutators import registry as mutator_registry from api import API DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" log = logging.getLogger(__name__) dumps = lambda _dict: json.dumps(_dict, indent=8) @dataclass(slots=True) class RSSItem: """Represents an entry from an RSS feed item list.""" guid: str link: str title: str description: str pub_date: datetime content_image_url: str thumb_image_url: str entry: FeedParserDict @classmethod def from_parsed_entry(cls, entry: FeedParserDict) -> RSSItem: """Returns an instance of `RSSItem` from a given `FeedParserDict`. Parameters ---------- entry: FeedParserDict The represented entry. Returns ------- RSSItem """ guid = entry.get('id', None) or entry.get("guid") link = entry.get('link', "") title = entry.get('title', "") description = entry.get('description', "") pub_date = entry.get('published_parsed', None) pub_date = datetime(*pub_date[0:6] if pub_date else None, tzinfo=timezone.utc) content_image_url = entry.get("media_content", [{}])[0].get("url") thumb_image_url = entry.get("media_thumbnail", [{}])[0].get("url") return cls(guid, link, title, description, pub_date, content_image_url, thumb_image_url, entry) def create_mutated_copy(self, mutators: dict[str, dict[str, str]]) -> RSSItem: """Returns a copy of `self` with the specified `mutations`. Parameters ---------- mutators: dict[str, dict[str, str]] Mutations to apply on the copy. Returns ------- RSSItem The copy of self. """ item_copy = copy.copy(self) def apply_mutation(item: RSSItem, attr: str, mutator: dict[str, str]): """Applies a specified `mutator` on the given `item`'s `attr`. Parameters ---------- item: RSSItem An RSSItem to mutate. attr: str The attribute of the RSSItem to mutate. mutator: dict[str, str] The mutator to apply. """ try: mutator = mutator_registry.get_mutator(mutator["value"]) except ValueError as err: log.error(err) return # mutator couldn't be found, so early return setattr(item, attr, mutator.mutate(getattr(item, attr))) for field in ("title", "description"): for mutator in mutators[field]: apply_mutation(item_copy, field, mutator) return item_copy async def to_embed(self, sub: Subscription, feed: RSSFeed, session: aiohttp.ClientSession) -> Embed: """Creates and returns a Discord Embed for this instance. Parameters ---------- sub: Subscription The subscription that this RSSItem derived from. feed: RSSFeed The feed containing this RSSItem in its entries. session: aiohttp.ClientSession A client session used to fetch thumbnail url if set. Returns ------- discord.Embed """ log.debug("Creating embed of item: %s", self.guid) # Replace HTML with Markdown, and shorten text. title = shorten(markdownify(self.title, strip=["img", "a"]), 256) desc = shorten(markdownify(self.description, strip=["img"]), 4096) author = shorten(feed.title, 256) # Combined length validation # Can't exceed combined 6000 characters, [400 Bad Request] if failed. combined_length = len(title) + len(desc) + (len(author) * 2) cutoff = combined_length - 6000 desc = shorten(desc, cutoff) if cutoff > 0 else desc embed = Embed( title=title, description=desc, timestamp=self.pub_date, url=self.link if validators.url(self.link) else None, colour=Colour.from_str("#" + sub.embed_colour) ) if sub.article_fetch_image: img_url = self.content_image_url if validators.url(self.content_image_url) else await self.get_thumbnail_url(session) img_url = self.thumb_image_url if not img_url and validators.url(self.thumb_image_url) else img_url embed.set_image(url=img_url) embed.set_thumbnail(url=feed.image_href if validators.url(feed.image_href) else None) embed.set_author(name=author, url=feed.link) embed.set_footer(text=sub.name) return embed async def get_thumbnail_url(self, session: aiohttp.ClientSession) -> str | None: """Returns the thumbnail URL for an article. Parameters ---------- session : aiohttp.ClientSession A client session used to get the thumbnail. Returns ------- str or None The thumbnail URL, or None if not found. """ log.debug("Fetching thumbnail for article: %s", self.guid) try: async with session.get(self.link, timeout=15) as response: html = await response.text() except aiohttp.InvalidURL as error: log.error("invalid thumbnail url: %s", error) return None soup = bs4(html, "html.parser") image_element = soup.select_one("meta[property='og:image']") if not image_element: return None image_content = image_element.get("content") return image_content if validators.url(image_content) else None @dataclass(slots=True) class RSSFeed: """Represents an RSS Feed, including its items.""" title: str description: str link: str lang: str last_build_date: datetime | None image_href: str items: list[RSSItem] = None def __post_init__(self): self.items = [] # can't use factory with dataclass slots, so this is second best. def add_item(self, item: RSSItem): """Add a given `RSSItem` to this feed's list of entries. Parameters ---------- item: RSSItem The item to add. """ if not isinstance(item, RSSItem): raise TypeError("item must be an instance of RSSItem") self.items.append(item) @classmethod def from_parsed_feed(cls, pf: FeedParserDict): """Returns an instance of `RSSItem` from a given `FeedParserDict`. Parameters ---------- pf: FeedParserDict The parsed feed being represented. Returns ------- RSSItem """ title = pf.feed.get('title', None) description = pf.feed.get('description', None) link = pf.feed.get('link', None) language = pf.feed.get('language', None) last_build_date = pf.feed.get('updated_parsed', None) if last_build_date: last_build_date = datetime(*last_build_date[0:-2]) image_href = pf.feed.get("image", {}).get("href") feed = cls(title, description, link, language, last_build_date, image_href) for entry in pf.entries: item = RSSItem.from_parsed_entry(entry) feed.add_item(item) feed.items.reverse() # order so that older items are processed first return feed @dataclass class DjangoDataModel(ABC): @staticmethod @abstractmethod def parser(item: dict) -> dict: """Overwrite this method to parse types.""" return item @classmethod def from_list(cls, data: list[dict]) -> list: return [cls(**cls.parser(item)) for item in data] @classmethod def from_dict(cls, data: dict): return cls(**cls.parser(data)) @dataclass(slots=True) class GuildSettings(DjangoDataModel): id: int guild_id: int default_embed_colour: str active: bool @staticmethod def parser(item: dict) -> dict: return item @dataclass(slots=True) class Subscription(DjangoDataModel): id: int name: str url: str guild_id: int creation_datetime: datetime extra_notes: str filters: list[int] mutators: dict[str, list[dict]] article_fetch_image: bool embed_colour: str published_threshold: datetime active: bool channels_count: int unique_content_rules: list @staticmethod def parser(item: dict) -> dict: item["guild_id"] = int(item["guild_id"]) item["creation_datetime"] = datetime.strptime(item["creation_datetime"], "%Y-%m-%dT%H:%M:%S.%f%z") item["mutators"] = { "title": item.pop("article_title_mutators"), "description": item.pop("article_desc_mutators") } item["published_threshold"] = datetime.strptime(item["published_threshold"], "%Y-%m-%dT%H:%M:%S%z") item["unique_content_rules"] = item.get("unique_content_rules", []) return item async def get_channels(self, api): channel_data, _ = await api.get_subscription_channels(subscription=self.id) return SubChannel.from_list(channel_data) @dataclass(slots=True) class SubChannel(DjangoDataModel): id: int channel_id: int channel_name: str subscription: int @staticmethod def parser(item: dict) -> dict: item["channel_id"] = int(item["channel_id"]) item["subscription"] = int(item["subscription"]) return item @property def mention(self) -> str: return f"<#{self.channel_id}>" @dataclass(slots=True) class TrackedContent(DjangoDataModel): id: int guid: str title: str url: str subscription: str channel_id: int message_id: int blocked: bool creation_datetime: datetime @staticmethod def parser(item: dict) -> dict: item["creation_datetime"] = datetime.strptime(item["creation_datetime"], "%Y-%m-%dT%H:%M:%S.%f%z") return item @dataclass(slots=True) class ContentFilter(DjangoDataModel): id: int name: str matching_algorithm: int match: str is_insensitive: bool is_whitelist: bool guild_id: int @staticmethod def parser(item: dict) -> dict: item["guild_id"] = int(item["guild_id"]) # stored as str due to a django/sqlite bug, convert back to int return item