PYRSS-Bot/src/utils.py

337 lines
9.8 KiB
Python

"""A collection of utility functions that can be used in various places."""
import asyncio
import aiohttp
import logging
import async_timeout
from typing import Callable
from discord import Interaction, Embed, Colour, ButtonStyle, Button
from discord.ui import View, button
from discord.ext.commands import Bot
log = logging.getLogger(__name__)
async def fetch(session, url: str) -> str:
async with async_timeout.timeout(20):
async with session.get(url) as response:
return await response.text()
async def get_unparsed_feed(url: str, session: aiohttp.ClientSession=None):
if session is not None:
return await fetch(session, url)
async with aiohttp.ClientSession() as session:
return await fetch(session, url)
async def get_rss_data(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
items = await response.text(), response.status
return items
async def followup(inter: Interaction, *args, **kwargs):
"""Shorthand for following up on an interaction.
Parameters
----------
inter : Interaction
Represents an app command interaction.
"""
await inter.followup.send(*args, **kwargs)
# https://img.icons8.com/fluency-systems-filled/48/FA5252/trash.png
class FollowupIcons:
error = "https://img.icons8.com/fluency-systems-filled/48/DC573C/box-important.png"
success = "https://img.icons8.com/fluency-systems-filled/48/5BC873/ok--v1.png"
trash = "https://img.icons8.com/fluency-systems-filled/48/DC573C/trash.png"
info = "https://img.icons8.com/fluency-systems-filled/48/4598DA/info.png"
added = "https://img.icons8.com/fluency-systems-filled/48/4598DA/plus.png"
assigned = "https://img.icons8.com/fluency-systems-filled/48/4598DA/hashtag-large.png"
class PaginationView(View):
"""A Discord UI View that adds pagination to an embed."""
def __init__(
self, bot: Bot, inter: Interaction, embed: Embed, getdata: Callable,
formatdata: Callable, pagesize: int, initpage: int=1
):
"""_summary_
Args:
bot (commands.Bot) The discord bot
inter (Interaction): Represents a discord command interaction.
embed (Embed): The base embed to paginate.
getdata (Callable): A function that provides data, must return Tuple[List[Any], int].
formatdata (Callable): A formatter function that determines how the data is displayed.
pagesize (int): The size of each page.
initpage (int, optional): The inital page. Defaults to 1.
"""
self.bot = bot
self.inter = inter
self.embed = embed
self.getdata = getdata
self.formatdata = formatdata
self.maxpage = None
self.pagesize = pagesize
self.index = initpage
# emoji reference
self.start_emoji = bot.get_emoji(1204542364073463818)
self.end_emoji = bot.get_emoji(1204542367752003624)
super().__init__(timeout=100)
async def check_user_is_author(self, inter: Interaction) -> bool:
"""Ensure the user is the author of the original command."""
if inter.user == self.inter.user:
return True
await inter.response.defer()
await (
Followup(None, "Only the author can interact with this.")
.error()
.send(inter, ephemeral=True)
)
return False
async def on_timeout(self):
"""Erase the controls on timeout."""
message = await self.inter.original_response()
await message.edit(view=None)
@staticmethod
def calc_total_pages(results: int, max_pagesize: int) -> int:
result = ((results - 1) // max_pagesize) + 1
return result
def calc_dataitem_index(self, dataitem_index: int):
"""Calculates a given index to be relative to the sum of all pages items
Example: dataitem_index = 6
pagesize = 10
if page == 1 then return 6
else return 6 + 10 * (page - 1)"""
if self.index > 1:
dataitem_index += self.pagesize * (self.index - 1)
dataitem_index += 1
return dataitem_index
@button(emoji="◀️", style=ButtonStyle.blurple)
async def backward(self, inter: Interaction, button: Button):
"""
Action the backwards button.
"""
self.index -= 1
await inter.response.defer()
self.inter = inter
await self.navigate()
@button(emoji="▶️", style=ButtonStyle.blurple)
async def forward(self, inter: Interaction, button: Button):
"""
Action the forwards button.
"""
self.index += 1
await inter.response.defer()
self.inter = inter
await self.navigate()
@button(emoji="⏭️", style=ButtonStyle.blurple)
async def start_or_end(self, inter: Interaction, button: Button):
"""
Action the start and end button.
This button becomes return to start if at end, otherwise skip to end.
"""
# Determine if should skip to start or end
if self.index <= self.maxpage // 2:
self.index = self.maxpage
else:
self.index = 1
await inter.response.defer()
self.inter = inter
await self.navigate()
async def navigate(self):
"""
Acts as an update method for the entire instance.
"""
log.debug("navigating to page: %s", self.index)
self.update_buttons()
paged_embed = await self.create_paged_embed()
await self.inter.edit_original_response(embed=paged_embed, view=self)
async def create_paged_embed(self) -> Embed:
"""
Returns a copy of the known embed, but with data from the current page.
"""
embed = self.embed.copy()
try:
data, total_results = await self.getdata(self.index, self.pagesize)
except aiohttp.ClientResponseError as exc:
log.error(exc)
await (
Followup(f"Error · {exc.message}",)
.footer(f"HTTP {exc.code}")
.error()
.send(self.inter)
)
raise exc
self.maxpage = self.calc_total_pages(total_results, self.pagesize)
log.debug(f"{self.maxpage=!r}")
for i, item in enumerate(data):
i = self.calc_dataitem_index(i)
if asyncio.iscoroutinefunction(self.formatdata):
key, value = await self.formatdata(i, item)
else:
key, value = self.formatdata(i, item)
embed.add_field(name=key, value=value, inline=False)
if not total_results:
embed.description = "There are no results"
if self.maxpage > 1:
embed.set_footer(text=f"Page {self.index}/{self.maxpage}")
return embed
def update_buttons(self):
if self.index >= self.maxpage:
self.children[2].emoji = self.start_emoji
else:
self.children[2].emoji = self.end_emoji
self.children[0].disabled = self.index == 1
self.children[1].disabled = self.index == self.maxpage
async def send(self):
"""Send the pagination view. It may be important to defer before invoking this method."""
log.debug("sending pagination view")
embed = await self.create_paged_embed()
if self.maxpage <= 1:
await self.inter.edit_original_response(embed=embed)
return
self.update_buttons()
await self.inter.edit_original_response(embed=embed, view=self)
class Followup:
"""Wrapper for a discord embed to follow up an interaction."""
def __init__(
self,
title: str = None,
description: str = None,
):
self._embed = Embed(
title=title,
description=description
)
async def send(self, inter: Interaction, message: str = None, ephemeral: bool = False):
""""""
await inter.followup.send(content=message, embed=self._embed, ephemeral=ephemeral)
def fields(self, inline: bool = False, **fields: dict):
""""""
for key, value in fields.items():
self._embed.add_field(name=key, value=value, inline=inline)
return self
def image(self, url: str):
""""""
self._embed.set_image(url=url)
return self
def author(self, name: str, url: str=None, icon_url: str=None):
""""""
self._embed.set_author(name=name, url=url, icon_url=icon_url)
return self
def footer(self, text: str, icon_url: str = None):
""""""
self._embed.set_footer(text=text, icon_url=icon_url)
return self
def error(self):
""""""
self._embed.colour = Colour.red()
self._embed.set_thumbnail(url=FollowupIcons.error)
return self
def success(self):
""""""
self._embed.colour = Colour.green()
self._embed.set_thumbnail(url=FollowupIcons.success)
return self
def info(self):
""""""
self._embed.colour = Colour.blue()
self._embed.set_thumbnail(url=FollowupIcons.info)
return self
def added(self):
""""""
self._embed.colour = Colour.blue()
self._embed.set_thumbnail(url=FollowupIcons.added)
return self
def assign(self):
""""""
self._embed.colour = Colour.blue()
self._embed.set_thumbnail(url=FollowupIcons.assigned)
return self
def trash(self):
""""""
self._embed.colour = Colour.red()
self._embed.set_thumbnail(url=FollowupIcons.trash)
return self
def extract_error_info(error: Exception) -> str:
class_name = error.__class__.__name__
desc = str(error)
return class_name, desc