From 6a399630a4afb7b48738c3c408a28a9b217e5657 Mon Sep 17 00:00:00 2001 From: Corban-Lee Jones Date: Sun, 18 Aug 2024 23:06:44 +0100 Subject: [PATCH] command for testing channel permissions & remove unused code --- src/extensions/rss.py | 272 ++++++------------------------------------ 1 file changed, 38 insertions(+), 234 deletions(-) diff --git a/src/extensions/rss.py b/src/extensions/rss.py index 73951b6..e100ee7 100644 --- a/src/extensions/rss.py +++ b/src/extensions/rss.py @@ -5,13 +5,15 @@ Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bo import logging from typing import Tuple +from datetime import datetime import aiohttp import validators from feedparser import FeedParserDict, parse from discord.ext import commands -from discord import Interaction, TextChannel +from discord import Interaction, TextChannel, Embed, Colour from discord.app_commands import Choice, Group, autocomplete, rename, command +from discord.errors import Forbidden from api import API from feed import Subscription, SubscriptionChannel, TrackedContent @@ -92,239 +94,6 @@ class FeedCog(commands.Cog): log.info("%s cog is ready", self.__class__.__name__) - # async def autocomplete_subscriptions(self, inter: Interaction, name: str) -> list[Choice]: - # """""" - - # log.debug("autocompleting subscriptions '%s'", name) - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # results, _ = await api.get_subscriptions(server=inter.guild_id, search=name) - - # except Exception as exc: - # log.error(exc) - # return [] - - # subscriptions = Subscription.from_list(results) - - # return [ - # Choice(name=sub.name, value=sub.uuid) - # for sub in subscriptions - # ] - - # async def autocomplete_subscription_channels(self, inter: Interaction, uuid: str): - # """""" - - # log.debug("autocompleting subscription channels") - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # results, _ = await api.get_subscription_channels() - - # except Exception as exc: - # log.error(exc) - # return [] - - # subscription_channels = SubscriptionChannel.from_list(results) - - # async def name(link): - # result = self.bot.get_channel(link.id) or await self.bot.fetch_channel(link.id) - # return f"{link.subscription.name} -> #{result.name}" - - # return [ - # Choice(name=await name(link), value=link.uuid) - # for link in subscription_channels - # ] - - # subscription_group = Group( - # name="subscriptions", - # description="subscription commands", - # guild_only=True - # ) - - # @subscription_group.command(name="link") - # @autocomplete(sub_uuid=autocomplete_subscriptions) - # @rename(sub_uuid="subscription") - # async def link_subscription_channel(self, inter: Interaction, sub_uuid: str, channel: TextChannel): - # """ - # Link Subscription to discord.TextChannel. - # """ - - # await inter.response.defer() - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # data = await api.create_subscription_channel(str(channel.id), sub_uuid) - - # except aiohttp.ClientResponseError as exc: - # return await ( - # Followup( - # f"Error · {exc.message}", - # "Ensure you haven't: \n" - # "- Already linked this subscription to this channel\n" - # "- Already linked this subscription to the maximum of 4 channels" - # ) - # .footer(f"HTTP {exc.code}") - # .error() - # .send(inter) - # ) - - # subscription = Subscription.from_dict(data.pop("subscription")) - # data["subscription"] = ( - # f"{subscription.name}\n" - # f"[RSS]({subscription.rss_url}) · " - # f"[API Subscription]({API.SUBSCRIPTION_ENDPOINT}{subscription.uuid}) · " - # f"[API Link]({API.CHANNEL_ENDPOINT}{data['uuid']})" - # ) - - # channel_id = int(data.pop("id")) - # channel = self.bot.get_channel(channel_id) or await self.bot.fetch_channel(channel_id) - # data["channel"] = channel.mention - - # data.pop("creation_datetime") - # data.pop("uuid") - - # await ( - # Followup("Linked!") - # .fields(**data) - # .added() - # .send(inter) - # ) - - # @subscription_group.command(name="unlink") - # @autocomplete(uuid=autocomplete_subscription_channels) - # @rename(uuid="link") - # async def unlink_subscription_channel(self, inter: Interaction, uuid: str): - # """ - # Unlink subscription from discord.TextChannel. - # """ - - # await inter.response.defer() - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # # data = await api.get_subscription(uuid=uuid) - # await api.delete_subscription_channel(uuid=uuid) - # # sub_channel = await SubscriptionChannel.from_dict(data) - - # except Exception as exc: - # return await ( - # Followup(exc.__class__.__name__, str(exc)) - # .error() - # .send(inter) - # ) - - # await ( - # Followup("Subscription unlinked!", uuid) - # .added() - # .send(inter) - # ) - - # @subscription_group.command(name="list-links") - # async def list_subscription(self, inter: Interaction): - # """List Subscriptions Channels in this server.""" - - # await inter.response.defer() - - # async def formatdata(index: int, item: dict) -> tuple[str, str]: - # item = SubscriptionChannel.from_dict(item) - # next_emoji = self.bot.get_emoji(1204542366602502265) - # key = f"{index}. {item.subscription.name} {next_emoji} {item.mention}" - # return key, item.hyperlinks_string - - # async def getdata(page: int, pagesize: int) -> dict: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # return await api.get_subscription_channels( - # subscription__server=inter.guild.id, page=page, page_size=pagesize - # ) - - # embed = Followup(f"Links in {inter.guild.name}").info()._embed - # pagination = PaginationView( - # self.bot, - # inter=inter, - # embed=embed, - # getdata=getdata, - # formatdata=formatdata, - # pagesize=10, - # initpage=1 - # ) - # await pagination.send() - - # @subscription_group.command(name="add") - # async def new_subscription(self, inter: Interaction, name: str, rss_url: str): - # """Subscribe this server to a new RSS Feed.""" - - # await inter.response.defer() - - # try: - # parsed_rssfeed = await self.bot.functions.validate_feed(name, rss_url) - # image_url = parsed_rssfeed.get("feed", {}).get("image", {}).get("href") - - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # data = await api.create_subscription(name, rss_url, image_url, str(inter.guild_id), [-1]) - - # except aiohttp.ClientResponseError as exc: - # return await ( - # Followup( - # f"Error · {exc.message}", - # "Ensure you haven't: \n" - # "- Reused an identical name of an existing Subscription\n" - # "- Already created the maximum of 25 Subscriptions" - # ) - # .footer(f"HTTP {exc.code}") - # .error() - # .send(inter) - # ) - - # # Omit data we dont want the user to see - # data.pop("uuid") - # data.pop("image") - # data.pop("server") - # data.pop("creation_datetime") - - # # Update keys to be more human readable - # data["url"] = data.pop("rss_url") - - # await ( - # Followup("Subscription Added!") - # .fields(**data) - # .image(image_url) - # .added() - # .send(inter) - # ) - - # @subscription_group.command(name="remove") - # @autocomplete(uuid=autocomplete_subscriptions) - # @rename(uuid="choice") - # async def remove_subscriptions(self, inter: Interaction, uuid: str): - # """Unsubscribe this server from an existing RSS Feed.""" - - # await inter.response.defer() - - # try: - # async with aiohttp.ClientSession() as session: - # api = API(self.bot.api_token, session) - # await api.delete_subscription(uuid) - - # except Exception as exc: - # return await ( - # Followup(exc.__class__.__name__, str(exc)) - # .error() - # .send(inter) - # ) - - # await ( - # Followup("Subscription Removed!", uuid) - # .trash() - # .send(inter) - # ) - @command(name="subscriptions") async def list_subscription(self, inter: Interaction): """List Subscriptions from this server.""" @@ -366,6 +135,41 @@ class FeedCog(commands.Cog): await pagination.send() # await Followup("results", str(await getdata(1, 10))).send(inter) + @command(name="test-channel-permissions") + async def cmd_test_channel_perms(self, inter: Interaction): + """Test that the current channel's permissions allow for PYRSS to operate in it.""" + + try: + test_message = await inter.channel.send(content="... testing permissions ...") + await self.test_channel_perms(inter.channel) + except Exception as error: + await inter.response.send_message(content=f"Failed: {error}") + return + + await test_message.delete() + await inter.response.send_message(content="Success") + + async def test_channel_perms(self, channel: TextChannel): + + # Test generic message and delete + msg = await channel.send(content="test message") + await msg.delete() + + # Test detailed embed + embed = Embed( + title="test title", + description="test description", + colour=Colour.random(), + timestamp=datetime.now(), + url="https://google.com" + ) + embed.set_author(name="test author") + embed.set_footer(text="test footer") + embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") + embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png") + embed_msg = await channel.send(embed=embed) + await embed_msg.delete() + async def setup(bot): """