From 7e6dcb006f186a098355dbe16259d5b57bc52f88 Mon Sep 17 00:00:00 2001 From: Interitio Date: Sun, 18 Aug 2024 21:51:38 +1000 Subject: [PATCH] feature: Add streamalerts cog. --- data/schema.sql | 51 +++ src/bot.py | 2 +- src/modules/__init__.py | 1 + src/modules/streamalerts/__init__.py | 8 + src/modules/streamalerts/cog.py | 609 +++++++++++++++++++++++++++ src/modules/streamalerts/data.py | 105 +++++ src/modules/streamalerts/editor.py | 369 ++++++++++++++++ src/modules/streamalerts/settings.py | 264 ++++++++++++ 8 files changed, 1408 insertions(+), 1 deletion(-) create mode 100644 src/modules/streamalerts/__init__.py create mode 100644 src/modules/streamalerts/cog.py create mode 100644 src/modules/streamalerts/data.py create mode 100644 src/modules/streamalerts/editor.py create mode 100644 src/modules/streamalerts/settings.py diff --git a/data/schema.sql b/data/schema.sql index dbd057fc..03b7ef59 100644 --- a/data/schema.sql +++ b/data/schema.sql @@ -1355,6 +1355,57 @@ CREATE TABLE premium_guild_contributions( -- }}} +-- Stream Alerts {{{ + +-- DROP TABLE IF EXISTS stream_alerts; +-- DROP TABLE IF EXISTS streams; +-- DROP TABLE IF EXISTS alert_channels; +-- DROP TABLE IF EXISTS streamers; + +CREATE TABLE streamers( + userid BIGINT PRIMARY KEY, + login_name TEXT NOT NULL, + display_name TEXT NOT NULL +); + +CREATE TABLE alert_channels( + subscriptionid SERIAL PRIMARY KEY, + guildid BIGINT NOT NULL, + channelid BIGINT NOT NULL, + streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE, + created_by BIGINT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + paused BOOLEAN NOT NULL DEFAULT FALSE, + end_delete BOOLEAN NOT NULL DEFAULT FALSE, + live_message TEXT, + end_message TEXT +); +CREATE INDEX alert_channels_guilds ON alert_channels (guildid); +CREATE UNIQUE INDEX alert_channels_channelid_streamerid ON alert_channels (channelid, streamerid); + +CREATE TABLE streams( + streamid SERIAL PRIMARY KEY, + streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE, + start_at TIMESTAMPTZ NOT NULL, + twitch_stream_id BIGINT, + game_name TEXT, + title TEXT, + end_at TIMESTAMPTZ +); + +CREATE TABLE stream_alerts( + alertid SERIAL PRIMARY KEY, + streamid INTEGER NOT NULL REFERENCES streams (streamid) ON DELETE CASCADE, + subscriptionid INTEGER NOT NULL REFERENCES alert_channels (subscriptionid) ON DELETE CASCADE, + sent_at TIMESTAMPTZ NOT NULL, + messageid BIGINT NOT NULL, + resolved_at TIMESTAMPTZ +); + + +-- }}} + + -- Analytics Data {{{ CREATE SCHEMA "analytics"; diff --git a/src/bot.py b/src/bot.py index d643fa73..6170fa09 100644 --- a/src/bot.py +++ b/src/bot.py @@ -70,7 +70,7 @@ async def main(): async with aiohttp.ClientSession() as session: async with LionBot( - command_prefix='!leo!', + command_prefix='!', intents=intents, appname=appname, shardname=shardname, diff --git a/src/modules/__init__.py b/src/modules/__init__.py index 416fbac9..62696f1b 100644 --- a/src/modules/__init__.py +++ b/src/modules/__init__.py @@ -24,6 +24,7 @@ active = [ '.sponsors', '.topgg', '.premium', + '.streamalerts', '.test', ] diff --git a/src/modules/streamalerts/__init__.py b/src/modules/streamalerts/__init__.py new file mode 100644 index 00000000..4fd5c1b3 --- /dev/null +++ b/src/modules/streamalerts/__init__.py @@ -0,0 +1,8 @@ +import logging +from meta import LionBot + +logger = logging.getLogger(__name__) + +async def setup(bot: LionBot): + from .cog import AlertCog + await bot.add_cog(AlertCog(bot)) diff --git a/src/modules/streamalerts/cog.py b/src/modules/streamalerts/cog.py new file mode 100644 index 00000000..8762cd7a --- /dev/null +++ b/src/modules/streamalerts/cog.py @@ -0,0 +1,609 @@ +import asyncio +from typing import Optional + +import discord +from discord.ext import commands as cmds +from discord import app_commands as appcmds + +from twitchAPI.twitch import Twitch +from twitchAPI.helper import first + +from meta import LionBot, LionCog, LionContext +from meta.errors import UserInputError +from meta.logger import log_wrap +from utils.lib import utc_now +from data.conditions import NULL + +from . import logger +from .data import AlertsData +from .settings import AlertConfig, AlertSettings +from .editor import AlertEditorUI + + +class AlertCog(LionCog): + POLL_PERIOD = 60 + + def __init__(self, bot: LionBot): + self.bot = bot + self.data = bot.db.load_registry(AlertsData()) + self.twitch = None + self.alert_settings = AlertSettings() + + self.poll_task = None + self.event_tasks = set() + + # Cache of currently live streams, maps streamerid -> stream + self.live_streams = {} + + # Cache of streamers we are watching state changes for + # Map of streamerid -> streamer + self.watching = {} + + async def cog_load(self): + await self.data.init() + + await self.twitch_login() + await self.load_subs() + self.poll_task = asyncio.create_task(self.poll_live()) + + async def twitch_login(self): + # TODO: Probably abstract this out to core or a dedicated core cog + # Also handle refresh tokens + if self.twitch is not None: + await self.twitch.close() + self.twitch = None + + self.twitch = await Twitch( + self.bot.config.twitch['app_id'].strip(), + self.bot.config.twitch['app_secret'].strip() + ) + + async def load_subs(self): + # Load active subscriptions + active_subs = await self.data.AlertChannel.fetch_where() + to_watch = {sub.streamerid for sub in active_subs} + live_streams = await self.data.Stream.fetch_where( + self.data.Stream.end_at != NULL + ) + to_watch.union(stream.streamerid for stream in live_streams) + + # Load associated streamers + watching = {} + if to_watch: + streamers = await self.data.Streamer.fetch_where( + userid=list(to_watch) + ) + for streamer in streamers: + watching[streamer.userid] = streamer + + self.watching = watching + self.live_streams = {stream.streamerid: stream for stream in live_streams} + + logger.info( + f"Watching {len(watching)} streamers for state changes. " + f"Loaded {len(live_streams)} (previously) live streams into cache." + ) + + async def poll_live(self): + # Every PERIOD seconds, + # request get_streams for the streamers we are currently watching. + # Check if they are in the live_stream cache, + # and update cache and data and fire-and-forget start/stop events as required. + # TODO: Logging + # TODO: Error handling so the poll loop doesn't die from temporary errors + # And when it does die it gets logged properly. + if not self.twitch: + raise ValueError("Attempting to start alert poll-loop before twitch set.") + + block_i = 0 + + self.polling = True + while self.polling: + await asyncio.sleep(self.POLL_PERIOD) + + to_request = list(self.watching.keys()) + if not to_request: + continue + # Each loop we request the 'next' slice of 100 userids + blocks = [to_request[i:i+100] for i in range(0, len(to_request), 100)] + block_i += 1 + block_i %= len(blocks) + block = blocks[block_i] + + streaming = {} + async for stream in self.twitch.get_streams(user_id=block, first=100): + # Note we set page size to 100 + # So we should never get repeat or missed streams + # Since we can request a max of 100 userids anyway. + streaming[stream.user_id] = stream + + started = set(streaming.keys()).difference(self.live_streams.keys()) + ended = set(self.live_streams.keys()).difference(streaming.keys()) + + for streamerid in started: + stream = streaming[streamerid] + stream_data = await self.data.Stream.create( + streamerid=stream.user_id, + start_at=stream.started_at, + twitch_stream_id=stream.id, + game_name=stream.game_name, + title=stream.title, + ) + self.live_streams[streamerid] = stream_data + task = asyncio.create_task(self.on_stream_start(stream_data)) + self.event_tasks.add(task) + task.add_done_callback(self.event_tasks.discard) + + for streamerid in ended: + stream_data = self.live_streams.pop(streamerid) + await stream_data.update(end_at=utc_now()) + task = asyncio.create_task(self.on_stream_end(stream_data)) + self.event_tasks.add(task) + task.add_done_callback(self.event_tasks.discard) + + async def on_stream_start(self, stream_data): + # Get channel subscriptions listening for this streamer + uid = stream_data.streamerid + logger.info(f"Streamer started streaming! {stream_data=}") + subbed = await self.data.AlertChannel.fetch_where(streamerid=uid) + + # Fulfill those alerts + for sub in subbed: + try: + # If the sub is paused, don't create the alert + await self.sub_alert(sub, stream_data) + except discord.HTTPException: + # TODO: Needs to be handled more gracefully at user level + # Retry logic? + logger.warning( + f"Could not complete subscription {sub=} for {stream_data=}", exc_info=True + ) + except Exception: + logger.exception( + f"Unexpected exception completing {sub=} for {stream_data=}" + ) + raise + + async def subscription_error(self, subscription, stream_data, err_msg): + """ + Handle a subscription fulfill failure. + Stores the error message for user display, + and deletes the subscription after some number of errors. + # TODO + """ + logger.warning( + f"Subscription error {subscription=} {stream_data=} {err_msg=}" + ) + + async def sub_alert(self, subscription, stream_data): + # Base alert behaviour is just to send a message + # and create an alert row + + channel = self.bot.get_channel(subscription.channelid) + if channel is None or not isinstance(channel, discord.abc.Messageable): + # Subscription channel is gone! + # Or the Discord channel cache died + await self.subscription_error( + subscription, stream_data, + "Subscription channel no longer exists." + ) + return + permissions = channel.permissions_for(channel.guild.me) + if not (permissions.send_messages and permissions.embed_links): + await self.subscription_error( + subscription, stream_data, + "Insufficient permissions to post alert message." + ) + return + + # Build message + streamer = await self.data.Streamer.fetch(stream_data.streamerid) + if not streamer: + # Streamer was deleted while handling the alert + # Just quietly ignore + # Don't error out because the stream data row won't exist anymore + logger.warning( + f"Cancelling alert for subscription {subscription.subscriptionid}" + " because the streamer no longer exists." + ) + return + + alert_config = AlertConfig(subscription.subscriptionid, subscription) + paused = alert_config.get(self.alert_settings.AlertPaused.setting_id) + if paused.value: + logger.info(f"Skipping alert for subscription {subscription=} because it is paused.") + return + + live_message = alert_config.get(self.alert_settings.AlertMessage.setting_id) + + formatter = await live_message.generate_formatter(self.bot, stream_data, streamer) + formatted = await formatter(live_message.value) + args = live_message.value_to_args(subscription.subscriptionid, formatted) + + try: + message = await channel.send(**args.send_args) + except discord.HTTPException as e: + logger.warning( + f"Message send failure while sending streamalert {subscription.subscriptionid}", + exc_info=True + ) + await self.subscription_error( + subscription, stream_data, + "Failed to post live alert." + ) + return + + # Store sent alert + alert = await self.data.StreamAlert.create( + streamid=stream_data.streamid, + subscriptionid=subscription.subscriptionid, + sent_at=utc_now(), + messageid=message.id + ) + logger.debug( + f"Fulfilled subscription {subscription.subscriptionid} with alert {alert.alertid}" + ) + + async def on_stream_end(self, stream_data): + # Get channel subscriptions listening for this streamer + uid = stream_data.streamerid + logger.info(f"Streamer stopped streaming! {stream_data=}") + subbed = await self.data.AlertChannel.fetch_where(streamerid=uid) + + # Resolve subscriptions + for sub in subbed: + try: + await self.sub_resolve(sub, stream_data) + except discord.HTTPException: + # TODO: Needs to be handled more gracefully at user level + # Retry logic? + logger.warning( + f"Could not resolve subscription {sub=} for {stream_data=}", exc_info=True + ) + except Exception: + logger.exception( + f"Unexpected exception resolving {sub=} for {stream_data=}" + ) + raise + + async def sub_resolve(self, subscription, stream_data): + # Check if there is a current active alert to resolve + alerts = await self.data.StreamAlert.fetch_where( + streamid=stream_data.streamid, + subscriptionid=subscription.subscriptionid, + ) + if not alerts: + logger.info( + f"Resolution requested for subscription {subscription.subscriptionid} with stream {stream_data.streamid} " + "but no active alerts were found." + ) + return + alert = alerts[0] + if alert.resolved_at is not None: + # Alert was already resolved + # This is okay, Twitch might have just sent the stream ending twice + logger.info( + f"Resolution requested for subscription {subscription.subscriptionid} with stream {stream_data.streamid} " + "but alert was already resolved." + ) + return + + # Check if message is to be deleted or edited (or nothing) + alert_config = AlertConfig(subscription.subscriptionid, subscription) + del_setting = alert_config.get(self.alert_settings.AlertEndDelete.setting_id) + edit_setting = alert_config.get(self.alert_settings.AlertEndMessage.setting_id) + + if (delmsg := del_setting.value) or (edit_setting.value): + # Find the message + message = None + channel = self.bot.get_channel(subscription.channelid) + if channel: + try: + message = await channel.fetch_message(alert.messageid) + except discord.HTTPException: + # Message was probably deleted already + # Or permissions were changed + # Or Discord connection broke + pass + else: + # Channel went after posting the alert + # Or Discord cache sucks + # Nothing we can do, just mark it handled + pass + if message: + if delmsg: + # Delete the message + try: + await message.delete() + except discord.HTTPException: + logger.warning( + f"Discord exception while del-resolve live alert {alert=}", + exc_info=True + ) + else: + # Edit message with custom arguments + streamer = await self.data.Streamer.fetch(stream_data.streamerid) + formatter = await edit_setting.generate_formatter(self.bot, stream_data, streamer) + formatted = await formatter(edit_setting.value) + args = edit_setting.value_to_args(subscription.subscriptionid, formatted) + try: + await message.edit(**args.edit_args) + except discord.HTTPException: + logger.warning( + f"Discord exception while edit-resolve live alert {alert=}", + exc_info=True + ) + else: + # Explicitly don't need to do anything to the alert + pass + + # Save alert as resolved + await alert.update(resolved_at=utc_now()) + + async def cog_unload(self): + if self.poll_task is not None and not self.poll_task.cancelled(): + self.poll_task.cancel() + + if self.twitch is not None: + await self.twitch.close() + self.twitch = None + + # ----- Commands ----- + @cmds.hybrid_group( + name='streamalert', + description=( + "Create and configure stream live-alerts." + ) + ) + @cmds.guild_only() + @appcmds.default_permissions(manage_channels=True) + async def streamalert_group(self, ctx: LionContext): + # Placeholder group, method not used + raise NotImplementedError + + @streamalert_group.command( + name='create', + description=( + "Subscribe a Discord channel to notifications when a Twitch stream goes live." + ) + ) + @appcmds.describe( + streamer="Name of the twitch channel to watch.", + channel="Which Discord channel to send live alerts in.", + message="Custom message to send when the channel goes live (may be edited later)." + ) + @appcmds.default_permissions(manage_channels=True) + async def streamalert_create_cmd(self, ctx: LionContext, + streamer: str, + channel: discord.TextChannel, + message: Optional[str]): + # Type guards + assert ctx.guild is not None, "Guild-only command has no guild ctx." + assert self.twitch is not None, "Twitch command run with no twitch obj." + + # Wards + if not channel.permissions_for(ctx.author).manage_channels: + await ctx.error_reply( + "Sorry, you need the `MANAGE_CHANNELS` permission " + "to add a stream alert to a channel." + ) + return + + # Look up the specified streamer + tw_user = await first(self.twitch.get_users(logins=[streamer])) + if not tw_user: + await ctx.error_reply( + f"Sorry, could not find `{streamer}` on Twitch! " + "Make sure you use the name in their channel url." + ) + return + + # Create streamer data if it doesn't already exist + streamer_data = await self.data.Streamer.fetch_or_create( + tw_user.id, + login_name=tw_user.login, + display_name=tw_user.display_name, + ) + + # Add subscription to alerts list + sub_data = await self.data.AlertChannel.create( + streamerid=streamer_data.userid, + guildid=channel.guild.id, + channelid=channel.id, + created_by=ctx.author.id, + paused=False + ) + + # Add to watchlist + self.watching[streamer_data.userid] = streamer_data + + # Open AlertEditorUI for the new subscription + # TODO + await ctx.reply("StreamAlert Created.") + + async def alert_acmpl(self, interaction: discord.Interaction, partial: str): + if not interaction.guild: + raise ValueError("Cannot acmpl alert in guildless interaction.") + + # Get all alerts in the server + alerts = await self.data.AlertChannel.fetch_where(guildid=interaction.guild_id) + + if not alerts: + # No alerts available + options = [ + appcmds.Choice( + name="No stream alerts are set up in this server!", + value=partial + ) + ] + else: + options = [] + for alert in alerts: + streamer = await self.data.Streamer.fetch(alert.streamerid) + if streamer is None: + # Should be impossible by foreign key condition + # Might be a stale cache + continue + channel = interaction.guild.get_channel(alert.channelid) + display = f"{streamer.display_name} in #{channel.name if channel else 'unknown'}" + if partial.lower() in display.lower(): + # Matching option + options.append(appcmds.Choice(name=display, value=str(alert.subscriptionid))) + if not options: + options.append( + appcmds.Choice( + name=f"No stream alerts matching {partial}"[:25], + value=partial + ) + ) + return options + + async def resolve_alert(self, interaction: discord.Interaction, alert_str: str): + if not interaction.guild: + raise ValueError("Resolving alert outside of a guild.") + # Expect alert_str to be the integer subscriptionid + if not alert_str.isdigit(): + raise UserInputError( + f"No stream alerts in this server matching `{alert_str}`!" + ) + alert = await self.data.AlertChannel.fetch(int(alert_str)) + if not alert or not alert.guildid == interaction.guild_id: + raise UserInputError( + "Could not find the selected alert! Please try again." + ) + return alert + + @streamalert_group.command( + name='edit', + description=( + "Update settings for an existing Twitch stream alert." + ) + ) + @appcmds.describe( + alert="Which alert do you want to edit?", + # TODO: Other settings here + ) + @appcmds.default_permissions(manage_channels=True) + async def streamalert_edit_cmd(self, ctx: LionContext, alert: str): + # Type guards + assert ctx.guild is not None, "Guild-only command has no guild ctx." + assert self.twitch is not None, "Twitch command run with no twitch obj." + assert ctx.interaction is not None, "Twitch command needs interaction ctx." + + # Look up provided alert + sub_data = await self.resolve_alert(ctx.interaction, alert) + + # Check user permissions for editing this alert + channel = ctx.guild.get_channel(sub_data.channelid) + permlevel = channel if channel else ctx.guild + if not permlevel.permissions_for(ctx.author).manage_channels: + await ctx.error_reply( + "Sorry, you need the `MANAGE_CHANNELS` permission " + "in this channel to edit the stream alert." + ) + return + # If edit options have been given, save edits and retouch cache if needed + # If not, open AlertEditorUI + ui = AlertEditorUI(bot=self.bot, sub_data=sub_data, callerid=ctx.author.id) + await ui.run(ctx.interaction) + await ui.wait() + + @streamalert_edit_cmd.autocomplete('alert') + async def streamalert_edit_cmd_alert_acmpl(self, interaction, partial): + return await self.alert_acmpl(interaction, partial) + + @streamalert_group.command( + name='pause', + description=( + "Pause a streamalert." + ) + ) + @appcmds.describe( + alert="Which alert do you want to pause?", + ) + @appcmds.default_permissions(manage_channels=True) + async def streamalert_pause_cmd(self, ctx: LionContext, alert: str): + # Type guards + assert ctx.guild is not None, "Guild-only command has no guild ctx." + assert self.twitch is not None, "Twitch command run with no twitch obj." + assert ctx.interaction is not None, "Twitch command needs interaction ctx." + + # Look up provided alert + sub_data = await self.resolve_alert(ctx.interaction, alert) + + # Check user permissions for editing this alert + channel = ctx.guild.get_channel(sub_data.channelid) + permlevel = channel if channel else ctx.guild + if not permlevel.permissions_for(ctx.author).manage_channels: + await ctx.error_reply( + "Sorry, you need the `MANAGE_CHANNELS` permission " + "in this channel to edit the stream alert." + ) + return + + await sub_data.update(paused=True) + await ctx.reply("This alert is now paused!") + + @streamalert_group.command( + name='unpause', + description=( + "Resume a streamalert." + ) + ) + @appcmds.describe( + alert="Which alert do you want to unpause?", + ) + @appcmds.default_permissions(manage_channels=True) + async def streamalert_unpause_cmd(self, ctx: LionContext, alert: str): + # Type guards + assert ctx.guild is not None, "Guild-only command has no guild ctx." + assert self.twitch is not None, "Twitch command run with no twitch obj." + assert ctx.interaction is not None, "Twitch command needs interaction ctx." + + # Look up provided alert + sub_data = await self.resolve_alert(ctx.interaction, alert) + + # Check user permissions for editing this alert + channel = ctx.guild.get_channel(sub_data.channelid) + permlevel = channel if channel else ctx.guild + if not permlevel.permissions_for(ctx.author).manage_channels: + await ctx.error_reply( + "Sorry, you need the `MANAGE_CHANNELS` permission " + "in this channel to edit the stream alert." + ) + return + + await sub_data.update(paused=False) + await ctx.reply("This alert has been unpaused!") + + @streamalert_group.command( + name='remove', + description=( + "Deactivate a streamalert entirely (see /streamalert pause to temporarily pause it)." + ) + ) + @appcmds.describe( + alert="Which alert do you want to remove?", + ) + @appcmds.default_permissions(manage_channels=True) + async def streamalert_remove_cmd(self, ctx: LionContext, alert: str): + # Type guards + assert ctx.guild is not None, "Guild-only command has no guild ctx." + assert self.twitch is not None, "Twitch command run with no twitch obj." + assert ctx.interaction is not None, "Twitch command needs interaction ctx." + + # Look up provided alert + sub_data = await self.resolve_alert(ctx.interaction, alert) + + # Check user permissions for editing this alert + channel = ctx.guild.get_channel(sub_data.channelid) + permlevel = channel if channel else ctx.guild + if not permlevel.permissions_for(ctx.author).manage_channels: + await ctx.error_reply( + "Sorry, you need the `MANAGE_CHANNELS` permission " + "in this channel to edit the stream alert." + ) + return + + await sub_data.delete() + await ctx.reply("This alert has been deleted.") diff --git a/src/modules/streamalerts/data.py b/src/modules/streamalerts/data.py new file mode 100644 index 00000000..645907f0 --- /dev/null +++ b/src/modules/streamalerts/data.py @@ -0,0 +1,105 @@ +from data import Registry, RowModel +from data.columns import Integer, Bool, Timestamp, String +from data.models import WeakCache +from cachetools import TTLCache + + +class AlertsData(Registry): + class Streamer(RowModel): + """ + Schema + ------ + CREATE TABLE streamers( + userid BIGINT PRIMARY KEY, + login_name TEXT NOT NULL, + display_name TEXT NOT NULL + ); + """ + _tablename_ = 'streamers' + _cache_ = {} + + userid = Integer(primary=True) + login_name = String() + display_name = String() + + class AlertChannel(RowModel): + """ + Schema + ------ + CREATE TABLE alert_channels( + subscriptionid SERIAL PRIMARY KEY, + guildid BIGINT NOT NULL, + channelid BIGINT NOT NULL, + streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE, + created_by BIGINT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + paused BOOLEAN NOT NULL DEFAULT FALSE, + end_delete BOOLEAN NOT NULL DEFAULT FALSE, + live_message TEXT, + end_message TEXT + ); + CREATE INDEX alert_channels_guilds ON alert_channels (guildid); + CREATE UNIQUE INDEX alert_channels_channelid_streamerid ON alert_channels (channelid, streamerid); + """ + _tablename_ = 'alert_channels' + _cache_ = {} + + subscriptionid = Integer(primary=True) + guildid = Integer() + channelid = Integer() + streamerid = Integer() + display_name = Integer() + created_by = Integer() + created_at = Timestamp() + paused = Bool() + end_delete = Bool() + live_message = String() + end_message = String() + + class Stream(RowModel): + """ + Schema + ------ + CREATE TABLE streams( + streamid SERIAL PRIMARY KEY, + streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE, + start_at TIMESTAMPTZ NOT NULL, + twitch_stream_id BIGINT, + game_name TEXT, + title TEXT, + end_at TIMESTAMPTZ + ); + """ + _tablename_ = 'streams' + _cache_ = WeakCache(TTLCache(maxsize=100, ttl=24*60*60)) + + streamid = Integer(primary=True) + streamerid = Integer() + start_at = Timestamp() + twitch_stream_id = Integer() + game_name = String() + title = String() + end_at = Timestamp() + + class StreamAlert(RowModel): + """ + Schema + ------ + CREATE TABLE stream_alerts( + alertid SERIAL PRIMARY KEY, + streamid INTEGER NOT NULL REFERENCES streams (streamid) ON DELETE CASCADE, + subscriptionid INTEGER NOT NULL REFERENCES alert_channels (subscriptionid) ON DELETE CASCADE, + sent_at TIMESTAMPTZ NOT NULL, + messageid BIGINT NOT NULL, + resolved_at TIMESTAMPTZ + ); + """ + _tablename_ = 'stream_alerts' + _cache_ = WeakCache(TTLCache(maxsize=1000, ttl=24*60*60)) + + alertid = Integer(primary=True) + streamid = Integer() + subscriptionid = Integer() + sent_at = Timestamp() + messageid = Integer() + resolved_at = Timestamp() diff --git a/src/modules/streamalerts/editor.py b/src/modules/streamalerts/editor.py new file mode 100644 index 00000000..e87f1759 --- /dev/null +++ b/src/modules/streamalerts/editor.py @@ -0,0 +1,369 @@ +import asyncio +import datetime as dt +from collections import namedtuple +from functools import wraps +from typing import TYPE_CHECKING + +import discord +from discord.ui.button import button, Button, ButtonStyle +from discord.ui.select import select, Select, SelectOption, ChannelSelect + +from meta import LionBot, conf + +from utils.lib import MessageArgs, tabulate, utc_now +from utils.ui import MessageUI +from utils.ui.msgeditor import MsgEditor + +from .settings import AlertSettings as Settings +from .settings import AlertConfig as Config +from .data import AlertsData + +if TYPE_CHECKING: + from .cog import AlertCog + + +FakeStream = namedtuple( + 'FakeStream', + ["streamid", "streamerid", "start_at", "twitch_stream_id", "game_name", "title", "end_at"] +) + + +class AlertEditorUI(MessageUI): + setting_classes = ( + Settings.AlertPaused, + Settings.AlertEndDelete, + Settings.AlertEndMessage, + Settings.AlertMessage, + Settings.AlertChannel, + ) + + def __init__(self, bot: LionBot, sub_data: AlertsData.AlertChannel, **kwargs): + super().__init__(**kwargs) + + self.bot = bot + self.sub_data = sub_data + self.subid = sub_data.subscriptionid + self.cog: 'AlertCog' = bot.get_cog('AlertCog') + self.config = Config(self.subid, sub_data) + + # ----- UI API ----- + def preview_stream_data(self): + # TODO: Probably makes sense to factor this out to the cog + # Or even generate it in the formatters themselves + data = self.sub_data + return FakeStream( + -1, + data.streamerid, + utc_now() - dt.timedelta(hours=1), + -1, + "Discord Admin", + "Testing Go Live Message", + utc_now() + ) + + def call_and_refresh(self, func): + """ + Generate a wrapper which runs coroutine 'func' and then refreshes the UI. + """ + # TODO: Check whether the UI has finished interaction + @wraps(func) + async def wrapped(*args, **kwargs): + await func(*args, **kwargs) + await self.refresh() + return wrapped + + # ----- UI Components ----- + + # Pause button + @button(label="PAUSE_PLACEHOLDER", style=ButtonStyle.blurple) + async def pause_button(self, press: discord.Interaction, pressed: Button): + await press.response.defer(thinking=True, ephemeral=True) + setting = self.config.get(Settings.AlertPaused.setting_id) + setting.value = not setting.value + await setting.write() + await self.refresh(thinking=press) + + async def pause_button_refresh(self): + button = self.pause_button + if self.config.get(Settings.AlertPaused.setting_id).value: + button.label = "UnPause" + button.style = ButtonStyle.grey + else: + button.label = "Pause" + button.style = ButtonStyle.green + + # Delete button + @button(label="Delete Alert", style=ButtonStyle.red) + async def delete_button(self, press: discord.Interaction, pressed: Button): + await press.response.defer(thinking=True, ephemeral=True) + await self.sub_data.delete() + embed = discord.Embed( + colour=discord.Colour.brand_green(), + description="Stream alert removed." + ) + await press.edit_original_response(embed=embed) + await self.close() + + # Close button + @button(emoji=conf.emojis.cancel, style=ButtonStyle.red) + async def close_button(self, press: discord.Interaction, pressed: Button): + await press.response.defer(thinking=False) + await self.close() + + # Edit Alert button + @button(label="Edit Alert", style=ButtonStyle.blurple) + async def edit_alert_button(self, press: discord.Interaction, pressed: Button): + # Spawn MsgEditor for the live alert + await press.response.defer(thinking=True, ephemeral=True) + + setting = self.config.get(Settings.AlertMessage.setting_id) + + stream = self.preview_stream_data() + streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid) + + editor = MsgEditor( + self.bot, + setting.value, + callback=self.call_and_refresh(setting.editor_callback), + formatter=await setting.generate_formatter(self.bot, stream, streamer), + callerid=press.user.id + ) + self._slaves.append(editor) + await editor.run(press) + + # Edit End message + @button(label="Edit Ending Alert", style=ButtonStyle.blurple) + async def edit_end_button(self, press: discord.Interaction, pressed: Button): + # Spawn MsgEditor for the ending alert + await press.response.defer(thinking=True, ephemeral=True) + await self.open_end_editor(press) + + async def open_end_editor(self, respond_to: discord.Interaction): + setting = self.config.get(Settings.AlertEndMessage.setting_id) + # Start from current live alert data if not set + if not setting.value: + alert_setting = self.config.get(Settings.AlertMessage.setting_id) + setting.value = alert_setting.value + + stream = self.preview_stream_data() + streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid) + + editor = MsgEditor( + self.bot, + setting.value, + callback=self.call_and_refresh(setting.editor_callback), + formatter=await setting.generate_formatter(self.bot, stream, streamer), + callerid=respond_to.user.id + ) + self._slaves.append(editor) + await editor.run(respond_to) + return editor + + # Ending Mode Menu + @select( + cls=Select, + placeholder="Select action to take when the stream ends", + options=[SelectOption(label="DUMMY")], + min_values=0, max_values=1 + ) + async def ending_mode_menu(self, selection: discord.Interaction, selected: Select): + if not selected.values: + await selection.response.defer() + return + + await selection.response.defer(thinking=True, ephemeral=True) + value = selected.values[0] + + if value == '0': + # In Do Nothing case, + # Ensure Delete is off and custom edit message is unset + setting = self.config.get(Settings.AlertEndDelete.setting_id) + if setting.value: + setting.value = False + await setting.write() + setting = self.config.get(Settings.AlertEndMessage.setting_id) + if setting.value: + setting.value = None + await setting.write() + + await self.refresh(thinking=selection) + elif value == '1': + # In Delete Alert case, + # Set the delete setting to True + setting = self.config.get(Settings.AlertEndDelete.setting_id) + if not setting.value: + setting.value = True + await setting.write() + + await self.refresh(thinking=selection) + elif value == '2': + # In Edit Message case, + # Set the delete setting to False, + setting = self.config.get(Settings.AlertEndDelete.setting_id) + if setting.value: + setting.value = False + await setting.write() + + # And open the edit message editor + await self.open_end_editor(selection) + await self.refresh() + + async def ending_mode_menu_refresh(self): + # Build menu options + options = [ + SelectOption( + label="Do Nothing", + description="Don't modify the live alert message.", + value="0", + ), + SelectOption( + label="Delete Alert After Stream", + description="Delete the live alert message.", + value="1", + ), + SelectOption( + label="Edit Alert After Stream", + description="Edit the live alert message to a custom message. Opens editor.", + value="2", + ), + ] + + # Calculate the correct default + if self.config.get(Settings.AlertEndDelete.setting_id).value: + options[1].default = True + elif self.config.get(Settings.AlertEndMessage.setting_id).value: + options[2].default = True + + self.ending_mode_menu.options = options + + # Edit channel menu + @select(cls=ChannelSelect, + placeholder="Select Alert Channel", + channel_types=[discord.ChannelType.text, discord.ChannelType.voice], + min_values=0, max_values=1) + async def channel_menu(self, selection: discord.Interaction, selected): + if selected.values: + await selection.response.defer(thinking=True, ephemeral=True) + setting = self.config.get(Settings.AlertChannel.setting_id) + setting.value = selected.values[0] + await setting.write() + await self.refresh(thinking=selection) + else: + await selection.response.defer(thinking=False) + + async def channel_menu_refresh(self): + # current = self.config.get(Settings.AlertChannel.setting_id).value + # TODO: Check if discord-typed menus can have defaults yet + # Impl in stable dpy, but not released to pip yet + ... + + # ----- UI Flow ----- + async def make_message(self) -> MessageArgs: + streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid) + if streamer is None: + raise ValueError("Streamer row does not exist in AlertEditor") + name = streamer.display_name + + # Build relevant setting table + table_map = {} + table_map['Channel'] = self.config.get(Settings.AlertChannel.setting_id).formatted + table_map['Streamer'] = f"https://www.twitch.tv/{streamer.login_name}" + table_map['Paused'] = self.config.get(Settings.AlertPaused.setting_id).formatted + + prop_table = '\n'.join(tabulate(*table_map.items())) + + embed = discord.Embed( + colour=discord.Colour.dark_green(), + title=f"Stream Alert for {name}", + description=prop_table, + timestamp=utc_now() + ) + + message_setting = self.config.get(Settings.AlertMessage.setting_id) + message_desc_lines = [ + f"An alert message will be posted to {table_map['Channel']}.", + f"Press `{self.edit_alert_button.label}`" + " to preview or edit the alert.", + "The following keys will be substituted in the alert message." + ] + keytable = tabulate(*message_setting._subkey_desc.items()) + for line in keytable: + message_desc_lines.append(f"> {line}") + + embed.add_field( + name=f"When {name} goes live", + value='\n'.join(message_desc_lines), + inline=False + ) + + # Determine the ending behaviour + del_setting = self.config.get(Settings.AlertEndDelete.setting_id) + end_msg_setting = self.config.get(Settings.AlertEndMessage.setting_id) + + if del_setting.value: + # Deleting + end_msg_desc = "The live alert message will be deleted." + ... + elif end_msg_setting.value: + # Editing + lines = [ + "The live alert message will edited to the configured message.", + f"Press `{self.edit_end_button.label}` to preview or edit the message.", + "The following substitution keys are supported " + "*in addition* to the live alert keys." + ] + keytable = tabulate( + *[(k, v) for k, v in end_msg_setting._subkey_desc.items() if k not in message_setting._subkey_desc] + ) + for line in keytable: + lines.append(f"> {line}") + end_msg_desc = '\n'.join(lines) + else: + # Doing nothing + end_msg_desc = "The live alert message will not be changed." + + embed.add_field( + name=f"When {name} ends their stream", + value=end_msg_desc, + inline=False + ) + + return MessageArgs(embed=embed) + + async def reload(self): + await self.sub_data.refresh() + # Note self.config references the sub_data, and doesn't need reloading. + + async def refresh_layout(self): + to_refresh = ( + self.pause_button_refresh(), + self.channel_menu_refresh(), + self.ending_mode_menu_refresh(), + ) + await asyncio.gather(*to_refresh) + + show_end_edit = ( + not self.config.get(Settings.AlertEndDelete.setting_id).value + and + self.config.get(Settings.AlertEndMessage.setting_id).value + ) + + + if not show_end_edit: + # Don't show edit end button + buttons = ( + self.edit_alert_button, + self.pause_button, self.delete_button, self.close_button + ) + else: + buttons = ( + self.edit_alert_button, self.edit_end_button, + self.pause_button, self.delete_button, self.close_button + ) + + self.set_layout( + buttons, + (self.ending_mode_menu,), + (self.channel_menu,), + ) + diff --git a/src/modules/streamalerts/settings.py b/src/modules/streamalerts/settings.py new file mode 100644 index 00000000..42e2e947 --- /dev/null +++ b/src/modules/streamalerts/settings.py @@ -0,0 +1,264 @@ +from typing import Optional, Any +import json + +from meta.LionBot import LionBot +from settings import ModelData +from settings.groups import SettingGroup, ModelConfig, SettingDotDict +from settings.setting_types import BoolSetting, ChannelSetting +from core.setting_types import MessageSetting +from babel.translator import LocalBabel +from utils.lib import recurse_map, replace_multiple, tabulate + +from .data import AlertsData + + +babel = LocalBabel('streamalerts') +_p = babel._p + + +class AlertConfig(ModelConfig): + settings = SettingDotDict() + _model_settings = set() + model = AlertsData.AlertChannel + + +class AlertSettings(SettingGroup): + @AlertConfig.register_model_setting + class AlertMessage(ModelData, MessageSetting): + setting_id = 'alert_live_message' + _display_name = _p('', 'live_message') + + _desc = _p( + '', + 'Message sent to the channel when the streamer goes live.' + ) + _long_desc = _p( + '', + 'Message sent to the attached channel when the Twitch streamer goes live.' + ) + _accepts = _p('', 'JSON formatted greeting message data') + _default = json.dumps({'content': "**{display_name}** just went live at {channel_link}"}) + + _model = AlertsData.AlertChannel + _column = AlertsData.AlertChannel.live_message.name + + _subkey_desc = { + '{display_name}': "Twitch channel name (with capitalisation)", + '{login_name}': "Twitch channel login name (as in url)", + '{channel_link}': "Link to the live twitch channel", + '{stream_start}': "Numeric timestamp when stream went live", + } + # TODO: More stuff + + @property + def update_message(self) -> str: + return "The go-live notification message has been updated!" + + @classmethod + async def generate_formatter(cls, bot: LionBot, stream: AlertsData.Stream, streamer: AlertsData.Streamer, **kwargs): + """ + Generate a formatter function for this message + from the provided stream and streamer data. + + The formatter function accepts and returns a message data dict. + """ + async def formatter(data_dict: Optional[dict[str, Any]]): + if not data_dict: + return None + + mapping = { + '{display_name}': streamer.display_name, + '{login_name}': streamer.login_name, + '{channel_link}': f"https://www.twitch.tv/{streamer.login_name}", + '{stream_start}': int(stream.start_at.timestamp()), + } + + recurse_map( + lambda loc, value: replace_multiple(value, mapping) if isinstance(value, str) else value, + data_dict, + ) + return data_dict + return formatter + + async def editor_callback(self, editor_data): + self.value = editor_data + await self.write() + + def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]: + lines = super()._desc_table(show_value=show_value) + keytable = tabulate(*self._subkey_desc.items(), colon='') + expline = ( + "The following placeholders will be substituted with their values." + ) + keyfield = ( + "Placeholders", + expline + '\n' + '\n'.join(f"> {line}" for line in keytable) + ) + lines.append(keyfield) + return lines + + @AlertConfig.register_model_setting + class AlertEndMessage(ModelData, MessageSetting): + """ + Custom ending message to edit the live alert to. + If not set, doesn't edit the alert. + """ + setting_id = 'alert_end_message' + _display_name = _p('', 'end_message') + + _desc = _p( + '', + 'Optional message to edit the live alert with when the stream ends.' + ) + _long_desc = _p( + '', + "If set, and `end_delete` is not on, " + "the live alert will be edited with this custom message " + "when the stream ends." + ) + _accepts = _p('', 'JSON formatted greeting message data') + _default = None + + _model = AlertsData.AlertChannel + _column = AlertsData.AlertChannel.end_message.name + + _subkey_desc = { + '{display_name}': "Twitch channel name (with capitalisation)", + '{login_name}': "Twitch channel login name (as in url)", + '{channel_link}': "Link to the live twitch channel", + '{stream_start}': "Numeric timestamp when stream went live", + '{stream_end}': "Numeric timestamp when stream ended", + } + + @property + def update_message(self) -> str: + if self.value: + return "The stream ending message has been updated." + else: + return "The stream ending message has been unset." + + @classmethod + async def generate_formatter(cls, bot: LionBot, stream: AlertsData.Stream, streamer: AlertsData.Streamer, **kwargs): + """ + Generate a formatter function for this message + from the provided stream and streamer data. + + The formatter function accepts and returns a message data dict. + """ + # TODO: Fake stream data maker (namedtuple?) for previewing + async def formatter(data_dict: Optional[dict[str, Any]]): + if not data_dict: + return None + + mapping = { + '{display_name}': streamer.display_name, + '{login_name}': streamer.login_name, + '{channel_link}': f"https://www.twitch.tv/{streamer.login_name}", + '{stream_start}': int(stream.start_at.timestamp()), + '{stream_end}': int(stream.end_at.timestamp()), + } + + recurse_map( + lambda loc, value: replace_multiple(value, mapping) if isinstance(value, str) else value, + data_dict, + ) + return data_dict + return formatter + + async def editor_callback(self, editor_data): + self.value = editor_data + await self.write() + + def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]: + lines = super()._desc_table(show_value=show_value) + keytable = tabulate(*self._subkey_desc.items(), colon='') + expline = ( + "The following placeholders will be substituted with their values." + ) + keyfield = ( + "Placeholders", + expline + '\n' + '\n'.join(f"> {line}" for line in keytable) + ) + lines.append(keyfield) + return lines + ... + + @AlertConfig.register_model_setting + class AlertEndDelete(ModelData, BoolSetting): + """ + Whether to delete the live alert after the stream ends. + """ + setting_id = 'alert_end_delete' + _display_name = _p('', 'end_delete') + _desc = _p( + '', + 'Whether to delete the live alert after the stream ends.' + ) + _long_desc = _p( + '', + "If enabled, the live alert message will be deleted when the stream ends. " + "This overrides the `end_message` setting." + ) + _default = False + + _model = AlertsData.AlertChannel + _column = AlertsData.AlertChannel.end_delete.name + + @property + def update_message(self) -> str: + if self.value: + return "The live alert will be deleted at the end of the stream." + else: + return "The live alert will not be deleted when the stream ends." + + @AlertConfig.register_model_setting + class AlertPaused(ModelData, BoolSetting): + """ + Whether this live alert is currently paused. + """ + setting_id = 'alert_paused' + _display_name = _p('', 'paused') + _desc = _p( + '', + "Whether the alert is currently paused." + ) + _long_desc = _p( + '', + "Paused alerts will not trigger live notifications, " + "although the streams will still be tracked internally." + ) + _default = False + + _model = AlertsData.AlertChannel + _column = AlertsData.AlertChannel.paused.name + + @property + def update_message(self): + if self.value: + return "This alert is now paused" + else: + return "This alert has been unpaused" + + @AlertConfig.register_model_setting + class AlertChannel(ModelData, ChannelSetting): + """ + The channel associated to this alert. + """ + setting_id = 'alert_channel' + _display_name = _p('', 'channel') + _desc = _p( + '', + "The Discord channel this live alert will be sent in." + ) + _long_desc = _desc + + # Note that this cannot actually be None, + # as there is no UI pathway to unset the setting. + _default = None + + _model = AlertsData.AlertChannel + _column = AlertsData.AlertChannel.channelid.name + + @property + def update_message(self): + return f"This alert will now be posted to {self.value.channel.mention}"