Corban-Lee Jones 2d3f4b6294
All checks were successful
Build and Push Docker Image / build (push) Successful in 13s
tidy up GuildsView
abstraction
comments
restructure of existing code ...
2024-10-11 11:49:26 +01:00

227 lines
7.7 KiB
Python

# -*- encoding: utf-8 -*-
import logging
import httpx
from django.conf import settings
from asgiref.sync import sync_to_async
from django.shortcuts import redirect
from django.http import JsonResponse, HttpResponseNotFound, HttpResponseNotAllowed
from django.views.generic import TemplateView, View
from apps.home.models import Server, DiscordChannel
from apps.authentication.models import DiscordUser, ServerMember
log = logging.getLogger(__name__)
class IndexView(TemplateView):
"""
View for the Index page.
"""
template_name = "home/index.html"
@sync_to_async
def get_user_access_token(user: DiscordUser) -> str:
return user.access_token
@sync_to_async
def is_user_authenticated(user: DiscordUser) -> bool:
return user.is_authenticated
class GuildsView(View):
"""
Fetches the related guilds to the currently authenticated user from Discord.
Will return a filtered list of the results, excluding servers where the
user isn't an administrator or owner.
Valid servers will also be stored in the database for future reference,
along-side the user-server relationship as a member.
"""
async def get(self, request, *args, **kwargs):
if not await is_user_authenticated(request.user):
return redirect("/oauth2/login")
access_token = await get_user_access_token(request.user)
guilds_data, status = await self._get_guilds_data(access_token)
# Send back the error data if status is bad
if status != 200:
return JsonResponse(guilds_data, status=status, safe=False)
cleaned_guilds_data = await self._clean_guilds_data(request.user, guilds_data)
return JsonResponse(cleaned_guilds_data, safe=False)
async def _get_guilds_data(self, access_token: str) -> tuple[list[dict], int]:
"""
Returns the raw guild data and a response status code
from the Discord API.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
url=f"{settings.DISCORD_API_URL}/users/@me/guilds",
headers={"Authorization": f"Bearer {access_token}"}
)
return response.json(), response.status_code
async def _clean_guilds_data(self, user: DiscordUser, guilds_data: list[dict]) -> list[dict]:
"""
Returns a filtered copy of the given `guilds_data`, without the guilds
where the given `user` is not an administrator or owner of.
Also, for each guild, creates/updates an object to represent it, and
another to represent the user/guild relationship as a member.
"""
cleaned_guilds_data = []
for item in guilds_data:
setup_success = await self._setup_server(user, item)
if setup_success:
cleaned_guilds_data.append(item)
return cleaned_guilds_data
async def _setup_server(self, user: DiscordUser, item: dict) -> bool:
"""
Create or update a server and user's membership to said server.
Returns `True` if successful, returns `False` if the user isn't an
administrator or owner of the given server.
If `False`, will also attempt to delete any existing member object
linked to the given server.
"""
# Collect some commonly used server data
server_id = item["id"]
is_owner = item["owner"]
permissions = item["permissions"]
admin_perm = 1 << 3
# Skip servers where the user isn't an administrator or owner.
# If an older member object exists, delete that too.
if not ((int(permissions) & admin_perm) == admin_perm or is_owner):
await self._try_delete_member(user, server_id)
return False
# Create or update an existing server matching the given ID
server = await Server.objects.aupdate_or_create(
id=server_id,
defaults={
"name": item["name"],
"icon_hash": item["icon"]
}
)
# Create or update a member object linking the user to the server
await ServerMember.objects.aupdate_or_create(
user=user,
server=server[0],
defaults={
"permissions": permissions,
"is_owner": is_owner
}
)
return True
@staticmethod
async def _try_delete_member(user: DiscordUser, server_id: int):
"""
Attempt to delete any existing server member linked to the given
server id.
"""
try:
member = await ServerMember.objects.aget(user=user, server_id=server_id)
await member.adelete()
except ServerMember.DoesNotExist:
pass
class ChannelsView(View):
async def get(self, request, *args, **kwargs):
if not await is_user_authenticated(request.user):
return redirect("/oauth2/login")
guild_id = request.GET.get("guild")
try: server = await Server.objects.aget(pk=guild_id)
except Server.DoesNotExist: return HttpResponseNotFound("Server not found.")
if not ServerMember.objects.filter(server=server, user=request.user).aexists():
return HttpResponseNotAllowed("You aren't a member of this server.")
channels_data, status = await self._get_channel_data(guild_id)
# Send back the error data if status is bad
if status != 200:
return JsonResponse(channels_data, status=status, safe=False)
cleaned_channels_data = await self._clean_channels_data(server, channels_data)
await self._cleanup_dead_channels(server, cleaned_channels_data)
return JsonResponse(cleaned_channels_data, safe=False)
async def _get_channel_data(self, guild_id: int) -> tuple[list[dict], int]:
"""
Returns the raw channel data and a response status code
from the Discord API.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
url=f"{settings.DISCORD_API_URL}/guilds/{guild_id}/channels",
headers={"Authorization": f"Bot {settings.BOT_TOKEN}"}
)
return response.json(), response.status_code
async def _clean_channels_data(self, server: Server, channels_data) -> list[dict]:
"""
Returns a sorted & cleaned list of channel data, also performs a
database setup for each channel to be stored.
"""
cleaned_channels_data = []
for item in channels_data:
cleaned_data = await self._setup_channel(server, item)
if cleaned_data:
cleaned_channels_data.append(cleaned_data)
cleaned_channels_data.sort(key=lambda ch: ch.get("position"))
return cleaned_channels_data
async def _setup_channel(self, server: Server, data: dict) -> dict:
"""
Create or update an instance of DiscordChannel representing the given
`data` dictionary, returns the data or `NoneType` if `data['type'] != 0`.
"""
# Type 0 = TextChannel, the only one we want
if data.get("type") != 0:
return
await DiscordChannel.objects.aupdate_or_create(
id=data["id"],
server=server,
defaults={
"name": data["name"],
"is_nsfw": data["nsfw"]
}
)
return data
async def _cleanup_dead_channels(self, server: Server, channel_data: list[dict]):
"""
Deletes any unused instances of `DiscordChannel` against the given server.
"""
channel_ids = [item["id"] for item in channel_data]
count, _ = await DiscordChannel.objects.filter(server=server).exclude(id__in=channel_ids).adelete()
log.info("Deleted %s dead DiscordChannel object(s)", count)