# -*- encoding: utf-8 -*- import logging import httpx from django.conf import settings from asgiref.sync import sync_to_async from django.shortcuts import redirect from django.http import JsonResponse, HttpResponseNotFound, HttpResponseNotAllowed from django.views.generic import TemplateView, View from apps.home.models import Server, DiscordChannel from apps.api.serializers import ServerSerializer from apps.authentication.models import DiscordUser, ServerMember log = logging.getLogger(__name__) class IndexView(TemplateView): """ View for the Index page. """ template_name = "home/index.html" @sync_to_async def get_user_access_token(user: DiscordUser) -> str: return user.access_token @sync_to_async def is_user_authenticated(user: DiscordUser) -> bool: return user.is_authenticated class GuildsView(View): """ Fetches the related guilds to the currently authenticated user from Discord. Will return a filtered list of the results, excluding servers where the user isn't an administrator or owner. Valid servers will also be stored in the database for future reference, along-side the user-server relationship as a member. """ async def get(self, request, *args, **kwargs): if not await is_user_authenticated(request.user): return redirect("/oauth2/login") access_token = await get_user_access_token(request.user) guilds_data, status = await self._get_guilds_data(access_token) # Send back the error data if status is bad if status != 200: return JsonResponse(guilds_data, status=status, safe=False) cleaned_guilds_data = await self._clean_guilds_data(request.user, guilds_data) return JsonResponse(cleaned_guilds_data, safe=False) async def _get_guilds_data(self, access_token: str) -> tuple[list[dict], int]: """ Returns the raw guild data and a response status code from the Discord API. """ async with httpx.AsyncClient() as client: response = await client.get( url=f"{settings.DISCORD_API_URL}/users/@me/guilds", headers={"Authorization": f"Bearer {access_token}"} ) return response.json(), response.status_code async def _clean_guilds_data(self, user: DiscordUser, guilds_data: list[dict]) -> list[dict]: """ Returns a filtered copy of the given `guilds_data`, without the guilds where the given `user` is not an administrator or owner of. Also, for each guild, creates/updates an object to represent it, and another to represent the user/guild relationship as a member. """ cleaned_guilds_data = [] for item in guilds_data: cleaned_data = await self._setup_server(user, item) if cleaned_data: cleaned_guilds_data.append(cleaned_data) return cleaned_guilds_data async def _setup_server(self, user: DiscordUser, item: dict) -> dict[str] | None: """ Create or update a server and user's membership to said server. Returns the cleaned server data, or NoneType if the user isn't an admin or owner. """ # Collect some commonly used server data server_id = item["id"] is_owner = item["owner"] permissions = item["permissions"] admin_perm = 1 << 3 # Skip servers where the user isn't an administrator or owner. # If an older member object exists, delete that too. if not ((int(permissions) & admin_perm) == admin_perm or is_owner): await self._try_delete_member(user, server_id) return # Create or update an existing server matching the given ID server, created = await Server.objects.aupdate_or_create( id=server_id, defaults={ "name": item["name"], "icon_hash": item["icon"] } ) # Create or update a member object linking the user to the server await ServerMember.objects.aupdate_or_create( user=user, server=server, defaults={ "permissions": permissions, "is_owner": is_owner } ) return ServerSerializer(server).data @staticmethod async def _try_delete_member(user: DiscordUser, server_id: int): """ Attempt to delete any existing server member linked to the given server id. """ try: member = await ServerMember.objects.aget(user=user, server_id=server_id) await member.adelete() except ServerMember.DoesNotExist: pass class ChannelsView(View): async def get(self, request, *args, **kwargs): if not await is_user_authenticated(request.user): return redirect("/oauth2/login") guild_id = request.GET.get("guild") try: server = await Server.objects.aget(pk=guild_id) except Server.DoesNotExist: return HttpResponseNotFound("Server not found.") if not await ServerMember.objects.filter(server=server, user=request.user).aexists(): return HttpResponseNotAllowed("You aren't a member of this server.") channels_data, status = await self._get_channel_data(guild_id) # Not authorized means the bot isn't operational, we need to save that if status == 403: server.is_bot_operational = False await server.asave() # Send back the error data if status is bad if status != 200: return JsonResponse(channels_data, status=status, safe=False) # Because the status is 200, we know the Bot is functional, set that param server.is_bot_operational = True await server.asave() cleaned_channels_data = await self._clean_channels_data(server, channels_data) await self._cleanup_dead_channels(server, cleaned_channels_data) return JsonResponse(cleaned_channels_data, safe=False) async def _get_channel_data(self, guild_id: int) -> tuple[list[dict], int]: """ Returns the raw channel data and a response status code from the Discord API. """ async with httpx.AsyncClient() as client: response = await client.get( url=f"{settings.DISCORD_API_URL}/guilds/{guild_id}/channels", headers={"Authorization": f"Bot {settings.BOT_TOKEN}"} ) return response.json(), response.status_code async def _clean_channels_data(self, server: Server, channels_data) -> list[dict]: """ Returns a sorted & cleaned list of channel data, also performs a database setup for each channel to be stored. """ cleaned_channels_data = [] for item in channels_data: cleaned_data = await self._setup_channel(server, item) if cleaned_data: cleaned_channels_data.append(cleaned_data) cleaned_channels_data.sort(key=lambda ch: ch.get("position")) return cleaned_channels_data async def _setup_channel(self, server: Server, data: dict) -> dict: """ Create or update an instance of DiscordChannel representing the given `data` dictionary, returns the data or `NoneType` if `data['type'] != 0`. """ # Type 0 = TextChannel, the only one we want if data.get("type") != 0: return await DiscordChannel.objects.aupdate_or_create( id=data["id"], server=server, defaults={ "name": data["name"], "is_nsfw": data["nsfw"] } ) return data async def _cleanup_dead_channels(self, server: Server, channel_data: list[dict]): """ Deletes any unused instances of `DiscordChannel` against the given server. """ channel_ids = [item["id"] for item in channel_data] count, _ = await DiscordChannel.objects.filter(server=server).exclude(id__in=channel_ids).adelete() log.info("Deleted %s dead DiscordChannel object(s)", count)