116 lines
3.4 KiB
Python
116 lines
3.4 KiB
Python
"""
|
|
Extension for the `TaskCog`.
|
|
Loading this file via `commands.Bot.load_extension` will add `TaskCog` to the bot.
|
|
"""
|
|
|
|
import logging
|
|
import async_timeout
|
|
|
|
import aiohttp
|
|
from feedparser import parse
|
|
from sqlalchemy import insert, select, and_
|
|
from discord import Interaction, app_commands, TextChannel
|
|
from discord.ext import commands, tasks
|
|
from discord.errors import Forbidden
|
|
|
|
from feed import Source, Article, get_unparsed_feed
|
|
from db import DatabaseManager, FeedChannelModel, RssSourceModel, SentArticleModel
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskCog(commands.Cog):
|
|
"""
|
|
Tasks cog.
|
|
"""
|
|
|
|
def __init__(self, bot):
|
|
super().__init__()
|
|
self.bot = bot
|
|
|
|
@commands.Cog.listener()
|
|
async def on_ready(self):
|
|
"""Instructions to call when the cog is ready."""
|
|
|
|
self.rss_task.start() # pylint disable=E1101
|
|
|
|
log.info("%s cog is ready", self.__class__.__name__)
|
|
|
|
# @app_commands.command(name="test-trigger-task")
|
|
async def test_trigger_task(self, inter: Interaction):
|
|
await inter.response.defer()
|
|
await self.rss_task()
|
|
await inter.followup.send("done")
|
|
|
|
@tasks.loop(minutes=10)
|
|
async def rss_task(self):
|
|
log.info("Running rss task")
|
|
|
|
async with DatabaseManager() as database:
|
|
query = select(FeedChannelModel, RssSourceModel).join(RssSourceModel)
|
|
result = await database.session.execute(query)
|
|
feeds = result.scalars().all()
|
|
|
|
for feed in feeds:
|
|
await self.process_feed(feed, database)
|
|
|
|
log.info("Finished rss task")
|
|
|
|
async def process_feed(self, feed: FeedChannelModel, database):
|
|
log.info("Processing feed: %s", feed.id)
|
|
channel = self.bot.get_channel(feed.discord_channel_id)
|
|
|
|
unparsed_content = await get_unparsed_feed(feed.rss_source.rss_url)
|
|
parsed_feed = parse(unparsed_content)
|
|
source = Source.from_parsed(parsed_feed)
|
|
articles = source.get_latest_articles(5)
|
|
|
|
if not articles:
|
|
log.info("No articles to process")
|
|
return
|
|
|
|
for article in articles:
|
|
await self.process_article(article, channel, database)
|
|
|
|
async def process_article(self, article: Article, channel: TextChannel, database):
|
|
log.info("Processing article: %s", article.url)
|
|
|
|
query = select(SentArticleModel).where(and_(
|
|
SentArticleModel.article_url == article.url,
|
|
SentArticleModel.discord_channel_id == channel.id
|
|
))
|
|
result = await database.session.execute(query)
|
|
|
|
if result.scalars().all():
|
|
log.info("Article already processed: %s", article.url)
|
|
return
|
|
|
|
embed = await article.to_embed()
|
|
try:
|
|
await channel.send(embed=embed)
|
|
except Forbidden:
|
|
log.error("Forbidden: %s · %s", channel.name, channel.id)
|
|
return
|
|
|
|
query = insert(SentArticleModel).values(
|
|
article_url = article.url,
|
|
discord_channel_id = channel.id,
|
|
discord_server_id = channel.guild.id,
|
|
discord_message_id = -1
|
|
)
|
|
await database.session.execute(query)
|
|
|
|
log.info("new Article processed: %s", article.url)
|
|
|
|
|
|
|
|
async def setup(bot):
|
|
"""
|
|
Setup function for this extension.
|
|
Adds `TaskCog` to the bot.
|
|
"""
|
|
|
|
cog = TaskCog(bot)
|
|
await bot.add_cog(cog)
|
|
log.info("Added %s cog", cog.__class__.__name__)
|