Testing basic feeds

This commit is contained in:
Corban-Lee Jones 2023-12-13 19:31:28 +00:00
parent cec1db209b
commit 3971007780
7 changed files with 215 additions and 16 deletions

View File

@ -1,14 +1,22 @@
aiohttp==3.9.1
aiosignal==1.3.1
aiosqlite==0.19.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.1.0
beautifulsoup4==4.12.2
discord.py==2.3.2
feedparser==6.0.11
frozenlist==1.4.0
greenlet==3.0.2
idna==3.6
markdownify==0.11.6
multidict==6.0.4
psycopg2==2.9.9
psycopg2-binary==2.9.9
python-dotenv==1.0.0
sgmllib3k==1.0.0
six==1.16.0
soupsieve==2.5
SQLAlchemy==2.0.23
typing_extensions==4.9.0
yarl==1.9.4

View File

@ -41,4 +41,11 @@ class DiscordBot(commands.Bot):
for path in (self.BASE_DIR / "src/extensions").iterdir():
if path.suffix == ".py":
await self.load_extension(f"extensions.{path.stem}")
await self.load_extension(f"extensions.{path.stem}")
async def audit(self, message: str, user_id: int):
async with DatabaseManager() as database:
message = f"Requesting latest article"
query = insert(AuditModel).values(discord_user_id=user_id, message=message)
await database.session.execute(query)

View File

@ -23,11 +23,12 @@ class DatabaseManager:
Asynchronous database context manager.
"""
def __init__(self):
def __init__(self, no_commit: bool = False):
database_url = self.get_database_url()
self.engine = create_async_engine(database_url, future=True)
self.session_maker = sessionmaker(self.engine, class_=AsyncSession)
self.session = None
self.no_commit = no_commit
@staticmethod
def get_database_url(use_async=True):
@ -35,13 +36,31 @@ class DatabaseManager:
Returns a connection string for the database.
"""
if DB_TYPE not in ("sqlite", "mariadb", "mysql", "postgresql"):
raise ValueError(f"Unknown Database Type: {DB_TYPE}")
# TODO finish support for mysql, mariadb, etc
is_sqlite = DB_TYPE == "sqlite"
url = f"{DB_TYPE}://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}"
url_addon = ""
# This looks fucking ugly
#
match use_async, DB_TYPE:
case True, "sqlite":
url_addon = "aiosqlite"
case True, "postgresql":
url_addon = "asyncpg"
case False, "sqlite":
pass
case False, "postgresql":
pass
case _, _:
raise ValueError(f"Unknown Database Type: {DB_TYPE}")
url = url.replace(":/", f"+{url_addon}:/") if url_addon else url
url = f"sqlite:///{DB_HOST}" if is_sqlite else f"{DB_TYPE}://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}"
url = url.replace(":/", "+aiosqlite:/" if is_sqlite else "+asyncpg:/") if use_async else url
return url
@ -52,7 +71,9 @@ class DatabaseManager:
return self
async def __aexit__(self, *_):
await self.session.commit()
if not self.no_commit:
await self.session.commit()
await self.session.close()
self.session = None
await self.engine.dispose()

View File

@ -5,11 +5,14 @@ Loading this file via `commands.Bot.load_extension` will add the `test` cog to t
import logging
from discord import app_commands, Interaction
import textwrap
from markdownify import markdownify
from discord import app_commands, Interaction, Embed
from discord.ext import commands, tasks
from sqlalchemy import insert, select
from db import DatabaseManager, AuditModel
from feed import Article, Source, Parser, Feeds, get_source
log = logging.getLogger(__name__)
@ -28,15 +31,55 @@ class Test(commands.Cog):
async def on_ready(self):
log.info(f"{self.__class__.__name__} cog is ready")
@app_commands.command(name="test-command")
@app_commands.command(name="test-bbc")
async def test_command(self, inter: Interaction):
async with DatabaseManager() as database:
message = f"Test command has been invoked successfully!"
query = insert(AuditModel).values(discord_user_id=inter.user.id, message=message)
await database.session.execute(query)
await inter.response.defer()
await self.bot.audit("Requesting latest article.", inter.user_id)
await inter.response.send_message("the audit log test was successful")
source = get_source(Feeds.THE_UPPER_LIP)
article = source.get_latest_article()
md_description = markdownify(article.description)
article_description = textwrap.shorten(md_description, 4096)
embed = Embed(
title=article.title,
description=article_description,
url=article.url,
)
embed.set_author(
name=source.name,
url=source.url,
icon_url=source.icon_url
)
await inter.followup.send(embed=embed)
@app_commands.command(name="test-upperlip")
async def test_command(self, inter: Interaction):
await inter.response.defer()
await self.bot.audit("Requesting latest article.", inter.user_id)
source = get_source(Feeds.THE_UPPER_LIP)
article = source.get_latest_article()
md_description = markdownify(article.description)
article_description = textwrap.shorten(md_description, 4096)
embed = Embed(
title=article.title,
description=article_description,
url=article.url,
)
embed.set_author(
name=source.name,
url=source.url,
icon_url=source.icon_url
)
await inter.followup.send(embed=embed)
async def setup(bot):

2
src/feed/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from .parser import Article, Source, Parser
from .feed import Feeds, get_source

69
src/feed/feed.py Normal file
View File

@ -0,0 +1,69 @@
import json
from enum import Enum
from dataclasses import dataclass
from feedparser import FeedParserDict, parse
class Feeds(Enum):
THE_UPPER_LIP = "https://theupperlip.co.uk/rss"
THE_BABYLON_BEE= ""
@dataclass
class Source:
name: str
url: str
icon_url: str
feed: FeedParserDict
@classmethod
def from_parsed(cls, feed:FeedParserDict):
# print(json.dumps(feed, indent=8))
return cls(
name=feed.channel.title,
url=feed.channel.link,
icon_url=feed.feed.image.href,
feed=feed
)
def get_latest_article(self):
return Article.from_parsed(self.feed)
@dataclass
class Article:
title: str
description: str
url: str
thumbnail_url: str
@classmethod
def from_parsed(cls, feed:FeedParserDict):
entry = feed.entries[0]
return cls(
title=entry.title,
description=entry.description,
url=entry.link,
thumbnail_url=None
)
def get_source(feed: Feeds) -> Source:
"""
"""
parsed_feed = parse(feed.value)
return Source.from_parsed(parsed_feed)
def get_test():
parsed = parse(Feeds.THE_UPPER_LIP.value)
print(json.dumps(parsed, indent=4))
return parsed

49
src/feed/parser.py Normal file
View File

@ -0,0 +1,49 @@
import textwrap
from datetime import datetime
from dataclasses import dataclass
import feedparser
from .feed import Feeds
class Parser:
def __init__(self, feed:Feeds):
self.feed_url = feed.value
def get_latest(self):
result = feedparser.parse(self.feed_url)
entry = result.entries[0]
return Article(
title=entry.title,
description=entry.description,
content="", # textwrap.shorten(100, entry.content),
url=entry.link,
thumbnail_url=""
)
def get_source(self):
return Source()
@dataclass
class Article:
title: str
description: str
content: str
url: str
thumbnail_url: str
@dataclass
class Source:
name: str
description: str
url: str
icon_url: str
last_updated: datetime