Greatly improved logging

now using a config file for logging to the terminal and log file

---

Also added custom exception for a missing token.
This commit is contained in:
Corban-Lee Jones 2024-02-26 18:08:01 +00:00
parent 9a2da70e38
commit a9ccf0f791
6 changed files with 83 additions and 118 deletions

4
.gitignore vendored
View File

@ -1,6 +1,10 @@
# Databases
db.sqlite
# Logging
*.log
*.log.*
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/

0
logs/.gitkeep Normal file
View File

48
logs/config.json Normal file
View File

@ -0,0 +1,48 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)s %(message)s"
},
"detail": {
"format": "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
},
"complex": {
"format": "[%(levelname)s|%(module)s|L%(lineno)d] %(asctime)s %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "complex",
"filename": "logs/pyrss.log",
"maxBytes": 1048576,
"backupCount": 3
},
"queue_handler": {
"class": "logging.handlers.QueueHandler",
"handlers": [
"stdout",
"file"
],
"respect_handler_level": true
}
},
"loggers": {
"root": {
"level": "DEBUG",
"handlers": [
"queue_handler"
]
}
}
}

View File

@ -3,3 +3,9 @@ class IllegalFeed(Exception):
def __init__(self, message: str, **items):
super().__init__(message)
self.items = items
class TokenMissingError(Exception):
"""
Exception to indicate a token couldnt be found.
"""

View File

@ -1,102 +0,0 @@
"""
Handle async logging for the project.
"""
import sys
import queue
import logging
from logging.handlers import QueueHandler, QueueListener
from datetime import datetime, timedelta
from itertools import count
from typing import TextIO
from pathlib import Path
from os import getenv
LOG_FILENAME_FORMAT_PREFIX = getenv("LOG_FILENAME_FORMAT_PREFIX")
MAX_LOGFILE_AGE_DAYS = int(getenv("MAX_LOGFILE_AGE_DAYS"))
log = logging.getLogger(__name__)
class LogSetup:
def __init__(self, logs_dir: Path):
self.logs_dir = logs_dir
def _open_file(self) -> TextIO:
"""
Returns a file object for the current log file.
"""
# Create the logs directory if it doesnt exist
self.logs_dir.mkdir(exist_ok=True)
# Create a generator to generate a unique filename
timestamp = datetime.now().strftime(LOG_FILENAME_FORMAT_PREFIX)
filenames = (f'{timestamp}.log' if i == 0 else f'{timestamp}_({i}).log' for i in count())
# Find a filename that doesn't already exist and return it
for filename in filenames:
try:
return (self.logs_dir / filename).open("x", encoding="utf-8")
except FileExistsError:
continue
def _delete_old_logs(self):
"""
Search through the logs directory and delete any expired log files.
"""
for path in self.logs_dir.glob('*.log'):
prefix = path.stem.split('_')[0]
try:
log_date = datetime.strptime(prefix, LOG_FILENAME_FORMAT_PREFIX)
except ValueError:
log.warning(f'{path.parent} contains a problematic filename: {path.name}')
continue
age = datetime.now() - log_date
if age >= timedelta(days=MAX_LOGFILE_AGE_DAYS):
log.info(f'Removing expired log file: {path.name}')
path.unlink()
@staticmethod
def update_log_levels(logger_names:tuple[str], level:int):
"""
Quick way to update the log level of multiple loggers at once.
"""
for name in logger_names:
logger=logging.getLogger(name)
logger.setLevel(level)
def setup_logs(self, log_level:int=logging.DEBUG) -> str:
"""
Setup a logging queue handler and queue listener.
Also creates a new log file for the current session and deletes old log files.
"""
# Create a queue to pass log records to the listener
log_queue = queue.Queue()
queue_handler = QueueHandler(log_queue)
# Configure the root logger to use the queue
logging.basicConfig(
level=log_level,
handlers=(queue_handler,),
format='[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s'
)
# Create a new log file
file = self._open_file()
file_handler = logging.StreamHandler(file) # Stream logs to the log file
sys_handler = logging.StreamHandler(sys.stdout) # Stream logs to the console
# Create a listener to handle the queue
queue_listener = QueueListener(log_queue, file_handler, sys_handler)
queue_listener.start()
# Clear up old log files
self._delete_old_logs()
return file.name

View File

@ -3,8 +3,11 @@ Entry point for the application.
Run this file to get started.
"""
import logging
import json
import atexit
import asyncio
import logging
import logging.config
from os import getenv
from pathlib import Path
@ -14,38 +17,44 @@ from dotenv import load_dotenv
load_dotenv(override=True)
from bot import DiscordBot
from logs import LogSetup
from errors import TokenMissingError
BASE_DIR = Path(__file__).resolve().parent.parent
async def main():
"""
Entry point function for the application.
point function for the application.
Run this function to get started.
"""
# Grab the token before anything else, because if there is no token
# available then the bot cannot be started anyways.
# Discord Bot token
bot_token = getenv("BOT_TOKEN")
if not bot_token:
raise ValueError("Bot Token is empty")
raise TokenMissingError("'BOT_TOKEN' environment variable cannot be missing or blank.")
# ^ same story for the API token. Without it the API cannot be
# interacted with, so grab it first.
# Web API token
api_token = getenv("API_TOKEN")
if not api_token:
raise ValueError("API Token is empty")
raise TokenMissingError("'API_TOKEN' environment variable cannot be missing or blank.")
# Effectively debug mode, defaults to True
developing = getenv("DEVELOPING", "False") == "True"
# Setup logging settings and mute spammy loggers
logsetup = LogSetup(BASE_DIR / "logs/")
logsetup.setup_logs(logging.DEBUG if developing else logging.INFO)
logsetup.update_log_levels(
("discord", "PIL", "urllib3", "aiosqlite", "charset_normalizer"),
level=logging.WARNING
)
# Logging setup
log_config_path = BASE_DIR / "logs" / "config.json"
if not log_config_path.exists():
raise FileNotFoundError(log_config_path)
with open(log_config_path, "r", encoding="utf-8") as file:
log_config = json.load(file)
logging.config.dictConfig(log_config)
# start the logging queue handler thread
queue_handler = logging.getHandlerByName("queue_handler")
if queue_handler is not None:
queue_handler.listener.start()
atexit.register(queue_handler.listener.stop)
async with DiscordBot(BASE_DIR, developing=developing, api_token=api_token) as bot:
await bot.load_extensions()