Corban-Lee Jones 36a744159f
All checks were successful
Build and Push Docker Image / build (push) Successful in 15s
add missing await statement
2024-10-13 00:25:26 +01:00

234 lines
8.0 KiB
Python

# -*- encoding: utf-8 -*-
import logging
import httpx
from django.conf import settings
from asgiref.sync import sync_to_async
from django.shortcuts import redirect
from django.http import JsonResponse, HttpResponseNotFound, HttpResponseNotAllowed
from django.views.generic import TemplateView, View
from apps.home.models import Server, DiscordChannel
from apps.api.serializers import ServerSerializer
from apps.authentication.models import DiscordUser, ServerMember
log = logging.getLogger(__name__)
class IndexView(TemplateView):
"""
View for the Index page.
"""
template_name = "home/index.html"
@sync_to_async
def get_user_access_token(user: DiscordUser) -> str:
return user.access_token
@sync_to_async
def is_user_authenticated(user: DiscordUser) -> bool:
return user.is_authenticated
class GuildsView(View):
"""
Fetches the related guilds to the currently authenticated user from Discord.
Will return a filtered list of the results, excluding servers where the
user isn't an administrator or owner.
Valid servers will also be stored in the database for future reference,
along-side the user-server relationship as a member.
"""
async def get(self, request, *args, **kwargs):
if not await is_user_authenticated(request.user):
return redirect("/oauth2/login")
access_token = await get_user_access_token(request.user)
guilds_data, status = await self._get_guilds_data(access_token)
# Send back the error data if status is bad
if status != 200:
return JsonResponse(guilds_data, status=status, safe=False)
cleaned_guilds_data = await self._clean_guilds_data(request.user, guilds_data)
return JsonResponse(cleaned_guilds_data, safe=False)
async def _get_guilds_data(self, access_token: str) -> tuple[list[dict], int]:
"""
Returns the raw guild data and a response status code
from the Discord API.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
url=f"{settings.DISCORD_API_URL}/users/@me/guilds",
headers={"Authorization": f"Bearer {access_token}"}
)
return response.json(), response.status_code
async def _clean_guilds_data(self, user: DiscordUser, guilds_data: list[dict]) -> list[dict]:
"""
Returns a filtered copy of the given `guilds_data`, without the guilds
where the given `user` is not an administrator or owner of.
Also, for each guild, creates/updates an object to represent it, and
another to represent the user/guild relationship as a member.
"""
cleaned_guilds_data = []
for item in guilds_data:
cleaned_data = await self._setup_server(user, item)
if cleaned_data:
cleaned_guilds_data.append(cleaned_data)
return cleaned_guilds_data
async def _setup_server(self, user: DiscordUser, item: dict) -> dict[str] | None:
"""
Create or update a server and user's membership to said server.
Returns the cleaned server data, or NoneType if the user isn't
an admin or owner.
"""
# Collect some commonly used server data
server_id = item["id"]
is_owner = item["owner"]
permissions = item["permissions"]
admin_perm = 1 << 3
# Skip servers where the user isn't an administrator or owner.
# If an older member object exists, delete that too.
if not ((int(permissions) & admin_perm) == admin_perm or is_owner):
await self._try_delete_member(user, server_id)
return
# Create or update an existing server matching the given ID
server, created = await Server.objects.aupdate_or_create(
id=server_id,
defaults={
"name": item["name"],
"icon_hash": item["icon"]
}
)
# Create or update a member object linking the user to the server
await ServerMember.objects.aupdate_or_create(
user=user,
server=server,
defaults={
"permissions": permissions,
"is_owner": is_owner
}
)
return ServerSerializer(server).data
@staticmethod
async def _try_delete_member(user: DiscordUser, server_id: int):
"""
Attempt to delete any existing server member linked to the given
server id.
"""
try:
member = await ServerMember.objects.aget(user=user, server_id=server_id)
await member.adelete()
except ServerMember.DoesNotExist:
pass
class ChannelsView(View):
async def get(self, request, *args, **kwargs):
if not await is_user_authenticated(request.user):
return redirect("/oauth2/login")
guild_id = request.GET.get("guild")
try: server = await Server.objects.aget(pk=guild_id)
except Server.DoesNotExist: return HttpResponseNotFound("Server not found.")
if not await ServerMember.objects.filter(server=server, user=request.user).aexists():
return HttpResponseNotAllowed("You aren't a member of this server.")
channels_data, status = await self._get_channel_data(guild_id)
# Not authorized means the bot isn't operational, we need to save that
if status == 403:
server.is_bot_operational = False
await server.asave()
# Send back the error data if status is bad
if status != 200:
return JsonResponse(channels_data, status=status, safe=False)
# Because the status is 200, we know the Bot is functional, set that param
server.is_bot_operational = True
await server.asave()
cleaned_channels_data = await self._clean_channels_data(server, channels_data)
await self._cleanup_dead_channels(server, cleaned_channels_data)
return JsonResponse(cleaned_channels_data, safe=False)
async def _get_channel_data(self, guild_id: int) -> tuple[list[dict], int]:
"""
Returns the raw channel data and a response status code
from the Discord API.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
url=f"{settings.DISCORD_API_URL}/guilds/{guild_id}/channels",
headers={"Authorization": f"Bot {settings.BOT_TOKEN}"}
)
return response.json(), response.status_code
async def _clean_channels_data(self, server: Server, channels_data) -> list[dict]:
"""
Returns a sorted & cleaned list of channel data, also performs a
database setup for each channel to be stored.
"""
cleaned_channels_data = []
for item in channels_data:
cleaned_data = await self._setup_channel(server, item)
if cleaned_data:
cleaned_channels_data.append(cleaned_data)
cleaned_channels_data.sort(key=lambda ch: ch.get("position"))
return cleaned_channels_data
async def _setup_channel(self, server: Server, data: dict) -> dict:
"""
Create or update an instance of DiscordChannel representing the given
`data` dictionary, returns the data or `NoneType` if `data['type'] != 0`.
"""
# Type 0 = TextChannel, the only one we want
if data.get("type") != 0:
return
await DiscordChannel.objects.aupdate_or_create(
id=data["id"],
server=server,
defaults={
"name": data["name"],
"is_nsfw": data["nsfw"]
}
)
return data
async def _cleanup_dead_channels(self, server: Server, channel_data: list[dict]):
"""
Deletes any unused instances of `DiscordChannel` against the given server.
"""
channel_ids = [item["id"] for item in channel_data]
count, _ = await DiscordChannel.objects.filter(server=server).exclude(id__in=channel_ids).adelete()
log.info("Deleted %s dead DiscordChannel object(s)", count)