diff --git a/src/extensions/cmds.py b/src/extensions/cmds.py index 46138e4..308b534 100644 --- a/src/extensions/cmds.py +++ b/src/extensions/cmds.py @@ -7,82 +7,82 @@ import logging from typing import Tuple from datetime import datetime -import aiohttp -import validators +# import aiohttp +# import validators from feedparser import FeedParserDict, parse from discord.ext import commands from discord import Interaction, TextChannel, Embed, Colour from discord.app_commands import Choice, choices, Group, autocomplete, rename, command from discord.errors import Forbidden -from api import API -from feed import Subscription, TrackedContent, ContentFilter -from utils import ( - Followup, - PaginationView, - get_rss_data, -) +# from api import API +# from feed import Subscription, TrackedContent, ContentFilter +# from utils import ( +# Followup, +# PaginationView, +# get_rss_data, +# ) log = logging.getLogger(__name__) -rss_list_sort_choices = [ - Choice(name="Nickname", value=0), - Choice(name="Date Added", value=1) -] -channels_list_sort_choices=[ - Choice(name="Feed Nickname", value=0), - Choice(name="Channel ID", value=1), - Choice(name="Date Added", value=2) -] +# rss_list_sort_choices = [ +# Choice(name="Nickname", value=0), +# Choice(name="Date Added", value=1) +# ] +# channels_list_sort_choices=[ +# Choice(name="Feed Nickname", value=0), +# Choice(name="Channel ID", value=1), +# Choice(name="Date Added", value=2) +# ] -# TODO SECURITY: a potential attack is that the user submits an rss feed then changes the -# target resource. Run a period task to check this. -async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]: - """Validate a provided RSS source. +# # TODO SECURITY: a potential attack is that the user submits an rss feed then changes the +# # target resource. Run a period task to check this. +# async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]: +# """Validate a provided RSS source. - Parameters - ---------- - nickname : str - Nickname of the source. Must not contain URL. - url : str - URL of the source. Must be URL with valid status code and be an RSS feed. +# Parameters +# ---------- +# nickname : str +# Nickname of the source. Must not contain URL. +# url : str +# URL of the source. Must be URL with valid status code and be an RSS feed. - Returns - ------- - str or None - String invalid message if invalid, NoneType if valid. - FeedParserDict or None - The feed parsed from the given URL or None if invalid. - """ +# Returns +# ------- +# str or None +# String invalid message if invalid, NoneType if valid. +# FeedParserDict or None +# The feed parsed from the given URL or None if invalid. +# """ - # Ensure the URL is valid - if not validators.url(url): - return f"The URL you have entered is malformed or invalid:\n`{url=}`", None +# # Ensure the URL is valid +# if not validators.url(url): +# return f"The URL you have entered is malformed or invalid:\n`{url=}`", None - # Check the nickname is not a URL - if validators.url(nickname): - return "It looks like the nickname you have entered is a URL.\n" \ - f"For security reasons, this is not allowed.\n`{nickname=}`", None +# # Check the nickname is not a URL +# if validators.url(nickname): +# return "It looks like the nickname you have entered is a URL.\n" \ +# f"For security reasons, this is not allowed.\n`{nickname=}`", None - feed_data, status_code = await get_rss_data(url) +# feed_data, status_code = await get_rss_data(url) - # Check the URL status code is valid - if status_code != 200: - return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None +# # Check the URL status code is valid +# if status_code != 200: +# return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None - # Check the contents is actually an RSS feed. - feed = parse(feed_data) - if not feed.version: - return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None +# # Check the contents is actually an RSS feed. +# feed = parse(feed_data) +# if not feed.version: +# return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None - return None, feed +# return None, feed -tri_choices = [ - Choice(name="Yes", value=2), - Choice(name="No (default)", value=1), - Choice(name="All", value=0), -] +# tri_choices = [ +# Choice(name="Yes", value=2), +# Choice(name="No (default)", value=1), +# Choice(name="All", value=0), +# ] class CommandsCog(commands.Cog): @@ -100,206 +100,206 @@ class CommandsCog(commands.Cog): log.info("%s cog is ready", self.__class__.__name__) - # Group for commands about viewing data - view_group = Group( - name="view", - description="View data.", - guild_only=True - ) + # # Group for commands about viewing data + # view_group = Group( + # name="view", + # description="View data.", + # guild_only=True + # ) - @view_group.command(name="subscriptions") - async def cmd_list_subs(self, inter: Interaction, search: str = ""): - """List Subscriptions from this server.""" + # @view_group.command(name="subscriptions") + # async def cmd_list_subs(self, inter: Interaction, search: str = ""): + # """List Subscriptions from this server.""" - await inter.response.defer() + # await inter.response.defer() - def formatdata(index, item): - item = Subscription.from_dict(item) + # def formatdata(index, item): + # item = Subscription.from_dict(item) - notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes - links = f"[RSS Link]({item.url}) โ€ข [API Link]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)" - activeness = "โœ… `enabled`" if item.active else "๐Ÿšซ `disabled`" + # notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes + # links = f"[RSS Link]({item.url}) โ€ข [API Link]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)" + # activeness = "โœ… `enabled`" if item.active else "๐Ÿšซ `disabled`" - description = f"๐Ÿ†” `{item.id}`\n{activeness}\n#๏ธโƒฃ `{item.channels_count}` ๐Ÿ”ฝ `{len(item.filters)}`\n" - description = f"{notes}\n" + description if notes else description - description += links + # description = f"๐Ÿ†” `{item.id}`\n{activeness}\n#๏ธโƒฃ `{item.channels_count}` ๐Ÿ”ฝ `{len(item.filters)}`\n" + # description = f"{notes}\n" + description if notes else description + # description += links - key = f"{index}. {item.name}" - return key, description # key, value pair + # key = f"{index}. {item.name}" + # return key, description # key, value pair - async def getdata(page: int, pagesize: int): - async with aiohttp.ClientSession() as session: - api = API(self.bot.api_token, session) - return await api.get_subscriptions( - guild_id=inter.guild.id, - page=page, - page_size=pagesize, - search=search - ) + # async def getdata(page: int, pagesize: int): + # async with aiohttp.ClientSession() as session: + # api = API(self.bot.api_token, session) + # return await api.get_subscriptions( + # guild_id=inter.guild.id, + # page=page, + # page_size=pagesize, + # search=search + # ) - embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed - pagination = PaginationView( - self.bot, - inter=inter, - embed=embed, - getdata=getdata, - formatdata=formatdata, - pagesize=10, - initpage=1 - ) - await pagination.send() + # embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed + # pagination = PaginationView( + # self.bot, + # inter=inter, + # embed=embed, + # getdata=getdata, + # formatdata=formatdata, + # pagesize=10, + # initpage=1 + # ) + # await pagination.send() - @view_group.command(name="tracked-content") - @choices(blocked=tri_choices) - async def cmd_list_tracked(self, inter: Interaction, search: str = "", blocked: Choice[int] = 1): - """List Tracked Content from this server""" # TODO: , or a given sub + # @view_group.command(name="tracked-content") + # @choices(blocked=tri_choices) + # async def cmd_list_tracked(self, inter: Interaction, search: str = "", blocked: Choice[int] = 1): + # """List Tracked Content from this server""" # TODO: , or a given sub - await inter.response.defer() + # await inter.response.defer() - # If the user picks an option it's an instance of `Choice` otherwise `str` - # Can't figure a way to select a default choices, so blame discordpy for this mess. - if isinstance(blocked, Choice): - blocked = blocked.value + # # If the user picks an option it's an instance of `Choice` otherwise `str` + # # Can't figure a way to select a default choices, so blame discordpy for this mess. + # if isinstance(blocked, Choice): + # blocked = blocked.value - def formatdata(index, item): - item = TrackedContent.from_dict(item) - sub = Subscription.from_dict(item.subscription) + # def formatdata(index, item): + # item = TrackedContent.from_dict(item) + # sub = Subscription.from_dict(item.subscription) - links = f"[Content Link]({item.url}) ยท [Message Link](https://discord.com/channels/{sub.guild_id}/{item.channel_id}/{item.message_id}/) ยท [API Link]({API.API_EXTERNAL_ENDPOINT}tracked-content/{item.id}/)" - delivery_state = "โœ… Delivered" if not item.blocked else "๐Ÿšซ Blocked" + # links = f"[Content Link]({item.url}) ยท [Message Link](https://discord.com/channels/{sub.guild_id}/{item.channel_id}/{item.message_id}/) ยท [API Link]({API.API_EXTERNAL_ENDPOINT}tracked-content/{item.id}/)" + # delivery_state = "โœ… Delivered" if not item.blocked else "๐Ÿšซ Blocked" - description = f"๐Ÿ†” `{item.id}`\n" - description += f"{delivery_state}\n" if blocked == 0 else "" - description += f"โžก๏ธ *{sub.name}*\n{links}" + # description = f"๐Ÿ†” `{item.id}`\n" + # description += f"{delivery_state}\n" if blocked == 0 else "" + # description += f"โžก๏ธ *{sub.name}*\n{links}" - key = f"{index}. {item.title}" - return key, description + # key = f"{index}. {item.title}" + # return key, description - def determine_blocked(): - match blocked: - case 0: return "" - case 1: return "false" - case 2: return "true" - case _: return "" + # def determine_blocked(): + # match blocked: + # case 0: return "" + # case 1: return "false" + # case 2: return "true" + # case _: return "" - async def getdata(page: int, pagesize: int): - async with aiohttp.ClientSession() as session: - api = API(self.bot.api_token, session) - is_blocked = determine_blocked() - return await api.get_tracked_content( - subscription__guild_id=inter.guild_id, - blocked=is_blocked, - page=page, - page_size=pagesize, - search=search, - ) + # async def getdata(page: int, pagesize: int): + # async with aiohttp.ClientSession() as session: + # api = API(self.bot.api_token, session) + # is_blocked = determine_blocked() + # return await api.get_tracked_content( + # subscription__guild_id=inter.guild_id, + # blocked=is_blocked, + # page=page, + # page_size=pagesize, + # search=search, + # ) - embed = Followup(f"Tracked Content in {inter.guild.name}").info()._embed - pagination = PaginationView( - self.bot, - inter=inter, - embed=embed, - getdata=getdata, - formatdata=formatdata, - pagesize=10, - initpage=1 - ) - await pagination.send() + # embed = Followup(f"Tracked Content in {inter.guild.name}").info()._embed + # pagination = PaginationView( + # self.bot, + # inter=inter, + # embed=embed, + # getdata=getdata, + # formatdata=formatdata, + # pagesize=10, + # initpage=1 + # ) + # await pagination.send() - @view_group.command(name="filters") - async def cmd_list_filters(self, inter: Interaction, search: str = ""): - """List Filters from this server.""" + # @view_group.command(name="filters") + # async def cmd_list_filters(self, inter: Interaction, search: str = ""): + # """List Filters from this server.""" - await inter.response.defer() + # await inter.response.defer() - def formatdata(index, item): - item = ContentFilter.from_dict(item) + # def formatdata(index, item): + # item = ContentFilter.from_dict(item) - matching_algorithm = get_algorithm_name(item.matching_algorithm) - whitelist = "Whitelist" if item.is_whitelist else "Blacklist" - sensitivity = "Case insensitive" if item.is_insensitive else "Case sensitive" + # matching_algorithm = get_algorithm_name(item.matching_algorithm) + # whitelist = "Whitelist" if item.is_whitelist else "Blacklist" + # sensitivity = "Case insensitive" if item.is_insensitive else "Case sensitive" - description = f"๐Ÿ†” `{item.id}`\n" - description += f"๐Ÿ”„ `{matching_algorithm}`\n๐ŸŸฐ `{item.match}`\n" - description += f"โœ… `{whitelist}` ๐Ÿ”  `{sensitivity}`\n" - description += f"[API Link]({API.API_EXTERNAL_ENDPOINT}filter/{item.id}/)" + # description = f"๐Ÿ†” `{item.id}`\n" + # description += f"๐Ÿ”„ `{matching_algorithm}`\n๐ŸŸฐ `{item.match}`\n" + # description += f"โœ… `{whitelist}` ๐Ÿ”  `{sensitivity}`\n" + # description += f"[API Link]({API.API_EXTERNAL_ENDPOINT}filter/{item.id}/)" - key = f"{index}. {item.name}" - return key, description + # key = f"{index}. {item.name}" + # return key, description - def get_algorithm_name(matching_algorithm: int): - match matching_algorithm: - case 0: return "None" - case 1: return "Any word" - case 2: return "All words" - case 3: return "Exact match" - case 4: return "Regex match" - case 5: return "Fuzzy match" - case _: return "unknown" + # def get_algorithm_name(matching_algorithm: int): + # match matching_algorithm: + # case 0: return "None" + # case 1: return "Any word" + # case 2: return "All words" + # case 3: return "Exact match" + # case 4: return "Regex match" + # case 5: return "Fuzzy match" + # case _: return "unknown" - async def getdata(page, pagesize): - async with aiohttp.ClientSession() as session: - api = API(self.bot.api_token, session) - return await api.get_filters( - guild_id=inter.guild_id, - page=page, - page_size=pagesize, - search=search - ) + # async def getdata(page, pagesize): + # async with aiohttp.ClientSession() as session: + # api = API(self.bot.api_token, session) + # return await api.get_filters( + # guild_id=inter.guild_id, + # page=page, + # page_size=pagesize, + # search=search + # ) - embed = Followup(f"Filters in {inter.guild.name}").info()._embed - pagination = PaginationView( - self.bot, - inter=inter, - embed=embed, - getdata=getdata, - formatdata=formatdata, - pagesize=10, - initpage=1 - ) - await pagination.send() + # embed = Followup(f"Filters in {inter.guild.name}").info()._embed + # pagination = PaginationView( + # self.bot, + # inter=inter, + # embed=embed, + # getdata=getdata, + # formatdata=formatdata, + # pagesize=10, + # initpage=1 + # ) + # await pagination.send() - # Group for test related commands - test_group = Group( - name="test", - description="Commands to test Bot functionality.", - guild_only=True - ) + # # Group for test related commands + # test_group = Group( + # name="test", + # description="Commands to test Bot functionality.", + # guild_only=True + # ) - @test_group.command(name="channel-permissions") - async def cmd_test_channel_perms(self, inter: Interaction): - """Test that the current channel's permissions allow for PYRSS to operate in it.""" + # @test_group.command(name="channel-permissions") + # async def cmd_test_channel_perms(self, inter: Interaction): + # """Test that the current channel's permissions allow for PYRSS to operate in it.""" - try: - test_message = await inter.channel.send(content="... testing permissions ...") - await self.test_channel_perms(inter.channel) - except Exception as error: - await inter.response.send_message(content=f"Failed: {error}") - return + # try: + # test_message = await inter.channel.send(content="... testing permissions ...") + # await self.test_channel_perms(inter.channel) + # except Exception as error: + # await inter.response.send_message(content=f"Failed: {error}") + # return - await test_message.delete() - await inter.response.send_message(content="Success") + # await test_message.delete() + # await inter.response.send_message(content="Success") - async def test_channel_perms(self, channel: TextChannel): + # async def test_channel_perms(self, channel: TextChannel): - # Test generic message and delete - msg = await channel.send(content="test message") - await msg.delete() + # # Test generic message and delete + # msg = await channel.send(content="test message") + # await msg.delete() - # Test detailed embed - embed = Embed( - title="test title", - description="test description", - colour=Colour.random(), - timestamp=datetime.now(), - url="https://google.com" - ) - embed.set_author(name="test author") - embed.set_footer(text="test footer") - embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") - embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") - embed_msg = await channel.send(embed=embed) - await embed_msg.delete() + # # Test detailed embed + # embed = Embed( + # title="test title", + # description="test description", + # colour=Colour.random(), + # timestamp=datetime.now(), + # url="https://google.com" + # ) + # embed.set_author(name="test author") + # embed.set_footer(text="test footer") + # embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") + # embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") + # embed_msg = await channel.send(embed=embed) + # await embed_msg.delete() async def setup(bot):