from string import punctuation import datetime as dt from datetime import datetime, timedelta import time import twitchio from twitchio import Scopes from twitchio.ext import commands as cmds from botdata import UserAuth from meta import Bot from meta.logger import log_wrap from utils.lib import parse_dur, strfdelta, utc_now from . import logger from ..data import HyperfocusData, Hyperfocuser from .focuschannel import FocusChannel # Default requested scopes for joining a channel CHANNEL_SCOPES = Scopes( ( Scopes.channel_bot, Scopes.user_read_chat, Scopes.user_write_chat, Scopes.moderator_manage_chat_messages, ) ) class FocusComponent(cmds.Component): def __init__(self, bot: Bot): self.bot = bot self.data = bot.dbconn.load_registry(HyperfocusData()) self.channels: dict[str, FocusChannel] = {} self._last_deleted: dict[int, datetime] = {} # ----- API ----- async def component_load(self): await self.data.init() async def get_focus_channel(self, channelid: str, channel: str): """ Get the logged in FocusChannel for the given channel, if possible. This is an expensive operation when we need to login to a new channel, so don't call this unless we are sure we can log in or need it. """ if not (ch := self.channels.get(channelid)) or not ch.is_alive: # Create and login to a new focuschannel, if possible. token = await self.get_token_for(channelid) session = self.bot._http._session ch = FocusChannel(channel, token, session) try: await ch.connect() except Exception: logger.exception( f"Hyperfocus channel connect failure for '{channel}' ('{channelid}')" ) raise self.channels[channelid] = ch return ch async def get_token_for(self, channelid: str): """ Get the hyperfocus-capable token for the given channel. """ # TODO: Token invalidation method if it fails. # Also should be in a transaction # Might technically need to refresh token as well scope_rows = await self.bot.data.user_auth_scopes.select_where(userid=channelid) scopes = Scopes([row["scope"] for row in scope_rows]) has_required = all(scope in scopes for scope in CHANNEL_SCOPES) if not has_required: # TODO: Better auth exception raise ValueError("Channel '%s' does not have required scopes." % channelid) auth_row = await UserAuth.fetch(channelid) assert auth_row is not None return auth_row.token async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None: """ Get the Hyperfocuser if the user is hyperfocused. """ row = await Hyperfocuser.fetch(profileid) if row and row.ends_at > utc_now(): return row async def focus_delete_message(self, message: twitchio.ChatMessage): """Delete the given message. Uses the API if possible, otherwise opens an IRC channel.""" # This should be impossible, but just in case. # None id could cause chat to be wiped assert message.id is not None # badge_sets = {badge.set_id for badge in message.badges} # if "moderator" in badge_sets or "broadcaster" in badge_sets: # # We need to use the focus channel # # assert message.broadcaster.name is not None # chan = await self.get_focus_channel( # str(message.broadcaster.id), message.broadcaster.name # ) # await chan.delete_msg(str(message.id)) # else: # await message.broadcaster.delete_chat_messages( # moderator=self.bot.bot_id, # message_id=message.id, # ) await message.broadcaster.delete_chat_messages( moderator=message.broadcaster, message_id=message.id, ) def check_hyperfocus_message(self, message: twitchio.ChatMessage): """ Check whether the given message is allowed to be sent in hyperfocus. This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions. """ allowed = message.text.startswith(tuple(punctuation)) if not allowed: allowed = True for fragment in message.fragments: if allowed and fragment.type == "text": stripped = fragment.text.strip().replace(" ", "").replace("\n", "") allowed = all(not char.isascii() for char in stripped) if not allowed: logger.info( f"Message failed hyperfocus check, attempting to delete: {message!r} " ) return allowed @cmds.Component.listener() async def event_message(self, payload: twitchio.ChatMessage): # Check if chatter is currently hyperfocused profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True) hyperfocused = await self.get_hyperfocus(profile.profileid) # If they are, check the message content for deletion if hyperfocused and not self.check_hyperfocus_message(payload): # If we need to delete, run delete and send message notify = ( # (last := self._last_deleted.get(profile.profileid)) and (utc_now() - last).total_seconds() > 30 ) try: await self.focus_delete_message(payload) if notify: await payload.broadcaster.send_message( f"@{payload.chatter.name} Stay focused! " "(You are in !hyperfocus, use !unfocus to come back if you need to!)", sender=self.bot.bot_id, ) except Exception: logger.warning(f"Failed to delete a hyperfocus message: {payload!r}") if notify: await payload.broadcaster.send_message( f"@{payload.chatter.name} Stay focused! ", sender=self.bot.bot_id, ) self._last_deleted[profile.profileid] = utc_now() # ------ Commands ----- @cmds.command( name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"] ) async def hyperfocus_cmd(self, ctx, *, duration: str | None = None): now = utc_now() # First parse duration if duration and duration.isdigit(): dur = int(duration) * 60 elif duration: dur = parse_dur(duration) if not dur: await ctx.reply( "USAGE: '!hyperfocus ' " "For example: '!hyperfocus 10' for 10 minutes or " "'!hyperfocus 1h 10m' for an hour and ten minutes!" ) return else: # TODO: Add to community configuration next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta( hours=1 ) next_block = next_hour - dt.timedelta(minutes=10) if now > next_block: next_block += dt.timedelta(hours=1) dur = int((next_block - now).total_seconds()) end_at = now + timedelta(seconds=dur) # Update the row profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True) pid = profile.profileid comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True) await Hyperfocuser.table.delete_where(profileid=pid) await Hyperfocuser.create( profileid=pid, started_at=now, ends_at=end_at, started_in=comm.communityid, ) # TODO: Update channel await ctx.reply( f"{ctx.chatter.name} has gone into HYPERFOCUS! " f"They will be in emote and command only mode for the next {minutes} minutes! " "Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ " ) @cmds.command(name="unfocus") async def unfocus_cmd(self, ctx): profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True) row = await Hyperfocuser.table.delete_where(profileid=profile.profileid) await ctx.reply( "Welcome back from focus, hope it went well!" " Remember to have a sip and stretch if you need it~" ) @cmds.command(name="hyperfocused") async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None): user, own = (user, False) if user is not None else (ctx.chatter, True) profile = await self.bot.profiles.fetch_profile(user, touch=False) if hyper := (await self.get_hyperfocus(profile.profileid)): durstr = strfdelta(hyper.ends_at - utc_now()) await ctx.reply( f"{user.name} is in HYPERFOCUS for another {durstr}! " "They can only write emojis and commands in this time. Good luck!" ) elif own: await ctx.reply( "You are not hyperfocused!" " Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'" ) else: await ctx.reply(f"{user.name} is not hyperfocused!")