Embeds for interaction responses

This commit is contained in:
Corban-Lee Jones 2023-12-16 14:21:30 +00:00
parent 0770fb3f6f
commit 41887472df
3 changed files with 121 additions and 193 deletions

View File

@ -3,6 +3,7 @@ Extension for the `CommandCog`.
Loading this file via `commands.Bot.load_extension` will add `CommandCog` to the bot.
"""
import json
import logging
import validators
@ -10,13 +11,13 @@ import aiohttp
import textwrap
import feedparser
from markdownify import markdownify
from discord import app_commands, Interaction, Embed
from discord import app_commands, Interaction, Embed, Colour
from discord.ext import commands, tasks
from discord.app_commands import Choice, Group, command, autocomplete
from sqlalchemy import insert, select, update, and_, or_, delete
from db import DatabaseManager, AuditModel, SentArticleModel, RssSourceModel, FeedChannelModel
from feed import Feeds, get_source
from feed import get_source, Source
log = logging.getLogger(__name__)
@ -156,7 +157,15 @@ class CommandCog(commands.Cog):
inter.user.id, database=database
)
await followup(inter, f"RSS source added [{nickname}]({url})", suppress_embeds=True)
embed = Embed(
title=f"New RSS Source: **{nickname}**",
url=url,
colour=Colour.from_str("#59ff00")
)
embed.set_thumbnail(url=feed.get("feed", {}).get("image", {}).get("href"))
# , f"RSS source added [{nickname}]({url})", suppress_embeds=True
await followup(inter, embed=embed)
@rss_group.command(name="remove")
@autocomplete(source=source_autocomplete)
@ -176,16 +185,17 @@ class CommandCog(commands.Cog):
log.debug(f"Attempting to remove RSS source ({source=})")
async with DatabaseManager() as database:
rss_source = (await database.session.execute(
select_result = await database.session.execute(
select(RssSourceModel).filter(
and_(
RssSourceModel.discord_server_id == inter.guild_id,
RssSourceModel.rss_url == source
)
)
)).fetchone()
)
rss_source = select_result.fetchone()
result = await database.session.execute(
delete_result = await database.session.execute(
delete(RssSourceModel).filter(
and_(
RssSourceModel.discord_server_id == inter.guild_id,
@ -194,11 +204,13 @@ class CommandCog(commands.Cog):
)
)
nickname, rss_url = rss_source.nick, rss_source.rss_url
# TODO: `if not result.rowcount` then show unique message and possible matches if any (like how the autocomplete works)
if result.rowcount:
if delete_result.rowcount:
await followup(inter,
f"RSS source deleted successfully\n**[{rss_source.nick}]({rss_source.rss_url})**",
f"RSS source deleted successfully\n**[{nickname}]({rss_url})**",
suppress_embeds=True
)
return
@ -285,10 +297,6 @@ class CommandCog(commands.Cog):
await followup(inter, embeds=embeds)
async def setup(bot):
"""
Setup function for this extension.

View File

@ -1,105 +0,0 @@
"""
Extension for the `test` cog.
Loading this file via `commands.Bot.load_extension` will add the `test` cog to the bot.
"""
import logging
import textwrap
from markdownify import markdownify
from discord import app_commands, Interaction, Embed
from discord.ext import commands, tasks
from sqlalchemy import insert, select, and_
from db import DatabaseManager, AuditModel, SentArticleModel, RssSourceModel
from feed import Feeds, get_source
log = logging.getLogger(__name__)
class Test(commands.Cog):
"""
News cog.
Delivers embeds of news articles to discord channels.
"""
def __init__(self, bot):
super().__init__()
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
log.info(f"{self.__class__.__name__} cog is ready")
async def source_autocomplete(self, inter: Interaction, current: str):
"""
"""
async with DatabaseManager() as database:
whereclause = and_(
RssSourceModel.discord_server_id == inter.guild_id,
RssSourceModel.rss_url.ilike(f"%{current}%")
)
query = select(RssSourceModel).where(whereclause)
result = await database.session.execute(query)
sources = [
app_commands.Choice(name=rss.rss_url, value=rss.rss_url)
for rss in result.scalars().all()
]
return sources
@app_commands.command(name="test-latest-article")
@app_commands.autocomplete(source=source_autocomplete)
async def test_news(self, inter: Interaction, source: str):
await inter.response.defer()
await self.bot.audit("Requesting latest article.", inter.user.id)
try:
source = get_source(source)
article = source.get_latest_article()
except IndexError as e:
log.error(e)
await inter.followup.send("An error occured, it's possible that the source provided was bad.")
return
md_description = markdownify(article.description, strip=("img",))
article_description = textwrap.shorten(md_description, 4096)
embed = Embed(
title=article.title,
description=article_description,
url=article.url,
timestamp=article.published,
)
embed.set_thumbnail(url=source.icon_url)
embed.set_image(url=await article.get_thumbnail_url())
embed.set_footer(text=article.author)
embed.set_author(
name=source.name,
url=source.url,
)
async with DatabaseManager() as database:
query = insert(SentArticleModel).values(
discord_server_id=inter.guild_id,
discord_channel_id=inter.channel_id,
discord_message_id=inter.id,
article_url=article.url
)
await database.session.execute(query)
await inter.followup.send(embed=embed)
async def setup(bot):
"""
Setup function for this extension.
Adds the `ErrorCog` cog to the bot.
"""
cog = Test(bot)
await bot.add_cog(cog)
log.info(f"Added {cog.__class__.__name__} cog")

View File

@ -1,7 +1,9 @@
"""
"""
import json
import logging
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
@ -10,16 +12,70 @@ from bs4 import BeautifulSoup as bs4
from feedparser import FeedParserDict, parse
log = logging.getLogger(__name__)
dumps = lambda _dict: json.dumps(_dict, indent=8)
class Feeds(Enum):
THE_UPPER_LIP = "https://theupperlip.co.uk/rss"
THE_BABYLON_BEE= "https://babylonbee.com/feed"
BBC_NEWS = "https://feeds.bbci.co.uk/news/rss.xml"
@dataclass
class Article:
"""Represents a news article, or entry from an RSS feed."""
title: str | None
description: str | None
url: str | None
published: datetime | None
author: str | None
@classmethod
def from_entry(cls, entry:FeedParserDict):
"""Create an Article from an RSS feed entry.
Parameters
----------
entry : FeedParserDict
An entry pulled from a complete FeedParserDict object.
Returns
-------
Article
The Article created from the feed entry.
"""
log.debug("Creating Article from entry: %s", dumps(entry))
published_parsed = entry.get("published_parsed")
published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None
return cls(
title=entry.get("title"),
description=entry.get("description"),
url=entry.get("link"),
published=published,
author = entry.get("author")
)
async def get_thumbnail_url(self) -> str | None:
"""Returns the thumbnail URL for an article.
Returns
-------
str or None
The thumbnail URL, or None if not found.
"""
log.debug("Fetching thumbnail for article: %s", self)
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
html = await response.text()
soup = bs4(html, "html.parser")
image_element = soup.select_one("meta[property='og:image']")
return image_element.get("content") if image_element else None
@dataclass
class Source:
"""Represents an RSS source."""
name: str | None
url: str | None
@ -28,7 +84,20 @@ class Source:
@classmethod
def from_parsed(cls, feed:FeedParserDict):
# print(json.dumps(feed, indent=8))
"""Returns a Source object from a parsed feed.
Parameters
----------
feed : FeedParserDict
The feed used to create the Source.
Returns
-------
Source
The Source object
"""
log.debug("Creating Source from feed: %s", dumps(feed))
return cls(
name=feed.get("channel", {}).get("title"),
@ -37,86 +106,42 @@ class Source:
feed=feed
)
def get_latest_articles(self, max: int) -> list:
""""""
def get_latest_articles(self, max: int) -> list[Article]:
"""Returns a list of Article objects.
articles = []
Parameters
----------
max : int
The maximum number of articles to return.
for i, entry in enumerate(self.feed.entries):
if i >= max:
break
articles.append(Article.from_entry(entry))
return articles
@dataclass
class Article:
title: str | None
description: str | None
url: str | None
published: datetime | None
author: str | None
@classmethod
def from_parsed(cls, feed:FeedParserDict):
entry = feed.entries[0]
# log.debug(json.dumps(entry, indent=8))
published_parsed = entry.get("published_parsed")
published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None
return cls(
title=entry.get("title"),
description=entry.get("description"),
url=entry.get("link"),
published=published,
author = entry.get("author")
)
@classmethod
def from_entry(cls, entry:FeedParserDict):
published_parsed = entry.get("published_parsed")
published = datetime(*entry.published_parsed[0:-2]) if published_parsed else None
return cls(
title=entry.get("title"),
description=entry.get("description"),
url=entry.get("link"),
published=published,
author = entry.get("author")
)
async def get_thumbnail_url(self):
Returns
-------
list of Article
A list of Article objects.
"""
"""
log.debug("Fetching latest articles from %s, max=%s", self, max)
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
html = await response.text()
# Parse the thumbnail for the news story
soup = bs4(html, "html.parser")
image_element = soup.select_one("meta[property='og:image']")
return image_element.get("content") if image_element else None
return [
Article.from_entry(entry)
for i, entry in enumerate(self.feed.entries)
if i < max
]
def get_source(rss_url: str) -> Source:
"""
"""_summary_
Parameters
----------
rss_url : str
_description_
Returns
-------
Source
_description_
"""
parsed_feed = parse(rss_url)
return Source.from_parsed(parsed_feed)
def get_test():
parsed = parse(Feeds.THE_UPPER_LIP.value)
print(json.dumps(parsed, indent=4))
return parsed