Removed unused and commented code

This commit is contained in:
Corban-Lee Jones 2024-02-18 21:43:28 +00:00
parent 2c83430988
commit 9399919b08

View File

@ -5,35 +5,20 @@ Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bo
import logging
from typing import Tuple
from dataclasses import asdict
import aiohttp
import validators
from feedparser import FeedParserDict, parse
from discord.ext import commands
from discord import Interaction, Embed, Colour, TextChannel, Permissions
from discord.app_commands import Choice, Group, autocomplete, choices, rename, command
from sqlalchemy import insert, select, and_, delete
from sqlalchemy.exc import NoResultFound, IntegrityError
from discord import Interaction, TextChannel
from discord.app_commands import Choice, Group, autocomplete, rename
from api import API
from feed import Source, RSSFeed, Subscription, SubscriptionChannel, TrackedContent
from errors import IllegalFeed
from db import (
DatabaseManager,
SentArticleModel,
RssSourceModel,
FeedChannelModel,
AuditModel
)
from feed import Subscription, SubscriptionChannel, TrackedContent
from utils import (
Followup,
PaginationView,
get_rss_data,
followup,
audit,
extract_error_info,
get_unparsed_feed
)
log = logging.getLogger(__name__)
@ -91,24 +76,6 @@ async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, Feed
return None, feed
async def set_all_articles_as_sent(inter, channel: TextChannel, feed_id: int, rss_url: str):
unparsed_feed = await get_unparsed_feed(rss_url)
source = Source.from_parsed(parse(unparsed_feed))
articles = source.get_latest_articles()
async with DatabaseManager() as database:
query = insert(SentArticleModel).values([
{
"discord_server_id": inter.guild_id,
"discord_channel_id": channel.id,
"discord_message_id": -1,
"article_url": article.url,
"feed_channel_id": feed_id
}
for article in articles
])
await database.session.execute(query)
class FeedCog(commands.Cog):
"""
@ -125,27 +92,6 @@ class FeedCog(commands.Cog):
log.info("%s cog is ready", self.__class__.__name__)
# async def autocomplete_channels(self, inter: Interaction, name: str) -> list[Choice]:
# """"""
# log.debug("autocompleting channels '%s'", name)
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# results, _ = await api.get_channel(server=inter.guild_id)
# except Exception as exc:
# log.error(exc)
# return []
# channels = Channel.from_list(results)
# return [
# Choice(name=channel.get_textchannel(self.bot).name, value=channel.id)
# for channel in channels
# ]
async def autocomplete_subscriptions(self, inter: Interaction, name: str) -> list[Choice]:
""""""
@ -192,101 +138,6 @@ class FeedCog(commands.Cog):
for link in subscription_channels
]
# channel_group = Group(
# name="channels",
# description="channel commands",
# guild_only=True
# )
# @channel_group.command(name="add")
# @autocomplete(sub_uuid=autocomplete_subscriptions)
# @rename(sub_uuid="subscription", textchannel="channel")
# async def new_channel(self, inter: Interaction, sub_uuid: str, textchannel: TextChannel):
# """Create a new channel."""
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# await api.create_channel(textchannel.id)
# # sub = await api.get_subscription(sub_uuid)
# # sub["channels"].append(textchannel.id)
# # await api.put_subscription(sub_uuid, sub)
# except Exception as exc:
# return await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# await (
# Followup("Channel Assigned!")
# .fields(
# subscription=sub_uuid,
# channel=textchannel.mention
# )
# .added()
# .send(inter)
# )
# @channel_group.command(name="remove")
# @autocomplete(id=autocomplete_channels)
# @rename(id="choice")
# async def remove_channel(self, inter: Interaction, id: int):
# """Remove a channel."""
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# await api.delete_channel(id)
# except Exception as exc:
# return await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# await (
# Followup("Channel Removed!", str(id))
# .trash()
# .send(inter)
# )
# @channel_group.command(name="list")
# async def list_channels(self, inter: Interaction):
# log.debug("Listing all subscription channels with this server.")
# await inter.response.defer()
# page = 1
# pagesize = 10
# def formatdata(index, item):
# item = Channel.from_dict(item)
# text_channel = item.get_textchannel(self.bot)
# key = f"{index}. {text_channel.mention}"
# value = f"[RSS]({item.rss_url}) · [API]({API.CHANNEL_ENDPOINT}{item.uuid}/)"
# return key, value
# async def getdata(page):
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# return await api.get_subscriptions(
# server=inter.guild.id, page=page, page_size=pagesize
# )
# embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed
# pagination = PaginationView(self.bot, inter, embed, getdata, formatdata, pagesize, page)
# await pagination.send()
subscription_group = Group(
name="subscriptions",
description="subscription commands",
@ -494,537 +345,6 @@ class FeedCog(commands.Cog):
await pagination.send()
# # All RSS commands belong to this group.
# feed_group = Group(
# name="feed",
# description="Commands for RSS sources.",
# default_permissions=Permissions.elevated(),
# guild_only=True # We store guild IDs in the database, so guild only = True
# )
# @feed_group.command(name="new")
# async def add_rssfeed(self, inter: Interaction, name: str, url: str):
# """Add a new RSS Feed for this server.
# Args:
# inter (Interaction): Represents the discord command interaction.
# name (str): A nickname used to refer to this RSS Feed.
# url (str): The URL of the RSS Feed.
# """
# await inter.response.defer()
# try:
# rssfeed = await self.bot.functions.create_new_rssfeed(name, url, inter.guild_id)
# except Exception as exc:
# await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# else:
# await (
# Followup("New RSS Feed")
# .image(rssfeed.image)
# .fields(uuid=rssfeed.uuid, name=name, url=url)
# .added()
# .send(inter)
# )
# @feed_group.command(name="delete")
# @autocomplete(uuid=autocomplete_rssfeed)
# @rename(uuid="rssfeed")
# async def delete_rssfeed(self, inter: Interaction, uuid: str):
# """Delete an existing RSS Feed for this server.
# Args:
# inter (Interaction): Represents the discord command interaction.
# uuid (str): The UUID of the
# """
# await inter.response.defer()
# try:
# rssfeed = await self.bot.functions.delete_rssfeed(uuid)
# except NoResultFound:
# await (
# Followup(
# "Feed Not Found Error",
# "A Feed with these parameters could not be found."
# )
# .error()
# .send(inter)
# )
# else:
# await (
# Followup("Feed Deleted")
# .image(rssfeed.image)
# .fields(uuid=rssfeed.uuid, name=rssfeed.name, url=rssfeed.url)
# .trash()
# .send(inter)
# )
# @feed_group.command(name="list")
# async def list_rssfeeds(self, inter: Interaction):
# """Provides a list of all RSS Feeds
# Args:
# inter (Interaction): Represents the discord command interaction.
# """
# await inter.response.defer()
# page = 1
# pagesize = 10
# try:
# def formatdata(index, item):
# key = f"{index}. {item.name}"
# value = f"[RSS]({item.url}) · [API]({API.RSS_FEED_ENDPOINT}{item.uuid}/)"
# return key, value
# async def getdata(page):
# data, count = await self.bot.functions.get_rssfeeds(inter.guild_id, page, pagesize)
# return data, count
# embed = Followup(f"Available RSS Feeds in {inter.guild.name}").info()._embed
# pagination = PaginationView(self.bot, inter, embed, getdata, formatdata, pagesize, 1)
# await pagination.send()
# except Exception as exc:
# await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# # @feed_group.command(name="fetch")
# # @rename(max_="max")
# # @autocomplete(rss=source_autocomplete)
# async def fetch_rss(self, inter: Interaction, rss: str, max_: int=1):
# """Fetch an item from the specified RSS feed.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# rss : str
# The RSS feed to fetch from.
# max_ : int, optional
# Maximum number of items to fetch, by default 1, limits at 5.
# """
# await inter.response.defer()
# if max_ > 5:
# followup(inter, "It looks like you have requested too many articles.\nThe limit is 5")
# return
# invalid_message, feed = await validate_rss_source("", rss)
# if invalid_message:
# await followup(inter, invalid_message)
# return
# source = Source.from_parsed(feed)
# articles = source.get_latest_articles(max_)
# if not articles:
# await followup(inter, "Sorry, I couldn't find any articles from this feed.")
# return
# async with aiohttp.ClientSession() as session:
# embeds = [await article.to_embed(session) for article in articles]
# async with DatabaseManager() as database:
# query = insert(SentArticleModel).values([
# {
# "discord_server_id": inter.guild_id,
# "discord_channel_id": inter.channel_id,
# "discord_message_id": inter.id,
# "article_url": article.url,
# }
# for article in articles
# ])
# await database.session.execute(query)
# await audit(self,
# f"User is requesting {max_} articles from {source.name}",
# inter.user.id, database=database
# )
# await followup(inter, embeds=embeds)
# # Help ---- ---- ----
# @feed_group.command(name="help")
# async def get_help(self, inter: Interaction):
# """Get help on how to use my commands.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# """
# await inter.response.defer()
# description = (
# "`/feed add <nickname> <url>` \n\n"
# "Save a new RSS feed to the bot. This can be referred to later, when assigning "
# "channels to receive content from these RSS feeds."
# "\n\n\n`/feed remove <option>` \n\n"
# "Remove a previously saved RSS feed. Select the nickname from the shown options "
# "if any. You can re-add the RSS feed to the bot using the `/feeds add` command."
# "\n\n\n`/feed list <sort> <sort_reverse>` \n\n"
# "List all saved RSS feeds numerically. Use the `<sort>` option to order "
# "the results by either nickname or date & time added. Use the `<sort_reverse>` "
# "option to order by ascending or descending in conjunction with the `<sort>` option."
# "\n\n\n`/feed assign <rss> <textchannel>` \n\n"
# "Assign a channel to an RSS feed. Previously saved RSS feeds will be selectable "
# "under the `<rss>` option. The channel will be assumed as the current channel, "
# "unless specified otherwise using the `<textchannel` option."
# "\n\n\n`/feed unassign <option>` \n\n"
# "Unassigned channel from an RSS feed. Previously assigned channels will be shown "
# "as an `<option>`, select one to remove it."
# "\n\n\n`/feed channels` \n\n"
# "List all channels assigned to an RSS feed numerically."
# )
# embed = Embed(
# title="Help",
# description=description,
# colour=Colour.blue(),
# )
# await followup(inter, embed=embed)
# # Channels ---- ---- ----
# async def autocomplete_rss_sources(self, inter: Interaction, nickname: str):
# """_summary_
# Parameters
# ----------
# inter : Interaction
# _description_
# nickname : str
# _description_
# Returns
# -------
# _type_
# _description_
# """
# async with DatabaseManager() as database:
# whereclause = and_(
# RssSourceModel.discord_server_id == inter.guild_id,
# RssSourceModel.nick.ilike(f"%{nickname}%")
# )
# query = select(RssSourceModel).where(whereclause)
# result = await database.session.execute(query)
# sources = [
# Choice(name=rss.nick, value=rss.rss_url)
# for rss in result.scalars().all()
# ]
# log.debug("Autocomplete rss_sources returned %s results", len(sources))
# return sources
# async def autocomplete_existing_feeds(self, inter: Interaction, current: str):
# """Returns a list of existing RSS + Channel feeds.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# current : str
# The current text entered for the autocomplete.
# """
# async with DatabaseManager() as database:
# whereclause = and_(
# FeedChannelModel.discord_server_id == inter.guild_id,
# FeedChannelModel.search_name.ilike(f"%{current}%"),
# RssSourceModel.id == FeedChannelModel.rss_source_id
# )
# query = (
# select(FeedChannelModel, RssSourceModel)
# .where(whereclause)
# .join(RssSourceModel)
# .order_by(FeedChannelModel.discord_channel_id)
# )
# result = await database.session.execute(query)
# feeds = []
# for feed in result.scalars().all():
# channel = inter.guild.get_channel(feed.discord_channel_id)
# feeds.append(Choice(
# name=f"# {channel.name} | {feed.rss_source.nick}",
# value=feed.id
# ))
# log.debug("Autocomplete existing_feeds returned %s results", len(feeds))
# return feeds
# # # All RSS commands belong to this group.
# # channel_group = Group(
# # name="channels",
# # description="Commands for channel assignment.",
# # guild_only=True # These commands belong to channels of
# # )
# @feed_group.command(name="assign")
# @rename(url="feed")
# @autocomplete(url=autocomplete_rss_sources)
# async def include_feed(
# self, inter: Interaction, url: str, channel: TextChannel = None, prevent_spam: bool = True
# ):
# """Include a feed within the specified channel.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# url : int
# The RSS feed to include.
# channel : TextChannel
# The channel to include the feed in.
# """
# await inter.response.defer()
# channel = channel or inter.channel
# try:
# feed_id, source = await self.bot.functions.assign_feed(
# url, channel.name, channel.id, inter.guild_id
# )
# except IntegrityError:
# await (
# Followup(
# "Duplicate Assigned Feed Error",
# f"This Feed has already been assigned to {channel.mention}"
# )
# .error()
# .send(inter)
# )
# except NoResultFound:
# await (
# Followup(
# "Feed Not Found Error",
# "A Feed with these parameters could not be found."
# )
# .error()
# .send(inter)
# )
# else:
# await (
# Followup(
# "Feed Assigned",
# f"I've assigned {channel.mention} to receive content from "
# f"[{source.name}]({source.url})."
# )
# .assign()
# .send(inter)
# )
# if prevent_spam:
# await set_all_articles_as_sent(inter, channel, feed_id, url)
# @feed_group.command(name="unassign")
# @autocomplete(option=autocomplete_existing_feeds)
# async def exclude_feed(self, inter: Interaction, option: int):
# """Undo command for the `/channel include-feed` command.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# option : str
# The RSS feed and channel to exclude.
# """
# await inter.response.defer()
# try:
# await self.bot.functions.unassign_feed(option, inter.guild_id)
# except NoResultFound:
# await (
# Followup(
# "Assigned Feed Not Found",
# "The assigned Feed doesn't exist."
# )
# .error()
# .send(inter)
# )
# else:
# await (
# Followup(
# "Unassigned Feed",
# "Feed has been unassigned."
# )
# .trash()
# .send(inter)
# )
# # async with DatabaseManager() as database:
# # query = delete(FeedChannelModel).where(and_(
# # FeedChannelModel.id == option,
# # FeedChannelModel.discord_server_id == inter.guild_id
# # ))
# # result = await database.session.execute(query)
# # if not result.rowcount:
# # await followup_error(inter,
# # title="Assigned Feed Not Found",
# # message=f"I couldn't find any assigned feeds for the option: {option}"
# # )
# # return
# await followup(inter, "I've removed this item (placeholder response)")
# @feed_group.command(name="channels")
# @choices(sort=channels_list_sort_choices)
# async def list_feeds(self, inter: Interaction, sort: Choice[int] = 0, sort_reverse: bool = False):
# """List all of the channels and their respective included feeds.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# """
# await inter.response.defer()
# description = "Sort By "
# if isinstance(sort, Choice):
# match sort.value, sort_reverse:
# case 0, False:
# order_by = RssSourceModel.nick.asc()
# description += "Nickname "
# case 0, True:
# order_by = RssSourceModel.nick.desc()
# description += "Nickname "
# case 1, False:
# order_by = FeedChannelModel.discord_channel_id.asc()
# description += "Channel ID "
# case 1, True:
# order_by = FeedChannelModel.discord_channel_id.desc()
# description += "Channel ID "
# case 2, False:
# order_by = RssSourceModel.created.desc()
# description += "Date Added "
# case 2, True:
# order_by = RssSourceModel.created.asc()
# description += "Date Added "
# case _, _:
# raise ValueError(f"Unknown sort: {sort}")
# else:
# order_by = FeedChannelModel.discord_channel_id.asc()
# description = ""
# async with DatabaseManager() as database:
# whereclause = and_(
# FeedChannelModel.discord_server_id == inter.guild_id,
# RssSourceModel.id == FeedChannelModel.rss_source_id
# )
# query = (
# select(FeedChannelModel, RssSourceModel)
# .where(whereclause)
# .join(RssSourceModel)
# .order_by(order_by)
# )
# result = await database.session.execute(query)
# feed_channels = result.scalars().all()
# rowcount = len(feed_channels)
# if not feed_channels:
# await followup_error(inter,
# title="No Assigned Feeds Found",
# message="Assign a channel to receive feed content with `/feed assign`."
# )
# return
# output = "\n".join([
# f"{i}. <#{feed.discord_channel_id}> · " # TODO: add icon indicating inaccessible channel, if is the case.
# f"[{feed.rss_source.nick}]({feed.rss_source.rss_url})"
# for i, feed in enumerate(feed_channels)
# ])
# embed = Embed(
# title="Saved Feed Channels",
# description=f"{description}\n{output}",
# colour=Colour.blue()
# )
# embed.set_footer(text=f"Showing {rowcount} results")
# await followup(inter, embed=embed)
# admin_group = Group(
# name="admin",
# description="Administration tasks",
# guild_only=True,
# default_permissions=Permissions.elevated()
# )
# @admin_group.command(name="clear-sent-articles")
# async def clear_sent_articles(self, inter: Interaction):
# """Clear the database of all sent articles.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# """
# await inter.response.defer()
# async with DatabaseManager() as database:
# query = delete(SentArticleModel).where(and_(
# SentArticleModel.discord_server_id == inter.guild_id
# ))
# result = await database.session.execute(query)
# await followup(inter,
# f"{result.rowcount} sent articles have been cleared from the database. "
# "I will no longer recognise these articles as sent, and will send them "
# "again if they appear during the next RSS feed scan."
# )
# audit_group = Group(
# name="audit",
# description="Check audited actions.",
# guild_only=True,
# default_permissions=Permissions.elevated()
# )
# @audit_group.command(name="check")
# async def check_audit_log(self, inter: Interaction):
# """Check the audit log.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# """
# await inter.response.defer()
# async with DatabaseManager() as database:
# query = select(AuditModel).where(and_(
# ))
# result = await database.session.execute(query)
async def setup(bot):
"""
Setup function for this extension.