PYRSS-Bot/src/models.py
Corban-Lee Jones eb97dca5c6
Some checks failed
Build and Push Docker Image / build (push) Has been cancelled
process models in task
2024-10-31 13:10:27 +00:00

368 lines
11 KiB
Python

import re
import logging
import hashlib
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
from dataclasses import dataclass
import httpx
import discord
import rapidfuzz
import feedparser
log = logging.getLogger(__name__)
@dataclass
class DjangoDataModel(ABC):
@staticmethod
@abstractmethod
def parser(item: dict) -> dict:
return item
@classmethod
def from_list(cls, data: list[dict]) -> list:
return [cls(**cls.parser(item)) for item in data]
@classmethod
def from_dict(cls, data: dict):
return cls(**cls.parser(data))
@dataclass(slots=True)
class Server(DjangoDataModel):
id: int
name: str
icon_hash: str
is_bot_operational: bool
active: bool
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item.pop("id"))
return item
class MatchingAlgorithm(Enum):
NONE = 0
ANY = 1
ALL = 2
LITERAL = 3
REGEX = 4
FUZZY = 5
AUTO = 6
@classmethod
def from_value(cls, value: int):
for member in cls:
if member.value == value:
return member
raise ValueError(f"No {cls.__class__.__name__} for value: {value}")
@dataclass(slots=True)
class ContentFilter(DjangoDataModel):
id: int
server_id: int
name: str
matching_pattern: str
matching_algorithm: MatchingAlgorithm
is_insensitive: bool
is_whitelist: bool
@staticmethod
def parser(item: dict) -> dict:
item["id"] = item.pop("id")
item["server_id"] = item.pop("server")
item["matching_pattern"] = item.pop("match")
item["matching_algorithm"] = MatchingAlgorithm.from_value(item.pop("matching_algorithm"))
return item
@property
def _regex_flags(self):
return re.IGNORECASE if self.is_insensitive else 0
@property
def cleaned_matching_pattern(self):
"""
Splits the pattern to individual keywords, getting rid of unnecessary
spaces and grouping quoted words together.
"""
findterms = re.compile(r'"([^"]+)"|(\S+)').findall
normspace = re.compile(r"\s+").sub
return [
re.escape(normspace(" ", (t[0] or t[1]).strip())).replace(r"\ ", r"\s+")
for t in findterms(self.matching_pattern)
]
def _match_any(self, matching_against: str):
for word in self.cleaned_matching_pattern:
if re.search(rf"\b{word}\b", matching_against, self._regex_flags):
return True
return False
def _match_all(self, matching_against: str):
for word in self.cleaned_matching_pattern:
if re.search(rf"\b{word}\b", matching_against, self._regex_flags):
return False
return True
def _match_literal(self, matching_against: str):
return bool(
re.search(
rf"\b{re.escape(self.matching_pattern)}\b",
matching_against,
self._regex_flags
)
)
def _match_regex(self, matching_against: str):
try:
return bool(re.search(
re.compile(self.matching_pattern, self._regex_flags),
matching_against
))
except re.error as exc:
log.error(f"Filter regex error: {exc}")
return False
def _match_fuzzy(self, matching_against: str):
matching_against = re.sub(r"[^\w\s]", "", matching_against)
matching_pattern = re.sub(r"[^\w\s]", "", self.matching_pattern)
if self.is_insensitive:
matching_against = matching_against.lower()
matching_pattern = matching_pattern.lower()
return rapidfuzz.fuzz.partial_ratio(
matching_against,
matching_pattern,
score_cutoff=90
)
def _get_algorithm_func(self):
match self.matching_algorithm:
case MatchingAlgorithm.NONE: return
case MatchingAlgorithm.ANY: return self._match_any
case MatchingAlgorithm.ALL: return self._match_all
case MatchingAlgorithm.LITERAL: return self._match_literal
case MatchingAlgorithm.REGEX: return self._match_regex
case MatchingAlgorithm.FUZZY: return self._match_fuzzy
case _: return
def matches(self, content) -> bool:
log.debug(f"applying filter: {self}")
if not self.matching_pattern.strip():
return False
algorithm_func = self._get_algorithm_func()
if not algorithm_func:
log.error(f"Bad algorithm function: {self.matching_algorithm}")
return False
match_found = algorithm_func(content.item_title) or algorithm_func(content.item_description)
log.debug(f"filter match found: {match_found}")
return not match_found if self.is_whitelist else match_found
@dataclass(slots=True)
class MessageMutator(DjangoDataModel):
id: int
name: str
value: str
@staticmethod
def parser(item: dict) -> dict:
item["id"] = item.pop("id")
return item
@dataclass(slots=True)
class MessageStyle(DjangoDataModel):
id: int
server_id: int
name: str
colour: str
is_embed: bool
is_hyperlinked: bool
show_author: bool
show_timestamp: bool
show_images: bool
fetch_images: bool
title_mutator: dict | None
description_mutator: dict | None
auto_created: bool
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item.pop("id"))
item["server_id"] = int(item.pop("server"))
item["title_mutator"] = item.pop("title_mutator_detail")
item["description_mutator"] = item.pop("description_mutator_detail")
return item
@dataclass(slots=True)
class UniqueContentRule(DjangoDataModel):
id: int
name: str
value: str
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item.pop("id"))
return item
@dataclass(slots=True)
class DiscordChannel(DjangoDataModel):
id: int
name: str
is_nsfw: bool
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item.pop("id"))
return item
@dataclass(slots=True)
class Subscription(DjangoDataModel):
id: int
server_id: int
name: str
url: str
created_at: datetime
updated_at: datetime
extra_notes: str
active: bool
publish_threshold: datetime
channels: list[DiscordChannel]
filters: list[ContentFilter]
message_style: MessageStyle
unique_rules: UniqueContentRule
_server: Server | None = None
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item.pop("id"))
item["server_id"] = int(item.pop("server"))
item["created_at"] = datetime.strptime(item.pop("created_at"), "%Y-%m-%dT%H:%M:%S.%f%z")
item["updated_at"] = datetime.strptime(item.pop("updated_at"), "%Y-%m-%dT%H:%M:%S.%f%z")
item["publish_threshold"] = datetime.strptime(item.pop("publish_threshold"), "%Y-%m-%dT%H:%M:%S%z")
item["channels"] = DiscordChannel.from_list(item.pop("channels_detail"))
item["filters"] = ContentFilter.from_list(item.pop("filters_detail"))
item["message_style"] = MessageStyle.from_dict(item.pop("message_style_detail"))
item["unique_rules"] = UniqueContentRule.from_list(item.pop("unique_rules_detail"))
return item
@property
def server(self) -> Server:
return self._server
@server.setter
def server(self, server: server):
self._server = server
async def get_rss_content(self, client: httpx.AsyncClient) -> str:
try:
response = await client.get(self.url)
response.raise_for_status()
except httpx.HTTPError as exc:
log.error("(%s) HTTP Exception for %s - %s", type(exc), exc.request.url, exc)
return
content_type = response.headers.get("Content-Type")
if not "text/xml" in content_type:
log.warning("Invalid 'Content-Type' header: %s (must contain 'text/xml')", content_type)
return
return response.text
async def get_discord_channels(self, bot) -> list:
channels = []
for channel_detail in self.channels:
try:
channel = bot.get_channel(channel_detail.id)
channels.append(channel or await bot.fetch_channel(channel_detail.id))
except discord.Forbidden:
log.error(f"Forbidden channel: ({channel.name}, {channel.id}) from ({self.server.name}, {self.server.id})")
return channels
def filter_entries(self, contents: list) -> tuple[list, list]:
log.debug(f"filtering entries for {self.name} in {self.server.name}")
valid_contents = []
invalid_contents = []
for content in contents:
log.debug(f"filtering: '{content.item_title}'")
if any(content_filter.matches(content) for content_filter in self.filters):
invalid_contents.append(content)
else:
valid_contents.append(content)
log.debug(f"filtered content: valid:{len(valid_contents)}, invalid:{len(invalid_contents)}")
return valid_contents, invalid_contents
@dataclass(slots=True)
class Content(DjangoDataModel):
id: int
subscription_id: int
item_id: str
item_guid: str
item_url: str
item_title: str
item_description: str
_subscription: Subscription | None = None
@staticmethod
def parser(item: dict) -> dict:
item["id"] = item.pop("id")
item["subscription_id"] = item.pop("subscription")
return item
@classmethod
def from_raw_rss(cls, raw_rss_content: str, subscription: Subscription):
parsed_rss = feedparser.parse(raw_rss_content)
contents = []
for entry in parsed_rss.entries:
# content_hash = hashlib.new("sha256")
# content_hash.update(entry.get("description", "").encode())
# content_hash.hexdigest()
data = {
"id": -1,
"subscription": subscription.id,
"item_id": entry.get("id", ""),
"item_guid": entry.get("guid", ""),
"item_url": entry.get("link", ""),
"item_title": entry.get("title", ""),
"item_description": entry.get("description", "")
}
content = Content.from_dict(data)
content.subscription = subscription
contents.append(content)
return contents
@property
def subscription(self) -> Subscription:
return self._subscription
@subscription.setter
def subscription(self, subscription: Subscription):
self._subscription = subscription