API Integration

This commit is contained in:
Corban-Lee Jones 2024-01-30 13:53:04 +00:00
parent 93fe4ebfce
commit 32df589ed2
2 changed files with 172 additions and 163 deletions

View File

@ -6,6 +6,7 @@ Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bo
import logging
from typing import Tuple
import aiohttp
import validators
from feedparser import FeedParserDict, parse
from discord.ext import commands
@ -14,7 +15,8 @@ from discord.app_commands import Choice, Group, autocomplete, choices, rename
from sqlalchemy import insert, select, and_, delete
from sqlalchemy.exc import NoResultFound, IntegrityError
from feed import Source
from api import API
from feed import Source, RSSFeed
from errors import IllegalFeed
from db import (
DatabaseManager,
@ -25,6 +27,7 @@ from db import (
)
from utils import (
Followup,
PaginationView,
get_rss_data,
followup,
audit,
@ -121,6 +124,19 @@ class FeedCog(commands.Cog):
log.info("%s cog is ready", self.__class__.__name__)
async def autocomplete_rssfeed(self, inter: Interaction, name: str) -> list[Choice]:
async with aiohttp.ClientSession() as session:
data = await API(session).get_rssfeed_list()
rssfeeds = RSSFeed.from_list(data)
choices = [
Choice(name=item.name, value=item.uuid)
for item in rssfeeds
]
return choices
async def source_autocomplete(self, inter: Interaction, nickname: str):
"""Provides RSS source autocomplete functionality for commands.
@ -154,69 +170,55 @@ class FeedCog(commands.Cog):
# All RSS commands belong to this group.
feed_group = Group(
name="feed",
description="Commands for rss sources.",
description="Commands for RSS sources.",
default_permissions=Permissions.elevated(),
guild_only=True # We store guild IDs in the database, so guild only = True
)
@feed_group.command(name="add")
async def add_rss_source(self, inter: Interaction, nickname: str, url: str):
"""Add a new Feed for this server.
@feed_group.command(name="new")
async def add_rssfeed(self, inter: Interaction, name: str, url: str):
"""Add a new RSS Feed for this server.
Parameters
----------
inter : Interaction
Represents an app command interaction.
nickname : str
A name used to identify the Feed.
url : str
The Feed URL.
Args:
inter (Interaction): Represents the discord command interaction.
name (str): A nickname used to refer to this RSS Feed.
url (str): The URL of the RSS Feed.
"""
await inter.response.defer()
try:
source = await self.bot.functions.create_new_feed(nickname, url, inter.guild_id)
except IllegalFeed as error:
title, desc = extract_error_info(error)
await Followup(title, desc).fields(**error.items).error().send(inter)
except IntegrityError as error:
rssfeed = await self.bot.functions.create_new_rssfeed(name, url, inter.guild_id)
except Exception as exc:
await (
Followup(
"Duplicate Feed Error",
"A Feed with the same nickname already exist."
)
.fields(nickname=nickname)
Followup(exc.__class__.__name__, str(exc))
.error()
.send(inter)
)
else:
await (
Followup("Feed Added")
.image(source.icon_url)
.fields(nickname=nickname, url=url)
Followup("New RSS Feed")
.image(rssfeed.image)
.fields(uuid=rssfeed.uuid, name=name, url=url)
.added()
.send(inter)
)
@feed_group.command(name="remove")
@rename(url="option")
@autocomplete(url=source_autocomplete)
async def remove_rss_source(self, inter: Interaction, url: str):
"""Delete an existing Feed from this server.
@feed_group.command(name="delete")
@autocomplete(uuid=autocomplete_rssfeed)
@rename(uuid="rssfeed")
async def delete_rssfeed(self, inter: Interaction, uuid: str):
"""Delete an existing RSS Feed for this server.
Parameters
----------
inter : Interaction
Represents an app command interaction.
url : str
The Feed to be removed. Autocomplete or enter the URL.
Args:
inter (Interaction): Represents the discord command interaction.
uuid (str): The UUID of the
"""
await inter.response.defer()
try:
source = await self.bot.functions.delete_feed(url, inter.guild_id)
rssfeed = await self.bot.functions.delete_rssfeed(uuid)
except NoResultFound:
await (
Followup(
@ -229,49 +231,88 @@ class FeedCog(commands.Cog):
else:
await (
Followup("Feed Deleted")
.image(source.icon_url)
.fields(url=url)
.image(rssfeed.image)
.fields(uuid=rssfeed.uuid, name=rssfeed.name, url=rssfeed.url)
.trash()
.send(inter)
)
@feed_group.command(name="list")
async def list_rss_sources(self, inter: Interaction):
"""Provides a with a list of Feeds available for this server.
async def list_rssfeeds(self, inter: Interaction):
"""Provides a list of all RSS Feeds
Parameters
----------
inter : Interaction
Represents an app command interaction.
Args:
inter (Interaction): Represents the discord command interaction.
"""
await inter.response.defer()
page = 1
try:
sources = await self.bot.functions.get_feeds(inter.guild_id)
except NoResultFound:
rssfeeds, total_results = await self.bot.functions.get_rssfeeds(inter.guild_id, page)
except Exception as exc:
await (
Followup(
"Feeds Not Found Error",
"There are no available Feeds for this server.\n"
"Add a new feed with `/feed add`."
)
Followup(exc.__class__.__name__, str(exc))
.error()
.send()
)
else:
description = "\n".join([
f"{i}. **[{source.name}]({source.url})**"
for i, source in enumerate(sources)
])
await (
Followup(
f"Available Feeds in {inter.guild.name}",
description
)
.info()
.send(inter)
)
else:
# description = "\n\n".join(
# f"{item.name}\n{item.url}\n{item.uuid}"
# for item in rssfeeds
# )
# fields = {
# f"{i+1}.": f"{item.name}\n{item.url}\n{item.uuid}"
# for i, item in enumerate(rssfeeds)
# }
def formatdata(item):
return item.name, f"{item.url}\n{item.uuid}"
async def getdata(page):
data, count = await self.bot.functions.get_rssfeeds(inter.guild_id, page)
return data
embed = Followup(f"Available RSS Feeds in {inter.guild.name}").info()._embed
maxpage = PaginationView.calc_total_pages(total_results, 10)
pagination = PaginationView(inter, embed, getdata, formatdata, maxpage, 1)
await pagination.send()
# await (
# Followup(f"Available RSS Feeds in {inter.guild.name}")
# .info()
# .fields(**fields)
# .footer(f"Page {page}")
# .send(inter)
# )
# try:
# sources = await self.bot.functions.get_feeds(inter.guild_id)
# except NoResultFound:
# await (
# Followup(
# "Feeds Not Found Error",
# "There are no available Feeds for this server.\n"
# "Add a new feed with `/feed add`."
# )
# .error()
# .send()
# )
# else:
# description = "\n".join([
# f"{i}. **[{source.name}]({source.url})**"
# for i, source in enumerate(sources)
# ])
# await (
# Followup(
# f"Available Feeds in {inter.guild.name}",
# description
# )
# .info()
# .send(inter)
# )
# @feed_group.command(name="fetch")

View File

@ -1,4 +1,5 @@
import ssl
import json
import logging
from dataclasses import dataclass
@ -18,6 +19,7 @@ from textwrap import shorten
from errors import IllegalFeed
from db import DatabaseManager, RssSourceModel, FeedChannelModel
from utils import get_rss_data, get_unparsed_feed
from api import API
log = logging.getLogger(__name__)
dumps = lambda _dict: json.dumps(_dict, indent=8)
@ -131,7 +133,7 @@ class Article:
@dataclass
class Source:
"""Represents an RSS source."""
name: str | None
url: str | None
icon_url: str | None
@ -164,7 +166,9 @@ class Source:
@classmethod
async def from_url(cls, url: str):
unparsed_content = await get_unparsed_feed(url)
return cls.from_parsed(parse(unparsed_content))
source = cls.from_parsed(parse(unparsed_content))
source.url = url
return source
def get_latest_articles(self, max: int = 999) -> list[Article]:
"""Returns a list of Article objects.
@ -189,6 +193,32 @@ class Source:
]
@dataclass
class RSSFeed:
uuid: str
name: str
url: str
image: str
discord_server_id: id
created_at: str
@classmethod
def from_list(cls, data: list) -> list:
result = []
for item in data:
key = "discord_server_id"
item[key] = int(item.get(key))
result.append(cls(**item))
return result
@classmethod
def from_dict(cls, data: dict):
return cls(**data)
class Functions:
def __init__(self, bot):
@ -245,106 +275,44 @@ class Functions:
return feed
async def create_new_feed(self, nickname: str, url: str, guild_id: int) -> Source:
"""Create a new Feed, and return it as a Source object.
async def create_new_rssfeed(self, name: str, url: str, guild_id: int) -> RSSFeed:
Parameters
----------
nickname : str
Human readable nickname used to refer to the feed.
url : str
URL to fetch content from the feed.
guild_id : int
Discord Server ID associated with the feed.
Returns
-------
Source
Dataclass containing attributes of the feed.
log.info("Creating new Feed: %s", name)
parsed_feed = await self.validate_feed(name, url)
source = Source.from_parsed(parsed_feed)
async with aiohttp.ClientSession() as session:
data = await API(session).create_new_rssfeed(name, url, source.icon_url, guild_id)
return RSSFeed.from_dict(data)
async def delete_rssfeed(self, uuid: str) -> RSSFeed:
log.info("Deleting Feed '%s'", uuid)
async with aiohttp.ClientSession() as session:
api = API(session)
data = await api.get_rssfeed(uuid)
await api.delete_rssfeed(uuid)
return RSSFeed.from_dict(data)
async def get_rssfeeds(self, guild_id: int, page: int) -> list[RSSFeed]:
"""Get a list of RSS Feeds.
Args:
guild_id (int): The guild_id to filter by.
Returns:
list[RSSFeed]: Resulting list of RSS Feeds
"""
log.info("Creating new Feed: %s - %s", nickname, guild_id)
async with aiohttp.ClientSession() as session:
data, count = await API(session).get_rssfeed_list(discord_server_id=guild_id, page=page)
parsed_feed = await self.validate_feed(nickname, url)
async with DatabaseManager() as database:
query = insert(RssSourceModel).values(
discord_server_id=guild_id,
rss_url=url,
nick=nickname
)
await database.session.execute(query)
log.info("Created Feed: %s - %s", nickname, guild_id)
return Source.from_parsed(parsed_feed)
async def delete_feed(self, url: str, guild_id: int) -> Source:
"""Delete an existing Feed, then return it as a Source object.
Parameters
----------
url : str
URL of the feed, used in the whereclause.
guild_id : int
Discord Server ID of the feed, used in the whereclause.
Returns
-------
Source
Dataclass containing attributes of the feed.
"""
log.info("Deleting Feed: %s - %s", url, guild_id)
async with DatabaseManager() as database:
whereclause = and_(
RssSourceModel.discord_server_id == guild_id,
RssSourceModel.rss_url == url
)
# Select the Feed entry, because an exception is raised if not found.
select_query = select(RssSourceModel).filter(whereclause)
select_result = await database.session.execute(select_query)
select_result.scalars().one()
delete_query = delete(RssSourceModel).filter(whereclause)
await database.session.execute(delete_query)
log.info("Deleted Feed: %s - %s", url, guild_id)
return await Source.from_url(url)
async def get_feeds(self, guild_id: int) -> list[Source]:
"""Returns a list of fetched Feed objects from the database.
Note: a request will be made too all found Feed URLs.
Parameters
----------
guild_id : int
The Discord Server ID, used to filter down the Feed query.
Returns
-------
list[Source]
List of Source objects, resulting from the query.
Raises
------
NoResultFound
Raised if no results are found.
"""
async with DatabaseManager() as database:
whereclause = and_(RssSourceModel.discord_server_id == guild_id)
query = select(RssSourceModel).where(whereclause)
result = await database.session.execute(query)
rss_sources = result.scalars().all()
if not rss_sources:
raise NoResultFound
return [await Source.from_url(feed.rss_url) for feed in rss_sources]
return RSSFeed.from_list(data), count
async def assign_feed(
self, url: str, channel_name: str, channel_id: int, guild_id: int