compatability with api

This commit is contained in:
Corban-Lee Jones 2024-06-07 17:36:54 +01:00
parent 307d341ec2
commit 4f4d827d3c
5 changed files with 184 additions and 215 deletions

View File

@ -108,228 +108,167 @@ class API:
url=url
)
async def create_subscription(self, name: str, rss_url: str, image_url: str, server_id: str, targets: list) -> dict:
async def get_subscriptions(self, **filters):
"""
Create a new Subscription.
Get multiple subscriptions.
"""
log.debug("subscribing '%s' to '%s'", server_id, rss_url)
log.debug("getting multiple subscriptions")
async with self.session.get(image_url) as response:
image_data = await response.read()
return await self._get_many(self.API_ENDPOINT + "subscription/", filters)
form = aiohttp.FormData()
form.add_field("name", name)
form.add_field("rss_url", rss_url)
form.add_field("server", server_id)
form.add_field("targets", ";".join(map(str, targets)))
form.add_field("image", image_data, filename="file.jpg")
data = await self._post_data(self.SUBSCRIPTION_ENDPOINT, form)
return data["json"]
async def get_subscription(self, uuid: str) -> dict:
"""
Retreive a Subscription.
"""
log.debug("retreiving subscription '%s'", uuid)
url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
data = await self._get_one(url)
return data["json"]
async def get_subscriptions(self, **filters) -> tuple[list[dict], int]:
"""
Retreive multiple Subscriptions.
"""
log.debug("retreiving multiple subscriptions")
return await self._get_many(self.SUBSCRIPTION_ENDPOINT, filters)
async def delete_subscription(self, uuid: str) -> None:
"""
Delete an existing Subscription.
"""
log.debug("deleting subscription '%s'", uuid)
url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
await self._delete(url)
async def create_subscription_channel(self, channel_id: int | str, sub_uuid: str) -> dict:
"""
Create a new Channel.
"""
log.debug("creating new subscription channel '%s', '%s'", channel_id, sub_uuid)
form = aiohttp.FormData()
form.add_field("id", str(channel_id))
form.add_field("subscription", sub_uuid)
data = await self._post_data(self.CHANNEL_ENDPOINT, form)
return data["json"]
async def get_subscription_channel(self, uuid: str) -> dict:
"""
Retreive a Channel.
"""
log.debug("retreiving a subscription channel '%s'", uuid)
url = f"{self.CHANNEL_ENDPOINT}{uuid}/"
data = await self._get_one(url)
return data["json"]
async def get_subscription_channels(self, **filters) -> tuple[list[dict], int]:
"""
Retreive multiple Channels.
"""
log.debug("retreiving multiple channels")
return await self._get_many(self.CHANNEL_ENDPOINT, filters)
async def delete_subscription_channel(self, uuid: str) -> None:
"""
Delete an existing Channel.
"""
log.debug("deleting channel '%s'", uuid)
url=f"{self.CHANNEL_ENDPOINT}{uuid}/"
await self._delete(url)
async def create_tracked_content(self, sub_uuid: str, content_url: str) -> dict:
"""
Create a Tracked Content.
"""
log.debug("creating tracked content '%s', '%s'", sub_uuid, content_url)
form = aiohttp.FormData()
form.add_field("subscription", sub_uuid)
form.add_field("content_url", content_url)
data = await self._post_data(self.TRACKED_ENDPOINT, form)
return data["json"]
async def get_tracked_content(self, uuid: str = None, content_url: str = None) -> dict:
"""
Retreive a Tracked Content.
"""
log.debug("retreiving tracked content '%s', '%s'", uuid, content_url)
if not (uuid or content_url) or (uuid and content_url):
raise ValueError(
"Must use only 'uuid' or 'content_url' arguments, cannot use "
"both arguments or none."
)
url = f"{self.TRACKED_ENDPOINT}{uuid or content_url}/"
data = await self._get_one(url)
return data["json"]
async def get_tracked_contents(self, **filters) -> tuple[list[dict], int]:
"""
Retreive multiple Tracked Content.
"""
log.debug("retreiving multiple tracked content")
return await self._get_many(self.TRACKED_ENDPOINT, filters)
async def delete_tracked_content(self, uuid: str) -> None:
"""
Delete a Tracked Content.
"""
log.debug("deleting tracked content '%s'", uuid)
url = f"{self.TRACKED_ENDPOINT}{uuid}/"
await self._delete(url)
async def is_tracked(self, content_url: str) -> bool:
"""
Returns boolean if an item with the given url exists.
"""
raise NotImplementedError
# async def create_new_rssfeed(self, name: str, url: str, image_url: str, discord_server_id: int) -> dict:
# """Create a new RSS Feed.
# Args:
# name (str): Name of the RSS Feed.
# url (str): URL for the RSS Feed.
# image_url (str): URL of the image representation of the RSS Feed.
# discord_server_id (int): ID of the discord server behind this item.
# Returns:
# dict: JSON representation of the newly created RSS Feed.
# async def create_subscription(self, name: str, rss_url: str, image_url: str, server_id: str, targets: list) -> dict:
# """
# Create a new Subscription.
# """
# log.debug("creating rssfeed: %s %s %s %s", name, url, image_url, discord_server_id)
# log.debug("subscribing '%s' to '%s'", server_id, rss_url)
# async with self.session.get(image_url) as response:
# image_data = await response.read()
# # Using formdata to make the image transfer easier.
# form = aiohttp.FormData({
# "name": name,
# "url": url,
# "discord_server_id": str(discord_server_id)
# })
# form = aiohttp.FormData()
# form.add_field("name", name)
# form.add_field("rss_url", rss_url)
# form.add_field("server", server_id)
# form.add_field("targets", ";".join(map(str, targets)))
# form.add_field("image", image_data, filename="file.jpg")
# data = (await self.make_request("POST", self.RSS_FEED_ENDPOINT, data=form))["json"]
# return data
# data = await self._post_data(self.SUBSCRIPTION_ENDPOINT, form)
# async def get_rssfeed(self, uuid: str) -> dict:
# """Get a particular RSS Feed given it's UUID.
# return data["json"]
# Args:
# uuid (str): Identifier of the desired RSS Feed.
# Returns:
# dict: A JSON representation of the RSS Feed.
# async def get_subscription(self, uuid: str) -> dict:
# """
# Retreive a Subscription.
# """
# log.debug("getting rssfeed: %s", uuid)
# endpoint = f"{self.RSS_FEED_ENDPOINT}/{uuid}/"
# data = (await self.make_request("GET", endpoint))["json"]
# return data
# log.debug("retreiving subscription '%s'", uuid)
# async def get_rssfeed_list(self, **filters) -> tuple[list[dict], int]:
# """Get all RSS Feeds with the associated filters.
# url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
# data = await self._get_one(url)
# Returns:
# tuple[list[dict], int] list contains dictionaries of each item, int is total items.
# return data["json"]
# async def get_subscriptions(self, **filters) -> tuple[list[dict], int]:
# """
# Retreive multiple Subscriptions.
# """
# log.debug("getting list of rss feeds with filters: %s", filters)
# data = (await self.make_request("GET", self.RSS_FEED_ENDPOINT, params=filters))["json"]
# return data["results"], data["count"]
# log.debug("retreiving multiple subscriptions")
# async def delete_rssfeed(self, uuid: str) -> int:
# """Delete a specified RSS Feed.
# return await self._get_many(self.SUBSCRIPTION_ENDPOINT, filters)
# Args:
# uuid (str): Identifier of the RSS Feed to delete.
# Returns:
# int: Status code of the response.
# async def delete_subscription(self, uuid: str) -> None:
# """
# Delete an existing Subscription.
# """
# log.debug("deleting rssfeed: %s", uuid)
# endpoint = f"{self.RSS_FEED_ENDPOINT}/{uuid}/"
# status = (await self.make_request("DELETE", endpoint))["status"]
# return status
# log.debug("deleting subscription '%s'", uuid)
# url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
# await self._delete(url)
# async def create_subscription_channel(self, channel_id: int | str, sub_uuid: str) -> dict:
# """
# Create a new Channel.
# """
# log.debug("creating new subscription channel '%s', '%s'", channel_id, sub_uuid)
# form = aiohttp.FormData()
# form.add_field("id", str(channel_id))
# form.add_field("subscription", sub_uuid)
# data = await self._post_data(self.CHANNEL_ENDPOINT, form)
# return data["json"]
# async def get_subscription_channel(self, uuid: str) -> dict:
# """
# Retreive a Channel.
# """
# log.debug("retreiving a subscription channel '%s'", uuid)
# url = f"{self.CHANNEL_ENDPOINT}{uuid}/"
# data = await self._get_one(url)
# return data["json"]
# async def get_subscription_channels(self, **filters) -> tuple[list[dict], int]:
# """
# Retreive multiple Channels.
# """
# log.debug("retreiving multiple channels")
# return await self._get_many(self.CHANNEL_ENDPOINT, filters)
# async def delete_subscription_channel(self, uuid: str) -> None:
# """
# Delete an existing Channel.
# """
# log.debug("deleting channel '%s'", uuid)
# url=f"{self.CHANNEL_ENDPOINT}{uuid}/"
# await self._delete(url)
# async def create_tracked_content(self, sub_uuid: str, content_url: str) -> dict:
# """
# Create a Tracked Content.
# """
# log.debug("creating tracked content '%s', '%s'", sub_uuid, content_url)
# form = aiohttp.FormData()
# form.add_field("subscription", sub_uuid)
# form.add_field("content_url", content_url)
# data = await self._post_data(self.TRACKED_ENDPOINT, form)
# return data["json"]
# async def get_tracked_content(self, uuid: str = None, content_url: str = None) -> dict:
# """
# Retreive a Tracked Content.
# """
# log.debug("retreiving tracked content '%s', '%s'", uuid, content_url)
# if not (uuid or content_url) or (uuid and content_url):
# raise ValueError(
# "Must use only 'uuid' or 'content_url' arguments, cannot use "
# "both arguments or none."
# )
# url = f"{self.TRACKED_ENDPOINT}{uuid or content_url}/"
# data = await self._get_one(url)
# return data["json"]
# async def get_tracked_contents(self, **filters) -> tuple[list[dict], int]:
# """
# Retreive multiple Tracked Content.
# """
# log.debug("retreiving multiple tracked content")
# return await self._get_many(self.TRACKED_ENDPOINT, filters)
# async def delete_tracked_content(self, uuid: str) -> None:
# """
# Delete a Tracked Content.
# """
# log.debug("deleting tracked content '%s'", uuid)
# url = f"{self.TRACKED_ENDPOINT}{uuid}/"
# await self._delete(url)
# async def is_tracked(self, content_url: str) -> bool:
# """
# Returns boolean if an item with the given url exists.
# """
# raise NotImplementedError

View File

@ -68,11 +68,11 @@ class DiscordBot(commands.Bot):
query = insert(AuditModel).values(discord_user_id=user_id, message=message)
log.debug("Logging audit")
if database:
await database.session.execute(query)
return
async with DatabaseManager() as database:
await database.session.execute(query)
log.debug("Audit logged")

View File

@ -333,15 +333,24 @@ class FeedCog(commands.Cog):
def formatdata(index, item):
item = Subscription.from_dict(item)
channels = f"{item.channels_count}{' channels' if item.channels_count != 1 else ' channel'}"
filters = f"{len(item.filters)}{' filters' if len(item.filters) != 1 else ' filter'}"
notes = item.extra_notes[:25] + "..." if len(item.extra_notes) > 28 else item.extra_notes
links = f"[RSS URL]({item.url}) · [API URL]({API.API_ENDPOINT}subscription/{item.id}/)"
description = f"{channels}, {filters}\n"
description += f"{notes}\n" if notes else ""
description += links
key = f"{index}. {item.name}"
value = f"[RSS]({item.rss_url}) · [API]({API.SUBSCRIPTION_ENDPOINT}{item.uuid}/)"
return key, value
return key, description # key, value pair
async def getdata(page: int, pagesize: int):
async with aiohttp.ClientSession() as session:
api = API(self.bot.api_token, session)
return await api.get_subscriptions(
server=inter.guild.id, page=page, page_size=pagesize
guild_id=inter.guild.id, page=page, page_size=pagesize
)
embed = Followup(f"Subscriptions in {inter.guild.name}").info()._embed
@ -355,6 +364,7 @@ class FeedCog(commands.Cog):
initpage=1
)
await pagination.send()
# await Followup("results", str(await getdata(1, 10))).send(inter)
async def setup(bot):

View File

@ -75,6 +75,9 @@ class TaskCog(commands.Cog):
async def rss_task(self):
"""Automated task responsible for processing rss feeds."""
log.debug("sub task disabled")
return
log.info("Running subscription task")
time = process_time()

View File

@ -238,7 +238,7 @@ class DjangoDataModel(ABC):
@staticmethod
@abstractmethod
def parser(item: dict) -> dict:
"""Overwrite this method to parse types, otherwise every attr is of type string."""
"""Overwrite this method to parse types."""
return item
@classmethod
@ -253,25 +253,42 @@ class DjangoDataModel(ABC):
@dataclass(slots=True)
class Subscription(DjangoDataModel):
uuid: str
id: int
name: str
rss_url: str
image_url: str
url: str
guild_id: int
creation_datetime: datetime
server: int
targets: list[int]
extra_notes: str
filters: list[int]
active: bool
channels_count: int
@staticmethod
def parser(item: dict) -> dict:
item["guild_id"] = int(item["guild_id"])
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], "%Y-%m-%dT%H:%M:%S.%f%z")
item["image_url"] = item.pop("image")
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
item["server"] = int(item["server"])
item["targets"] = item["targets"].split(";")
return item
# uuid: str
# name: str
# rss_url: str
# image_url: str
# creation_datetime: datetime
# server: int
# targets: list[int]
# extra_notes: str
# active: bool
# @staticmethod
# def parser(item: dict) -> dict:
# item["image_url"] = item.pop("image")
# item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
# item["server"] = int(item["server"])
# item["targets"] = item["targets"].split(";")
# return item
@dataclass(slots=True)
class SubscriptionChannel(DjangoDataModel):