This commit is contained in:
Corban-Lee Jones 2024-06-18 00:25:14 +01:00
parent f5766e0390
commit 745fda9bbe
4 changed files with 114 additions and 4 deletions

View File

@ -5,6 +5,7 @@ async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
beautifulsoup4==4.12.3
click==8.1.7
discord.py==2.3.2
feedparser==6.0.11
frozenlist==1.4.1
@ -20,5 +21,6 @@ six==1.16.0
soupsieve==2.5
SQLAlchemy==2.0.23
typing_extensions==4.10.0
uwuify==1.3.1
validators==0.22.0
yarl==1.9.4

View File

@ -146,7 +146,7 @@ class TaskCog(commands.Cog):
if not articles:
log.debug("No articles found")
embeds = await self.get_articles_as_embeds(api, session, sub.id, filters, articles)
embeds = await self.get_articles_as_embeds(api, session, sub.id, sub.mutators, filters, articles)
await self.send_embeds_in_chunks(embeds, channels)
async def get_articles_as_embeds(
@ -154,6 +154,7 @@ class TaskCog(commands.Cog):
api: API,
session: aiohttp.ClientSession,
sub_id: int,
mutators: list[dict],
filters: list[dict],
articles: list[Article]
) -> list[Embed]:
@ -163,7 +164,7 @@ class TaskCog(commands.Cog):
embeds = []
for article in articles:
embed = await self.process_article(api, session, sub_id, filters, article)
embed = await self.process_article(api, session, sub_id, mutators, filters, article)
if embed:
embeds.append(embed)
@ -206,6 +207,7 @@ class TaskCog(commands.Cog):
api: API,
session: aiohttp.ClientSession,
sub_id: int,
mutators: list[dict],
filters: list[dict],
article: Article
) -> Embed | None:
@ -219,6 +221,9 @@ class TaskCog(commands.Cog):
blocked = any(self.filter_article(_filter, article) for _filter in filters)
log.debug("filter result: %s", "blocked" if blocked else "ok")
for mutator in mutators:
article.mutate(mutator)
try:
await api.create_tracked_content(
guid=article.guid,

View File

@ -1,11 +1,10 @@
import ssl
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from abc import ABC, abstractmethod
from random import shuffle, sample
import aiohttp
import validators
@ -17,6 +16,7 @@ from sqlalchemy import select, insert, delete, and_
from sqlalchemy.exc import NoResultFound
from textwrap import shorten
import mutators
from errors import IllegalFeed
from db import DatabaseManager, RssSourceModel, FeedChannelModel
from utils import get_rss_data, get_unparsed_feed
@ -68,6 +68,57 @@ class Article:
source=source
)
def mutate(self, mutator: dict):
log.debug("Applying mutator '%s'", mutator["name"])
match mutator["value"]:
case "UWU_TITLE":
self.title = mutators.uwu(self.title)
case "UWU_DESC":
self.description = mutators.uwu(self.description)
case "GIB_TITLE":
self.title = mutators.gibberish(self.title)
case "GIB_DESC":
self.description = mutators.gibberish(self.description)
case "L3_TITLE":
self.title = mutators.leet(self.title)
case "L3_DESC":
self.description = mutators.leet(self.description)
case "REV_TITLE":
self.title = mutators.reverse(self.title)
case "REV_DESC":
self.description = mutators.reverse(self.description)
case "RND_TITLE":
self.title = mutators.shuffle(self.title)
case "RND_DESC":
self.description = mutators.shuffle(self.description)
case "PGL_TITLE":
self.title = mutators.pig_latin(self.title)
case "PGL_DESC":
self.description = mutators.pig_latin(self.description)
case "RCS_TITLE":
self.title = mutators.random_case(self.title)
case "RCS_DESC":
self.description = mutators.random_case(self.description)
case _:
log.warn("Unknown mutator value '%s', skipping", mutator["value"])
async def get_thumbnail_url(self, session: aiohttp.ClientSession) -> str | None:
"""Returns the thumbnail URL for an article.
@ -266,6 +317,7 @@ class Subscription(DjangoDataModel):
creation_datetime: datetime
extra_notes: str
filters: list[int]
mutators: list[dict]
active: bool
channels_count: int

51
src/mutators.py Normal file
View File

@ -0,0 +1,51 @@
import random
import uwuify
leet_map = {'a': '4', 'e': '3', 'i': '1', 'o': '0', 's': '5', 't': '7'}
def uwu(text: str) -> str:
return uwuify.uwu(text)
def gibberish(text: str) -> str:
return " ".join(["".join(
random.sample(word, len(word)))
for word in text.split()
])
def leet(text: str) -> str:
return "".join([
leet_map.get(char.lower(), char)
for char in text
])
def reverse(text: str) -> str:
return text[::-1]
def shuffle(text: str) -> str:
words = text.split()
random.shuffle(words)
return " ".join(words)
def _pig_latin_word(word: str) -> str:
first_vowel = 0
for char in word:
if char in "aeiou":
break
first_vowel += 1
if first_vowel == 0:
return word + "way"
else:
return word[first_vowel:] + word[:first_vowel] + "ay"
def pig_latin(text: str) -> str:
return " ".join([_pig_latin_word(word) for word in text.split()])
def random_case(text: str) -> str:
return "".join([
char.upper() if random.random() > 0.5 else char.lower()
for char in text
])