PYRSS-Bot/src/utils.py
Corban-Lee Jones 08295dfea6
Some checks failed
Build and Push Docker Image / build (push) Failing after 7m7s
efficiency and function for tasks
2024-10-31 23:52:29 +00:00

347 lines
10 KiB
Python

"""A collection of utility functions that can be used in various places."""
import asyncio
# import aiohttp
import logging
# import async_timeout
# from typing import Callable
from discord import Interaction, Embed, Colour, ButtonStyle, Button
from discord.ui import View, button
from discord.ext.commands import Bot
log = logging.getLogger(__name__)
async def do_batch_job(iterable: list, func, batch_size: int):
semaphore = asyncio.Semaphore(batch_size)
async def batch_job(item):
async with semaphore:
await func(item)
tasks = [batch_job(item) for item in iterable]
await asyncio.gather(*tasks)
# async def fetch(session, url: str) -> str:
# async with async_timeout.timeout(20):
# async with session.get(url) as response:
# return await response.text()
# async def get_unparsed_feed(url: str, session: aiohttp.ClientSession=None):
# if session is not None:
# return await fetch(session, url)
# async with aiohttp.ClientSession() as session:
# return await fetch(session, url)
# async def get_rss_data(url: str):
# async with aiohttp.ClientSession() as session:
# async with session.get(url) as response:
# items = await response.text(), response.status
# return items
# async def followup(inter: Interaction, *args, **kwargs):
# """Shorthand for following up on an interaction.
# Parameters
# ----------
# inter : Interaction
# Represents an app command interaction.
# """
# await inter.followup.send(*args, **kwargs)
# # https://img.icons8.com/fluency-systems-filled/48/FA5252/trash.png
# class FollowupIcons:
# error = "https://img.icons8.com/fluency-systems-filled/48/DC573C/box-important.png"
# success = "https://img.icons8.com/fluency-systems-filled/48/5BC873/ok--v1.png"
# trash = "https://img.icons8.com/fluency-systems-filled/48/DC573C/trash.png"
# info = "https://img.icons8.com/fluency-systems-filled/48/4598DA/info.png"
# added = "https://img.icons8.com/fluency-systems-filled/48/4598DA/plus.png"
# assigned = "https://img.icons8.com/fluency-systems-filled/48/4598DA/hashtag-large.png"
# class PaginationView(View):
# """A Discord UI View that adds pagination to an embed."""
# def __init__(
# self, bot: Bot, inter: Interaction, embed: Embed, getdata: Callable,
# formatdata: Callable, pagesize: int, initpage: int=1
# ):
# """_summary_
# Args:
# bot (commands.Bot) The discord bot
# inter (Interaction): Represents a discord command interaction.
# embed (Embed): The base embed to paginate.
# getdata (Callable): A function that provides data, must return Tuple[List[Any], int].
# formatdata (Callable): A formatter function that determines how the data is displayed.
# pagesize (int): The size of each page.
# initpage (int, optional): The inital page. Defaults to 1.
# """
# self.bot = bot
# self.inter = inter
# self.embed = embed
# self.getdata = getdata
# self.formatdata = formatdata
# self.maxpage = None
# self.pagesize = pagesize
# self.index = initpage
# # emoji reference
# self.start_emoji = bot.get_emoji(1204542364073463818)
# self.end_emoji = bot.get_emoji(1204542367752003624)
# super().__init__(timeout=100)
# async def check_user_is_author(self, inter: Interaction) -> bool:
# """Ensure the user is the author of the original command."""
# if inter.user == self.inter.user:
# return True
# await inter.response.defer()
# await (
# Followup(None, "Only the author can interact with this.")
# .error()
# .send(inter, ephemeral=True)
# )
# return False
# async def on_timeout(self):
# """Erase the controls on timeout."""
# message = await self.inter.original_response()
# await message.edit(view=None)
# @staticmethod
# def calc_total_pages(results: int, max_pagesize: int) -> int:
# result = ((results - 1) // max_pagesize) + 1
# return result
# def calc_dataitem_index(self, dataitem_index: int):
# """Calculates a given index to be relative to the sum of all pages items
# Example: dataitem_index = 6
# pagesize = 10
# if page == 1 then return 6
# else return 6 + 10 * (page - 1)"""
# if self.index > 1:
# dataitem_index += self.pagesize * (self.index - 1)
# dataitem_index += 1
# return dataitem_index
# @button(emoji="◀️", style=ButtonStyle.blurple)
# async def backward(self, inter: Interaction, button: Button):
# """
# Action the backwards button.
# """
# self.index -= 1
# await inter.response.defer()
# self.inter = inter
# await self.navigate()
# @button(emoji="▶️", style=ButtonStyle.blurple)
# async def forward(self, inter: Interaction, button: Button):
# """
# Action the forwards button.
# """
# self.index += 1
# await inter.response.defer()
# self.inter = inter
# await self.navigate()
# @button(emoji="⏭️", style=ButtonStyle.blurple)
# async def start_or_end(self, inter: Interaction, button: Button):
# """
# Action the start and end button.
# This button becomes return to start if at end, otherwise skip to end.
# """
# # Determine if should skip to start or end
# if self.index <= self.maxpage // 2:
# self.index = self.maxpage
# else:
# self.index = 1
# await inter.response.defer()
# self.inter = inter
# await self.navigate()
# async def navigate(self):
# """
# Acts as an update method for the entire instance.
# """
# log.debug("navigating to page: %s", self.index)
# self.update_buttons()
# paged_embed = await self.create_paged_embed()
# await self.inter.edit_original_response(embed=paged_embed, view=self)
# async def create_paged_embed(self) -> Embed:
# """
# Returns a copy of the known embed, but with data from the current page.
# """
# embed = self.embed.copy()
# try:
# data, total_results = await self.getdata(self.index, self.pagesize)
# except aiohttp.ClientResponseError as exc:
# log.error(exc)
# await (
# Followup(f"Error · {exc.message}",)
# .footer(f"HTTP {exc.code}")
# .error()
# .send(self.inter)
# )
# raise exc
# self.maxpage = self.calc_total_pages(total_results, self.pagesize)
# log.debug(f"{self.maxpage=!r}")
# for i, item in enumerate(data):
# i = self.calc_dataitem_index(i)
# if asyncio.iscoroutinefunction(self.formatdata):
# key, value = await self.formatdata(i, item)
# else:
# key, value = self.formatdata(i, item)
# embed.add_field(name=key, value=value, inline=False)
# if not total_results:
# embed.description = "There are no results"
# if self.maxpage > 1:
# embed.set_footer(text=f"Page {self.index}/{self.maxpage}")
# return embed
# def update_buttons(self):
# if self.index >= self.maxpage:
# self.children[2].emoji = self.start_emoji
# else:
# self.children[2].emoji = self.end_emoji
# self.children[0].disabled = self.index == 1
# self.children[1].disabled = self.index == self.maxpage
# async def send(self):
# """Send the pagination view. It may be important to defer before invoking this method."""
# log.debug("sending pagination view")
# embed = await self.create_paged_embed()
# if self.maxpage <= 1:
# await self.inter.edit_original_response(embed=embed)
# return
# self.update_buttons()
# await self.inter.edit_original_response(embed=embed, view=self)
# class Followup:
# """Wrapper for a discord embed to follow up an interaction."""
# def __init__(
# self,
# title: str = None,
# description: str = None,
# ):
# self._embed = Embed(
# title=title,
# description=description
# )
# async def send(self, inter: Interaction, message: str = None, ephemeral: bool = False):
# """"""
# await inter.followup.send(content=message, embed=self._embed, ephemeral=ephemeral)
# def fields(self, inline: bool = False, **fields: dict):
# """"""
# for key, value in fields.items():
# self._embed.add_field(name=key, value=value, inline=inline)
# return self
# def image(self, url: str):
# """"""
# self._embed.set_image(url=url)
# return self
# def author(self, name: str, url: str=None, icon_url: str=None):
# """"""
# self._embed.set_author(name=name, url=url, icon_url=icon_url)
# return self
# def footer(self, text: str, icon_url: str = None):
# """"""
# self._embed.set_footer(text=text, icon_url=icon_url)
# return self
# def error(self):
# """"""
# self._embed.colour = Colour.red()
# self._embed.set_thumbnail(url=FollowupIcons.error)
# return self
# def success(self):
# """"""
# self._embed.colour = Colour.green()
# self._embed.set_thumbnail(url=FollowupIcons.success)
# return self
# def info(self):
# """"""
# self._embed.colour = Colour.blue()
# self._embed.set_thumbnail(url=FollowupIcons.info)
# return self
# def added(self):
# """"""
# self._embed.colour = Colour.blue()
# self._embed.set_thumbnail(url=FollowupIcons.added)
# return self
# def assign(self):
# """"""
# self._embed.colour = Colour.blue()
# self._embed.set_thumbnail(url=FollowupIcons.assigned)
# return self
# def trash(self):
# """"""
# self._embed.colour = Colour.red()
# self._embed.set_thumbnail(url=FollowupIcons.trash)
# return self
# def extract_error_info(error: Exception) -> str:
# class_name = error.__class__.__name__
# desc = str(error)
# return class_name, desc