From e000f05df22150755e9d13e296d48ec4279a5350 Mon Sep 17 00:00:00 2001 From: Corban-Lee Date: Wed, 10 Jul 2024 18:12:39 +0100 Subject: [PATCH] improved filters --- src/extensions/tasks.py | 36 +++++++------- src/filters.py | 102 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 20 deletions(-) create mode 100644 src/filters.py diff --git a/src/extensions/tasks.py b/src/extensions/tasks.py index 75d79f2..3d4cdf7 100644 --- a/src/extensions/tasks.py +++ b/src/extensions/tasks.py @@ -4,6 +4,7 @@ Loading this file via `commands.Bot.load_extension` will add `TaskCog` to the bo """ import re +import json import logging import datetime from os import getenv @@ -18,6 +19,7 @@ from feedparser import parse from feed import RSSFeed, Subscription, RSSItem from utils import get_unparsed_feed +from filters import match_text from api import API log = logging.getLogger(__name__) @@ -122,6 +124,8 @@ class TaskCog(commands.Cog): channels = [self.bot.get_channel(channel.channel_id) for channel in await sub.get_channels(api)] filters = [await api.get_filter(filter_id) for filter_id in sub.filters] + log.debug(json.dumps(filters, indent=4)) + for item in feed.items: log.debug("processing item '%s'", item.guid) @@ -129,7 +133,9 @@ class TaskCog(commands.Cog): log.debug("item '%s' older than subscription threshold '%s', skipping", item.pub_date, sub.published_threshold) continue + log.debug("before") blocked = any(self.filter_item(_filter, item) for _filter in filters) + log.debug("after") mutated_item = item.create_mutated_copy(sub.mutators) for channel in channels: @@ -140,29 +146,19 @@ class TaskCog(commands.Cog): def filter_item(self, _filter: dict, item: RSSItem) -> bool: """ - Returns True if item should be ignored due to filters. + Returns `True` if item should be ignored due to filters. """ - match_found = False # This is the flag to determine if the content should be filtered + log.debug("checking filter") - keywords = _filter["keywords"].split(",") - regex_pattern = _filter["regex"] - is_whitelist = _filter["whitelist"] - - log.debug( - "trying filter '%s', keyword '%s', regex '%s', is whitelist: '%s'", - _filter["name"], keywords, regex_pattern, is_whitelist - ) - - assert not (keywords and regex_pattern), "Keywords and Regex used, only 1 can be used." - - if regex_pattern: - regex = re.compile(regex_pattern) - match_found = regex.search(item.title) or regex.search(item.description) - else: - match_found = any(word in item.title or word in item.description for word in keywords) - - return not match_found if is_whitelist else match_found + try: + match_found = match_text(_fliter, item.title) or match_text(_filter, item.description) + log.debug("filter match found? '%s'", match_found) + return match_found + except Exception as error: + log.error(error) + input("[paused] >") + return False async def mark_tracked_item(self, api: API, sub: Subscription, item: RSSItem, channel_id: int, blocked: bool): try: diff --git a/src/filters.py b/src/filters.py new file mode 100644 index 0000000..d42cc07 --- /dev/null +++ b/src/filters.py @@ -0,0 +1,102 @@ +import re + +def _split_match(_match): + """ + Splits the match to individual keywords, getting rid of unnecessary + spaces and grouping quoted words together. + + Example: + ' some random words "with quotes " and spaces' + ==> + ["some", "random", "words", "with+quotes", "and", "spaces"] + """ + findterms = re.compile(r'"([^"]+)"|(\S+)').findall + normspace = re.compile(r"\s+").sub + return [ + # normspace(" ", (t[0] or t[1]).strip()).replace(" ", r"\s+") + re.escape(normspace(" ", (t[0] or t[1]).strip())).replace(r"\ ", r"\s+") + for t in findterms(_match) + ] + +def _match_any(_match: str, matching_to: str, **search_kwargs) -> bool: + for word in _split_match(_match): + if re.search(rf"\b{word}\b", matching_to, **search_kwargs): + return True + + return False + +def _match_all(_match: str, matching_to: str, **search_kwargs) -> bool: + for word in _split_match(_match): + if not re.search(rf"\b{word}\b", matching_to, **search_kwargs): + return False + + return True + +def _match_literal(_match: str, matching_to: str, **search_kwargs) -> bool: + return bool( + re.search( + rf"\b{re.escape(_match)}\b", + matching_to, + **search_kwargs, + ), + ) + +def _match_regex(_match: str, matching_to: str, **search_kwargs) -> bool: + try: + return bool(re.search( + re.compile(_match, **search_kwargs), + matching_to, + )) + except re.error as err: + log.error(err) + return False + +def _match_fuzzy(_match: str, matching_to: str, **search_kwargs) -> bool: + from rapidfuzz import fuzz + + _match = re.sub(r"[^\w\s]", "", _match) + text = re.sub(r"[^\w\s]", "", matching_to) + if is_insensitive: + _match = _match.lower() + text = text.lower() + + return fuzz.partial_ratio(_match, text, score_cutoff=90) + +def match_text(_filter: dict, matching_to: str) -> bool: + search_kwargs = {} + + algorithm = _filter["matching_algorithm"] + _match = _filter["match"] + is_whitelist = _filter["is_whitelist"] + is_insensitive = _filter["is_insensitive"] + + # If the matching pattern is empty + if not _match.strip(): + return False + + if is_insensitive: + search_kwargs = {"flags": re.IGNORECASE} + + log.debug( + "matching algorithm '%s', whitelist? '%s', insensitive? '%s'", + algorithm, is_whitelist, is_insensitive + ) + + match algorithm: + case 0: + return False + + case 1: # Any + return _match_any(_match, matching_to, **search_kwargs) + + case 2: # All + return _match_all(_match, matching_to, **search_kwargs) + + case 3: # Exact Match + return _match_literal(_match, matching_to, **search_kwargs) + + case 4: # Regular Expression + return _match_regex(_match, matching_to, **search_kwargs) + + case 5: # Fuzzy Match + return _match_fuzzy(_match, matching_to, **search_kwargs)