This commit is contained in:
Corban-Lee Jones 2024-01-30 13:54:38 +00:00
commit f2e10e6584
7 changed files with 84 additions and 30 deletions

11
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,11 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
"./src",
"-p",
"test*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}

View File

@ -1,17 +1,7 @@
# NewsBot
# PYRSS
Bot delivering news articles to discord servers.
An RSS driven Discord bot written in Python.
Plans
Provides user commands for storing RSS feed URLs that can be assigned to any given discord channel.
- Multiple news providers
- Choose how much of each provider should be delivered
- Check for duplicate articles between providers, and only deliver preferred provider article
## Dev Notes:
For the sake of development, the following defintions apply:
- Feed - An RSS feed stored within the database, submitted by a user.
- Assigned Feed - A discord channel set to receive content from a Feed.
Content is shared every 10 minutes as an Embed.

View File

@ -17,11 +17,14 @@ log = logging.getLogger(__name__)
class DiscordBot(commands.Bot):
def __init__(self, BASE_DIR: Path):
activity = Game("Indev")
def __init__(self, BASE_DIR: Path, developing: bool):
activity = Game("Indev") if developing else None
super().__init__(command_prefix="-", intents=Intents.all(), activity=activity)
self.functions = Functions(self)
self.BASE_DIR = BASE_DIR
self.developing = developing
log.info("developing=%s", developing)
async def sync_app_commands(self):
"""

View File

@ -48,13 +48,20 @@ class TaskCog(commands.Cog):
@commands.Cog.listener()
async def on_ready(self):
"""Instructions to call when the cog is ready."""
"""Instructions to execute when the cog is ready."""
self.rss_task.start()
if not self.bot.developing:
self.rss_task.start()
log.info("%s cog is ready", self.__class__.__name__)
@tasks.loop(time=times)
@commands.Cog.listener(name="cog_unload")
async def on_unload(self):
"""Instructions to execute before the cog is unloaded."""
self.rss_task.cancel()
@tasks.loop(minutes=10)
async def rss_task(self):
"""Automated task responsible for processing rss feeds."""
@ -86,7 +93,9 @@ class TaskCog(commands.Cog):
channel = self.bot.get_channel(feed.discord_channel_id)
unparsed_content = await get_unparsed_feed(feed.rss_source.rss_url)
# TODO: integrate the `validate_feed` code into here, also do on list command and show errors.
unparsed_content = await self.bot.functions.get_unparsed_feed(feed.rss_source.rss_url)
parsed_feed = parse(unparsed_content)
source = Source.from_parsed(parsed_feed)
articles = source.get_latest_articles(5)

View File

@ -84,7 +84,7 @@ class LogSetup:
logging.basicConfig(
level=log_level,
handlers=(queue_handler,),
format='[%(asctime)s] [%(levelname)-8s] [%(name)-18s]: %(message)s'
format='[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s'
)
# Create a new log file

View File

@ -31,15 +31,17 @@ async def main():
if not token:
raise ValueError("Token is empty")
developing = bool(getenv("DEVELOPING"))
# Setup logging settings and mute spammy loggers
logsetup = LogSetup(BASE_DIR)
logsetup.setup_logs(logging.DEBUG)
logsetup.setup_logs(logging.DEBUG if developing else logging.INFO)
logsetup.update_log_levels(
('discord', 'PIL', 'urllib3', 'aiosqlite', 'charset_normalizer'),
level=logging.WARNING
)
async with DiscordBot(BASE_DIR) as bot:
async with DiscordBot(BASE_DIR, developing=developing) as bot:
await bot.load_extensions()
await bot.start(token, reconnect=True)

View File

@ -1,15 +1,54 @@
import unittest
from sqlalchemy import select
from sqlalchemy.engine.cursor import CursorResult
from sqlalchemy.engine.result import ChunkedIteratorResult
from db import DatabaseManager, FeedChannelModel, AuditModel
class TestDatabaseConnections(unittest.IsolatedAsyncioTestCase):
"""The purpose of this test, is to ensure that the database connections function properly."""
def test_article_embed():
assert True, ""
async def test_select__feed_channel_model(self):
"""This test runs a select query on the `FeedChannelModel`"""
async with DatabaseManager() as database:
query = select(FeedChannelModel).limit(1000)
result = await database.session.execute(query)
self.assertIsInstance(
result,
ChunkedIteratorResult,
f"Result should be `ChunkedIteratorResult`, not {type(result)!r}"
)
def main():
# test article embed
test_article_embed()
async def test_select__rss_source_model(self):
"""This test runs a select query on the `RssSourceModel`"""
async with DatabaseManager() as database:
query = select(RssSourceModel).limit(1000)
result = await database.session.execute(query)
self.assertIsInstance(
result,
ChunkedIteratorResult,
f"Result should be `ChunkedIteratorResult`, not {type(result)!r}"
)
async def test_select__audit_model(self):
"""This test runs a select query on the `AuditModel`"""
async with DatabaseManager() as database:
query = select(AuditModel).limit(1000)
result = await database.session.execute(query)
self.assertIsInstance(
result,
ChunkedIteratorResult,
f"Result should be `ChunkedIteratorResult`, not {type(result)!r}"
)
if __name__ == "__main__":
main()
unittest.main()