From 82b78924a0adf8b4e728cfe291dffe1994e43b6e Mon Sep 17 00:00:00 2001 From: Interitio Date: Sat, 1 Nov 2025 05:27:18 +1000 Subject: [PATCH] Twitch Hyperfocus version 0 --- data/hyperfocus.sql | 13 ++ hyperfocus/__init__.py | 5 + hyperfocus/data.py | 16 ++ hyperfocus/twitch/__init__.py | 7 + hyperfocus/twitch/component.py | 237 ++++++++++++++++++++++++ hyperfocus/twitch/focuschannel.py | 290 ++++++++++++++++++++++++++++++ 6 files changed, 568 insertions(+) create mode 100644 hyperfocus/data.py create mode 100644 hyperfocus/twitch/component.py create mode 100644 hyperfocus/twitch/focuschannel.py diff --git a/data/hyperfocus.sql b/data/hyperfocus.sql index e69de29..26624d1 100644 --- a/data/hyperfocus.sql +++ b/data/hyperfocus.sql @@ -0,0 +1,13 @@ +BEGIN; + +INSERT INTO version_history (component, from_version, to_version, author) +VALUES ('HYPERFOCUS', 0, 1, 'Initial Creation'); + +CREATE TABLE hyperfocused( + profileid INTEGER PRIMARY KEY REFERENCES user_profiles(profileid) ON DELETE CASCADE ON UPDATE CASCADE, + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + ends_at TIMSTAMPTZ NOT NULL, + started_in INTEGER REFERENCES communities(communityid) ON DELETE SET NULL ON UPDATE CASCADE +); + +COMMIT; diff --git a/hyperfocus/__init__.py b/hyperfocus/__init__.py index e69de29..8ba00fb 100644 --- a/hyperfocus/__init__.py +++ b/hyperfocus/__init__.py @@ -0,0 +1,5 @@ +import logging + +logger = logging.getLogger(__name__) + +from .twitch import * diff --git a/hyperfocus/data.py b/hyperfocus/data.py new file mode 100644 index 0000000..7856fd9 --- /dev/null +++ b/hyperfocus/data.py @@ -0,0 +1,16 @@ +from data import Registry, Table, RowModel +from data.columns import String, Integer, Timestamp + + +class Hyperfocuser(RowModel): + _tablename_ = "hyperfocused" + _cache_ = {} + + profileid = Integer(primary=True) + started_at = Timestamp() + ends_at = Timestamp() + started_in = Integer() + + +class HyperfocusData(Registry): + hyperfocused = Hyperfocuser.table diff --git a/hyperfocus/twitch/__init__.py b/hyperfocus/twitch/__init__.py index e69de29..59cd773 100644 --- a/hyperfocus/twitch/__init__.py +++ b/hyperfocus/twitch/__init__.py @@ -0,0 +1,7 @@ +from .. import logger + + +async def twitch_setup(bot): + from .component import FocusComponent + + await bot.add_component(FocusComponent(bot)) diff --git a/hyperfocus/twitch/component.py b/hyperfocus/twitch/component.py new file mode 100644 index 0000000..4402eee --- /dev/null +++ b/hyperfocus/twitch/component.py @@ -0,0 +1,237 @@ +from string import punctuation +import datetime as dt +from datetime import datetime, timedelta +import time + +import twitchio +from twitchio import Scopes +from twitchio.ext import commands as cmds + +from botdata import UserAuth +from meta import Bot +from meta.logger import log_wrap +from utils.lib import parse_dur, strfdelta, utc_now + +from . import logger + +from ..data import HyperfocusData, Hyperfocuser +from .focuschannel import FocusChannel + +# Default requested scopes for joining a channel +CHANNEL_SCOPES = Scopes( + ( + Scopes.channel_bot, + Scopes.user_read_chat, + Scopes.user_write_chat, + Scopes.moderator_manage_chat_messages, + ) +) + + +class FocusComponent(cmds.Component): + def __init__(self, bot: Bot): + self.bot = bot + self.data = bot.dbconn.load_registry(HyperfocusData()) + + self.channels: dict[str, FocusChannel] = {} + + # ----- API ----- + async def component_load(self): + await self.data.init() + + async def get_focus_channel(self, channelid: str, channel: str): + """ + Get the logged in FocusChannel for the given channel, if possible. + + This is an expensive operation when we need to login to a new channel, + so don't call this unless we are sure we can log in or need it. + """ + if not (ch := self.channels.get(channelid)) or not ch.is_alive: + # Create and login to a new focuschannel, if possible. + token = await self.get_token_for(channelid) + session = self.bot._http._session + ch = FocusChannel(channel, token, session) + try: + await ch.connect() + except Exception: + logger.exception( + f"Hyperfocus channel connect failure for '{channel}' ('{channelid}')" + ) + raise + self.channels[channelid] = ch + + return ch + + async def get_token_for(self, channelid: str): + """ + Get the hyperfocus-capable token for the given channel. + """ + # TODO: Token invalidation method if it fails. + # Also should be in a transaction + # Might technically need to refresh token as well + scope_rows = await self.bot.data.user_auth_scopes.select_where(userid=channelid) + scopes = Scopes([row["scope"] for row in scope_rows]) + + has_required = all(scope in scopes for scope in CHANNEL_SCOPES) + if not has_required: + # TODO: Better auth exception + raise ValueError("Channel '%s' does not have required scopes." % channelid) + + auth_row = await UserAuth.fetch(channelid) + assert auth_row is not None + return auth_row.token + + async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None: + """ + Get the Hyperfocuser if the user is hyperfocused. + """ + row = Hyperfocuser.fetch(profileid) + if row and row.ends_at < utc_now(): + return row + + async def focus_delete_message(self, message: twitchio.ChatMessage): + """Delete the given message. Uses the API if possible, otherwise opens an IRC channel.""" + # This should be impossible, but just in case. + # None id could cause chat to be wiped + assert message.id is not None + + badge_sets = {badge.set_id for badge in message.badges} + if "moderator" in badge_sets or "broadcaster" in badge_sets: + # We need to use the focus channel + + assert message.broadcaster.name is not None + chan = await self.get_focus_channel( + str(message.broadcaster.id), message.broadcaster.name + ) + await chan.delete_msg(str(message.id)) + else: + await message.broadcaster.delete_chat_messages( + moderator=self.bot.bot_id, + message_id=message.id, + ) + + def check_hyperfocus_message(self, message: twitchio.ChatMessage): + """ + Check whether the given message is allowed to be sent in hyperfocus. + + This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions. + """ + allowed = message.text.startswith(tuple(punctuation)) + + if not allowed: + allowed = True + for fragment in message.fragments: + if allowed and fragment.type == "text": + stripped = fragment.text.strip().replace(" ", "").replace("\n", "") + allowed = all(not char.isascii() for char in stripped) + + if not allowed: + logger.info( + f"Message failed hyperfocus check, attempting to delete: {message!r} " + ) + + return allowed + + @cmds.Component.listener() + async def event_message(self, payload: twitchio.ChatMessage): + # Check if chatter is currently hyperfocused + profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True) + hyperfocused = await self.get_hyperfocus(profile.profileid) + + # If they are, check the message content for deletion + if hyperfocused and not self.check_hyperfocus_message(payload): + # If we need to delete, run delete and send message + try: + await self.focus_delete_message(payload) + await payload.broadcaster.send_message( + f"@{payload.chatter.name} Stay focused! " + "(You are in !hyperfocus, use !unfocus to come back if you need to!)", + sender=self.bot.bot_id, + ) + except Exception: + logger.warning(f"Failed to delete a hyperfocus message: {payload!r}") + await payload.broadcaster.send_message( + f"@{payload.chatter.name} Stay focused! ", + sender=self.bot.bot_id, + ) + + # ------ Commands ----- + @cmds.command( + name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"] + ) + async def hyperfocus_cmd(self, ctx, *, duration: str | None): + now = utc_now() + + # First parse duration + if duration and duration.isdigit(): + dur = int(duration) * 60 + elif duration: + dur = parse_dur(duration) + if not dur: + await ctx.reply( + "USAGE: '!hyperfocus ' " + "For example: '!hyperfocus 10' for 10 minutes or " + "'!hyperfocus 1h 10m' for an hour and ten minutes!" + ) + return + else: + # TODO: Add to community configuration + next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta( + hours=1 + ) + next_block = next_hour - dt.timedelta(minutes=10) + if now > next_block: + next_block += dt.timedelta(hours=1) + dur = int((next_block - now).total_seconds()) + + end_at = now + timedelta(seconds=dur) + + # Update the row + profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True) + pid = profile.profileid + comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True) + + await Hyperfocuser.table.delete_where(profileid=pid) + await Hyperfocuser.create( + profileid=pid, + started_at=now, + ends_at=end_at, + started_in=comm.communityid, + ) + + # TODO: Update channel + await ctx.reply( + f"{ctx.chatter.name} has gone into HYPERFOCUS! " + "They will be in emote and command only mode for the next {minutes} minutes! " + "Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ " + ) + + @cmds.command(name="unfocus") + async def unfocus_cmd(self, ctx): + profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True) + row = await Hyperfocuser.table.delete_where(profileid=profile.profileid) + + await ctx.reply( + "Welcome back from focus, hope it went well!" + " Remember to have a sip and stretch if you need it~" + ) + + @cmds.command(name="hyperfocused") + async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None): + user, own = (user, False) if user is not None else (ctx.chatter, True) + + profile = await self.bot.profiles.fetch_profile(user, touch=False) + + if hyper := (await self.get_hyperfocus(profile.profileid)): + durstr = strfdelta(hyper.ends_at - utc_now()) + await ctx.reply( + f"{user.name} is in HYPERFOCUS for another {durstr}! " + "They can only write emojis and commands in this time. Good luck!" + ) + elif own: + await ctx.reply( + "You are not hyperfocused!" + " Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'" + ) + else: + await ctx.reply(f"{user.name} is not hyperfocused!") diff --git a/hyperfocus/twitch/focuschannel.py b/hyperfocus/twitch/focuschannel.py new file mode 100644 index 0000000..56c0219 --- /dev/null +++ b/hyperfocus/twitch/focuschannel.py @@ -0,0 +1,290 @@ +""" +Minimal IRC client intended to connect to one channel and send a message. +""" + +import asyncio +import aiohttp +import time +import re + +from . import logger + + +ACTIONS = ( + "JOIN", + "PART", + "PING", + "PRIVMSG", + "PRIVMSG(ECHO)", + "USERSTATE", + "MODE", + "WHISPER", + "USERNOTICE", + "NOTICE", +) +ACTIONS2 = ("USERSTATE", "ROOMSTATE", "PRIVMSG", "USERNOTICE", "WHISPER") +USER_SUB = re.compile(r":(?P.*)!") +MESSAGE_RE = re.compile( + r":(?P\S+) (?P\S+) (?P\S+)( :(?P.*))?$" +) +FAST_RETURN = { + "RECONNECT": {"code": 0, "action": "RECONNECT"}, + "PING": {"action": "PING"}, +} + + +def parser(data: str, nick: str): + groups = data.split() + action = groups[1] if groups[1] == "JOIN" else groups[-2] + channel = None + message = None + user = None + badges = None + + _group_len = len(groups) + + if action in FAST_RETURN: + return FAST_RETURN[action] + + elif groups[1] in FAST_RETURN: + return FAST_RETURN[groups[1]] + + elif ( + groups[1] in ACTIONS + or (_group_len > 2 and groups[2] in ACTIONS) + or (_group_len > 3 and groups[3] in {"PRIVMSG", "PRIVMSG(ECHO)"}) + ): + result = re.search(MESSAGE_RE, data) + if not result: + logger.error("****** MESSAGE_RE Failed! ******") + raise ValueError("Could not parse message '%s' as IRC message" % data) + user = result.group("useraddr").split("!")[0] + action = result.group("action") + channel = result.group("channel").lstrip("#") + message = result.group("message") + + if action == "WHISPER": + channel = None + + if action in ACTIONS2: + prebadge = groups[0].split(";") + badges = {} + + for badge in prebadge: + badge = badge.split("=") + + try: + badges[badge[0]] = badge[1] + except IndexError: + pass + + if action == "USERSTATE" and badges.get("display-name"): + user = badges["display-name"].lower() + + if action == "USERNOTICE" and badges.get("login"): + user = badges["login"].lower() + elif action not in ACTIONS: + action = None + + if not user: + try: + user = re.search(USER_SUB, groups[0]).group("user") + except (AttributeError, ValueError): + pass + + try: + code = int(groups[1]) + except ValueError: + code = 0 + + batches = [] + if code == 353: + channel = groups[4] + if channel[0] == "#": + channel = channel[1:] + else: + logger.warning(f" (353) parse failed? ||{channel}||") + + if user is None: + user = groups[-1][1:].lower() + for b in groups[5:-1]: + if b[0] == ":": + b = b[1:] + + if "\r\n:" in b: + batches.append(b.split("\r\n:")[0]) + break + else: + batches.append(b) + + return dict( + data=data, + nick=nick, + groups=groups, + action=action, + channel=channel, + user=user, + badges=badges, + code=code, + message=message, + batches=batches, + ) + + +class FocusChannel: + IRC_URI = "wss://irc-ws.chat.twitch.tv:443" + + def __init__(self, channel: str, token: str, session: aiohttp.ClientSession): + self.channel = channel.lower() + self.token = token + self.session = session + + self._socket = None + self._keeper: asyncio.Task | None = None + self._ws_ready_event: asyncio.Event = asyncio.Event() + self._joined: asyncio.Event = asyncio.Event() + + self.is_ready: asyncio.Event = asyncio.Event() + + self.modes = ("commands",) + self._last_ping = 0 + self._reconnect_requested = False + + @property + def is_alive(self): + return self._socket is not None and not self._socket.closed + + async def wait_until_ready(self): + await self.is_ready.wait() + + async def connect(self): + self.is_ready.clear() + + if self._keeper: + self._keeper.cancel() + if self.is_alive: + await self._socket.close() + + try: + self._socket = await self.session.ws_connect(url=self.IRC_URI) + except Exception as e: + logger.error(f"FocusChannel IRC connection failed: {e}") + raise + + self._reconnect_requested = False + self._keeper = asyncio.create_task(self._keep_alive()) + self._ws_ready_event.set() + + async def authenticate(self): + if not self.is_alive: + raise ValueError("Cannot authenticate before connection.") + await self.send(f"PASS oauth:{self.token}\r\n") + await self.send(f"NICK {self.channel}\r\n") + + for cap in self.modes: + await self.send(f"CAP REQ :twitch.tv/{cap}") + + async def join(self): + if not self.is_alive: + raise ValueError("Cannot join before connection.") + + self._joined.clear() + + await self.send(f"JOIN #{self.channel}\r\n") + await self._joined.wait() + + async def send(self, message: str): + await self._socket.send_str(message + "\r\n") + + async def _process_data(self, data: str): + data = data.rstrip() + parsed = parser(data, self.channel) + + if parsed["action"] == "PING": + return await self._ping() + elif parsed["action"] == "RECONNECT": + return await self._reconnect(parsed) + elif parsed["code"] != 0: + return await self._code(parsed, parsed["code"]) + elif data.startswith(":tmi.twitch.tv NOTICE * :Login unsuccessful"): + logger.error(f"Failed to login to Twitch IRC channel {self.channel}") + return await self._close() + else: + # TODO: We could handle other actions here + return None + + async def _code(self, parsed, code: int): + logger.info(f"{self!r} received '{code}': {parsed}") + + if code == 1: + logger.info(f"FocusChannel logged into '{self.channel}'") + elif code == 353: + pass + elif code in (2, 3, 4, 366, 372, 375): + pass + elif code == 376: + pass + else: + pass + + async def _ping(self): + self._last_ping = time.monotonic() + await self.send("PONG :tmi.twitch.tv\r\n") + + async def _close(self): + if self._keeper: + self._keeper.cancel() + + self.is_ready.clear() + if self._socket: + await self._socket.close() + + async def _join(self, parsed): + channel = parsed["channel"] + logger.info(f"ACTION: JOIN:: {channel}") + + if self.channel != channel.lower(): + logger.info( + f"FocusChannel '{self.channel}' got join event for '{channel}'.. ignoring" + ) + else: + self._joined.set() + + async def _keep_alive(self): + await self._ws_ready_event.wait() + self._ws_ready_event.clear() + + if not self._last_ping: + self._last_ping = time.monotonic() + while not self._socket.closed and not self._reconnect_requested: + msg = await self._socket.receive() + + if msg.type is aiohttp.WSMsgType.CLOSED: + logger.error(f"Websocket connection closed: {msg.extra}") + break + data = msg.data + if data: + logger.debug(f" < {data}") + events = data.split("\r\n") + for event in events: + if not event: + continue + try: + await self._process_data(event) + except Exception: + logger.exception( + "Websocket message processing failed: %s", str(event) + ) + + async def _reconnect(self, parsed): + logger.info(f"ACTION: {self!r} recconecting") + self._reconnect_requested = True + if self._keeper: + self._keeper.cancel() + asyncio.create_task(self.connect()) + + async def send_msg(self, content: str): + await self._socket.send_str(content) + + async def delete_msg(self, msgid: str): + return await self.send_msg(f"/delete {msgid}")