Compare commits

...

10 Commits

Author SHA1 Message Date
93c17112e3 Switch to explicit allowed prefix list. 2025-11-07 23:50:02 +10:00
26dc684851 Fix typo in link 2025-11-01 11:53:42 +10:00
1bd2e9607a fix: Add lock to prevent update race. 2025-11-01 11:49:27 +10:00
a14cc4056e fix: Channel del userid type 2025-11-01 11:41:10 +10:00
3885710689 feat: Add websocket channel 2025-11-01 10:58:26 +10:00
737871d696 Add invite link 2025-11-01 07:21:40 +10:00
41ec25cd21 fix: Caching issue on unfocus 2025-11-01 07:03:30 +10:00
424f7584cb fix: Correct ratelimit for delete notify 2025-11-01 06:58:46 +10:00
baeea1c41b Remove extraneous IRC client 2025-11-01 06:45:36 +10:00
6eabbc0d73 fix: Various typos 2025-11-01 06:38:12 +10:00
5 changed files with 223 additions and 394 deletions

View File

@@ -0,0 +1 @@
from .hyperfocus import *

View File

@@ -6,7 +6,7 @@ VALUES ('HYPERFOCUS', 0, 1, 'Initial Creation');
CREATE TABLE hyperfocused( CREATE TABLE hyperfocused(
profileid INTEGER PRIMARY KEY REFERENCES user_profiles(profileid) ON DELETE CASCADE ON UPDATE CASCADE, profileid INTEGER PRIMARY KEY REFERENCES user_profiles(profileid) ON DELETE CASCADE ON UPDATE CASCADE,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ends_at TIMSTAMPTZ NOT NULL, ends_at TIMESTAMPTZ NOT NULL,
started_in INTEGER REFERENCES communities(communityid) ON DELETE SET NULL ON UPDATE CASCADE started_in INTEGER REFERENCES communities(communityid) ON DELETE SET NULL ON UPDATE CASCADE
); );

107
hyperfocus/channel.py Normal file
View File

@@ -0,0 +1,107 @@
from collections import defaultdict
from typing import Optional, TypeAlias, TypedDict
import json
from datetime import datetime, timedelta
from meta.sockets import Channel
from modules.profiles.profiles.profiles import ProfilesRegistry
from . import logger
from .data import Hyperfocuser, HyperfocusData
ISOTimestamp: TypeAlias = str
class HyperfocusedPayload(TypedDict):
userid: str
user_name: str
started_at: ISOTimestamp
ends_at: ISOTimestamp
async def prepare_hyperfocuser(profiler: ProfilesRegistry, hyperfocuser: Hyperfocuser):
profile = await profiler.get_profile(hyperfocuser.profileid)
assert profile is not None
return HyperfocusedPayload(
userid=str(hyperfocuser.profileid),
user_name=profile.nickname or "Unknown",
started_at=hyperfocuser.started_at.isoformat(),
ends_at=hyperfocuser.ends_at.isoformat(),
)
class FocusChannel(Channel):
name = "HyperFocus"
def __init__(self, profiler: ProfilesRegistry, focusdata: HyperfocusData, **kwargs):
super().__init__(**kwargs)
self.profiler = profiler
self.focusdata = focusdata
# communityid -> listening websockets
self.communities = defaultdict(set)
async def on_connection(self, websocket, event):
if not (cidstr := event.get("community")):
raise ValueError("Hyperfocus browser missing communityid")
elif not cidstr.isdigit():
raise ValueError("Community id provided is not an integer")
cid = int(cidstr)
community = await self.profiler.get_community(cid)
if community is None:
raise ValueError("Unknown community provided")
await super().on_connection(websocket, event)
self.communities[cid].add(websocket)
focus_rows = await Hyperfocuser.fetch_where(started_in=cid)
await self.send_hyperfocus_put(cid, focus_rows)
async def del_connection(self, websocket):
for wss in self.communities.values():
wss.discard(websocket)
await super().del_connection(websocket)
async def send_hyperfocus_patch(
self, communityid: int, focuser: Hyperfocuser, websocket=None
):
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{
"type": "DO",
"method": "patchFocus",
"args": await prepare_hyperfocuser(self.profiler, focuser),
},
websocket=ws,
)
async def send_hyperfocus_del(self, profileid: int, websocket=None):
await self.send_event(
{
"type": "DO",
"method": "delFocus",
"args": {"userid": str(profileid)},
},
websocket=websocket,
)
async def send_hyperfocus_put(
self, communityid: int, focusers: list[Hyperfocuser], websocket=None
):
payload = []
for focuser in focusers:
fpl = await prepare_hyperfocuser(self.profiler, focuser)
payload.append(fpl)
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{"type": "DO", "method": "putFocus", "args": payload},
websocket=ws,
)
async def send_event(self, event, **kwargs):
logger.info(f"Sending websocket event: {json.dumps(event, indent=1)}")
await super().send_event(event, **kwargs)

View File

@@ -1,6 +1,8 @@
import asyncio
from string import punctuation from string import punctuation
import datetime as dt import datetime as dt
from datetime import datetime, timedelta from datetime import datetime, timedelta
from math import ceil
import time import time
import twitchio import twitchio
@@ -10,12 +12,14 @@ from twitchio.ext import commands as cmds
from botdata import UserAuth from botdata import UserAuth
from meta import Bot from meta import Bot
from meta.logger import log_wrap from meta.logger import log_wrap
from meta.sockets import register_channel
from utils.lib import parse_dur, strfdelta, utc_now from utils.lib import parse_dur, strfdelta, utc_now
from . import logger from . import logger
from ..data import HyperfocusData, Hyperfocuser from ..data import HyperfocusData, Hyperfocuser
from .focuschannel import FocusChannel from ..channel import FocusChannel
# Default requested scopes for joining a channel # Default requested scopes for joining a channel
CHANNEL_SCOPES = Scopes( CHANNEL_SCOPES = Scopes(
@@ -32,83 +36,35 @@ class FocusComponent(cmds.Component):
def __init__(self, bot: Bot): def __init__(self, bot: Bot):
self.bot = bot self.bot = bot
self.data = bot.dbconn.load_registry(HyperfocusData()) self.data = bot.dbconn.load_registry(HyperfocusData())
self.channel = FocusChannel(self.bot.profiles.profiles, self.data)
self.channels: dict[str, FocusChannel] = {} register_channel(self.channel.name, self.channel)
self._last_deleted: dict[int, datetime] = {}
self.hyperfocus_lock = asyncio.Lock()
# ----- API ----- # ----- API -----
async def component_load(self): async def component_load(self):
await self.data.init() await self.data.init()
async def get_focus_channel(self, channelid: str, channel: str):
"""
Get the logged in FocusChannel for the given channel, if possible.
This is an expensive operation when we need to login to a new channel,
so don't call this unless we are sure we can log in or need it.
"""
if not (ch := self.channels.get(channelid)) or not ch.is_alive:
# Create and login to a new focuschannel, if possible.
token = await self.get_token_for(channelid)
session = self.bot._http._session
ch = FocusChannel(channel, token, session)
try:
await ch.connect()
except Exception:
logger.exception(
f"Hyperfocus channel connect failure for '{channel}' ('{channelid}')"
)
raise
self.channels[channelid] = ch
return ch
async def get_token_for(self, channelid: str):
"""
Get the hyperfocus-capable token for the given channel.
"""
# TODO: Token invalidation method if it fails.
# Also should be in a transaction
# Might technically need to refresh token as well
scope_rows = await self.bot.data.user_auth_scopes.select_where(userid=channelid)
scopes = Scopes([row["scope"] for row in scope_rows])
has_required = all(scope in scopes for scope in CHANNEL_SCOPES)
if not has_required:
# TODO: Better auth exception
raise ValueError("Channel '%s' does not have required scopes." % channelid)
auth_row = await UserAuth.fetch(channelid)
assert auth_row is not None
return auth_row.token
async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None: async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None:
""" """
Get the Hyperfocuser if the user is hyperfocused. Get the Hyperfocuser if the user is hyperfocused.
""" """
row = Hyperfocuser.fetch(profileid) row = await Hyperfocuser.fetch(profileid)
if row and row.ends_at < utc_now(): if row and row.ends_at > utc_now():
return row return row
async def focus_delete_message(self, message: twitchio.ChatMessage): async def focus_delete_message(self, message: twitchio.ChatMessage):
"""Delete the given message. Uses the API if possible, otherwise opens an IRC channel.""" """Delete the given message."""
# This should be impossible, but just in case. # This should be impossible, but just in case.
# None id could cause chat to be wiped # None id could cause chat to be wiped
assert message.id is not None assert message.id is not None
badge_sets = {badge.set_id for badge in message.badges} await message.broadcaster.delete_chat_messages(
if "moderator" in badge_sets or "broadcaster" in badge_sets: moderator=message.broadcaster,
# We need to use the focus channel message_id=message.id,
)
assert message.broadcaster.name is not None
chan = await self.get_focus_channel(
str(message.broadcaster.id), message.broadcaster.name
)
await chan.delete_msg(str(message.id))
else:
await message.broadcaster.delete_chat_messages(
moderator=self.bot.bot_id,
message_id=message.id,
)
def check_hyperfocus_message(self, message: twitchio.ChatMessage): def check_hyperfocus_message(self, message: twitchio.ChatMessage):
""" """
@@ -116,7 +72,7 @@ class FocusComponent(cmds.Component):
This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions. This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions.
""" """
allowed = message.text.startswith(tuple(punctuation)) allowed = message.text.startswith(tuple("!*#%|?><."))
if not allowed: if not allowed:
allowed = True allowed = True
@@ -134,6 +90,10 @@ class FocusComponent(cmds.Component):
@cmds.Component.listener() @cmds.Component.listener()
async def event_message(self, payload: twitchio.ChatMessage): async def event_message(self, payload: twitchio.ChatMessage):
async with self.hyperfocus_lock:
await self.handle_message(payload)
async def handle_message(self, payload: twitchio.ChatMessage):
# Check if chatter is currently hyperfocused # Check if chatter is currently hyperfocused
profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True) profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True)
hyperfocused = await self.get_hyperfocus(profile.profileid) hyperfocused = await self.get_hyperfocus(profile.profileid)
@@ -141,25 +101,53 @@ class FocusComponent(cmds.Component):
# If they are, check the message content for deletion # If they are, check the message content for deletion
if hyperfocused and not self.check_hyperfocus_message(payload): if hyperfocused and not self.check_hyperfocus_message(payload):
# If we need to delete, run delete and send message # If we need to delete, run delete and send message
notify = ( #
not (last := self._last_deleted.get(profile.profileid))
or (utc_now() - last).total_seconds() > 30
)
try: try:
await self.focus_delete_message(payload) await self.focus_delete_message(payload)
await payload.broadcaster.send_message( deleted = True
f"@{payload.chatter.name} Stay focused! "
"(You are in !hyperfocus, use !unfocus to come back if you need to!)",
sender=self.bot.bot_id,
)
except Exception: except Exception:
logger.warning(f"Failed to delete a hyperfocus message: {payload!r}") logger.warning(
await payload.broadcaster.send_message( f"Failed to delete a hyperfocus message: {payload!r}", exc_info=True
f"@{payload.chatter.name} Stay focused! ",
sender=self.bot.bot_id,
) )
deleted = False
if notify:
self._last_deleted[profile.profileid] = utc_now()
try:
if deleted:
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! "
"(You are in !hyperfocus, use !unfocus to come back if you need to!)",
sender=self.bot.bot_id,
)
else:
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! ",
sender=self.bot.bot_id,
)
except twitchio.exceptions.HTTPException:
logger.warning(
f"Failed to notify user of hyperfocus deletion: {payload!r}",
exc_info=True,
)
if hyperfocused:
# Send an update to the channel
# TODO: Nicer and more efficient caching of which channel has which users
# TODO: Possible race condition with the commands. Should use locks.
comm = await self.bot.profiles.fetch_community(
payload.broadcaster, touch=True
)
await self.channel.send_hyperfocus_patch(comm.communityid, hyperfocused)
# ------ Commands ----- # ------ Commands -----
@cmds.command( @cmds.command(
name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"] name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"]
) )
async def hyperfocus_cmd(self, ctx, *, duration: str | None): async def hyperfocus_cmd(self, ctx, *, duration: str | None = None):
now = utc_now() now = utc_now()
# First parse duration # First parse duration
@@ -191,30 +179,37 @@ class FocusComponent(cmds.Component):
pid = profile.profileid pid = profile.profileid
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True) comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
await Hyperfocuser.table.delete_where(profileid=pid) async with self.hyperfocus_lock:
await Hyperfocuser.create( await Hyperfocuser.table.delete_where(profileid=pid)
profileid=pid, focuser = await Hyperfocuser.create(
started_at=now, profileid=pid,
ends_at=end_at, started_at=now,
started_in=comm.communityid, ends_at=end_at,
) started_in=comm.communityid,
)
# TODO: Update channel await self.channel.send_hyperfocus_patch(comm.communityid, focuser)
await ctx.reply(
f"{ctx.chatter.name} has gone into HYPERFOCUS! " minutes = ceil(dur / 60)
"They will be in emote and command only mode for the next {minutes} minutes! " await ctx.reply(
"Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ " f"{ctx.chatter.name} has gone into HYPERFOCUS! "
) f"They will be in emote and command only mode for the next {minutes} minutes! "
"Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ "
)
@cmds.command(name="unfocus") @cmds.command(name="unfocus")
async def unfocus_cmd(self, ctx): async def unfocus_cmd(self, ctx):
profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True) profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True)
row = await Hyperfocuser.table.delete_where(profileid=profile.profileid) async with self.hyperfocus_lock:
row = await Hyperfocuser.fetch(profile.profileid)
if row:
await row.delete()
await self.channel.send_hyperfocus_del(profile.profileid)
await ctx.reply( await ctx.reply(
"Welcome back from focus, hope it went well!" "Welcome back from focus, hope it went well!"
" Remember to have a sip and stretch if you need it~" " Remember to have a sip and stretch if you need it~"
) )
@cmds.command(name="hyperfocused") @cmds.command(name="hyperfocused")
async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None): async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None):
@@ -222,16 +217,32 @@ class FocusComponent(cmds.Component):
profile = await self.bot.profiles.fetch_profile(user, touch=False) profile = await self.bot.profiles.fetch_profile(user, touch=False)
if hyper := (await self.get_hyperfocus(profile.profileid)): async with self.hyperfocus_lock:
durstr = strfdelta(hyper.ends_at - utc_now()) if hyper := (await self.get_hyperfocus(profile.profileid)):
await ctx.reply( durstr = strfdelta(hyper.ends_at - utc_now())
f"{user.name} is in HYPERFOCUS for another {durstr}! " await ctx.reply(
"They can only write emojis and commands in this time. Good luck!" f"{user.name} is in HYPERFOCUS for another {durstr}! "
) "They can only write emojis and commands in this time. Good luck!"
elif own: )
await ctx.reply( elif own:
"You are not hyperfocused!" await ctx.reply(
" Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'" "You are not hyperfocused!"
) " Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'"
else: )
await ctx.reply(f"{user.name} is not hyperfocused!") else:
await ctx.reply(f"{user.name} is not hyperfocused!")
@cmds.command(name="addfocus")
async def addfocus_cmd(self, ctx):
await ctx.reply(
"Add HYPERFOCUS to your channel by authorising me here: https://croccyfocus.thewisewolf.dev/invite"
)
@cmds.command(name="focuslist")
@cmds.is_moderator()
async def focuslist_cmd(self, ctx):
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
link = (
f"https://croccyfocus.thewisewolf.dev/widget/?community={comm.communityid}"
)
await ctx.reply(f"Browser source link for your channel's hyperfocus: {link}")

View File

@@ -1,290 +0,0 @@
"""
Minimal IRC client intended to connect to one channel and send a message.
"""
import asyncio
import aiohttp
import time
import re
from . import logger
ACTIONS = (
"JOIN",
"PART",
"PING",
"PRIVMSG",
"PRIVMSG(ECHO)",
"USERSTATE",
"MODE",
"WHISPER",
"USERNOTICE",
"NOTICE",
)
ACTIONS2 = ("USERSTATE", "ROOMSTATE", "PRIVMSG", "USERNOTICE", "WHISPER")
USER_SUB = re.compile(r":(?P<user>.*)!")
MESSAGE_RE = re.compile(
r":(?P<useraddr>\S+) (?P<action>\S+) (?P<channel>\S+)( :(?P<message>.*))?$"
)
FAST_RETURN = {
"RECONNECT": {"code": 0, "action": "RECONNECT"},
"PING": {"action": "PING"},
}
def parser(data: str, nick: str):
groups = data.split()
action = groups[1] if groups[1] == "JOIN" else groups[-2]
channel = None
message = None
user = None
badges = None
_group_len = len(groups)
if action in FAST_RETURN:
return FAST_RETURN[action]
elif groups[1] in FAST_RETURN:
return FAST_RETURN[groups[1]]
elif (
groups[1] in ACTIONS
or (_group_len > 2 and groups[2] in ACTIONS)
or (_group_len > 3 and groups[3] in {"PRIVMSG", "PRIVMSG(ECHO)"})
):
result = re.search(MESSAGE_RE, data)
if not result:
logger.error("****** MESSAGE_RE Failed! ******")
raise ValueError("Could not parse message '%s' as IRC message" % data)
user = result.group("useraddr").split("!")[0]
action = result.group("action")
channel = result.group("channel").lstrip("#")
message = result.group("message")
if action == "WHISPER":
channel = None
if action in ACTIONS2:
prebadge = groups[0].split(";")
badges = {}
for badge in prebadge:
badge = badge.split("=")
try:
badges[badge[0]] = badge[1]
except IndexError:
pass
if action == "USERSTATE" and badges.get("display-name"):
user = badges["display-name"].lower()
if action == "USERNOTICE" and badges.get("login"):
user = badges["login"].lower()
elif action not in ACTIONS:
action = None
if not user:
try:
user = re.search(USER_SUB, groups[0]).group("user")
except (AttributeError, ValueError):
pass
try:
code = int(groups[1])
except ValueError:
code = 0
batches = []
if code == 353:
channel = groups[4]
if channel[0] == "#":
channel = channel[1:]
else:
logger.warning(f" (353) parse failed? ||{channel}||")
if user is None:
user = groups[-1][1:].lower()
for b in groups[5:-1]:
if b[0] == ":":
b = b[1:]
if "\r\n:" in b:
batches.append(b.split("\r\n:")[0])
break
else:
batches.append(b)
return dict(
data=data,
nick=nick,
groups=groups,
action=action,
channel=channel,
user=user,
badges=badges,
code=code,
message=message,
batches=batches,
)
class FocusChannel:
IRC_URI = "wss://irc-ws.chat.twitch.tv:443"
def __init__(self, channel: str, token: str, session: aiohttp.ClientSession):
self.channel = channel.lower()
self.token = token
self.session = session
self._socket = None
self._keeper: asyncio.Task | None = None
self._ws_ready_event: asyncio.Event = asyncio.Event()
self._joined: asyncio.Event = asyncio.Event()
self.is_ready: asyncio.Event = asyncio.Event()
self.modes = ("commands",)
self._last_ping = 0
self._reconnect_requested = False
@property
def is_alive(self):
return self._socket is not None and not self._socket.closed
async def wait_until_ready(self):
await self.is_ready.wait()
async def connect(self):
self.is_ready.clear()
if self._keeper:
self._keeper.cancel()
if self.is_alive:
await self._socket.close()
try:
self._socket = await self.session.ws_connect(url=self.IRC_URI)
except Exception as e:
logger.error(f"FocusChannel IRC connection failed: {e}")
raise
self._reconnect_requested = False
self._keeper = asyncio.create_task(self._keep_alive())
self._ws_ready_event.set()
async def authenticate(self):
if not self.is_alive:
raise ValueError("Cannot authenticate before connection.")
await self.send(f"PASS oauth:{self.token}\r\n")
await self.send(f"NICK {self.channel}\r\n")
for cap in self.modes:
await self.send(f"CAP REQ :twitch.tv/{cap}")
async def join(self):
if not self.is_alive:
raise ValueError("Cannot join before connection.")
self._joined.clear()
await self.send(f"JOIN #{self.channel}\r\n")
await self._joined.wait()
async def send(self, message: str):
await self._socket.send_str(message + "\r\n")
async def _process_data(self, data: str):
data = data.rstrip()
parsed = parser(data, self.channel)
if parsed["action"] == "PING":
return await self._ping()
elif parsed["action"] == "RECONNECT":
return await self._reconnect(parsed)
elif parsed["code"] != 0:
return await self._code(parsed, parsed["code"])
elif data.startswith(":tmi.twitch.tv NOTICE * :Login unsuccessful"):
logger.error(f"Failed to login to Twitch IRC channel {self.channel}")
return await self._close()
else:
# TODO: We could handle other actions here
return None
async def _code(self, parsed, code: int):
logger.info(f"{self!r} received '{code}': {parsed}")
if code == 1:
logger.info(f"FocusChannel logged into '{self.channel}'")
elif code == 353:
pass
elif code in (2, 3, 4, 366, 372, 375):
pass
elif code == 376:
pass
else:
pass
async def _ping(self):
self._last_ping = time.monotonic()
await self.send("PONG :tmi.twitch.tv\r\n")
async def _close(self):
if self._keeper:
self._keeper.cancel()
self.is_ready.clear()
if self._socket:
await self._socket.close()
async def _join(self, parsed):
channel = parsed["channel"]
logger.info(f"ACTION: JOIN:: {channel}")
if self.channel != channel.lower():
logger.info(
f"FocusChannel '{self.channel}' got join event for '{channel}'.. ignoring"
)
else:
self._joined.set()
async def _keep_alive(self):
await self._ws_ready_event.wait()
self._ws_ready_event.clear()
if not self._last_ping:
self._last_ping = time.monotonic()
while not self._socket.closed and not self._reconnect_requested:
msg = await self._socket.receive()
if msg.type is aiohttp.WSMsgType.CLOSED:
logger.error(f"Websocket connection closed: {msg.extra}")
break
data = msg.data
if data:
logger.debug(f" < {data}")
events = data.split("\r\n")
for event in events:
if not event:
continue
try:
await self._process_data(event)
except Exception:
logger.exception(
"Websocket message processing failed: %s", str(event)
)
async def _reconnect(self, parsed):
logger.info(f"ACTION: {self!r} recconecting")
self._reconnect_requested = True
if self._keeper:
self._keeper.cancel()
asyncio.create_task(self.connect())
async def send_msg(self, content: str):
await self._socket.send_str(content)
async def delete_msg(self, msgid: str):
return await self.send_msg(f"/delete {msgid}")