Compare commits

...

23 Commits

Author SHA1 Message Date
85c5c96392 fix TypeError with empty title, desc, pub_date rssItems
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
Build and Push Docker Image / build (pull_request) Successful in 8s
2024-09-10 10:49:22 +01:00
1d78a05807 task group and exception checker 2024-09-10 10:44:50 +01:00
e43d7aacf9 search option for view commands
All checks were successful
Build and Push Docker Image / build (push) Successful in 7s
2024-08-21 17:20:48 +01:00
7b1a293891 Merge pull request 'v0.2.0' (#20) from staging into master
All checks were successful
Build and Push Docker Image / build (push) Successful in 10s
Reviewed-on: https://gitea.corbz.dev/corbz/PYRSS-Bot/pulls/20
2024-08-19 21:06:52 +00:00
a2e8128bb6 Merge branch 'dev' into staging
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
Build and Push Docker Image / build (pull_request) Successful in 8s
2024-08-19 22:05:08 +01:00
be47f5be68 Merge branch 'master' into staging
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
2024-08-19 22:01:29 +01:00
21b1e22a5e Bump version: 0.1.1 → 0.2.0
All checks were successful
Build and Push Docker Image / build (push) Successful in 12s
Build and Push Docker Image / build (pull_request) Successful in 7s
2024-08-19 22:01:11 +01:00
8716e4bfa2 ready for release
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
2024-08-19 21:59:51 +01:00
12c7e81325 item id over index 2024-08-19 21:59:38 +01:00
a71866ed6e command list tracked content
All checks were successful
Build and Push Docker Image / build (push) Successful in 7s
2024-08-19 21:21:21 +01:00
213024ba9f logs, comments, code cleanup 2024-08-19 21:21:08 +01:00
f231403c08 fix issues with tracked content model 2024-08-19 21:20:48 +01:00
1c8a8d20df Update CHANGELOG.md
next version is not v0.2.0

not v0.1.2

this is due to largers changes being made.
2024-08-19 20:35:01 +01:00
dd8d4a0485 rss.py > cmds.py & tracked content command
tracked content command not finished
2024-08-19 20:34:30 +01:00
389fbfee26 Guild settings and task rewrites 2024-08-19 20:33:55 +01:00
fbac613895 Update CHANGELOG.md 2024-08-18 23:09:24 +01:00
6748274cc9 guild settings checks 2024-08-18 23:09:20 +01:00
6a399630a4 command for testing channel permissions & remove unused code 2024-08-18 23:06:44 +01:00
88d31d2427 get guild settings
All checks were successful
Build and Push Docker Image / build (push) Successful in 12s
2024-08-18 01:01:28 +01:00
1cc2aa2a08 Update launch.json 2024-08-18 01:01:20 +01:00
81ae171b02 Merge pull request 'v0.1.1' (#16) from staging into master
All checks were successful
Build and Push Docker Image / build (push) Successful in 9s
Reviewed-on: https://gitea.corbz.dev/corbz/PYRSS-Bot/pulls/16
2024-08-17 19:53:40 +00:00
163c589271 syntax fix ??
All checks were successful
Build and Push Docker Image / build (push) Successful in 7s
Build and Push Docker Image / build (pull_request) Successful in 8s
debugging why this action wont run
2024-08-17 20:51:32 +01:00
46533e5ffa filename fix 2024-08-17 20:47:28 +01:00
10 changed files with 396 additions and 465 deletions

View File

@ -1,4 +1,4 @@
[bumpversion]
current_version = 0.1.1
current_version = 0.2.0
commit = True
tag = True

View File

@ -48,7 +48,7 @@ jobs:
- name: Tag & Push Docker image
run: |
# Push the branch-specific or version-specific tag
# Push the branch-specific or version-specific tag
docker tag pyrss-bot:${{ env.TAG }} xordk/pyrss-bot:${{ env.TAG }}
docker push xordk/pyrss-bot:${{ env.TAG }}

2
.vscode/launch.json vendored
View File

@ -5,7 +5,7 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python: NewsBot",
"name": "Python: PYRSS Bot",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/src/main.py",

View File

@ -1,5 +1,14 @@
**v0.2.0**
- Fix: Fetch channels if not found in bot cache (error fix)
- Enhancement: command to test a channel's permissions allow for the Bot to function
- Enhancement: account for active state from a server's settings (`GuildSettings`)
- Enhancement: command to view tracked content from the server or a given subscription of the same server.
- Other: code optimisation & `GuildSettings` dataclass
- Other: Cleaned out many instances of unused code
**v0.1.1**
- Docs: Start of changelog
- Enhancement: Versioning with tagged docker images
- Enhancement: Versioning with tagged docker images

View File

@ -132,6 +132,15 @@ class API:
return await self._get_many(self.API_ENDPOINT + "subchannel/", filters)
async def get_guild_settings(self, **filters) -> tuple[list[dict], int]:
"""
Get many guild settings.
"""
log.debug("getting multiple guild settings")
return await self._get_many(self.API_ENDPOINT + "guild-settings/", filters)
async def create_tracked_content(self, **data) -> dict:
"""
Create an instance of tracked content.

236
src/extensions/cmds.py Normal file
View File

@ -0,0 +1,236 @@
"""
Extension for the `FeedCog`.
Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bot.
"""
import logging
from typing import Tuple
from datetime import datetime
import aiohttp
import validators
from feedparser import FeedParserDict, parse
from discord.ext import commands
from discord import Interaction, TextChannel, Embed, Colour
from discord.app_commands import Choice, Group, autocomplete, rename, command
from discord.errors import Forbidden
from api import API
from feed import Subscription, TrackedContent
from utils import (
Followup,
PaginationView,
get_rss_data,
)
log = logging.getLogger(__name__)
rss_list_sort_choices = [
Choice(name="Nickname", value=0),
Choice(name="Date Added", value=1)
]
channels_list_sort_choices=[
Choice(name="Feed Nickname", value=0),
Choice(name="Channel ID", value=1),
Choice(name="Date Added", value=2)
]
# TODO SECURITY: a potential attack is that the user submits an rss feed then changes the
# target resource. Run a period task to check this.
async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]:
"""Validate a provided RSS source.
Parameters
----------
nickname : str
Nickname of the source. Must not contain URL.
url : str
URL of the source. Must be URL with valid status code and be an RSS feed.
Returns
-------
str or None
String invalid message if invalid, NoneType if valid.
FeedParserDict or None
The feed parsed from the given URL or None if invalid.
"""
# Ensure the URL is valid
if not validators.url(url):
return f"The URL you have entered is malformed or invalid:\n`{url=}`", None
# Check the nickname is not a URL
if validators.url(nickname):
return "It looks like the nickname you have entered is a URL.\n" \
f"For security reasons, this is not allowed.\n`{nickname=}`", None
feed_data, status_code = await get_rss_data(url)
# Check the URL status code is valid
if status_code != 200:
return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None
# Check the contents is actually an RSS feed.
feed = parse(feed_data)
if not feed.version:
return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None
return None, feed
class CommandsCog(commands.Cog):
"""
Command cog.
"""
def __init__(self, bot: commands.Bot):
super().__init__()
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
"""Instructions to call when the cog is ready."""
log.info("%s cog is ready", self.__class__.__name__)
# Group for commands about viewing data
view_group = Group(
name="view",
description="View data.",
guild_only=True
)
@view_group.command(name="subscriptions")
async def cmd_list_subs(self, inter: Interaction, search: str = ""):
"""List Subscriptions from this server."""
await inter.response.defer()
def formatdata(index, item):
item = Subscription.from_dict(item)
channels = f"{item.channels_count}{' channels' if item.channels_count != 1 else ' channel'}"
filters = f"{len(item.filters)}{' filters' if len(item.filters) != 1 else ' filter'}"
notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes
links = f"[RSS Link]({item.url}) · [API Link]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)"
description = f"{channels}, {filters}\n"
description += f"{notes}\n" if notes else ""
description += links
key = f"{index}. {item.name}"
return key, description # key, value pair
async def getdata(page: int, pagesize: int):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
return await api.get_subscriptions(
guild_id=inter.guild.id,
page=page,
page_size=pagesize,
search=search
)
embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed
pagination = PaginationView(
self.bot,
inter=inter,
embed=embed,
getdata=getdata,
formatdata=formatdata,
pagesize=10,
initpage=1
)
await pagination.send()
@view_group.command(name="tracked-content")
async def cmd_list_tracked(self, inter: Interaction, search: str = ""):
"""List Tracked Content from this server, or a given sub"""
await inter.response.defer()
def formatdata(index, item):
item = TrackedContent.from_dict(item)
sub = Subscription.from_dict(item.subscription)
links = f"[Content Link]({item.url}) · [Message Link](https://discord.com/channels/{sub.guild_id}/{item.channel_id}/{item.message_id}/)"
description = f"Subscription: {sub.name}\n{links}"
key = f"{item.id}. {item.title}"
return key, description
async def getdata(page: int, pagesize: int):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
return await api.get_tracked_content(
subscription__guild_id=inter.guild_id,
page=page,
page_size=pagesize,
search=search
)
embed = Followup(f"Tracked Content in {inter.guild.name}").info()._embed
pagination = PaginationView(
self.bot,
inter=inter,
embed=embed,
getdata=getdata,
formatdata=formatdata,
pagesize=10,
initpage=1
)
await pagination.send()
# Group for test related commands
test_group = Group(
name="test",
description="Commands to test Bot functionality.",
guild_only=True
)
@test_group.command(name="channel-permissions")
async def cmd_test_channel_perms(self, inter: Interaction):
"""Test that the current channel's permissions allow for PYRSS to operate in it."""
try:
test_message = await inter.channel.send(content="... testing permissions ...")
await self.test_channel_perms(inter.channel)
except Exception as error:
await inter.response.send_message(content=f"Failed: {error}")
return
await test_message.delete()
await inter.response.send_message(content="Success")
async def test_channel_perms(self, channel: TextChannel):
# Test generic message and delete
msg = await channel.send(content="test message")
await msg.delete()
# Test detailed embed
embed = Embed(
title="test title",
description="test description",
colour=Colour.random(),
timestamp=datetime.now(),
url="https://google.com"
)
embed.set_author(name="test author")
embed.set_footer(text="test footer")
embed.set_thumbnail(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
embed.set_image(url="https://www.google.com/images/branding/googlelogo/2x/googlelogo_light_color_272x92dp.png")
embed_msg = await channel.send(embed=embed)
await embed_msg.delete()
async def setup(bot):
"""
Setup function for this extension.
Adds `CommandsCog` to the bot.
"""
cog = CommandsCog(bot)
await bot.add_cog(cog)
log.info("Added %s cog", cog.__class__.__name__)

View File

@ -1,378 +0,0 @@
"""
Extension for the `FeedCog`.
Loading this file via `commands.Bot.load_extension` will add `FeedCog` to the bot.
"""
import logging
from typing import Tuple
import aiohttp
import validators
from feedparser import FeedParserDict, parse
from discord.ext import commands
from discord import Interaction, TextChannel
from discord.app_commands import Choice, Group, autocomplete, rename, command
from api import API
from feed import Subscription, SubscriptionChannel, TrackedContent
from utils import (
Followup,
PaginationView,
get_rss_data,
)
log = logging.getLogger(__name__)
rss_list_sort_choices = [
Choice(name="Nickname", value=0),
Choice(name="Date Added", value=1)
]
channels_list_sort_choices=[
Choice(name="Feed Nickname", value=0),
Choice(name="Channel ID", value=1),
Choice(name="Date Added", value=2)
]
# TODO SECURITY: a potential attack is that the user submits an rss feed then changes the
# target resource. Run a period task to check this.
async def validate_rss_source(nickname: str, url: str) -> Tuple[str | None, FeedParserDict | None]:
"""Validate a provided RSS source.
Parameters
----------
nickname : str
Nickname of the source. Must not contain URL.
url : str
URL of the source. Must be URL with valid status code and be an RSS feed.
Returns
-------
str or None
String invalid message if invalid, NoneType if valid.
FeedParserDict or None
The feed parsed from the given URL or None if invalid.
"""
# Ensure the URL is valid
if not validators.url(url):
return f"The URL you have entered is malformed or invalid:\n`{url=}`", None
# Check the nickname is not a URL
if validators.url(nickname):
return "It looks like the nickname you have entered is a URL.\n" \
f"For security reasons, this is not allowed.\n`{nickname=}`", None
feed_data, status_code = await get_rss_data(url)
# Check the URL status code is valid
if status_code != 200:
return f"The URL provided returned an invalid status code:\n{url=}, {status_code=}", None
# Check the contents is actually an RSS feed.
feed = parse(feed_data)
if not feed.version:
return f"The provided URL '{url}' does not seem to be a valid RSS feed.", None
return None, feed
class FeedCog(commands.Cog):
"""
Command cog.
"""
def __init__(self, bot: commands.Bot):
super().__init__()
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
"""Instructions to call when the cog is ready."""
log.info("%s cog is ready", self.__class__.__name__)
# async def autocomplete_subscriptions(self, inter: Interaction, name: str) -> list[Choice]:
# """"""
# log.debug("autocompleting subscriptions '%s'", name)
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# results, _ = await api.get_subscriptions(server=inter.guild_id, search=name)
# except Exception as exc:
# log.error(exc)
# return []
# subscriptions = Subscription.from_list(results)
# return [
# Choice(name=sub.name, value=sub.uuid)
# for sub in subscriptions
# ]
# async def autocomplete_subscription_channels(self, inter: Interaction, uuid: str):
# """"""
# log.debug("autocompleting subscription channels")
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# results, _ = await api.get_subscription_channels()
# except Exception as exc:
# log.error(exc)
# return []
# subscription_channels = SubscriptionChannel.from_list(results)
# async def name(link):
# result = self.bot.get_channel(link.id) or await self.bot.fetch_channel(link.id)
# return f"{link.subscription.name} -> #{result.name}"
# return [
# Choice(name=await name(link), value=link.uuid)
# for link in subscription_channels
# ]
# subscription_group = Group(
# name="subscriptions",
# description="subscription commands",
# guild_only=True
# )
# @subscription_group.command(name="link")
# @autocomplete(sub_uuid=autocomplete_subscriptions)
# @rename(sub_uuid="subscription")
# async def link_subscription_channel(self, inter: Interaction, sub_uuid: str, channel: TextChannel):
# """
# Link Subscription to discord.TextChannel.
# """
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# data = await api.create_subscription_channel(str(channel.id), sub_uuid)
# except aiohttp.ClientResponseError as exc:
# return await (
# Followup(
# f"Error · {exc.message}",
# "Ensure you haven't: \n"
# "- Already linked this subscription to this channel\n"
# "- Already linked this subscription to the maximum of 4 channels"
# )
# .footer(f"HTTP {exc.code}")
# .error()
# .send(inter)
# )
# subscription = Subscription.from_dict(data.pop("subscription"))
# data["subscription"] = (
# f"{subscription.name}\n"
# f"[RSS]({subscription.rss_url}) · "
# f"[API Subscription]({API.SUBSCRIPTION_ENDPOINT}{subscription.uuid}) · "
# f"[API Link]({API.CHANNEL_ENDPOINT}{data['uuid']})"
# )
# channel_id = int(data.pop("id"))
# channel = self.bot.get_channel(channel_id) or await self.bot.fetch_channel(channel_id)
# data["channel"] = channel.mention
# data.pop("creation_datetime")
# data.pop("uuid")
# await (
# Followup("Linked!")
# .fields(**data)
# .added()
# .send(inter)
# )
# @subscription_group.command(name="unlink")
# @autocomplete(uuid=autocomplete_subscription_channels)
# @rename(uuid="link")
# async def unlink_subscription_channel(self, inter: Interaction, uuid: str):
# """
# Unlink subscription from discord.TextChannel.
# """
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# # data = await api.get_subscription(uuid=uuid)
# await api.delete_subscription_channel(uuid=uuid)
# # sub_channel = await SubscriptionChannel.from_dict(data)
# except Exception as exc:
# return await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# await (
# Followup("Subscription unlinked!", uuid)
# .added()
# .send(inter)
# )
# @subscription_group.command(name="list-links")
# async def list_subscription(self, inter: Interaction):
# """List Subscriptions Channels in this server."""
# await inter.response.defer()
# async def formatdata(index: int, item: dict) -> tuple[str, str]:
# item = SubscriptionChannel.from_dict(item)
# next_emoji = self.bot.get_emoji(1204542366602502265)
# key = f"{index}. {item.subscription.name} {next_emoji} {item.mention}"
# return key, item.hyperlinks_string
# async def getdata(page: int, pagesize: int) -> dict:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# return await api.get_subscription_channels(
# subscription__server=inter.guild.id, page=page, page_size=pagesize
# )
# embed = Followup(f"Links in {inter.guild.name}").info()._embed
# pagination = PaginationView(
# self.bot,
# inter=inter,
# embed=embed,
# getdata=getdata,
# formatdata=formatdata,
# pagesize=10,
# initpage=1
# )
# await pagination.send()
# @subscription_group.command(name="add")
# async def new_subscription(self, inter: Interaction, name: str, rss_url: str):
# """Subscribe this server to a new RSS Feed."""
# await inter.response.defer()
# try:
# parsed_rssfeed = await self.bot.functions.validate_feed(name, rss_url)
# image_url = parsed_rssfeed.get("feed", {}).get("image", {}).get("href")
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# data = await api.create_subscription(name, rss_url, image_url, str(inter.guild_id), [-1])
# except aiohttp.ClientResponseError as exc:
# return await (
# Followup(
# f"Error · {exc.message}",
# "Ensure you haven't: \n"
# "- Reused an identical name of an existing Subscription\n"
# "- Already created the maximum of 25 Subscriptions"
# )
# .footer(f"HTTP {exc.code}")
# .error()
# .send(inter)
# )
# # Omit data we dont want the user to see
# data.pop("uuid")
# data.pop("image")
# data.pop("server")
# data.pop("creation_datetime")
# # Update keys to be more human readable
# data["url"] = data.pop("rss_url")
# await (
# Followup("Subscription Added!")
# .fields(**data)
# .image(image_url)
# .added()
# .send(inter)
# )
# @subscription_group.command(name="remove")
# @autocomplete(uuid=autocomplete_subscriptions)
# @rename(uuid="choice")
# async def remove_subscriptions(self, inter: Interaction, uuid: str):
# """Unsubscribe this server from an existing RSS Feed."""
# await inter.response.defer()
# try:
# async with aiohttp.ClientSession() as session:
# api = API(self.bot.api_token, session)
# await api.delete_subscription(uuid)
# except Exception as exc:
# return await (
# Followup(exc.__class__.__name__, str(exc))
# .error()
# .send(inter)
# )
# await (
# Followup("Subscription Removed!", uuid)
# .trash()
# .send(inter)
# )
@command(name="subscriptions")
async def list_subscription(self, inter: Interaction):
"""List Subscriptions from this server."""
await inter.response.defer()
def formatdata(index, item):
item = Subscription.from_dict(item)
channels = f"{item.channels_count}{' channels' if item.channels_count != 1 else ' channel'}"
filters = f"{len(item.filters)}{' filters' if len(item.filters) != 1 else ' filter'}"
notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes
links = f"[RSS URL]({item.url}) · [API URL]({API.API_EXTERNAL_ENDPOINT}subscription/{item.id}/)"
description = f"{channels}, {filters}\n"
description += f"{notes}\n" if notes else ""
description += links
key = f"{index}. {item.name}"
return key, description # key, value pair
async def getdata(page: int, pagesize: int):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
return await api.get_subscriptions(
guild_id=inter.guild.id, page=page, page_size=pagesize
)
embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed
pagination = PaginationView(
self.bot,
inter=inter,
embed=embed,
getdata=getdata,
formatdata=formatdata,
pagesize=10,
initpage=1
)
await pagination.send()
# await Followup("results", str(await getdata(1, 10))).send(inter)
async def setup(bot):
"""
Setup function for this extension.
Adds `FeedCog` to the bot.
"""
cog = FeedCog(bot)
await bot.add_cog(cog)
log.info("Added %s cog", cog.__class__.__name__)

View File

@ -18,7 +18,7 @@ from discord.ext import commands, tasks
from discord.errors import Forbidden
from feedparser import parse
from feed import RSSFeed, Subscription, RSSItem
from feed import RSSFeed, Subscription, RSSItem, GuildSettings
from utils import get_unparsed_feed
from filters import match_text
from api import API
@ -65,13 +65,24 @@ class TaskCog(commands.Cog):
"""
self.subscription_task.cancel()
@app_commands.command(name="debug-trigger-task")
async def debug_trigger_task(self, inter):
group = app_commands.Group(
name="task",
description="Commands for tasks",
guild_only=True
)
@group.command(name="trigger")
async def cmd_trigger_task(self, inter):
await inter.response.defer()
start_time = perf_counter()
await self.subscription_task()
end_time = perf_counter()
await inter.followup.send(f"completed in {end_time - start_time:.4f} seconds")
try:
await self.subscription_task()
except Exception as error:
await inter.followup.send(str(error))
finally:
end_time = perf_counter()
await inter.followup.send(f"completed in {end_time - start_time:.4f} seconds")
@tasks.loop(time=subscription_task_times)
async def subscription_task(self):
@ -83,43 +94,88 @@ class TaskCog(commands.Cog):
async with aiohttp.ClientSession() as session:
self.api = API(self.bot.api_token, session)
subscriptions = await self.get_subscriptions()
await self.process_subscriptions(subscriptions)
await self.execute_task()
end_time = perf_counter()
log.debug(f"task completed in {end_time - start_time:.4f} seconds")
async def get_subscriptions(self) -> list[Subscription]:
async def execute_task(self):
"""Execute the task directly."""
# Filter out inactive guild IDs using related settings
guild_ids = [guild.id for guild in self.bot.guilds]
sub_data = []
guild_settings = await self.get_guild_settings(guild_ids)
active_guild_ids = [settings.guild_id for settings in guild_settings if settings.active]
subscriptions = await self.get_subscriptions(active_guild_ids)
await self.process_subscriptions(subscriptions)
async def get_guild_settings(self, guild_ids: list[int]) -> list[int]:
"""Returns a list of guild settings from the Bot's guilds, if they exist."""
guild_settings = []
# Iterate infinitely taking the iter no. as `page`
# data will be empty after last page reached.
for page, _ in enumerate(iter(int, 1)):
try:
log.debug("fetching page '%s'", page + 1)
sub_data.extend(
(await self.api.get_subscriptions(server__in=guild_ids, page=page+1))[0]
)
except aiohttp.ClientResponseError as error:
match error.status:
case 404:
log.debug("final page reached '%s'", page)
break
case 403:
log.critical(error)
self.subscription_task.cancel()
return [] # returning an empty list should gracefully end the task
case _:
log.error(error)
break
except Exception as error:
log.error("Exception while gathering page data %s", error)
data = await self.get_guild_settings_page(guild_ids, page)
if not data:
break
guild_settings.extend(data[0])
return Subscription.from_list(sub_data)
# Only return active guild IDs
return GuildSettings.from_list(guild_settings)
async def get_guild_settings_page(self, guild_ids: list[int], page: int) -> list[dict]:
"""Returns an individual page of guild settings."""
try:
return await self.api.get_guild_settings(guild_id__in=guild_ids, page=page+1)
except aiohttp.ClientResponseError as error:
self.handle_pagination_error(error)
return []
def handle_pagination_error(self, error: aiohttp.ClientResponseError):
"""Handle the error cases from pagination attempts."""
match error.status:
case 404:
log.debug("final page reached")
case 403:
log.critical("[403] Bot likely lacks permissions: %s", error, exc_info=True)
self.subscription_task.cancel() # can't do task without proper auth, so cancel permanently
case _:
log.debug(error)
async def get_subscriptions(self, guild_ids: list[int]) -> list[Subscription]:
"""Get a list of `Subscription`s matching the given `guild_ids`."""
subscriptions = []
# Iterate infinitely taking the iter no. as `page`
# data will be empty after last page reached.
for page, _ in enumerate(iter(int, 1)):
data = await self.get_subs_page(guild_ids, page)
if not data:
break
subscriptions.extend(data[0])
return Subscription.from_list(subscriptions)
async def get_subs_page(self, guild_ids: list[int], page: int) -> list[Subscription]:
"""Returns an individual page of subscriptions."""
try:
return await self.api.get_subscriptions(guild_id__in=guild_ids, page=page+1)
except aiohttp.ClientResponseError as error:
self.handle_pagination_error(error)
return []
async def process_subscriptions(self, subscriptions: list[Subscription]):
"""Process a given list of `Subscription`s."""
async def process_single_subscription(sub: Subscription):
log.debug("processing subscription '%s' for '%s'", sub.id, sub.guild_id)
@ -143,7 +199,7 @@ class TaskCog(commands.Cog):
async def process_items(self, sub: Subscription, feed: RSSFeed):
log.debug("processing items")
channels = [self.bot.get_channel(channel.channel_id) for channel in await sub.get_channels(self.api)]
channels = await self.fetch_or_get_channels(await sub.get_channels(self.api))
filters = [await self.api.get_filter(filter_id) for filter_id in sub.filters]
for item in feed.items:
@ -154,11 +210,23 @@ class TaskCog(commands.Cog):
continue
blocked = any(self.filter_item(_filter, item) for _filter in filters)
mutated_item = item.create_mutated_copy(sub.mutators)
mutated_item = item.create_mutated_copy(sub.mutators) if sub.mutators else None
for channel in channels:
await self.track_and_send(sub, feed, item, mutated_item, channel, blocked)
async def fetch_or_get_channels(self, channels_data: list[dict]):
channels = []
for data in channels_data:
try:
channel = self.bot.get_channel(data.channel_id)
channels.append(channel or await self.bot.fetch_channel(data.channel_id))
except Forbidden:
log.error(f"Forbidden Channel '{data.channel_id}'")
return channels
def filter_item(self, _filter: dict, item: RSSItem) -> bool:
"""
Returns `True` if item should be ignored due to filters.
@ -168,7 +236,7 @@ class TaskCog(commands.Cog):
log.debug("filter match found? '%s'", match_found)
return match_found
async def track_and_send(self, sub: Subscription, feed: RSSFeed, item: RSSItem, mutated_item: RSSItem, channel: TextChannel, blocked: bool):
async def track_and_send(self, sub: Subscription, feed: RSSFeed, item: RSSItem, mutated_item: RSSItem | None, channel: TextChannel, blocked: bool):
message_id = -1
log.debug("track and send func %s, %s", item.guid, item.title)
@ -186,10 +254,11 @@ class TaskCog(commands.Cog):
if not blocked:
try:
log.debug("sending '%s', exists '%s'", item.guid, result[1])
message = await channel.send(embed=await mutated_item.to_embed(sub, feed, self.api.session))
sendable_item = mutated_item or item
message = await channel.send(embed=await sendable_item.to_embed(sub, feed, self.api.session))
message_id = message.id
except Forbidden as error:
log.error(error)
except Forbidden:
log.error(f"Forbidden to send to channel {channel.id}")
await self.mark_tracked_item(sub, item, channel.id, message_id, blocked)

View File

@ -51,13 +51,13 @@ class RSSItem:
RSSItem
"""
guid = entry.get('id', None) or entry.get("guid", None)
link = entry.get('link', None)
title = entry.get('title', None)
description = entry.get('description', None)
guid = entry.get('id', None) or entry.get("guid")
link = entry.get('link', "")
title = entry.get('title', "")
description = entry.get('description', "")
pub_date = entry.get('published_parsed', None)
pub_date = datetime(*pub_date[0:6], tzinfo=timezone.utc)
pub_date = datetime(*pub_date[0:6] if pub_date else None, tzinfo=timezone.utc)
content_image_url = entry.get("media_content", [{}])[0].get("url")
thumb_image_url = entry.get("media_thumbnail", [{}])[0].get("url")
@ -128,6 +128,8 @@ class RSSItem:
discord.Embed
"""
log.debug("Creating embed of item: %s", self.guid)
# Replace HTML with Markdown, and shorten text.
title = shorten(markdownify(self.title, strip=["img", "a"]), 256)
desc = shorten(markdownify(self.description, strip=["img"]), 4096)
@ -270,6 +272,19 @@ class DjangoDataModel(ABC):
return cls(**cls.parser(data))
@dataclass(slots=True)
class GuildSettings(DjangoDataModel):
id: int
guild_id: int
default_embed_colour: str
active: bool
@staticmethod
def parser(item: dict) -> dict:
return item
@dataclass(slots=True)
class Subscription(DjangoDataModel):
@ -324,51 +339,21 @@ class SubChannel(DjangoDataModel):
return f"<#{self.channel_id}>"
@dataclass(slots=True)
class SubscriptionChannel(DjangoDataModel):
uuid: str
id: int
subscription: Subscription
creation_datetime: datetime
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item["id"])
item["subscription"] = Subscription.from_dict(item.pop("subscription"))
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
return item
@property
def mention(self) -> str:
"""
Returns the `id` as a string in the discord mention format.
"""
return f"<#{self.id}>"
@property
def hyperlinks_string(self) -> str:
""""""
api_hyperlink = f"[API]({API.CHANNEL_ENDPOINT}{self.uuid}/)"
rss_hyperlink = f"[RSS]({self.subscription.rss_url})"
value = f"{rss_hyperlink} · {api_hyperlink}"
return value
@dataclass(slots=True)
class TrackedContent(DjangoDataModel):
uuid: str
id: int
guid: str
title: str
url: str
subscription: str
content_url: str
channel_id: int
message_id: int
blocked: bool
creation_datetime: datetime
@staticmethod
def parser(item: dict) -> dict:
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], "%Y-%m-%dT%H:%M:%S.%f%z")
return item

View File

@ -83,8 +83,6 @@ class PaginationView(View):
self.index = initpage
# emoji reference
next_emoji = bot.get_emoji(1204542366602502265)
prev_emoji = bot.get_emoji(1204542365432422470)
self.start_emoji = bot.get_emoji(1204542364073463818)
self.end_emoji = bot.get_emoji(1204542367752003624)
@ -113,7 +111,6 @@ class PaginationView(View):
@staticmethod
def calc_total_pages(results: int, max_pagesize: int) -> int:
result = ((results - 1) // max_pagesize) + 1
log.debug("total pages calculated: %s", result)
return result
def calc_dataitem_index(self, dataitem_index: int):
@ -200,6 +197,7 @@ class PaginationView(View):
raise exc
self.maxpage = self.calc_total_pages(total_results, self.pagesize)
log.debug(f"{self.maxpage=!r}")
for i, item in enumerate(data):
i = self.calc_dataitem_index(i)
@ -228,6 +226,9 @@ class PaginationView(View):
self.children[1].disabled = self.index == self.maxpage
async def send(self):
"""Send the pagination view. It may be important to defer before invoking this method."""
log.debug("sending pagination view")
embed = await self.create_paged_embed()
if self.maxpage <= 1: