This commit is contained in:
Corban-Lee Jones 2023-12-11 01:19:22 +00:00
parent a61b227c9f
commit fd962a9e76
5 changed files with 241 additions and 1 deletions

View File

@ -1,3 +1,9 @@
# NewsBot
Bot delivering news articles to discord servers.
Bot delivering news articles to discord servers.
Plans
- Multiple news providers
- Choose how much of each provider should be delivered
- Check for duplicate articles between providers, and only deliver preferred provider article

44
src/bot.py Normal file
View File

@ -0,0 +1,44 @@
"""
The discord bot for the application.
"""
import logging
from pathlib import Path
from discord import Intents
from discord.ext import commands
log = logging.getLogger(__name__)
class DiscordBot(commands.Bot):
def __init__(self, BASE_DIR: Path):
super().__init__(command_prefix="-", intents=Intents.all())
self.BASE_DIR = BASE_DIR
async def sync_app_commands(self):
"""
Sync application commands between discord and the bot.
"""
await self.wait_until_ready()
await self.tree.sync()
log.info("Application commands successfully synced")
async def on_ready(self):
"""
Execute init operations that require the bot to be ready.
Ideally should not be manually called, this is handled by discord.py
"""
await self.sync_app_commands()
async def load_extensions(self):
"""
Load any extensions found in the extensions dictionary.
"""
for path in (self.BASE_DIR / "src/extensions").iterdir():
if path.suffix == ".py":
await self.load_extension(f"extensions.{path.stem}")

41
src/extensions/test.py Normal file
View File

@ -0,0 +1,41 @@
"""
Extension for the `test` cog.
Loading this file via `commands.Bot.load_extension` will add the `test` cog to the bot.
"""
import logging
from discord import app_commands, Interaction
from discord.ext import commands, tasks
log = logging.getLogger(__name__)
class Test(commands.Cog):
"""
News cog.
Delivers embeds of news articles to discord channels.
"""
def __init__(self, bot):
super().__init__()
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
log.info(f"{self.__class__.__name__} cog is ready")
@app_commands.command(name="test-command")
async def test_command(self, inter: Interaction):
await inter.response.send_message("test")
async def setup(bot):
"""
Setup function for this extension.
Adds the `ErrorCog` cog to the bot.
"""
cog = Test(bot)
await bot.add_cog(cog)
log.info(f"Added {cog.__class__.__name__} cog")

103
src/logs.py Normal file
View File

@ -0,0 +1,103 @@
"""
Handle async logging for the project.
"""
import sys
import queue
import logging
from logging.handlers import QueueHandler, QueueListener
from datetime import datetime, timedelta
from itertools import count
from typing import TextIO
from pathlib import Path
from os import getenv
LOG_FILENAME_FORMAT_PREFIX = getenv("LOG_FILENAME_FORMAT_PREFIX")
MAX_LOGFILE_AGE_DAYS = getenv("MAX_LOGFILE_AGE_DAYS")
log = logging.getLogger(__name__)
class LogSetup:
def __init__(self, BASE_DIR: Path):
self.BASE_DIR = BASE_DIR
self.LOGS_DIR = BASE_DIR / "logs/"
def _open_file(self) -> TextIO:
"""
Returns a file object for the current log file.
"""
# Create the logs directory if it doesnt exist
self.LOGS_DIR.mkdir(exist_ok=True)
# Create a generator to generate a unique filename
timestamp = datetime.now().strftime(LOG_FILENAME_FORMAT_PREFIX)
filenames = (f'{timestamp}.log' if i == 0 else f'{timestamp}_({i}).log' for i in count())
# Find a filename that doesn't already exist and return it
for filename in filenames:
try:
return (self.LOGS_DIR / filename).open("x", encoding="utf-8")
except FileExistsError:
continue
def _delete_old_logs(self):
"""
Search through the logs directory and delete any expired log files.
"""
for path in self.LOGS_DIR.glob('*.txt'):
prefix = path.stem.split('_')[0]
try:
log_date = datetime.strptime(prefix, LOG_FILENAME_FORMAT_PREFIX)
except ValueError:
log.warning(f'{path.parent} contains a problematic filename: {path.name}')
continue
age = datetime.now() - log_date
if age >= timedelta(days=MAX_LOGFILE_AGE_DAYS):
log.info(f'Removing expired log file: {path.name}')
path.unlink()
@staticmethod
def update_log_levels(logger_names:tuple[str], level:int):
"""
Quick way to update the log level of multiple loggers at once.
"""
for name in logger_names:
logger=logging.getLogger(name)
logger.setLevel(level)
def setup_logs(self, log_level:int=logging.DEBUG) -> str:
"""
Setup a logging queue handler and queue listener.
Also creates a new log file for the current session and deletes old log files.
"""
# Create a queue to pass log records to the listener
log_queue = queue.Queue()
queue_handler = QueueHandler(log_queue)
# Configure the root logger to use the queue
logging.basicConfig(
level=log_level,
handlers=(queue_handler,),
format='[%(asctime)s] [%(levelname)-8s] [%(name)-18s]: %(message)s'
)
# Create a new log file
file = self._open_file()
file_handler = logging.StreamHandler(file) # Stream logs to the log file
sys_handler = logging.StreamHandler(sys.stdout) # Stream logs to the console
# Create a listener to handle the queue
queue_listener = QueueListener(log_queue, file_handler, sys_handler)
queue_listener.start()
# Clear up old log files
self._delete_old_logs()
return file.name

46
src/main.py Normal file
View File

@ -0,0 +1,46 @@
"""
Entry point for the application.
Run this file to get started.
"""
import logging
import asyncio
from os import getenv
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
from bot import DiscordBot
from logs import LogSetup
BASE_DIR = Path(__file__).resolve().parent.parent
async def main():
"""
Entry point function for the application.
Run this function to get started.
"""
# Grab the token before anything else, because if there is no token
# available then the bot cannot be started anyways.
token = getenv("BOT_TOKEN")
if not token:
raise ValueError("Token is empty")
# Setup logging settings
logsetup = LogSetup(BASE_DIR)
logsetup.setup_logs()
logsetup.update_log_levels(
('discord', 'PIL', 'urllib3', 'aiosqlite', 'charset_normalizer'),
level=logging.WARNING
)
async with DiscordBot(BASE_DIR) as bot:
await bot.load_extensions()
await bot.start(token)
if __name__ == "__main__":
asyncio.run(main())