diff --git a/src/extensions/tasks.py b/src/extensions/tasks.py index 2b40d64..2434f13 100644 --- a/src/extensions/tasks.py +++ b/src/extensions/tasks.py @@ -68,6 +68,7 @@ class TaskCog(commands.Cog): Instructions to execute when the cog is ready. """ # self.subscription_task.start() + self.do_task.start() log.info("%s cog is ready", self.__class__.__name__) @commands.Cog.listener(name="cog_unload") @@ -97,6 +98,7 @@ class TaskCog(commands.Cog): end_time = perf_counter() await inter.followup.send(f"completed command in {end_time - start_time:.4f} seconds") + @tasks.loop(time=subscription_task_times) async def do_task(self): log.info("Running task") start_time = perf_counter() @@ -166,8 +168,39 @@ class TaskCog(commands.Cog): if not raw_rss_content: return - channels = await subscription.get_discord_channels(self.bot) contents = await models.Content.from_raw_rss(raw_rss_content, subscription, self.client) + + async def check_duplicate_content(content: models.Content): + params = { + "match_any": True, # allows any param to match, instead of needing all + "item_id": content.item_id, + "item_guid": content.item_guid, + "item_url": content.item_url, + "item_title": content.item_title, + "item_content_hash": content.item_content_hash, + "subscription": content.subscription_id + } + + try: + response = await self.client.get( + self.api_base_url + f"content/", + headers=self.api_headers, + params=params + ) + response.raise_for_status() + + if len(response.json().get("results", [])): + log.debug("found duplicate") + contents.remove(content) + except httpx.HTTPError as exc: + log.error(f"assuming not duplicate {exc}") + + # clear duplicate content + log.debug(f"checking for duplicates (count: {len(contents)})") + await do_batch_job(contents, check_duplicate_content, 15) + log.debug(f"finished looking for duplicates (count: {len(contents)})") + + channels = await subscription.get_discord_channels(self.bot) valid_contents, invalid_contents = subscription.filter_entries(contents) async def send_content(channel: discord.TextChannel): @@ -179,6 +212,12 @@ class TaskCog(commands.Cog): await do_batch_job(channels, send_content, 5) + combined = valid_contents.copy() + combined.extend(invalid_contents) + + tasks = [content.save(self.client, self.api_base_url, self.api_headers) for content in combined] + await asyncio.gather(*tasks) + # TODO: mark invalid contents as blocked end_time = perf_counter()