Compare commits

...

10 Commits

Author SHA1 Message Date
93c17112e3 Switch to explicit allowed prefix list. 2025-11-07 23:50:02 +10:00
26dc684851 Fix typo in link 2025-11-01 11:53:42 +10:00
1bd2e9607a fix: Add lock to prevent update race. 2025-11-01 11:49:27 +10:00
a14cc4056e fix: Channel del userid type 2025-11-01 11:41:10 +10:00
3885710689 feat: Add websocket channel 2025-11-01 10:58:26 +10:00
737871d696 Add invite link 2025-11-01 07:21:40 +10:00
41ec25cd21 fix: Caching issue on unfocus 2025-11-01 07:03:30 +10:00
424f7584cb fix: Correct ratelimit for delete notify 2025-11-01 06:58:46 +10:00
baeea1c41b Remove extraneous IRC client 2025-11-01 06:45:36 +10:00
6eabbc0d73 fix: Various typos 2025-11-01 06:38:12 +10:00
5 changed files with 223 additions and 394 deletions

View File

@@ -0,0 +1 @@
from .hyperfocus import *

View File

@@ -6,7 +6,7 @@ VALUES ('HYPERFOCUS', 0, 1, 'Initial Creation');
CREATE TABLE hyperfocused(
profileid INTEGER PRIMARY KEY REFERENCES user_profiles(profileid) ON DELETE CASCADE ON UPDATE CASCADE,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ends_at TIMSTAMPTZ NOT NULL,
ends_at TIMESTAMPTZ NOT NULL,
started_in INTEGER REFERENCES communities(communityid) ON DELETE SET NULL ON UPDATE CASCADE
);

107
hyperfocus/channel.py Normal file
View File

@@ -0,0 +1,107 @@
from collections import defaultdict
from typing import Optional, TypeAlias, TypedDict
import json
from datetime import datetime, timedelta
from meta.sockets import Channel
from modules.profiles.profiles.profiles import ProfilesRegistry
from . import logger
from .data import Hyperfocuser, HyperfocusData
ISOTimestamp: TypeAlias = str
class HyperfocusedPayload(TypedDict):
userid: str
user_name: str
started_at: ISOTimestamp
ends_at: ISOTimestamp
async def prepare_hyperfocuser(profiler: ProfilesRegistry, hyperfocuser: Hyperfocuser):
profile = await profiler.get_profile(hyperfocuser.profileid)
assert profile is not None
return HyperfocusedPayload(
userid=str(hyperfocuser.profileid),
user_name=profile.nickname or "Unknown",
started_at=hyperfocuser.started_at.isoformat(),
ends_at=hyperfocuser.ends_at.isoformat(),
)
class FocusChannel(Channel):
name = "HyperFocus"
def __init__(self, profiler: ProfilesRegistry, focusdata: HyperfocusData, **kwargs):
super().__init__(**kwargs)
self.profiler = profiler
self.focusdata = focusdata
# communityid -> listening websockets
self.communities = defaultdict(set)
async def on_connection(self, websocket, event):
if not (cidstr := event.get("community")):
raise ValueError("Hyperfocus browser missing communityid")
elif not cidstr.isdigit():
raise ValueError("Community id provided is not an integer")
cid = int(cidstr)
community = await self.profiler.get_community(cid)
if community is None:
raise ValueError("Unknown community provided")
await super().on_connection(websocket, event)
self.communities[cid].add(websocket)
focus_rows = await Hyperfocuser.fetch_where(started_in=cid)
await self.send_hyperfocus_put(cid, focus_rows)
async def del_connection(self, websocket):
for wss in self.communities.values():
wss.discard(websocket)
await super().del_connection(websocket)
async def send_hyperfocus_patch(
self, communityid: int, focuser: Hyperfocuser, websocket=None
):
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{
"type": "DO",
"method": "patchFocus",
"args": await prepare_hyperfocuser(self.profiler, focuser),
},
websocket=ws,
)
async def send_hyperfocus_del(self, profileid: int, websocket=None):
await self.send_event(
{
"type": "DO",
"method": "delFocus",
"args": {"userid": str(profileid)},
},
websocket=websocket,
)
async def send_hyperfocus_put(
self, communityid: int, focusers: list[Hyperfocuser], websocket=None
):
payload = []
for focuser in focusers:
fpl = await prepare_hyperfocuser(self.profiler, focuser)
payload.append(fpl)
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{"type": "DO", "method": "putFocus", "args": payload},
websocket=ws,
)
async def send_event(self, event, **kwargs):
logger.info(f"Sending websocket event: {json.dumps(event, indent=1)}")
await super().send_event(event, **kwargs)

View File

@@ -1,6 +1,8 @@
import asyncio
from string import punctuation
import datetime as dt
from datetime import datetime, timedelta
from math import ceil
import time
import twitchio
@@ -10,12 +12,14 @@ from twitchio.ext import commands as cmds
from botdata import UserAuth
from meta import Bot
from meta.logger import log_wrap
from meta.sockets import register_channel
from utils.lib import parse_dur, strfdelta, utc_now
from . import logger
from ..data import HyperfocusData, Hyperfocuser
from .focuschannel import FocusChannel
from ..channel import FocusChannel
# Default requested scopes for joining a channel
CHANNEL_SCOPES = Scopes(
@@ -32,83 +36,35 @@ class FocusComponent(cmds.Component):
def __init__(self, bot: Bot):
self.bot = bot
self.data = bot.dbconn.load_registry(HyperfocusData())
self.channel = FocusChannel(self.bot.profiles.profiles, self.data)
self.channels: dict[str, FocusChannel] = {}
register_channel(self.channel.name, self.channel)
self._last_deleted: dict[int, datetime] = {}
self.hyperfocus_lock = asyncio.Lock()
# ----- API -----
async def component_load(self):
await self.data.init()
async def get_focus_channel(self, channelid: str, channel: str):
"""
Get the logged in FocusChannel for the given channel, if possible.
This is an expensive operation when we need to login to a new channel,
so don't call this unless we are sure we can log in or need it.
"""
if not (ch := self.channels.get(channelid)) or not ch.is_alive:
# Create and login to a new focuschannel, if possible.
token = await self.get_token_for(channelid)
session = self.bot._http._session
ch = FocusChannel(channel, token, session)
try:
await ch.connect()
except Exception:
logger.exception(
f"Hyperfocus channel connect failure for '{channel}' ('{channelid}')"
)
raise
self.channels[channelid] = ch
return ch
async def get_token_for(self, channelid: str):
"""
Get the hyperfocus-capable token for the given channel.
"""
# TODO: Token invalidation method if it fails.
# Also should be in a transaction
# Might technically need to refresh token as well
scope_rows = await self.bot.data.user_auth_scopes.select_where(userid=channelid)
scopes = Scopes([row["scope"] for row in scope_rows])
has_required = all(scope in scopes for scope in CHANNEL_SCOPES)
if not has_required:
# TODO: Better auth exception
raise ValueError("Channel '%s' does not have required scopes." % channelid)
auth_row = await UserAuth.fetch(channelid)
assert auth_row is not None
return auth_row.token
async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None:
"""
Get the Hyperfocuser if the user is hyperfocused.
"""
row = Hyperfocuser.fetch(profileid)
if row and row.ends_at < utc_now():
row = await Hyperfocuser.fetch(profileid)
if row and row.ends_at > utc_now():
return row
async def focus_delete_message(self, message: twitchio.ChatMessage):
"""Delete the given message. Uses the API if possible, otherwise opens an IRC channel."""
"""Delete the given message."""
# This should be impossible, but just in case.
# None id could cause chat to be wiped
assert message.id is not None
badge_sets = {badge.set_id for badge in message.badges}
if "moderator" in badge_sets or "broadcaster" in badge_sets:
# We need to use the focus channel
assert message.broadcaster.name is not None
chan = await self.get_focus_channel(
str(message.broadcaster.id), message.broadcaster.name
)
await chan.delete_msg(str(message.id))
else:
await message.broadcaster.delete_chat_messages(
moderator=self.bot.bot_id,
message_id=message.id,
)
await message.broadcaster.delete_chat_messages(
moderator=message.broadcaster,
message_id=message.id,
)
def check_hyperfocus_message(self, message: twitchio.ChatMessage):
"""
@@ -116,7 +72,7 @@ class FocusComponent(cmds.Component):
This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions.
"""
allowed = message.text.startswith(tuple(punctuation))
allowed = message.text.startswith(tuple("!*#%|?><."))
if not allowed:
allowed = True
@@ -134,6 +90,10 @@ class FocusComponent(cmds.Component):
@cmds.Component.listener()
async def event_message(self, payload: twitchio.ChatMessage):
async with self.hyperfocus_lock:
await self.handle_message(payload)
async def handle_message(self, payload: twitchio.ChatMessage):
# Check if chatter is currently hyperfocused
profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True)
hyperfocused = await self.get_hyperfocus(profile.profileid)
@@ -141,25 +101,53 @@ class FocusComponent(cmds.Component):
# If they are, check the message content for deletion
if hyperfocused and not self.check_hyperfocus_message(payload):
# If we need to delete, run delete and send message
notify = ( #
not (last := self._last_deleted.get(profile.profileid))
or (utc_now() - last).total_seconds() > 30
)
try:
await self.focus_delete_message(payload)
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! "
"(You are in !hyperfocus, use !unfocus to come back if you need to!)",
sender=self.bot.bot_id,
)
deleted = True
except Exception:
logger.warning(f"Failed to delete a hyperfocus message: {payload!r}")
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! ",
sender=self.bot.bot_id,
logger.warning(
f"Failed to delete a hyperfocus message: {payload!r}", exc_info=True
)
deleted = False
if notify:
self._last_deleted[profile.profileid] = utc_now()
try:
if deleted:
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! "
"(You are in !hyperfocus, use !unfocus to come back if you need to!)",
sender=self.bot.bot_id,
)
else:
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! ",
sender=self.bot.bot_id,
)
except twitchio.exceptions.HTTPException:
logger.warning(
f"Failed to notify user of hyperfocus deletion: {payload!r}",
exc_info=True,
)
if hyperfocused:
# Send an update to the channel
# TODO: Nicer and more efficient caching of which channel has which users
# TODO: Possible race condition with the commands. Should use locks.
comm = await self.bot.profiles.fetch_community(
payload.broadcaster, touch=True
)
await self.channel.send_hyperfocus_patch(comm.communityid, hyperfocused)
# ------ Commands -----
@cmds.command(
name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"]
)
async def hyperfocus_cmd(self, ctx, *, duration: str | None):
async def hyperfocus_cmd(self, ctx, *, duration: str | None = None):
now = utc_now()
# First parse duration
@@ -191,30 +179,37 @@ class FocusComponent(cmds.Component):
pid = profile.profileid
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
await Hyperfocuser.table.delete_where(profileid=pid)
await Hyperfocuser.create(
profileid=pid,
started_at=now,
ends_at=end_at,
started_in=comm.communityid,
)
async with self.hyperfocus_lock:
await Hyperfocuser.table.delete_where(profileid=pid)
focuser = await Hyperfocuser.create(
profileid=pid,
started_at=now,
ends_at=end_at,
started_in=comm.communityid,
)
# TODO: Update channel
await ctx.reply(
f"{ctx.chatter.name} has gone into HYPERFOCUS! "
"They will be in emote and command only mode for the next {minutes} minutes! "
"Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ "
)
await self.channel.send_hyperfocus_patch(comm.communityid, focuser)
minutes = ceil(dur / 60)
await ctx.reply(
f"{ctx.chatter.name} has gone into HYPERFOCUS! "
f"They will be in emote and command only mode for the next {minutes} minutes! "
"Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ "
)
@cmds.command(name="unfocus")
async def unfocus_cmd(self, ctx):
profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True)
row = await Hyperfocuser.table.delete_where(profileid=profile.profileid)
async with self.hyperfocus_lock:
row = await Hyperfocuser.fetch(profile.profileid)
if row:
await row.delete()
await self.channel.send_hyperfocus_del(profile.profileid)
await ctx.reply(
"Welcome back from focus, hope it went well!"
" Remember to have a sip and stretch if you need it~"
)
await ctx.reply(
"Welcome back from focus, hope it went well!"
" Remember to have a sip and stretch if you need it~"
)
@cmds.command(name="hyperfocused")
async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None):
@@ -222,16 +217,32 @@ class FocusComponent(cmds.Component):
profile = await self.bot.profiles.fetch_profile(user, touch=False)
if hyper := (await self.get_hyperfocus(profile.profileid)):
durstr = strfdelta(hyper.ends_at - utc_now())
await ctx.reply(
f"{user.name} is in HYPERFOCUS for another {durstr}! "
"They can only write emojis and commands in this time. Good luck!"
)
elif own:
await ctx.reply(
"You are not hyperfocused!"
" Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'"
)
else:
await ctx.reply(f"{user.name} is not hyperfocused!")
async with self.hyperfocus_lock:
if hyper := (await self.get_hyperfocus(profile.profileid)):
durstr = strfdelta(hyper.ends_at - utc_now())
await ctx.reply(
f"{user.name} is in HYPERFOCUS for another {durstr}! "
"They can only write emojis and commands in this time. Good luck!"
)
elif own:
await ctx.reply(
"You are not hyperfocused!"
" Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'"
)
else:
await ctx.reply(f"{user.name} is not hyperfocused!")
@cmds.command(name="addfocus")
async def addfocus_cmd(self, ctx):
await ctx.reply(
"Add HYPERFOCUS to your channel by authorising me here: https://croccyfocus.thewisewolf.dev/invite"
)
@cmds.command(name="focuslist")
@cmds.is_moderator()
async def focuslist_cmd(self, ctx):
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
link = (
f"https://croccyfocus.thewisewolf.dev/widget/?community={comm.communityid}"
)
await ctx.reply(f"Browser source link for your channel's hyperfocus: {link}")

View File

@@ -1,290 +0,0 @@
"""
Minimal IRC client intended to connect to one channel and send a message.
"""
import asyncio
import aiohttp
import time
import re
from . import logger
ACTIONS = (
"JOIN",
"PART",
"PING",
"PRIVMSG",
"PRIVMSG(ECHO)",
"USERSTATE",
"MODE",
"WHISPER",
"USERNOTICE",
"NOTICE",
)
ACTIONS2 = ("USERSTATE", "ROOMSTATE", "PRIVMSG", "USERNOTICE", "WHISPER")
USER_SUB = re.compile(r":(?P<user>.*)!")
MESSAGE_RE = re.compile(
r":(?P<useraddr>\S+) (?P<action>\S+) (?P<channel>\S+)( :(?P<message>.*))?$"
)
FAST_RETURN = {
"RECONNECT": {"code": 0, "action": "RECONNECT"},
"PING": {"action": "PING"},
}
def parser(data: str, nick: str):
groups = data.split()
action = groups[1] if groups[1] == "JOIN" else groups[-2]
channel = None
message = None
user = None
badges = None
_group_len = len(groups)
if action in FAST_RETURN:
return FAST_RETURN[action]
elif groups[1] in FAST_RETURN:
return FAST_RETURN[groups[1]]
elif (
groups[1] in ACTIONS
or (_group_len > 2 and groups[2] in ACTIONS)
or (_group_len > 3 and groups[3] in {"PRIVMSG", "PRIVMSG(ECHO)"})
):
result = re.search(MESSAGE_RE, data)
if not result:
logger.error("****** MESSAGE_RE Failed! ******")
raise ValueError("Could not parse message '%s' as IRC message" % data)
user = result.group("useraddr").split("!")[0]
action = result.group("action")
channel = result.group("channel").lstrip("#")
message = result.group("message")
if action == "WHISPER":
channel = None
if action in ACTIONS2:
prebadge = groups[0].split(";")
badges = {}
for badge in prebadge:
badge = badge.split("=")
try:
badges[badge[0]] = badge[1]
except IndexError:
pass
if action == "USERSTATE" and badges.get("display-name"):
user = badges["display-name"].lower()
if action == "USERNOTICE" and badges.get("login"):
user = badges["login"].lower()
elif action not in ACTIONS:
action = None
if not user:
try:
user = re.search(USER_SUB, groups[0]).group("user")
except (AttributeError, ValueError):
pass
try:
code = int(groups[1])
except ValueError:
code = 0
batches = []
if code == 353:
channel = groups[4]
if channel[0] == "#":
channel = channel[1:]
else:
logger.warning(f" (353) parse failed? ||{channel}||")
if user is None:
user = groups[-1][1:].lower()
for b in groups[5:-1]:
if b[0] == ":":
b = b[1:]
if "\r\n:" in b:
batches.append(b.split("\r\n:")[0])
break
else:
batches.append(b)
return dict(
data=data,
nick=nick,
groups=groups,
action=action,
channel=channel,
user=user,
badges=badges,
code=code,
message=message,
batches=batches,
)
class FocusChannel:
IRC_URI = "wss://irc-ws.chat.twitch.tv:443"
def __init__(self, channel: str, token: str, session: aiohttp.ClientSession):
self.channel = channel.lower()
self.token = token
self.session = session
self._socket = None
self._keeper: asyncio.Task | None = None
self._ws_ready_event: asyncio.Event = asyncio.Event()
self._joined: asyncio.Event = asyncio.Event()
self.is_ready: asyncio.Event = asyncio.Event()
self.modes = ("commands",)
self._last_ping = 0
self._reconnect_requested = False
@property
def is_alive(self):
return self._socket is not None and not self._socket.closed
async def wait_until_ready(self):
await self.is_ready.wait()
async def connect(self):
self.is_ready.clear()
if self._keeper:
self._keeper.cancel()
if self.is_alive:
await self._socket.close()
try:
self._socket = await self.session.ws_connect(url=self.IRC_URI)
except Exception as e:
logger.error(f"FocusChannel IRC connection failed: {e}")
raise
self._reconnect_requested = False
self._keeper = asyncio.create_task(self._keep_alive())
self._ws_ready_event.set()
async def authenticate(self):
if not self.is_alive:
raise ValueError("Cannot authenticate before connection.")
await self.send(f"PASS oauth:{self.token}\r\n")
await self.send(f"NICK {self.channel}\r\n")
for cap in self.modes:
await self.send(f"CAP REQ :twitch.tv/{cap}")
async def join(self):
if not self.is_alive:
raise ValueError("Cannot join before connection.")
self._joined.clear()
await self.send(f"JOIN #{self.channel}\r\n")
await self._joined.wait()
async def send(self, message: str):
await self._socket.send_str(message + "\r\n")
async def _process_data(self, data: str):
data = data.rstrip()
parsed = parser(data, self.channel)
if parsed["action"] == "PING":
return await self._ping()
elif parsed["action"] == "RECONNECT":
return await self._reconnect(parsed)
elif parsed["code"] != 0:
return await self._code(parsed, parsed["code"])
elif data.startswith(":tmi.twitch.tv NOTICE * :Login unsuccessful"):
logger.error(f"Failed to login to Twitch IRC channel {self.channel}")
return await self._close()
else:
# TODO: We could handle other actions here
return None
async def _code(self, parsed, code: int):
logger.info(f"{self!r} received '{code}': {parsed}")
if code == 1:
logger.info(f"FocusChannel logged into '{self.channel}'")
elif code == 353:
pass
elif code in (2, 3, 4, 366, 372, 375):
pass
elif code == 376:
pass
else:
pass
async def _ping(self):
self._last_ping = time.monotonic()
await self.send("PONG :tmi.twitch.tv\r\n")
async def _close(self):
if self._keeper:
self._keeper.cancel()
self.is_ready.clear()
if self._socket:
await self._socket.close()
async def _join(self, parsed):
channel = parsed["channel"]
logger.info(f"ACTION: JOIN:: {channel}")
if self.channel != channel.lower():
logger.info(
f"FocusChannel '{self.channel}' got join event for '{channel}'.. ignoring"
)
else:
self._joined.set()
async def _keep_alive(self):
await self._ws_ready_event.wait()
self._ws_ready_event.clear()
if not self._last_ping:
self._last_ping = time.monotonic()
while not self._socket.closed and not self._reconnect_requested:
msg = await self._socket.receive()
if msg.type is aiohttp.WSMsgType.CLOSED:
logger.error(f"Websocket connection closed: {msg.extra}")
break
data = msg.data
if data:
logger.debug(f" < {data}")
events = data.split("\r\n")
for event in events:
if not event:
continue
try:
await self._process_data(event)
except Exception:
logger.exception(
"Websocket message processing failed: %s", str(event)
)
async def _reconnect(self, parsed):
logger.info(f"ACTION: {self!r} recconecting")
self._reconnect_requested = True
if self._keeper:
self._keeper.cancel()
asyncio.create_task(self.connect())
async def send_msg(self, content: str):
await self._socket.send_str(content)
async def delete_msg(self, msgid: str):
return await self.send_msg(f"/delete {msgid}")