command for testing channel permissions & remove unused code

This commit is contained in:
Corban-Lee Jones 2024-08-18 23:06:44 +01:00
parent 88d31d2427
commit 6a399630a4

View File

@ -5,13 +5,15 @@ Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bo
import logging
from typing import Tuple
from datetime import datetime
import aiohttp
import validators
from feedparser import FeedParserDict, parse
from discord.ext import commands
from discord import Interaction, TextChannel
from discord import Interaction, TextChannel, Embed, Colour
from discord.app_commands import Choice, Group, autocomplete, rename, command
from discord.errors import Forbidden
from api import API
from feed import Subscription, SubscriptionChannel, TrackedContent
@ -92,239 +94,6 @@ class FeedCog(commands.Cog):
log.info("%s cog is ready", self.__class__.__name__)
# async def autocomplete_subscriptions(self, inter: Interaction, name: str) -> list[Choice]:
# """"""
# log.debug("autocompleting subscriptions '%s'", name)
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# results, _ = await api.get_subscriptions(server=inter.guild_id, search=name)
# except Exception as exc:
# log.error(exc)
# return []
# subscriptions = Subscription.from_list(results)
# return [
# Choice(name=sub.name, value=sub.uuid)
# for sub in subscriptions
# ]
# async def autocomplete_subscription_channels(self, inter: Interaction, uuid: str):
# """"""
# log.debug("autocompleting subscription channels")
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# results, _ = await api.get_subscription_channels()
# except Exception as exc:
# log.error(exc)
# return []
# subscription_channels = SubscriptionChannel.from_list(results)
# async def name(link):
# result = self.bot.get_channel(link.id) or await self.bot.fetch_channel(link.id)
# return f"{link.subscription.name} -> #{result.name}"
# return [
# Choice(name=await name(link), value=link.uuid)
# for link in subscription_channels
# ]
# subscription_group = Group(
# name="subscriptions",
# description="subscription commands",
# guild_only=True
# )
# @subscription_group.command(name="link")
# @autocomplete(sub_uuid=autocomplete_subscriptions)
# @rename(sub_uuid="subscription")
# async def link_subscription_channel(self, inter: Interaction, sub_uuid: str, channel: TextChannel):
# """
# Link Subscription to discord.TextChannel.
# """
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# data = await api.create_subscription_channel(str(channel.id), sub_uuid)
# except aiohttp.ClientResponseError as exc:
# return await (
# Followup(
# f"Error · {exc.message}",
# "Ensure you haven't: \n"
# "- Already linked this subscription to this channel\n"
# "- Already linked this subscription to the maximum of 4 channels"
# )
# .footer(f"HTTP {exc.code}")
# .error()
# .send(inter)
# )
# subscription = Subscription.from_dict(data.pop("subscription"))
# data["subscription"] = (
# f"{subscription.name}\n"
# f"[RSS]({subscription.rss_url}) · "
# f"[API Subscription]({API.SUBSCRIPTION_ENDPOINT}{subscription.uuid}) · "
# f"[API Link]({API.CHANNEL_ENDPOINT}{data['uuid']})"
# )
# channel_id = int(data.pop("id"))
# channel = self.bot.get_channel(channel_id) or await self.bot.fetch_channel(channel_id)
# data["channel"] = channel.mention
# data.pop("creation_datetime")
# data.pop("uuid")
# await (
# Followup("Linked!")
# .fields(**data)
# .added()
# .send(inter)
# )
# @subscription_group.command(name="unlink")
# @autocomplete(uuid=autocomplete_subscription_channels)
# @rename(uuid="link")
# async def unlink_subscription_channel(self, inter: Interaction, uuid: str):
# """
# Unlink subscription from discord.TextChannel.
# """
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# # data = await api.get_subscription(uuid=uuid)
# await api.delete_subscription_channel(uuid=uuid)
# # sub_channel = await SubscriptionChannel.from_dict(data)
# except Exception as exc:
# return await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# await (
# Followup("Subscription unlinked!", uuid)
# .added()
# .send(inter)
# )
# @subscription_group.command(name="list-links")
# async def list_subscription(self, inter: Interaction):
# """List Subscriptions Channels in this server."""
# await inter.response.defer()
# async def formatdata(index: int, item: dict) -> tuple[str, str]:
# item = SubscriptionChannel.from_dict(item)
# next_emoji = self.bot.get_emoji(1204542366602502265)
# key = f"{index}. {item.subscription.name} {next_emoji} {item.mention}"
# return key, item.hyperlinks_string
# async def getdata(page: int, pagesize: int) -> dict:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# return await api.get_subscription_channels(
# subscription__server=inter.guild.id, page=page, page_size=pagesize
# )
# embed = Followup(f"Links in {inter.guild.name}").info()._embed
# pagination = PaginationView(
# self.bot,
# inter=inter,
# embed=embed,
# getdata=getdata,
# formatdata=formatdata,
# pagesize=10,
# initpage=1
# )
# await pagination.send()
# @subscription_group.command(name="add")
# async def new_subscription(self, inter: Interaction, name: str, rss_url: str):
# """Subscribe this server to a new RSS Feed."""
# await inter.response.defer()
# try:
# parsed_rssfeed = await self.bot.functions.validate_feed(name, rss_url)
# image_url = parsed_rssfeed.get("feed", {}).get("image", {}).get("href")
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# data = await api.create_subscription(name, rss_url, image_url, str(inter.guild_id), [-1])
# except aiohttp.ClientResponseError as exc:
# return await (
# Followup(
# f"Error · {exc.message}",
# "Ensure you haven't: \n"
# "- Reused an identical name of an existing Subscription\n"
# "- Already created the maximum of 25 Subscriptions"
# )
# .footer(f"HTTP {exc.code}")
# .error()
# .send(inter)
# )
# # Omit data we dont want the user to see
# data.pop("uuid")
# data.pop("image")
# data.pop("server")
# data.pop("creation_datetime")
# # Update keys to be more human readable
# data["url"] = data.pop("rss_url")
# await (
# Followup("Subscription Added!")
# .fields(**data)
# .image(image_url)
# .added()
# .send(inter)
# )
# @subscription_group.command(name="remove")
# @autocomplete(uuid=autocomplete_subscriptions)
# @rename(uuid="choice")
# async def remove_subscriptions(self, inter: Interaction, uuid: str):
# """Unsubscribe this server from an existing RSS Feed."""
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# await api.delete_subscription(uuid)
# except Exception as exc:
# return await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# await (
# Followup("Subscription Removed!", uuid)
# .trash()
# .send(inter)
# )
@command(name="subscriptions")
async def list_subscription(self, inter: Interaction):
"""List Subscriptions from this server."""
@ -366,6 +135,41 @@ class FeedCog(commands.Cog):
await pagination.send()
# await Followup("results", str(await getdata(1, 10))).send(inter)
@command(name="test-channel-permissions")
async def cmd_test_channel_perms(self, inter: Interaction):
"""Test that the current channel's permissions allow for PYRSS to operate in it."""
try:
test_message = await inter.channel.send(content="... testing permissions ...")
await self.test_channel_perms(inter.channel)
except Exception as error:
await inter.response.send_message(content=f"Failed: {error}")
return
await test_message.delete()
await inter.response.send_message(content="Success")
async def test_channel_perms(self, channel: TextChannel):
# Test generic message and delete
msg = await channel.send(content="test message")
await msg.delete()
# Test detailed embed
embed = Embed(
title="test title",
description="test description",
colour=Colour.random(),
timestamp=datetime.now(),
url="https://google.com"
)
embed.set_author(name="test author")
embed.set_footer(text="test footer")
embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
embed_msg = await channel.send(embed=embed)
await embed_msg.delete()
async def setup(bot):
"""