166 lines
4.6 KiB
Python
166 lines
4.6 KiB
Python
# -*- encoding: utf-8 -*-
|
|
|
|
import logging
|
|
import requests
|
|
from datetime import timedelta
|
|
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from django.http import JsonResponse
|
|
from django.views.generic import View, TemplateView
|
|
from django.shortcuts import redirect
|
|
from django.contrib.auth import authenticate, login
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class DiscordLoginAction(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
return redirect(settings.DISCORD_OAUTH2_URL)
|
|
|
|
|
|
class DiscordLoginRedirect(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
"""
|
|
Endpoint function directed to once authorized from Discord.
|
|
This method will "login" the user.
|
|
"""
|
|
|
|
if request.GET.get("error"):
|
|
return redirect("auth:login")
|
|
|
|
code = request.GET.get("code")
|
|
exchange_data = self.exchange_code(code)
|
|
access_token = exchange_data["access_token"]
|
|
|
|
# Get raw user data from discord
|
|
raw_user_data = self.get_raw_user_data(access_token)
|
|
|
|
# Add the token and expires datetime to the user data
|
|
token_expire_datetime = timezone.now() + timedelta(seconds=exchange_data["expires_in"])
|
|
raw_user_data["token_expires"] = token_expire_datetime
|
|
raw_user_data["access_token"] = access_token
|
|
raw_user_data["refresh_token"] = exchange_data["refresh_token"]
|
|
|
|
# authenticate (creates user if not exists) and login
|
|
discord_user = authenticate(request, discord_user_data=raw_user_data)
|
|
login(request, discord_user)
|
|
|
|
return redirect("home:index")
|
|
|
|
def exchange_code(self, code: str) -> dict:
|
|
"""
|
|
Exchanges the given code for an access token.
|
|
A call is made to the Discord API.
|
|
"""
|
|
|
|
log.debug("exchanging code for access token")
|
|
|
|
request_data = settings.DISCORD_CODE_EXCHANGE_REQUEST
|
|
request_data["data"]["code"] = code
|
|
|
|
# Fetch the access token
|
|
response = requests.post(
|
|
url=f"{settings.DISCORD_API_URL}/oauth2/token",
|
|
data=request_data["data"],
|
|
headers=request_data["headers"]
|
|
)
|
|
|
|
return response.json()
|
|
|
|
def refresh_token(self, refresh_token: str) -> dict:
|
|
"""
|
|
Refresh the access token if expired, using the refresh
|
|
token. Returns a new access token, expire time and refresh
|
|
token.
|
|
"""
|
|
|
|
log.debug("refreshing access token")
|
|
|
|
request_data = settings.DISCORD_REFRESH_TOKEN_REQUEST
|
|
request_data["data"]["refresh_token"] = refresh_token
|
|
|
|
response = requests.post(
|
|
url=f"{settings.DISCORD_API_URL}/oauth2/token",
|
|
data=request_data["data"],
|
|
headers=request_data["headers"]
|
|
)
|
|
|
|
return response.json()
|
|
|
|
def get_raw_user_data(self, access_token: str):
|
|
"""
|
|
Fetches the raw user data using the given access token.
|
|
A call is made to the Discord API.
|
|
"""
|
|
|
|
log.debug("fetching raw user data")
|
|
|
|
response = requests.get(
|
|
url=f"{settings.DISCORD_API_URL}/users/@me",
|
|
headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
data = response.json()
|
|
|
|
# Assign the default avatar
|
|
if not data.get("avatar"):
|
|
data["avatar"] = int(data["id"]) % 5
|
|
|
|
return data
|
|
|
|
|
|
class Login(TemplateView):
|
|
|
|
template_name = "accounts/login.html"
|
|
|
|
|
|
class GuildsView(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
|
|
response = requests.get(
|
|
url=f"{settings.DISCORD_API_URL}/users/@me/guilds",
|
|
headers={"Authorization": f"Bearer {request.user.access_token}"}
|
|
)
|
|
|
|
content = response.json()
|
|
status = response.status_code
|
|
|
|
if status != 200:
|
|
log.warning("Bad status code getting guilds: %s", status)
|
|
return JsonResponse(content, safe=False, status=status)
|
|
|
|
valid_guilds = [guild for guild in response.json() if self._has_permissions(guild)]
|
|
|
|
return JsonResponse(valid_guilds, safe=False, status=status)
|
|
|
|
def _has_permissions(self, guild):
|
|
|
|
permissions = guild["permissions"]
|
|
is_owner = guild["owner"]
|
|
|
|
return (int(permissions) & 1 << 3) == 1 << 3 or is_owner
|
|
|
|
|
|
class GuildChannelsView(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
log.debug("fetching channels from guid")
|
|
|
|
guild_id = request.GET.get("guild")
|
|
response = requests.get(
|
|
url=f"{settings.DISCORD_API_URL}/guilds/{guild_id}/channels",
|
|
headers={"Authorization": f"Bot {settings.BOT_TOKEN}"}
|
|
)
|
|
|
|
return JsonResponse(response.json(), safe=False)
|
|
|
|
|
|
class SaveGuildView(View):
|
|
|
|
pass
|