Files
hyperfocus-plugin/hyperfocus/twitch/component.py
2025-11-01 11:53:42 +10:00

249 lines
9.2 KiB
Python

import asyncio
from string import punctuation
import datetime as dt
from datetime import datetime, timedelta
from math import ceil
import time
import twitchio
from twitchio import Scopes
from twitchio.ext import commands as cmds
from botdata import UserAuth
from meta import Bot
from meta.logger import log_wrap
from meta.sockets import register_channel
from utils.lib import parse_dur, strfdelta, utc_now
from . import logger
from ..data import HyperfocusData, Hyperfocuser
from ..channel import FocusChannel
# Default requested scopes for joining a channel
CHANNEL_SCOPES = Scopes(
(
Scopes.channel_bot,
Scopes.user_read_chat,
Scopes.user_write_chat,
Scopes.moderator_manage_chat_messages,
)
)
class FocusComponent(cmds.Component):
def __init__(self, bot: Bot):
self.bot = bot
self.data = bot.dbconn.load_registry(HyperfocusData())
self.channel = FocusChannel(self.bot.profiles.profiles, self.data)
register_channel(self.channel.name, self.channel)
self._last_deleted: dict[int, datetime] = {}
self.hyperfocus_lock = asyncio.Lock()
# ----- API -----
async def component_load(self):
await self.data.init()
async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None:
"""
Get the Hyperfocuser if the user is hyperfocused.
"""
row = await Hyperfocuser.fetch(profileid)
if row and row.ends_at > utc_now():
return row
async def focus_delete_message(self, message: twitchio.ChatMessage):
"""Delete the given message."""
# This should be impossible, but just in case.
# None id could cause chat to be wiped
assert message.id is not None
await message.broadcaster.delete_chat_messages(
moderator=message.broadcaster,
message_id=message.id,
)
def check_hyperfocus_message(self, message: twitchio.ChatMessage):
"""
Check whether the given message is allowed to be sent in hyperfocus.
This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions.
"""
allowed = message.text.startswith(tuple(punctuation))
if not allowed:
allowed = True
for fragment in message.fragments:
if allowed and fragment.type == "text":
stripped = fragment.text.strip().replace(" ", "").replace("\n", "")
allowed = all(not char.isascii() for char in stripped)
if not allowed:
logger.info(
f"Message failed hyperfocus check, attempting to delete: {message!r} "
)
return allowed
@cmds.Component.listener()
async def event_message(self, payload: twitchio.ChatMessage):
async with self.hyperfocus_lock:
await self.handle_message(payload)
async def handle_message(self, payload: twitchio.ChatMessage):
# Check if chatter is currently hyperfocused
profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True)
hyperfocused = await self.get_hyperfocus(profile.profileid)
# If they are, check the message content for deletion
if hyperfocused and not self.check_hyperfocus_message(payload):
# If we need to delete, run delete and send message
notify = ( #
not (last := self._last_deleted.get(profile.profileid))
or (utc_now() - last).total_seconds() > 30
)
try:
await self.focus_delete_message(payload)
deleted = True
except Exception:
logger.warning(
f"Failed to delete a hyperfocus message: {payload!r}", exc_info=True
)
deleted = False
if notify:
self._last_deleted[profile.profileid] = utc_now()
try:
if deleted:
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! "
"(You are in !hyperfocus, use !unfocus to come back if you need to!)",
sender=self.bot.bot_id,
)
else:
await payload.broadcaster.send_message(
f"@{payload.chatter.name} Stay focused! ",
sender=self.bot.bot_id,
)
except twitchio.exceptions.HTTPException:
logger.warning(
f"Failed to notify user of hyperfocus deletion: {payload!r}",
exc_info=True,
)
if hyperfocused:
# Send an update to the channel
# TODO: Nicer and more efficient caching of which channel has which users
# TODO: Possible race condition with the commands. Should use locks.
comm = await self.bot.profiles.fetch_community(
payload.broadcaster, touch=True
)
await self.channel.send_hyperfocus_patch(comm.communityid, hyperfocused)
# ------ Commands -----
@cmds.command(
name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"]
)
async def hyperfocus_cmd(self, ctx, *, duration: str | None = None):
now = utc_now()
# First parse duration
if duration and duration.isdigit():
dur = int(duration) * 60
elif duration:
dur = parse_dur(duration)
if not dur:
await ctx.reply(
"USAGE: '!hyperfocus <duration>' "
"For example: '!hyperfocus 10' for 10 minutes or "
"'!hyperfocus 1h 10m' for an hour and ten minutes!"
)
return
else:
# TODO: Add to community configuration
next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta(
hours=1
)
next_block = next_hour - dt.timedelta(minutes=10)
if now > next_block:
next_block += dt.timedelta(hours=1)
dur = int((next_block - now).total_seconds())
end_at = now + timedelta(seconds=dur)
# Update the row
profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True)
pid = profile.profileid
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
async with self.hyperfocus_lock:
await Hyperfocuser.table.delete_where(profileid=pid)
focuser = await Hyperfocuser.create(
profileid=pid,
started_at=now,
ends_at=end_at,
started_in=comm.communityid,
)
await self.channel.send_hyperfocus_patch(comm.communityid, focuser)
minutes = ceil(dur / 60)
await ctx.reply(
f"{ctx.chatter.name} has gone into HYPERFOCUS! "
f"They will be in emote and command only mode for the next {minutes} minutes! "
"Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ "
)
@cmds.command(name="unfocus")
async def unfocus_cmd(self, ctx):
profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True)
async with self.hyperfocus_lock:
row = await Hyperfocuser.fetch(profile.profileid)
if row:
await row.delete()
await self.channel.send_hyperfocus_del(profile.profileid)
await ctx.reply(
"Welcome back from focus, hope it went well!"
" Remember to have a sip and stretch if you need it~"
)
@cmds.command(name="hyperfocused")
async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None):
user, own = (user, False) if user is not None else (ctx.chatter, True)
profile = await self.bot.profiles.fetch_profile(user, touch=False)
async with self.hyperfocus_lock:
if hyper := (await self.get_hyperfocus(profile.profileid)):
durstr = strfdelta(hyper.ends_at - utc_now())
await ctx.reply(
f"{user.name} is in HYPERFOCUS for another {durstr}! "
"They can only write emojis and commands in this time. Good luck!"
)
elif own:
await ctx.reply(
"You are not hyperfocused!"
" Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'"
)
else:
await ctx.reply(f"{user.name} is not hyperfocused!")
@cmds.command(name="addfocus")
async def addfocus_cmd(self, ctx):
await ctx.reply(
"Add HYPERFOCUS to your channel by authorising me here: https://croccyfocus.thewisewolf.dev/invite"
)
@cmds.command(name="focuslist")
@cmds.is_moderator()
async def focuslist_cmd(self, ctx):
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
link = (
f"https://croccyfocus.thewisewolf.dev/widget/?community={comm.communityid}"
)
await ctx.reply(f"Browser source link for your channel's hyperfocus: {link}")