197 lines
7.0 KiB
Python
197 lines
7.0 KiB
Python
from string import punctuation
|
|
import datetime as dt
|
|
from datetime import datetime, timedelta
|
|
from math import ceil
|
|
import time
|
|
|
|
import twitchio
|
|
from twitchio import Scopes
|
|
from twitchio.ext import commands as cmds
|
|
|
|
from botdata import UserAuth
|
|
from meta import Bot
|
|
from meta.logger import log_wrap
|
|
from utils.lib import parse_dur, strfdelta, utc_now
|
|
|
|
from . import logger
|
|
|
|
from ..data import HyperfocusData, Hyperfocuser
|
|
|
|
|
|
# Default requested scopes for joining a channel
|
|
CHANNEL_SCOPES = Scopes(
|
|
(
|
|
Scopes.channel_bot,
|
|
Scopes.user_read_chat,
|
|
Scopes.user_write_chat,
|
|
Scopes.moderator_manage_chat_messages,
|
|
)
|
|
)
|
|
|
|
|
|
class FocusComponent(cmds.Component):
|
|
def __init__(self, bot: Bot):
|
|
self.bot = bot
|
|
self.data = bot.dbconn.load_registry(HyperfocusData())
|
|
|
|
self._last_deleted: dict[int, datetime] = {}
|
|
|
|
# ----- API -----
|
|
async def component_load(self):
|
|
await self.data.init()
|
|
|
|
async def get_hyperfocus(self, profileid: int) -> Hyperfocuser | None:
|
|
"""
|
|
Get the Hyperfocuser if the user is hyperfocused.
|
|
"""
|
|
row = await Hyperfocuser.fetch(profileid)
|
|
if row and row.ends_at > utc_now():
|
|
return row
|
|
|
|
async def focus_delete_message(self, message: twitchio.ChatMessage):
|
|
"""Delete the given message."""
|
|
# This should be impossible, but just in case.
|
|
# None id could cause chat to be wiped
|
|
assert message.id is not None
|
|
|
|
await message.broadcaster.delete_chat_messages(
|
|
moderator=message.broadcaster,
|
|
message_id=message.id,
|
|
)
|
|
|
|
def check_hyperfocus_message(self, message: twitchio.ChatMessage):
|
|
"""
|
|
Check whether the given message is allowed to be sent in hyperfocus.
|
|
|
|
This amounts to whether it starts with a punctuation symbol, or it is only emotes and mentions.
|
|
"""
|
|
allowed = message.text.startswith(tuple(punctuation))
|
|
|
|
if not allowed:
|
|
allowed = True
|
|
for fragment in message.fragments:
|
|
if allowed and fragment.type == "text":
|
|
stripped = fragment.text.strip().replace(" ", "").replace("\n", "")
|
|
allowed = all(not char.isascii() for char in stripped)
|
|
|
|
if not allowed:
|
|
logger.info(
|
|
f"Message failed hyperfocus check, attempting to delete: {message!r} "
|
|
)
|
|
|
|
return allowed
|
|
|
|
@cmds.Component.listener()
|
|
async def event_message(self, payload: twitchio.ChatMessage):
|
|
# Check if chatter is currently hyperfocused
|
|
profile = await self.bot.profiles.fetch_profile(payload.chatter, touch=True)
|
|
hyperfocused = await self.get_hyperfocus(profile.profileid)
|
|
|
|
# If they are, check the message content for deletion
|
|
if hyperfocused and not self.check_hyperfocus_message(payload):
|
|
# If we need to delete, run delete and send message
|
|
notify = ( #
|
|
not (last := self._last_deleted.get(profile.profileid))
|
|
or (utc_now() - last).total_seconds() > 30
|
|
)
|
|
try:
|
|
await self.focus_delete_message(payload)
|
|
if notify:
|
|
await payload.broadcaster.send_message(
|
|
f"@{payload.chatter.name} Stay focused! "
|
|
"(You are in !hyperfocus, use !unfocus to come back if you need to!)",
|
|
sender=self.bot.bot_id,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
f"Failed to delete a hyperfocus message: {payload!r}", exc_info=True
|
|
)
|
|
if notify:
|
|
await payload.broadcaster.send_message(
|
|
f"@{payload.chatter.name} Stay focused! ",
|
|
sender=self.bot.bot_id,
|
|
)
|
|
self._last_deleted[profile.profileid] = utc_now()
|
|
|
|
# ------ Commands -----
|
|
@cmds.command(
|
|
name="hyperfocus", aliases=["hyperf", "hyper", "hypercrocus", "hyperofcus"]
|
|
)
|
|
async def hyperfocus_cmd(self, ctx, *, duration: str | None = None):
|
|
now = utc_now()
|
|
|
|
# First parse duration
|
|
if duration and duration.isdigit():
|
|
dur = int(duration) * 60
|
|
elif duration:
|
|
dur = parse_dur(duration)
|
|
if not dur:
|
|
await ctx.reply(
|
|
"USAGE: '!hyperfocus <duration>' "
|
|
"For example: '!hyperfocus 10' for 10 minutes or "
|
|
"'!hyperfocus 1h 10m' for an hour and ten minutes!"
|
|
)
|
|
return
|
|
else:
|
|
# TODO: Add to community configuration
|
|
next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta(
|
|
hours=1
|
|
)
|
|
next_block = next_hour - dt.timedelta(minutes=10)
|
|
if now > next_block:
|
|
next_block += dt.timedelta(hours=1)
|
|
dur = int((next_block - now).total_seconds())
|
|
|
|
end_at = now + timedelta(seconds=dur)
|
|
|
|
# Update the row
|
|
profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True)
|
|
pid = profile.profileid
|
|
comm = await self.bot.profiles.fetch_community(ctx.broadcaster, touch=True)
|
|
|
|
await Hyperfocuser.table.delete_where(profileid=pid)
|
|
await Hyperfocuser.create(
|
|
profileid=pid,
|
|
started_at=now,
|
|
ends_at=end_at,
|
|
started_in=comm.communityid,
|
|
)
|
|
|
|
# TODO: Update channel
|
|
minutes = ceil(dur / 60)
|
|
await ctx.reply(
|
|
f"{ctx.chatter.name} has gone into HYPERFOCUS! "
|
|
f"They will be in emote and command only mode for the next {minutes} minutes! "
|
|
"Use !unfocus to come back if you need to, best of luck! ☘️🍀☘️ "
|
|
)
|
|
|
|
@cmds.command(name="unfocus")
|
|
async def unfocus_cmd(self, ctx):
|
|
profile = await self.bot.profiles.fetch_profile(ctx.chatter, touch=True)
|
|
row = await Hyperfocuser.table.delete_where(profileid=profile.profileid)
|
|
|
|
await ctx.reply(
|
|
"Welcome back from focus, hope it went well!"
|
|
" Remember to have a sip and stretch if you need it~"
|
|
)
|
|
|
|
@cmds.command(name="hyperfocused")
|
|
async def hyperfocused_cmd(self, ctx, user: twitchio.User | None = None):
|
|
user, own = (user, False) if user is not None else (ctx.chatter, True)
|
|
|
|
profile = await self.bot.profiles.fetch_profile(user, touch=False)
|
|
|
|
if hyper := (await self.get_hyperfocus(profile.profileid)):
|
|
durstr = strfdelta(hyper.ends_at - utc_now())
|
|
await ctx.reply(
|
|
f"{user.name} is in HYPERFOCUS for another {durstr}! "
|
|
"They can only write emojis and commands in this time. Good luck!"
|
|
)
|
|
elif own:
|
|
await ctx.reply(
|
|
"You are not hyperfocused!"
|
|
" Enter HYPERFOCUS mode for e.g. 10 minutes with '!hyperfocus 10'"
|
|
)
|
|
else:
|
|
await ctx.reply(f"{user.name} is not hyperfocused!")
|