temp trim commands
Some checks are pending
Build and Push Docker Image / build (push) Waiting to run

This commit is contained in:
Corban-Lee Jones 2024-10-31 13:10:46 +00:00
parent eb97dca5c6
commit cf8fb34a29

View File

@ -7,82 +7,82 @@ import logging
from typing import Tuple
from datetime import datetime
import aiohttp
import validators
# import aiohttp
# import validators
from feedparser import FeedParserDict, parse
from discord.ext import commands
from discord import Interaction, TextChannel, Embed, Colour
from discord.app_commands import Choice, choices, Group, autocomplete, rename, command
from discord.errors import Forbidden
from api import API
from feed import Subscription, TrackedContent, ContentFilter
from utils import (
Followup,
PaginationView,
get_rss_data,
)
# from api import API
# from feed import Subscription, TrackedContent, ContentFilter
# from utils import (
# Followup,
# PaginationView,
# get_rss_data,
# )
log = logging.getLogger(__name__)
rss_list_sort_choices = [
Choice(name="Nickname", value=0),
Choice(name="Date Added", value=1)
]
channels_list_sort_choices=[
Choice(name="Feed Nickname", value=0),
Choice(name="Channel ID", value=1),
Choice(name="Date Added", value=2)
]
# rss_list_sort_choices = [
# Choice(name="Nickname", value=0),
# Choice(name="Date Added", value=1)
# ]
# channels_list_sort_choices=[
# Choice(name="Feed Nickname", value=0),
# Choice(name="Channel ID", value=1),
# Choice(name="Date Added", value=2)
# ]
# TODO SECURITY: a potential attack is that the user submits an rss feed then changes the
# target resource. Run a period task to check this.
async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]:
"""Validate a provided RSS source.
# # TODO SECURITY: a potential attack is that the user submits an rss feed then changes the
# # target resource. Run a period task to check this.
# async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]:
# """Validate a provided RSS source.
Parameters
----------
nickname : str
Nickname of the source. Must not contain URL.
url : str
URL of the source. Must be URL with valid status code and be an RSS feed.
# Parameters
# ----------
# nickname : str
# Nickname of the source. Must not contain URL.
# url : str
# URL of the source. Must be URL with valid status code and be an RSS feed.
Returns
-------
str or None
String invalid message if invalid, NoneType if valid.
FeedParserDict or None
The feed parsed from the given URL or None if invalid.
"""
# Returns
# -------
# str or None
# String invalid message if invalid, NoneType if valid.
# FeedParserDict or None
# The feed parsed from the given URL or None if invalid.
# """
# Ensure the URL is valid
if not validators.url(url):
return f"The URL you have entered is malformed or invalid:\n`{url=}`", None
# # Ensure the URL is valid
# if not validators.url(url):
# return f"The URL you have entered is malformed or invalid:\n`{url=}`", None
# Check the nickname is not a URL
if validators.url(nickname):
return "It looks like the nickname you have entered is a URL.\n" \
f"For security reasons, this is not allowed.\n`{nickname=}`", None
# # Check the nickname is not a URL
# if validators.url(nickname):
# return "It looks like the nickname you have entered is a URL.\n" \
# f"For security reasons, this is not allowed.\n`{nickname=}`", None
feed_data, status_code = await get_rss_data(url)
# feed_data, status_code = await get_rss_data(url)
# Check the URL status code is valid
if status_code != 200:
return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None
# # Check the URL status code is valid
# if status_code != 200:
# return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None
# Check the contents is actually an RSS feed.
feed = parse(feed_data)
if not feed.version:
return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None
# # Check the contents is actually an RSS feed.
# feed = parse(feed_data)
# if not feed.version:
# return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None
return None, feed
# return None, feed
tri_choices = [
Choice(name="Yes", value=2),
Choice(name="No (default)", value=1),
Choice(name="All", value=0),
]
# tri_choices = [
# Choice(name="Yes", value=2),
# Choice(name="No (default)", value=1),
# Choice(name="All", value=0),
# ]
class CommandsCog(commands.Cog):
@ -100,206 +100,206 @@ class CommandsCog(commands.Cog):
log.info("%s cog is ready", self.__class__.__name__)
# Group for commands about viewing data
view_group = Group(
name="view",
description="View data.",
guild_only=True
)
# # Group for commands about viewing data
# view_group = Group(
# name="view",
# description="View data.",
# guild_only=True
# )
@view_group.command(name="subscriptions")
async def cmd_list_subs(self, inter: Interaction, search: str = ""):
"""List Subscriptions from this server."""
# @view_group.command(name="subscriptions")
# async def cmd_list_subs(self, inter: Interaction, search: str = ""):
# """List Subscriptions from this server."""
await inter.response.defer()
# await inter.response.defer()
def formatdata(index, item):
item = Subscription.from_dict(item)
# def formatdata(index, item):
# item = Subscription.from_dict(item)
notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes
links = f"[RSS Link]({item.url}) • [API Link]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)"
activeness = "✅ `enabled`" if item.active else "🚫 `disabled`"
# notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes
# links = f"[RSS Link]({item.url}) • [API Link]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)"
# activeness = "✅ `enabled`" if item.active else "🚫 `disabled`"
description = f"🆔 `{item.id}`\n{activeness}\n#️⃣ `{item.channels_count}` 🔽 `{len(item.filters)}`\n"
description = f"{notes}\n" + description if notes else description
description += links
# description = f"🆔 `{item.id}`\n{activeness}\n#️⃣ `{item.channels_count}` 🔽 `{len(item.filters)}`\n"
# description = f"{notes}\n" + description if notes else description
# description += links
key = f"{index}. {item.name}"
return key, description # key, value pair
# key = f"{index}. {item.name}"
# return key, description # key, value pair
async def getdata(page: int, pagesize: int):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
return await api.get_subscriptions(
guild_id=inter.guild.id,
page=page,
page_size=pagesize,
search=search
)
# async def getdata(page: int, pagesize: int):
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# return await api.get_subscriptions(
# guild_id=inter.guild.id,
# page=page,
# page_size=pagesize,
# search=search
# )
embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed
pagination = PaginationView(
self.bot,
inter=inter,
embed=embed,
getdata=getdata,
formatdata=formatdata,
pagesize=10,
initpage=1
)
await pagination.send()
# embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed
# pagination = PaginationView(
# self.bot,
# inter=inter,
# embed=embed,
# getdata=getdata,
# formatdata=formatdata,
# pagesize=10,
# initpage=1
# )
# await pagination.send()
@view_group.command(name="tracked-content")
@choices(blocked=tri_choices)
async def cmd_list_tracked(self, inter: Interaction, search: str = "", blocked: Choice[int] = 1):
"""List Tracked Content from this server""" # TODO: , or a given sub
# @view_group.command(name="tracked-content")
# @choices(blocked=tri_choices)
# async def cmd_list_tracked(self, inter: Interaction, search: str = "", blocked: Choice[int] = 1):
# """List Tracked Content from this server""" # TODO: , or a given sub
await inter.response.defer()
# await inter.response.defer()
# If the user picks an option it's an instance of `Choice` otherwise `str`
# Can't figure a way to select a default choices, so blame discordpy for this mess.
if isinstance(blocked, Choice):
blocked = blocked.value
# # If the user picks an option it's an instance of `Choice` otherwise `str`
# # Can't figure a way to select a default choices, so blame discordpy for this mess.
# if isinstance(blocked, Choice):
# blocked = blocked.value
def formatdata(index, item):
item = TrackedContent.from_dict(item)
sub = Subscription.from_dict(item.subscription)
# def formatdata(index, item):
# item = TrackedContent.from_dict(item)
# sub = Subscription.from_dict(item.subscription)
links = f"[Content Link]({item.url}) · [Message Link](https://discord.com/channels/{sub.guild_id}/{item.channel_id}/{item.message_id}/) · [API Link]({API.API_EXTERNAL_ENDPOINT}tracked-content/{item.id}/)"
delivery_state = "✅ Delivered" if not item.blocked else "🚫 Blocked"
# links = f"[Content Link]({item.url}) · [Message Link](https://discord.com/channels/{sub.guild_id}/{item.channel_id}/{item.message_id}/) · [API Link]({API.API_EXTERNAL_ENDPOINT}tracked-content/{item.id}/)"
# delivery_state = "✅ Delivered" if not item.blocked else "🚫 Blocked"
description = f"🆔 `{item.id}`\n"
description += f"{delivery_state}\n" if blocked == 0 else ""
description += f"➡️ *{sub.name}*\n{links}"
# description = f"🆔 `{item.id}`\n"
# description += f"{delivery_state}\n" if blocked == 0 else ""
# description += f"➡️ *{sub.name}*\n{links}"
key = f"{index}. {item.title}"
return key, description
# key = f"{index}. {item.title}"
# return key, description
def determine_blocked():
match blocked:
case 0: return ""
case 1: return "false"
case 2: return "true"
case _: return ""
# def determine_blocked():
# match blocked:
# case 0: return ""
# case 1: return "false"
# case 2: return "true"
# case _: return ""
async def getdata(page: int, pagesize: int):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
is_blocked = determine_blocked()
return await api.get_tracked_content(
subscription__guild_id=inter.guild_id,
blocked=is_blocked,
page=page,
page_size=pagesize,
search=search,
)
# async def getdata(page: int, pagesize: int):
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# is_blocked = determine_blocked()
# return await api.get_tracked_content(
# subscription__guild_id=inter.guild_id,
# blocked=is_blocked,
# page=page,
# page_size=pagesize,
# search=search,
# )
embed = Followup(f"Tracked Content in {inter.guild.name}").info()._embed
pagination = PaginationView(
self.bot,
inter=inter,
embed=embed,
getdata=getdata,
formatdata=formatdata,
pagesize=10,
initpage=1
)
await pagination.send()
# embed = Followup(f"Tracked Content in {inter.guild.name}").info()._embed
# pagination = PaginationView(
# self.bot,
# inter=inter,
# embed=embed,
# getdata=getdata,
# formatdata=formatdata,
# pagesize=10,
# initpage=1
# )
# await pagination.send()
@view_group.command(name="filters")
async def cmd_list_filters(self, inter: Interaction, search: str = ""):
"""List Filters from this server."""
# @view_group.command(name="filters")
# async def cmd_list_filters(self, inter: Interaction, search: str = ""):
# """List Filters from this server."""
await inter.response.defer()
# await inter.response.defer()
def formatdata(index, item):
item = ContentFilter.from_dict(item)
# def formatdata(index, item):
# item = ContentFilter.from_dict(item)
matching_algorithm = get_algorithm_name(item.matching_algorithm)
whitelist = "Whitelist" if item.is_whitelist else "Blacklist"
sensitivity = "Case insensitive" if item.is_insensitive else "Case sensitive"
# matching_algorithm = get_algorithm_name(item.matching_algorithm)
# whitelist = "Whitelist" if item.is_whitelist else "Blacklist"
# sensitivity = "Case insensitive" if item.is_insensitive else "Case sensitive"
description = f"🆔 `{item.id}`\n"
description += f"🔄 `{matching_algorithm}`\n🟰 `{item.match}`\n"
description += f"✅ `{whitelist}` 🔠 `{sensitivity}`\n"
description += f"[API Link]({API.API_EXTERNAL_ENDPOINT}filter/{item.id}/)"
# description = f"🆔 `{item.id}`\n"
# description += f"🔄 `{matching_algorithm}`\n🟰 `{item.match}`\n"
# description += f"✅ `{whitelist}` 🔠 `{sensitivity}`\n"
# description += f"[API Link]({API.API_EXTERNAL_ENDPOINT}filter/{item.id}/)"
key = f"{index}. {item.name}"
return key, description
# key = f"{index}. {item.name}"
# return key, description
def get_algorithm_name(matching_algorithm: int):
match matching_algorithm:
case 0: return "None"
case 1: return "Any word"
case 2: return "All words"
case 3: return "Exact match"
case 4: return "Regex match"
case 5: return "Fuzzy match"
case _: return "unknown"
# def get_algorithm_name(matching_algorithm: int):
# match matching_algorithm:
# case 0: return "None"
# case 1: return "Any word"
# case 2: return "All words"
# case 3: return "Exact match"
# case 4: return "Regex match"
# case 5: return "Fuzzy match"
# case _: return "unknown"
async def getdata(page, pagesize):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
return await api.get_filters(
guild_id=inter.guild_id,
page=page,
page_size=pagesize,
search=search
)
# async def getdata(page, pagesize):
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# return await api.get_filters(
# guild_id=inter.guild_id,
# page=page,
# page_size=pagesize,
# search=search
# )
embed = Followup(f"Filters in {inter.guild.name}").info()._embed
pagination = PaginationView(
self.bot,
inter=inter,
embed=embed,
getdata=getdata,
formatdata=formatdata,
pagesize=10,
initpage=1
)
await pagination.send()
# embed = Followup(f"Filters in {inter.guild.name}").info()._embed
# pagination = PaginationView(
# self.bot,
# inter=inter,
# embed=embed,
# getdata=getdata,
# formatdata=formatdata,
# pagesize=10,
# initpage=1
# )
# await pagination.send()
# Group for test related commands
test_group = Group(
name="test",
description="Commands to test Bot functionality.",
guild_only=True
)
# # Group for test related commands
# test_group = Group(
# name="test",
# description="Commands to test Bot functionality.",
# guild_only=True
# )
@test_group.command(name="channel-permissions")
async def cmd_test_channel_perms(self, inter: Interaction):
"""Test that the current channel's permissions allow for PYRSS to operate in it."""
# @test_group.command(name="channel-permissions")
# async def cmd_test_channel_perms(self, inter: Interaction):
# """Test that the current channel's permissions allow for PYRSS to operate in it."""
try:
test_message = await inter.channel.send(content="... testing permissions ...")
await self.test_channel_perms(inter.channel)
except Exception as error:
await inter.response.send_message(content=f"Failed: {error}")
return
# try:
# test_message = await inter.channel.send(content="... testing permissions ...")
# await self.test_channel_perms(inter.channel)
# except Exception as error:
# await inter.response.send_message(content=f"Failed: {error}")
# return
await test_message.delete()
await inter.response.send_message(content="Success")
# await test_message.delete()
# await inter.response.send_message(content="Success")
async def test_channel_perms(self, channel: TextChannel):
# async def test_channel_perms(self, channel: TextChannel):
# Test generic message and delete
msg = await channel.send(content="test message")
await msg.delete()
# # Test generic message and delete
# msg = await channel.send(content="test message")
# await msg.delete()
# Test detailed embed
embed = Embed(
title="test title",
description="test description",
colour=Colour.random(),
timestamp=datetime.now(),
url="https://google.com"
)
embed.set_author(name="test author")
embed.set_footer(text="test footer")
embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
embed_msg = await channel.send(embed=embed)
await embed_msg.delete()
# # Test detailed embed
# embed = Embed(
# title="test title",
# description="test description",
# colour=Colour.random(),
# timestamp=datetime.now(),
# url="https://google.com"
# )
# embed.set_author(name="test author")
# embed.set_footer(text="test footer")
# embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
# embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
# embed_msg = await channel.send(embed=embed)
# await embed_msg.delete()
async def setup(bot):