""" Handles tasks related to in-game players, such as connect/disconnect alerts. """ import re import logging from os import getenv from pathlib import Path from datetime import datetime from discord import Colour, Interaction, Permissions, app_commands from discord.ext import commands, tasks from utils.reader import LogFileReader from utils.models import Player ZOMBOID_FOLDER_PATH = Path(getenv("SPIFFO__ZOMBOID_FOLDER_PATH")) LOGS_FOLDER_PATH = ZOMBOID_FOLDER_PATH / "Logs" TIMESTAMP_FORMAT = getenv("SPIFFO__ZOMBOID_LOG_TIMESTAMP_FORMAT", "%d-%m-%y %H:%M:%S.%f") log = logging.getLogger(__name__) class PlayersCog(commands.Cog): """ Handles tasks related to in-game players. """ file_handler: LogFileReader def __init__(self, bot: commands.Bot): self.bot = bot self.create_file_handler() self.listen_for_changes.start() cmd_group = app_commands.Group( name="players", description="Commands for the players cog.", default_permissions=Permissions.all(), guild_only=True ) @cmd_group.command(name="build-from-logs") async def build_from_logs(self, inter: Interaction): """ Build player data from existing and older log files. """ await inter.response.defer() self.listen_for_changes.stop() log.info("Building player data from logs.") # Delete the existing data, as we will reconstruct it. await Player.all().delete() for log_file in LOGS_FOLDER_PATH.glob("**/*_user.txt"): log.debug("building from log file: %s", str(log_file)) file_handler = LogFileReader(log_file, track_from_start=True) for line in await file_handler.read(): await self.process_log_line(line, alert=False) self.listen_for_changes.start() await inter.followup.send("Completed") def create_file_handler(self): user_log_file_path = next(LOGS_FOLDER_PATH.glob("*_user.txt"), None) self.file_handler = LogFileReader(user_log_file_path) @tasks.loop(seconds=3) async def listen_for_changes(self): """ Listen for changes in the user.txt log file, and process them. """ log.debug("listening for changes") try: for line in await self.file_handler.read(): await self.process_log_line(line) except FileNotFoundError: log.info("User log file not found, assuming it rotated to a new file.") self.create_file_handler() async def process_log_line(self, line: str, alert: bool = True): log.debug("processing log line") if "died" in line: await self.process_player_death(line, alert) elif "fully connected" in line: await self.process_connected_player(line, alert) elif "disconnected player" in line: await self.process_disconnected_player(line, alert) async def process_player_death(self, line: str, alert: bool = False): log.debug("processing player death") re_pattern = r"\[(?P[\d\- :\.]+)\] user (?P.+?) died at \((?P\d+),(?P\d+),(?P\d+)\) \((?P.+?)\)" re_match = re.search(re_pattern, line) if not re_match: log.warning("failed to parse player death log: %s", line) return username = re_match.group("username") player = await Player.get_or_none(username=username) if not player: log.warning("Player returned none, cannot add death: %s", username) return await player.add_death( coord_x=re_match.group("x"), coord_y=re_match.group("y"), coord_z=re_match.group("z"), cause=re_match.group("cause"), timestamp=datetime.strptime( re_match.group("timestamp"), TIMESTAMP_FORMAT ) ) await player.save() log.debug("successfully registered player death to %s", re_match.group("username")) if not alert: return channel = await self.bot.get_ingame_channel() embed = await player.get_embed() embed.title = "Player Has Died" embed.colour = Colour.dark_orange() await channel.send(embed=embed) async def process_connected_player(self, line: str, alert: bool = False): """ """ log.debug("processing connected player") re_pattern = r'\[(?P.*?)\] (?P\d+) "(?P.*?)" fully connected \((?P\d+),(?P\d+),(?P\d+)\)' re_match = re.search(re_pattern, line) if not re_match: log.warning("Failed to parse player data: %s", line) return player, created = await Player.get_or_create(username=re_match.group("username")) await player.open_session( timestamp=datetime.strptime(re_match.group("timestamp"), TIMESTAMP_FORMAT), coord_x=re_match.group("x"), coord_y=re_match.group("y"), coord_z=re_match.group("z") ) await player.update_outdated_steam_summary( re_match.group("steam_id"), self.bot.steam_api_key ) await player.save() # This connection method is called when the player respawns if player.is_dead: player.is_dead = False # player must be alive if fully connected await player.save() return if not alert: return channel = await self.bot.get_ingame_channel() embed = await player.get_embed() embed.title = "Player Has Connected" embed.colour = Colour.brand_green() await channel.send(embed=embed) async def process_disconnected_player(self, line: str, alert: bool = False): """ """ log.debug("processing disconnected player") re_pattern = r'\[(?P.*?)\] (?P\d+) "(?P.*?)" disconnected player \((?P\d+),(?P\d+),(?P\d+)\)' re_match = re.search(re_pattern, line) if not re_match: log.warning("Failed to parse player data: %s", line) return player, created = await Player.get_or_create(username=re_match.group("username")) await player.close_session( timestamp=datetime.strptime(re_match.group("timestamp"), TIMESTAMP_FORMAT), coord_x=re_match.group("x"), coord_y=re_match.group("y"), coord_z=re_match.group("z") ) await player.update_outdated_steam_summary( re_match.group("steam_id"), self.bot.steam_api_key ) await player.save() if player.is_dead or not alert: return channel = await self.bot.get_ingame_channel() embed = await player.get_embed() embed.title = "Player Has Disconnected" embed.colour = Colour.brand_red() await channel.send(embed=embed) async def setup(bot: commands.Bot): cog = PlayersCog(bot) await bot.add_cog(cog) log.info("Added %s cog", cog.__class__.__name__)