Working on new db model integration

This commit is contained in:
Corban-Lee Jones 2024-02-11 23:52:32 +00:00
parent ff617b2006
commit 08ba995520
3 changed files with 1025 additions and 540 deletions

View File

@ -39,6 +39,10 @@ class API:
RSS_FEED_ENDPOINT = API_ENDPOINT + "rssfeed/"
FEED_CHANNEL_ENDPOINT = API_ENDPOINT + "feedchannel/"
SUBSCRIPTION_ENDPOINT = API_ENDPOINT + "subscription/"
CHANNEL_ENDPOINT = SUBSCRIPTION_ENDPOINT + "channel/"
TRACKED_ENDPOINT = API_ENDPOINT + "tracked/"
def __init__(self, api_token: str, session: aiohttp.ClientSession):
log.debug("API session initialised")
self.session = session
@ -69,72 +73,262 @@ class API:
return {"json": json, "text": text, "status": status}
async def create_new_rssfeed(self, name: str, url: str, image_url: str, discord_server_id: int) -> dict:
"""Create a new RSS Feed.
async def _post_data(self, url: str, data: dict | aiohttp.FormData) -> dict:
return await self.make_request(
method="POST",
url=url,
data=data
)
Args:
name (str): Name of the RSS Feed.
url (str): URL for the RSS Feed.
image_url (str): URL of the image representation of the RSS Feed.
discord_server_id (int): ID of the discord server behind this item.
async def _put_data(self, url: str, data: dict | aiohttp.FormData) -> dict:
return await self.make_request(
method="PUT",
url=url,
data=data
)
Returns:
dict: JSON representation of the newly created RSS Feed.
async def _get_one(self, url: str) -> dict:
return await self.make_request(
method="GET",
url=url
)
async def _get_many(self, url: str, filters: dict) -> tuple[list[dict], int]:
data = await self.make_request(
method="GET",
url=url,
params=filters
)
content = data["json"]
return content["results"], content["count"]
async def _delete(self, url: str) -> None:
await self.make_request(
method="DELETE",
url=url
)
async def create_subscription(self, name: str, rss_url: str, image_url: str, server_id: str) -> dict:
"""
Create a new Subscription.
"""
log.debug("creating rssfeed: %s %s %s %s", name, url, image_url, discord_server_id)
log.debug("subscribing '%s' to '%s'", server_id, rss_url)
async with self.session.get(image_url) as response:
image_data = await response.read()
# Using formdata to make the image transfer easier.
form = aiohttp.FormData({
"name": name,
"url": url,
"discord_server_id": str(discord_server_id)
})
form = aiohttp.FormData()
form.add_field("name", name)
form.add_field("rss_url", rss_url)
form.add_field("server", server_id)
form.add_field("image", image_data, filename="file.jpg")
data = (await self.make_request("POST", self.RSS_FEED_ENDPOINT, data=form))["json"]
return data
data = await self._post_data(self.SUBSCRIPTION_ENDPOINT, form)
async def get_rssfeed(self, uuid: str) -> dict:
"""Get a particular RSS Feed given it's UUID.
return data["json"]
Args:
uuid (str): Identifier of the desired RSS Feed.
Returns:
dict: A JSON representation of the RSS Feed.
async def get_subscription(self, uuid: str) -> dict:
"""
Retreive a Subscription.
"""
log.debug("getting rssfeed: %s", uuid)
endpoint = f"{self.RSS_FEED_ENDPOINT}/{uuid}/"
data = (await self.make_request("GET", endpoint))["json"]
return data
log.debug("retreiving subscription '%s'", uuid)
async def get_rssfeed_list(self, **filters) -> tuple[list[dict], int]:
"""Get all RSS Feeds with the associated filters.
url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
data = await self._get_one(url)
Returns:
tuple[list[dict], int] list contains dictionaries of each item, int is total items.
return data["json"]
async def get_subscriptions(self, **filters) -> tuple[list[dict], int]:
"""
Retreive multiple Subscriptions.
"""
log.debug("getting list of rss feeds with filters: %s", filters)
data = (await self.make_request("GET", self.RSS_FEED_ENDPOINT, params=filters))["json"]
return data["results"], data["count"]
log.debug("retreiving multiple subscriptions")
async def delete_rssfeed(self, uuid: str) -> int:
"""Delete a specified RSS Feed.
return await self._get_many(self.SUBSCRIPTION_ENDPOINT, filters)
Args:
uuid (str): Identifier of the RSS Feed to delete.
Returns:
int: Status code of the response.
async def delete_subscription(self, uuid: str) -> None:
"""
Delete an existing Subscription.
"""
log.debug("deleting rssfeed: %s", uuid)
endpoint = f"{self.RSS_FEED_ENDPOINT}/{uuid}/"
status = (await self.make_request("DELETE", endpoint))["status"]
return status
log.debug("deleting subscription '%s'", uuid)
url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
await self._delete(url)
async def create_subscription_channel(self, channel_id: int | str, sub_uuid: str) -> dict:
"""
Create a new Channel.
"""
log.debug("creating new subscription channel '%s', '%s'", channel_id, sub_uuid)
form = aiohttp.FormData()
form.add_field("id", str(channel_id))
form.add_field("subscription", sub_uuid)
data = await self._post_data(self.CHANNEL_ENDPOINT, form)
return data["json"]
async def get_subscription_channel(self, uuid: str) -> dict:
"""
Retreive a Channel.
"""
log.debug("retreiving a subscription channel '%s'", uuid)
url = f"{self.CHANNEL_ENDPOINT}{uuid}/"
data = await self._get_one(url)
return data["json"]
async def get_subscription_channels(self, **filters) -> tuple[list[dict], int]:
"""
Retreive multiple Channels.
"""
log.debug("retreiving multiple channels")
return await self._get_many(self.CHANNEL_ENDPOINT, filters)
async def delete_subscription_channel(self, uuid: str) -> None:
"""
Delete an existing Channel.
"""
log.debug("deleting channel '%s'", uuid)
url=f"{self.CHANNEL_ENDPOINT}{uuid}/"
await self._delete(url)
async def create_tracked_content(self, sub_uuid: str, content_url: str) -> dict:
"""
Create a Tracked Content.
"""
log.debug("creating tracked content '%s', '%s'", sub_uuid, content_url)
form = aiohttp.FormData()
form.add_field("subscription", sub_uuid)
form.add_field("content_url", content_url)
data = await self._post_data(self.TRACKED_ENDPOINT, form)
return data["json"]
async def get_tracked_content(self, uuid: str = None, content_url: str = None) -> dict:
"""
Retreive a Tracked Content.
"""
log.debug("retreiving tracked content '%s', '%s'", uuid, content_url)
if not (uuid or content_url) or (uuid and content_url):
raise ValueError(
"Must use only 'uuid' or 'content_url' arguments, cannot use "
"both arguments or none."
)
url = f"{self.TRACKED_ENDPOINT}{uuid or content_url}/"
data = await self._get_one(url)
return data["json"]
async def get_tracked_contents(self, **filters) -> tuple[list[dict], int]:
"""
Retreive multiple Tracked Content.
"""
log.debug("retreiving multiple tracked content")
return await self._get_many(self.TRACKED_ENDPOINT, filters)
async def delete_tracked_content(self, uuid: str) -> None:
"""
Delete a Tracked Content.
"""
log.debug("deleting tracked content '%s'", uuid)
url = f"{self.TRACKED_ENDPOINT}{uuid}/"
await self._delete(url)
async def is_tracked(self, content_url: str) -> bool:
"""
Returns boolean if an item with the given url exists.
"""
raise NotImplementedError
# async def create_new_rssfeed(self, name: str, url: str, image_url: str, discord_server_id: int) -> dict:
# """Create a new RSS Feed.
# Args:
# name (str): Name of the RSS Feed.
# url (str): URL for the RSS Feed.
# image_url (str): URL of the image representation of the RSS Feed.
# discord_server_id (int): ID of the discord server behind this item.
# Returns:
# dict: JSON representation of the newly created RSS Feed.
# """
# log.debug("creating rssfeed: %s %s %s %s", name, url, image_url, discord_server_id)
# async with self.session.get(image_url) as response:
# image_data = await response.read()
# # Using formdata to make the image transfer easier.
# form = aiohttp.FormData({
# "name": name,
# "url": url,
# "discord_server_id": str(discord_server_id)
# })
# form.add_field("image", image_data, filename="file.jpg")
# data = (await self.make_request("POST", self.RSS_FEED_ENDPOINT, data=form))["json"]
# return data
# async def get_rssfeed(self, uuid: str) -> dict:
# """Get a particular RSS Feed given it's UUID.
# Args:
# uuid (str): Identifier of the desired RSS Feed.
# Returns:
# dict: A JSON representation of the RSS Feed.
# """
# log.debug("getting rssfeed: %s", uuid)
# endpoint = f"{self.RSS_FEED_ENDPOINT}/{uuid}/"
# data = (await self.make_request("GET", endpoint))["json"]
# return data
# async def get_rssfeed_list(self, **filters) -> tuple[list[dict], int]:
# """Get all RSS Feeds with the associated filters.
# Returns:
# tuple[list[dict], int] list contains dictionaries of each item, int is total items.
# """
# log.debug("getting list of rss feeds with filters: %s", filters)
# data = (await self.make_request("GET", self.RSS_FEED_ENDPOINT, params=filters))["json"]
# return data["results"], data["count"]
# async def delete_rssfeed(self, uuid: str) -> int:
# """Delete a specified RSS Feed.
# Args:
# uuid (str): Identifier of the RSS Feed to delete.
# Returns:
# int: Status code of the response.
# """
# log.debug("deleting rssfeed: %s", uuid)
# endpoint = f"{self.RSS_FEED_ENDPOINT}/{uuid}/"
# status = (await self.make_request("DELETE", endpoint))["status"]
# return status

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ import json
import logging
from dataclasses import dataclass
from datetime import datetime
from abc import ABC, abstractmethod
import aiohttp
@ -228,6 +229,77 @@ class RSSFeed:
return cls(**data)
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
@dataclass
class DjangoDataModel(ABC):
@staticmethod
@abstractmethod
def parser(item: dict) -> dict:
"""Overwrite this method to parse types, otherwise every attr is of type string."""
return item
@classmethod
def from_list(cls, data: list[dict]) -> list:
return [cls(**cls.parser(item)) for item in data]
@classmethod
def from_dict(cls, data: dict):
return cls(**cls.parser(data))
@dataclass
class Subscription(DjangoDataModel):
uuid: str
name: str
rss_url: str
image_url: str
creation_datetime: datetime
server: int
@staticmethod
def parser(item: dict) -> dict:
item["image_url"] = item.pop("image")
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
item["server"] = int(item["server"])
return item
@dataclass
class SubscriptionChannel(DjangoDataModel):
uuid: str
id: int
subscription: str
creation_datetime: datetime
@staticmethod
def parser(item: dict) -> dict:
item["id"] = int(item["id"])
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
return item
@dataclass
class TrackedContent(DjangoDataModel):
uuid: str
subscription: str
content_url: str
creation_datetime: datetime
@staticmethod
def parser(item: dict) -> dict:
item["creation_datetime"] = datetime.strptime(item["creation_datetime"], DATETIME_FORMAT)
return item
class Functions:
def __init__(self, bot, api_token: str):