diff --git a/requirements.txt b/requirements.txt index b076812..890f171 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ async-timeout==4.0.3 asyncpg==0.29.0 attrs==23.2.0 beautifulsoup4==4.12.3 +click==8.1.7 discord.py==2.3.2 feedparser==6.0.11 frozenlist==1.4.1 @@ -20,5 +21,6 @@ six==1.16.0 soupsieve==2.5 SQLAlchemy==2.0.23 typing_extensions==4.10.0 +uwuify==1.3.1 validators==0.22.0 yarl==1.9.4 diff --git a/src/extensions/tasks.py b/src/extensions/tasks.py index f82ef1e..fb43909 100644 --- a/src/extensions/tasks.py +++ b/src/extensions/tasks.py @@ -146,7 +146,7 @@ class TaskCog(commands.Cog): if not articles: log.debug("No articles found") - embeds = await self.get_articles_as_embeds(api, session, sub.id, filters, articles) + embeds = await self.get_articles_as_embeds(api, session, sub.id, sub.mutators, filters, articles) await self.send_embeds_in_chunks(embeds, channels) async def get_articles_as_embeds( @@ -154,6 +154,7 @@ class TaskCog(commands.Cog): api: API, session: aiohttp.ClientSession, sub_id: int, + mutators: list[dict], filters: list[dict], articles: list[Article] ) -> list[Embed]: @@ -163,7 +164,7 @@ class TaskCog(commands.Cog): embeds = [] for article in articles: - embed = await self.process_article(api, session, sub_id, filters, article) + embed = await self.process_article(api, session, sub_id, mutators, filters, article) if embed: embeds.append(embed) @@ -206,6 +207,7 @@ class TaskCog(commands.Cog): api: API, session: aiohttp.ClientSession, sub_id: int, + mutators: list[dict], filters: list[dict], article: Article ) -> Embed | None: @@ -219,6 +221,9 @@ class TaskCog(commands.Cog): blocked = any(self.filter_article(_filter, article) for _filter in filters) log.debug("filter result: %s", "blocked" if blocked else "ok") + for mutator in mutators: + article.mutate(mutator) + try: await api.create_tracked_content( guid=article.guid, diff --git a/src/feed.py b/src/feed.py index 1308e1e..29fab90 100644 --- a/src/feed.py +++ b/src/feed.py @@ -1,12 +1,11 @@ -import ssl import json import logging from dataclasses import dataclass from datetime import datetime from abc import ABC, abstractmethod +from random import shuffle, sample - import aiohttp import validators from discord import Embed, Colour @@ -17,6 +16,7 @@ from sqlalchemy import select, insert, delete, and_ from sqlalchemy.exc import NoResultFound from textwrap import shorten +import mutators from errors import IllegalFeed from db import DatabaseManager, RssSourceModel, FeedChannelModel from utils import get_rss_data, get_unparsed_feed @@ -68,6 +68,57 @@ class Article: source=source ) + def mutate(self, mutator: dict): + + log.debug("Applying mutator '%s'", mutator["name"]) + + match mutator["value"]: + + case "UWU_TITLE": + self.title = mutators.uwu(self.title) + + case "UWU_DESC": + self.description = mutators.uwu(self.description) + + case "GIB_TITLE": + self.title = mutators.gibberish(self.title) + + case "GIB_DESC": + self.description = mutators.gibberish(self.description) + + case "L3_TITLE": + self.title = mutators.leet(self.title) + + case "L3_DESC": + self.description = mutators.leet(self.description) + + case "REV_TITLE": + self.title = mutators.reverse(self.title) + + case "REV_DESC": + self.description = mutators.reverse(self.description) + + case "RND_TITLE": + self.title = mutators.shuffle(self.title) + + case "RND_DESC": + self.description = mutators.shuffle(self.description) + + case "PGL_TITLE": + self.title = mutators.pig_latin(self.title) + + case "PGL_DESC": + self.description = mutators.pig_latin(self.description) + + case "RCS_TITLE": + self.title = mutators.random_case(self.title) + + case "RCS_DESC": + self.description = mutators.random_case(self.description) + + case _: + log.warn("Unknown mutator value '%s', skipping", mutator["value"]) + async def get_thumbnail_url(self, session: aiohttp.ClientSession) -> str | None: """Returns the thumbnail URL for an article. @@ -266,6 +317,7 @@ class Subscription(DjangoDataModel): creation_datetime: datetime extra_notes: str filters: list[int] + mutators: list[dict] active: bool channels_count: int diff --git a/src/mutators.py b/src/mutators.py new file mode 100644 index 0000000..802f482 --- /dev/null +++ b/src/mutators.py @@ -0,0 +1,51 @@ +import random + +import uwuify + +leet_map = {'a': '4', 'e': '3', 'i': '1', 'o': '0', 's': '5', 't': '7'} + +def uwu(text: str) -> str: + return uwuify.uwu(text) + +def gibberish(text: str) -> str: + return " ".join(["".join( + random.sample(word, len(word))) + for word in text.split() + ]) + +def leet(text: str) -> str: + return "".join([ + leet_map.get(char.lower(), char) + for char in text + ]) + +def reverse(text: str) -> str: + return text[::-1] + +def shuffle(text: str) -> str: + words = text.split() + random.shuffle(words) + return " ".join(words) + +def _pig_latin_word(word: str) -> str: + first_vowel = 0 + for char in word: + if char in "aeiou": + break + + first_vowel += 1 + + if first_vowel == 0: + return word + "way" + + else: + return word[first_vowel:] + word[:first_vowel] + "ay" + +def pig_latin(text: str) -> str: + return " ".join([_pig_latin_word(word) for word in text.split()]) + +def random_case(text: str) -> str: + return "".join([ + char.upper() if random.random() > 0.5 else char.lower() + for char in text + ]) \ No newline at end of file