improved filters

This commit is contained in:
Corban-Lee Jones 2024-07-10 18:12:39 +01:00
parent ab788ed379
commit e000f05df2
2 changed files with 118 additions and 20 deletions

View File

@ -4,6 +4,7 @@ Loading this file via `commands.Bot.load_extension` will add `TaskCog` to the bo
"""
import re
import json
import logging
import datetime
from os import getenv
@ -18,6 +19,7 @@ from feedparser import parse
from feed import RSSFeed, Subscription, RSSItem
from utils import get_unparsed_feed
from filters import match_text
from api import API
log = logging.getLogger(__name__)
@ -122,6 +124,8 @@ class TaskCog(commands.Cog):
channels = [self.bot.get_channel(channel.channel_id) for channel in await sub.get_channels(api)]
filters = [await api.get_filter(filter_id) for filter_id in sub.filters]
log.debug(json.dumps(filters, indent=4))
for item in feed.items:
log.debug("processing item '%s'", item.guid)
@ -129,7 +133,9 @@ class TaskCog(commands.Cog):
log.debug("item '%s' older than subscription threshold '%s', skipping", item.pub_date, sub.published_threshold)
continue
log.debug("before")
blocked = any(self.filter_item(_filter, item) for _filter in filters)
log.debug("after")
mutated_item = item.create_mutated_copy(sub.mutators)
for channel in channels:
@ -140,29 +146,19 @@ class TaskCog(commands.Cog):
def filter_item(self, _filter: dict, item: RSSItem) -> bool:
"""
Returns True if item should be ignored due to filters.
Returns `True` if item should be ignored due to filters.
"""
match_found = False # This is the flag to determine if the content should be filtered
log.debug("checking filter")
keywords = _filter["keywords"].split(",")
regex_pattern = _filter["regex"]
is_whitelist = _filter["whitelist"]
log.debug(
"trying filter '%s', keyword '%s', regex '%s', is whitelist: '%s'",
_filter["name"], keywords, regex_pattern, is_whitelist
)
assert not (keywords and regex_pattern), "Keywords and Regex used, only 1 can be used."
if regex_pattern:
regex = re.compile(regex_pattern)
match_found = regex.search(item.title) or regex.search(item.description)
else:
match_found = any(word in item.title or word in item.description for word in keywords)
return not match_found if is_whitelist else match_found
try:
match_found = match_text(_fliter, item.title) or match_text(_filter, item.description)
log.debug("filter match found? '%s'", match_found)
return match_found
except Exception as error:
log.error(error)
input("[paused] >")
return False
async def mark_tracked_item(self, api: API, sub: Subscription, item: RSSItem, channel_id: int, blocked: bool):
try:

102
src/filters.py Normal file
View File

@ -0,0 +1,102 @@
import re
def _split_match(_match):
"""
Splits the match to individual keywords, getting rid of unnecessary
spaces and grouping quoted words together.
Example:
' some random words "with quotes " and spaces'
==>
["some", "random", "words", "with+quotes", "and", "spaces"]
"""
findterms = re.compile(r'"([^"]+)"|(\S+)').findall
normspace = re.compile(r"\s+").sub
return [
# normspace(" ", (t[0] or t[1]).strip()).replace(" ", r"\s+")
re.escape(normspace(" ", (t[0] or t[1]).strip())).replace(r"\ ", r"\s+")
for t in findterms(_match)
]
def _match_any(_match: str, matching_to: str, **search_kwargs) -> bool:
for word in _split_match(_match):
if re.search(rf"\b{word}\b", matching_to, **search_kwargs):
return True
return False
def _match_all(_match: str, matching_to: str, **search_kwargs) -> bool:
for word in _split_match(_match):
if not re.search(rf"\b{word}\b", matching_to, **search_kwargs):
return False
return True
def _match_literal(_match: str, matching_to: str, **search_kwargs) -> bool:
return bool(
re.search(
rf"\b{re.escape(_match)}\b",
matching_to,
**search_kwargs,
),
)
def _match_regex(_match: str, matching_to: str, **search_kwargs) -> bool:
try:
return bool(re.search(
re.compile(_match, **search_kwargs),
matching_to,
))
except re.error as err:
log.error(err)
return False
def _match_fuzzy(_match: str, matching_to: str, **search_kwargs) -> bool:
from rapidfuzz import fuzz
_match = re.sub(r"[^\w\s]", "", _match)
text = re.sub(r"[^\w\s]", "", matching_to)
if is_insensitive:
_match = _match.lower()
text = text.lower()
return fuzz.partial_ratio(_match, text, score_cutoff=90)
def match_text(_filter: dict, matching_to: str) -> bool:
search_kwargs = {}
algorithm = _filter["matching_algorithm"]
_match = _filter["match"]
is_whitelist = _filter["is_whitelist"]
is_insensitive = _filter["is_insensitive"]
# If the matching pattern is empty
if not _match.strip():
return False
if is_insensitive:
search_kwargs = {"flags": re.IGNORECASE}
log.debug(
"matching algorithm '%s', whitelist? '%s', insensitive? '%s'",
algorithm, is_whitelist, is_insensitive
)
match algorithm:
case 0:
return False
case 1: # Any
return _match_any(_match, matching_to, **search_kwargs)
case 2: # All
return _match_all(_match, matching_to, **search_kwargs)
case 3: # Exact Match
return _match_literal(_match, matching_to, **search_kwargs)
case 4: # Regular Expression
return _match_regex(_match, matching_to, **search_kwargs)
case 5: # Fuzzy Match
return _match_fuzzy(_match, matching_to, **search_kwargs)