230 lines
7.6 KiB
Python

# -*- encoding: utf-8 -*-
import logging
from typing import Any
from operator import or_, and_
from functools import reduce
from django_filters import rest_framework as rest_filters
from django.db.models import Q, Count
from django.http import HttpRequest
from django.core.cache import cache
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.core.exceptions import ValidationError
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions, filters, generics
from rest_framework.pagination import PageNumberPagination
from rest_framework.authentication import SessionAuthentication, TokenAuthentication
from apps.home.models import Ticket, TicketPriority, TicketTag
from apps.authentication.models import Department, User
from .serializers import (
TicketSerializer, TicketTagSerializer, TicketPrioritySerializer,
UserSerializer
)
log = logging.getLogger(__name__)
def respond_error(error: Exception, status: Any) -> Response:
logging.error(error)
return Response({
"detail": str(error),
"error": error.__class__.__name__
}, status=status)
class TicketPaginiation(PageNumberPagination):
page_size = 25
page_size_query_param = "page_size"
max_page_size = 100
class TicketListApiView(generics.ListAPIView):
authentication_classes = [SessionAuthentication, TokenAuthentication]
permission_classes = [permissions.IsAuthenticated]
pagination_class = TicketPaginiation
serializer_class = TicketSerializer
queryset = Ticket.objects.all()
filter_backends = [filters.SearchFilter, rest_filters.DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ["priority", "tags", "author"]
search_fields = ["author__forename", "author__surname", "title", "description"]
ordering_fields = ["create_timestamp", "edit_timestamp"]
def get_queryset(self):
tag_uuids = self.request.query_params.getlist("tags", [])
queryset = self.queryset
log.debug("tag uuids %s", tag_uuids)
for uuid in tag_uuids:
if not uuid: continue
log.debug("uuid %s", uuid)
queryset = queryset.filter(tags__uuid__in=[uuid])
return queryset
# ALLOWED_FILTERS = (
# "uuid__in", "priority", "tags__in", "author__department", "search"
# )
# MATCHER_MAP = {
# "contains": lambda k, v: {k: v[0]},
# "in": lambda k, v: {k: v}
# }
# @method_decorator(cache_page(60 * 5))
# def get(self, request: HttpRequest) -> Response:
# """Returns a response containing tickets matching the given filters."""
# try:
# kwargs = self._parse_querystring(request)
# data = self._new_get_tickets(**kwargs)
# return Response(data, status=status.HTTP_200_OK)
# except KeyError as error:
# return respond_error(error, status.HTTP_400_BAD_REQUEST)
# except ValidationError as error:
# return respond_error(error, status.HTTP_400_BAD_REQUEST)
# def post(self, request):
# """Create a new ticket"""
# serializer = TicketSerializer(data={
# })
# if not serializer.is_valid():
# return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# serializer.save()
# return Response(serializer.data, status=status.HTTP_201_CREATED)
# def _parse_querystring(self, request: HttpRequest) -> dict:
# """"""
# def _new_get_tickets(self, **filters) -> dict:
# """"""
# def _get_tickets(self, request: HttpRequest) -> dict:
# """Returns a response containing tickets matching the given filters.
# Parameters
# ----------
# request : HttpRequest
# The request containing the filters.
# Returns
# -------
# Response
# A response containing the tickets data.
# Raises
# ------
# KeyError
# If any given filter key isn't in `self.ALLOWED_FILTERS`.
# ValidationError
# If any ValidationErrors are raised by Django while applying filters.
# """
# queryset = Ticket.objects.all()
# for key, values in request.GET.lists():
# key = key.removesuffix("[]")
# # format is not intended for this function
# if key == "format":
# continue
# if key not in self.ALLOWED_FILTERS:
# raise KeyError(key)
# if not key.endswith("__in"):
# values = values[0]
# # search filter is a user text input
# if key == "search":
# clean_value = values.split()
# log.debug("clean values %s", clean_value)
# or_filters = [
# Q(**{"title__contains": clean_value}),
# Q(**{"description__contains": clean_value}),
# Q(**{"author__forename__contains": clean_value}),
# Q(**{"author__surname__contains": clean_value})
# ]
# name_values = values.split(" ")
# if len(name_values) == 2:
# log.debug("looking for author %s %s", *name_values)
# users = User.objects.filter(Q(forename__contains=name_values[0]) | Q(surname__contains=name_values[1]))
# log.debug("found %s authors under that name", users.count())
# for user in users:
# log.debug("%s %s", user.fullname.lower(), values.lower())
# if user.fullname.lower() == values.lower():
# log.debug("adding author to filter")
# or_filters.append(Q(**{"author": user}))
# queryset = queryset.filter(reduce(or_, or_filters))
# continue
# if "all" in values:
# continue
# # if it doesn't end with "__in", assume we are looking for an exact match
# if not key.endswith("__in"):
# queryset = queryset.filter(Q(**{key: values}))
# continue
# # Chain the filters to create an AND query (Q() & Q() doesnt work?)
# for value in values:
# filter_kwargs = {key: [value]}
# queryset = queryset.filter(Q(**filter_kwargs))
# tickets = queryset.order_by("-edit_timestamp")
# serializer = TicketSerializer(tickets, many=True)
# log.debug("Query successful showing %s results", tickets.count())
# return serializer.data
class FilterCountApiView(APIView):
permission_classes = [permissions.IsAuthenticated]
@method_decorator(cache_page(60 * 5))
def get(self, request):
priorities = TicketPriority.objects.all()
tags = TicketTag.objects.all()
departments = Department.objects.all()
tickets = Ticket.objects.all()
data = {
"priority_counts": {},
"tag_counts": {},
"department_counts": {},
"ticket_count": tickets.count()
}
for priority in priorities:
data["priority_counts"][str(priority.uuid)] = tickets.filter(priority=priority).count()
for tag in tags:
data["tag_counts"][str(tag.uuid)] = tickets.filter(tags__in=[tag]).count()
for department in departments:
data["department_counts"][str(department.uuid)] = tickets.filter(author__department=department).count()
return Response(data, status=status.HTTP_200_OK)