PYRSS-Bot/src/api.py
Corban-Lee Jones cd551c595f
All checks were successful
Build and Push Docker Image / build (push) Successful in 12s
temp log for testing
2024-08-13 23:42:13 +01:00

318 lines
9.2 KiB
Python

import logging
from os import getenv
import aiohttp
log = logging.getLogger(__name__)
class APIException(Exception):
pass
class NotCreatedException(APIException):
pass
class BadStatusException(APIException):
pass
def normalize_api_host(host: str):
if not host:
raise ValueError("host cannot be empty")
if not host.endswith("/"):
host += "/"
return host
class API:
"""Interactions with the API."""
API_HOST = normalize_api_host(getenv("API_HOST"))
API_ENDPOINT = API_HOST + "api/"
API_EXTERNAL_HOST = normalize_api_host(getenv("API_EXTERNAL_HOST"))
API_EXTERNAL_ENDPOINT = API_EXTERNAL_HOST + "api/"
RSS_FEED_ENDPOINT = API_ENDPOINT + "rssfeed/"
FEED_CHANNEL_ENDPOINT = API_ENDPOINT + "feedchannel/"
SUBSCRIPTION_ENDPOINT = API_ENDPOINT + "subscription/"
CHANNEL_ENDPOINT = SUBSCRIPTION_ENDPOINT + "channel/"
TRACKED_ENDPOINT = API_ENDPOINT + "tracked/"
def __init__(self, api_token: str, session: aiohttp.ClientSession):
log.debug("API session initialised")
self.session = session
self.token_headers = {"Authorization": f"Token {api_token}"}
async def make_request(self, method: str, url: str, **kwargs) -> dict:
"""Make a request to the given API endpoint.
Args:
method (str): The request method to use, examples: GET, POST, DELETE...
url (str): The API endpoint to request to.
**kwargs: Passed into self.session.request.
Returns:
dict: Dictionary containing status code, json or text.
"""
log.debug("request to '%s', headers '%s', kwargs '%s'", url, self.token_headers, kwargs) # temp for testing
async with self.session.request(method, url, headers=self.token_headers, **kwargs) as response:
response.raise_for_status()
try:
json = await response.json()
text = None
except aiohttp.ContentTypeError:
json = None
text = await response.text()
status = response.status
log.debug("request to '%s', response '%s', kwargs '%s'", url, status, kwargs)
return {"json": json, "text": text, "status": status}
async def _post_data(self, url: str, data: dict | aiohttp.FormData) -> dict:
return await self.make_request(
method="POST",
url=url,
data=data
)
async def _put_data(self, url: str, data: dict | aiohttp.FormData) -> dict:
return await self.make_request(
method="PUT",
url=url,
data=data
)
async def _get_one(self, url: str) -> dict:
data = await self.make_request(
method="GET",
url=url
)
return data["json"]
async def _get_many(self, url: str, filters: dict) -> tuple[list[dict], int]:
data = await self.make_request(
method="GET",
url=url,
params=filters
)
content = data["json"]
return content["results"], content["count"]
async def _delete(self, url: str) -> None:
await self.make_request(
method="DELETE",
url=url
)
async def get_subscriptions(self, **filters) -> tuple[list[dict], int]:
"""
Get multiple subscriptions.
"""
log.debug("getting multiple subscriptions")
return await self._get_many(self.API_ENDPOINT + "subscription/", filters)
async def get_subscription_channels(self, **filters) -> tuple[list[dict], int]:
"""
Get many subscription channels.
"""
log.debug("getting multiple channels")
return await self._get_many(self.API_ENDPOINT + "subchannel/", filters)
async def create_tracked_content(self, **data) -> dict:
"""
Create an instance of tracked content.
"""
log.debug("creating tracked content")
return await self._post_data(self.API_ENDPOINT + "tracked-content/", data)
async def get_tracked_content(self, **filters) -> tuple[list[dict], int]:
"""
Return an instance of tracked content.
"""
log.debug("getting tracked content")
return await self._get_many(self.API_ENDPOINT + f"tracked-content/", filters)
async def get_filter(self, filter_id: int) -> dict:
"""
Get an instance of Filter.
"""
log.debug("getting a filter")
return await self._get_one(f"{self.API_ENDPOINT}filter/{filter_id}")
# async def create_subscription(self, name: str, rss_url: str, image_url: str, server_id: str, targets: list) -> dict:
# """
# Create a new Subscription.
# """
# log.debug("subscribing '%s' to '%s'", server_id, rss_url)
# async with self.session.get(image_url) as response:
# image_data = await response.read()
# form = aiohttp.FormData()
# form.add_field("name", name)
# form.add_field("rss_url", rss_url)
# form.add_field("server", server_id)
# form.add_field("targets", ";".join(map(str, targets)))
# form.add_field("image", image_data, filename="file.jpg")
# data = await self._post_data(self.SUBSCRIPTION_ENDPOINT, form)
# return data["json"]
# async def get_subscription(self, uuid: str) -> dict:
# """
# Retreive a Subscription.
# """
# log.debug("retreiving subscription '%s'", uuid)
# url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
# data = await self._get_one(url)
# return data["json"]
# async def get_subscriptions(self, **filters) -> tuple[list[dict], int]:
# """
# Retreive multiple Subscriptions.
# """
# log.debug("retreiving multiple subscriptions")
# return await self._get_many(self.SUBSCRIPTION_ENDPOINT, filters)
# async def delete_subscription(self, uuid: str) -> None:
# """
# Delete an existing Subscription.
# """
# log.debug("deleting subscription '%s'", uuid)
# url=f"{self.SUBSCRIPTION_ENDPOINT}{uuid}/"
# await self._delete(url)
# async def create_subscription_channel(self, channel_id: int | str, sub_uuid: str) -> dict:
# """
# Create a new Channel.
# """
# log.debug("creating new subscription channel '%s', '%s'", channel_id, sub_uuid)
# form = aiohttp.FormData()
# form.add_field("id", str(channel_id))
# form.add_field("subscription", sub_uuid)
# data = await self._post_data(self.CHANNEL_ENDPOINT, form)
# return data["json"]
# async def get_subscription_channel(self, uuid: str) -> dict:
# """
# Retreive a Channel.
# """
# log.debug("retreiving a subscription channel '%s'", uuid)
# url = f"{self.CHANNEL_ENDPOINT}{uuid}/"
# data = await self._get_one(url)
# return data["json"]
# async def get_subscription_channels(self, **filters) -> tuple[list[dict], int]:
# """
# Retreive multiple Channels.
# """
# log.debug("retreiving multiple channels")
# return await self._get_many(self.CHANNEL_ENDPOINT, filters)
# async def delete_subscription_channel(self, uuid: str) -> None:
# """
# Delete an existing Channel.
# """
# log.debug("deleting channel '%s'", uuid)
# url=f"{self.CHANNEL_ENDPOINT}{uuid}/"
# await self._delete(url)
# async def create_tracked_content(self, sub_uuid: str, content_url: str) -> dict:
# """
# Create a Tracked Content.
# """
# log.debug("creating tracked content '%s', '%s'", sub_uuid, content_url)
# form = aiohttp.FormData()
# form.add_field("subscription", sub_uuid)
# form.add_field("content_url", content_url)
# data = await self._post_data(self.TRACKED_ENDPOINT, form)
# return data["json"]
# async def get_tracked_content(self, uuid: str = None, content_url: str = None) -> dict:
# """
# Retreive a Tracked Content.
# """
# log.debug("retreiving tracked content '%s', '%s'", uuid, content_url)
# if not (uuid or content_url) or (uuid and content_url):
# raise ValueError(
# "Must use only 'uuid' or 'content_url' arguments, cannot use "
# "both arguments or none."
# )
# url = f"{self.TRACKED_ENDPOINT}{uuid or content_url}/"
# data = await self._get_one(url)
# return data["json"]
# async def get_tracked_contents(self, **filters) -> tuple[list[dict], int]:
# """
# Retreive multiple Tracked Content.
# """
# log.debug("retreiving multiple tracked content")
# return await self._get_many(self.TRACKED_ENDPOINT, filters)
# async def delete_tracked_content(self, uuid: str) -> None:
# """
# Delete a Tracked Content.
# """
# log.debug("deleting tracked content '%s'", uuid)
# url = f"{self.TRACKED_ENDPOINT}{uuid}/"
# await self._delete(url)
# async def is_tracked(self, content_url: str) -> bool:
# """
# Returns boolean if an item with the given url exists.
# """
# raise NotImplementedError