rewrite: New Pomodoro Timer system.

This commit is contained in:
2023-05-19 09:45:06 +03:00
parent 8d5840c696
commit 4aa2587c45
29 changed files with 2860 additions and 12 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -46,6 +46,7 @@ loading = <:cyclebigger:975880828611600404>
tick = :✅:
clock = :⏱️:
warning = :⚠️:
coin = <:coin:975880967485022239>

View File

@@ -730,6 +730,28 @@ CREATE TABLE season_stats(
-- }}}
-- Pomodoro Data {{{
ALTER TABLE timers ADD COLUMN ownerid BIGINT REFERENCES user_config;
ALTER TABLE timers ADD COLUMN manager_roleid BIGINT;
ALTER TABLE timers ADD COLUMN last_messageid BIGINT;
ALTER TABLE timers ADD COLUMN voice_alerts BOOLEAN;
ALTER TABLE timers ADD COLUMN auto_restart BOOLEAN;
ALTER TABLE timers RENAME COLUMN text_channelid TO notification_channelid;
ALTER TABLE timers ALTER COLUMN last_started DROP NOT NULL;
-- }}}
-- Webhooks {{{
CREATE TABLE channel_webhooks(
channelid BIGINT NOT NULL PRIMARY KEY,
webhookid BIGINT NOT NULL,
token TEXT NOT NULL
);
-- }}}
INSERT INTO VersionHistory (version, author) VALUES (13, 'v12-v13 migration');
COMMIT;

View File

@@ -2,6 +2,7 @@ from enum import Enum
from itertools import chain
from psycopg import sql
from cachetools import TTLCache
import discord
from data import Table, Registry, Column, RowModel, RegisterEnum
from data.models import WeakCache
@@ -346,3 +347,23 @@ class CoreData(Registry, name="core"):
(guildid, tuple(untracked), userid)
)
return (await curs.fetchone()) or (None, None)
class LionHook(RowModel):
"""
Schema
------
CREATE TABLE channel_webhooks(
channelid BIGINT NOT NULL PRIMARY KEY,
webhookid BIGINT NOT NULL,
token TEXT NOT NULL
);
"""
_tablename_ = 'channel_webhooks'
_cache_ = {}
channelid = Integer(primary=True)
webhookid = Integer()
token = String()
def as_webhook(self, **kwargs):
return discord.Webhook.partial(self.webhookid, self.token, **kwargs)

View File

@@ -1,5 +1,6 @@
from typing import Optional, TYPE_CHECKING
from enum import Enum
import asyncio
import pytz
import discord
@@ -48,7 +49,7 @@ class LionGuild(Timezoned):
No guarantee is made that the client is in the corresponding Guild,
or that the corresponding Guild even exists.
"""
__slots__ = ('bot', 'data', 'guildid', 'config', '_guild', '__weakref__')
__slots__ = ('bot', 'data', 'guildid', 'config', '_guild', 'voice_lock', '__weakref__')
Config = GuildConfig
settings = Config.settings
@@ -62,6 +63,11 @@ class LionGuild(Timezoned):
self.config = self.Config(self.guildid, data)
# Guild-specific voice lock
# Every module which uses voice alerts should hold this lock throughout the alert.
# Avoids voice race-states
self.voice_lock = asyncio.Lock()
@property
def guild(self):
if self._guild is None:
@@ -77,6 +83,10 @@ class LionGuild(Timezoned):
def timezone(self) -> pytz.timezone:
return self.config.timezone.value
@property
def locale(self) -> str:
return self.config.get('guild_locale').value
async def touch_discord_model(self, guild: discord.Guild):
"""
Update saved Discord model attributes for this guild.

View File

@@ -77,6 +77,10 @@ class LionBot(Bot):
with logging_context(stack=["Running"]):
await self.connect(reconnect=reconnect)
def dispatch(self, event_name: str, *args, **kwargs):
with logging_context(action=f"Dispatch {event_name}"):
super().dispatch(event_name, *args, **kwargs)
async def on_ready(self):
logger.info(
f"Logged in as {self.application.name}\n"

View File

@@ -5,10 +5,12 @@ active = [
'.config',
'.user_config',
'.economy',
'.ranks',
'.reminders',
'.shop',
'.tasklist',
'.statistics',
'.pomodoro',
'.test',
]

View File

@@ -0,0 +1,10 @@
import logging
from babel.translator import LocalBabel
babel = LocalBabel('Pomodoro')
logger = logging.getLogger(__name__)
async def setup(bot):
from .cog import TimerCog
await bot.add_cog(TimerCog(bot))

844
src/modules/pomodoro/cog.py Normal file
View File

@@ -0,0 +1,844 @@
from typing import Optional
from collections import defaultdict
import asyncio
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
from meta import LionCog, LionBot, LionContext
from meta.logger import log_wrap
from meta.sharding import THIS_SHARD
from utils.lib import utc_now
from wards import low_management
from . import babel, logger
from .data import TimerData
from .lib import TimerRole
from .settings import TimerSettings
from .settingui import TimerConfigUI
from .timer import Timer
from .options import TimerOptions
from .ui import TimerStatusUI
from .ui.config import TimerOptionsUI
_p = babel._p
_param_options = {
'focus_length': (TimerOptions.FocusLength, TimerRole.MANAGER),
'break_length': (TimerOptions.BreakLength, TimerRole.MANAGER),
'notification_channel': (TimerOptions.NotificationChannel, TimerRole.ADMIN),
'inactivity_threshold': (TimerOptions.InactivityThreshold, TimerRole.OWNER),
'manager_role': (TimerOptions.ManagerRole, TimerRole.ADMIN),
'voice_alerts': (TimerOptions.VoiceAlerts, TimerRole.OWNER),
'name': (TimerOptions.BaseName, TimerRole.OWNER),
'channel_name': (TimerOptions.ChannelFormat, TimerRole.OWNER),
}
class TimerCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(TimerData())
self.settings = TimerSettings()
self.timer_options = TimerOptions()
self.ready = False
self.timers = defaultdict(dict)
async def cog_load(self):
await self.data.init()
self.bot.core.guild_config.register_model_setting(self.settings.PomodoroChannel)
configcog = self.bot.get_cog('ConfigCog')
self.crossload_group(self.configure_group, configcog.configure_group)
if self.bot.is_ready():
await self.initialise()
async def cog_unload(self):
"""
Detach TimerCog and unload components.
Clears caches and stops run-tasks for each active timer.
Does not exist until all timers have completed background tasks.
"""
timers = (timer for tguild in self.timers.values() for timer in tguild.values())
try:
await asyncio.gather(*(timer.unload() for timer in timers))
except Exception:
logger.exception(
"Exception encountered while unloading `TimerCog`"
)
self.timers.clear()
async def _load_timers(self, timer_data: list[TimerData.Timer]):
"""
Factored method to load a list of timers from data rows.
"""
guildids = set()
to_delete = []
to_create = []
for row in timer_data:
channel = self.bot.get_channel(row.channelid)
if not channel:
to_delete.append(row.channelid)
else:
guildids.add(row.guildid)
to_create.append(row)
if guildids:
lguilds = await self.bot.core.lions.fetch_guilds(*guildids)
else:
lguilds = []
now = utc_now()
to_launch = []
to_update = []
timer_reg = defaultdict(dict)
for row in to_create:
timer = Timer(self.bot, row, lguilds[row.guildid])
if timer.running:
to_launch.append(timer)
else:
to_update.append(timer)
timer_reg[row.guildid][row.channelid] = timer
timer.last_seen = {member.id: now for member in timer.members}
# Delete non-existent timers
if to_delete:
await self.data.Timer.table.delete_where(channelid=to_delete)
idstr = ', '.join(map(str, to_delete))
logger.info(
f"Destroyed {len(to_delete)} timers with missing voice channels: {idstr}"
)
# Re-launch and update running timers
for timer in to_launch:
timer.launch()
tasks = [
asyncio.create_task(timer.update_status_card()) for timer in to_launch
]
if tasks:
try:
await asyncio.gather(*tasks)
except Exception:
logger.exception(
"Exception occurred updating timer status for running timers."
)
logger.info(
f"Updated and launched {len(to_launch)} running timers."
)
# Update stopped timers
tasks = [
asyncio.create_task(timer.update_status_card()) for timer in to_update
]
if tasks:
try:
await asyncio.gather(*tasks)
except Exception:
logger.exception(
"Exception occurred updating timer status for stopped timers."
)
logger.info(
f"Updated {len(to_update)} stopped timers."
)
# Update timer registry
self.timers.update(timer_reg)
@LionCog.listener('on_ready')
@log_wrap(action='Init Timers')
async def initialise(self):
"""
Restore timers.
"""
self.ready = False
self.timers = defaultdict(dict)
# Fetch timers in guilds on this shard
# TODO: Join with guilds and filter by guilds we are still in
timer_data = await self.data.Timer.fetch_where(THIS_SHARD)
await self._load_timers(timer_data)
# Ready to handle events
self.ready = True
logger.info("Timer system ready to process events.")
# ----- Event Handlers -----
@LionCog.listener('on_voice_state_update')
@log_wrap(action='Timer Voice Events')
async def timer_voice_events(self, member, before, after):
if not self.ready:
# Trust initialise to trigger update status
return
if member.bot:
return
# If a member is leaving or joining a running timer, trigger a status update
if before.channel != after.channel:
leaving = self.get_channel_timer(before.channel.id) if before.channel else None
joining = self.get_channel_timer(after.channel.id) if after.channel else None
tasks = []
if leaving:
tasks.append(leaving.update_status_card())
if joining is not None:
joining.last_seen[member.id] = utc_now()
if not joining.running and joining.auto_restart:
tasks.append(joining.start())
else:
tasks.append(joining.update_status_card())
if tasks:
try:
await asyncio.gather(*tasks)
except Exception:
logger.exception(
"Exception occurred while handling timer voice event. "
f"Leaving: {leaving!r} "
f"Joining: {joining!r}"
)
@LionCog.listener('on_guild_remove')
@log_wrap(action='Unload Guild Timers')
async def _unload_guild_timers(self, guild: discord.Guild):
"""
When we leave a guild, perform an unload for all timers in the Guild.
"""
if not self.ready:
# Trust initialiser to ignore the guild
return
timers = self.timers.pop(guild.id)
tasks = []
for timer in timers:
tasks.append(asyncio.create_task(timer.unload()))
if tasks:
try:
await asyncio.gather(*tasks)
except Exception:
logger.warning(
"Exception occurred while unloading timers for removed guild.",
exc_info=True
)
logger.info(
f"Unloaded {len(timers)} from removed guild <gid: {guild.id}>."
)
@LionCog.listener('on_guild_join')
@log_wrap(action='Load Guild Timers')
async def _load_guild_timers(self, guild: discord.Guild):
"""
When we join a guild, reload any saved timers for this guild.
"""
timer_data = await self.data.Timer.fetch_where(guildid=guild.id)
if timer_data:
await self._load_timers(timer_data)
@LionCog.listener('on_guild_channel_delete')
@log_wrap(action='Destroy Channel Timer')
async def _destroy_channel_timer(self, channel: discord.abc.GuildChannel):
"""
If a voice channel with a timer was deleted, destroy the timer.
"""
timer = self.get_channel_timer(channel.id)
if timer is not None:
await timer.destroy(reason="Voice Channel Deleted")
@LionCog.listener('on_guildset_pomodoro_channel')
@log_wrap(action='Update Pomodoro Channels')
async def _update_pomodoro_channels(self, guildid: int, setting: TimerSettings.PomodoroChannel):
"""
Request a send_status for all guild timers which need to move channel.
"""
timers = self.get_guild_timers(guildid).values()
tasks = []
for timer in timers:
current_channel = timer.notification_channel
current_hook = timer._hook
if current_channel and (not current_hook or current_hook.channelid != current_channel.id):
tasks.append(asyncio.create_task(timer.send_status()))
if tasks:
try:
await asyncio.gather(*tasks)
except Exception:
logger.warning(
"Exception occurred which refreshing status for timers with new notification_channel.",
exc_info=True
)
# ----- Timer API -----
def get_guild_timers(self, guildid: int) -> dict[int, Timer]:
"""
Get all timers in the given guild as a map channelid -> Timer.
"""
return self.timers[guildid]
def get_channel_timer(self, channelid: int) -> Optional[Timer]:
"""
Get the timer bound to the given channel, or None if it does not exist.
"""
channel = self.bot.get_channel(channelid)
if channel:
return self.timers[channel.guild.id].get(channelid, None)
async def create_timer(self, **kwargs):
timer_data = await self.data.Timer.create(**kwargs)
lguild = await self.bot.core.lions.fetch_guild(timer_data.guildid)
timer = Timer(self.bot, timer_data, lguild)
self.timers[timer_data.guildid][timer_data.channelid] = timer
return timer
async def destroy_timer(self, timer: Timer, **kwargs):
"""
Destroys the provided timer and removes it from the registry.
"""
self.timers[timer.data.guildid].pop(timer.data.channelid, None)
await timer.destroy(**kwargs)
# ----- Timer Commands -----
@cmds.hybrid_group(
name=_p('cmd:pomodoro', "pomodoro"),
desc=_p('cmd:pomodoro|desc', "Base group for all pomodoro timer commands.")
)
@cmds.guild_only()
async def pomodoro_group(self, ctx: LionContext):
...
# -- User Display Commands --
@pomodoro_group.command(
name=_p('cmd:pomodoro_status', "show"),
description=_p('cmd:pomodoro_status|desc', "Display the status of a single pomodoro timer.")
)
@appcmds.rename(
channel=_p('cmd:pomodoro_status|param:channel', "timer_channel")
)
@appcmds.describe(
channel=_p(
'cmd:pomodoro_status|param:channel|desc',
"The channel for which you want to view the timer."
)
)
async def cmd_pomodoro_status(self, ctx: LionContext, channel: discord.VoiceChannel):
t = self.bot.translator.t
if not ctx.guild:
return
if not ctx.interaction:
return
# Check if a timer exists in the given channel
timer = self.get_channel_timer(channel.id)
if timer is None:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_status|error:no_timer',
"The channel {channel} does not have a timer set up!"
)).format(channel=channel.mention)
)
await ctx.reply(embed=embed, ephemeral=True)
else:
# Display the timer status ephemerally
status = await timer.current_status(with_notify=False, with_warnings=False)
await ctx.reply(**status.send_args, ephemeral=True)
@pomodoro_group.command(
name=_p('cmd:pomodoro_list', "list"),
description=_p('cmd:pomodoro_list|desc', "List the available pomodoro timers.")
)
async def cmd_pomodoro_list(self, ctx: LionContext):
t = self.bot.translator.t
if not ctx.guild:
return
if not ctx.interaction:
return
timers = list(self.get_guild_timers(ctx.guild.id).values())
visible_timers = [
timer for timer in timers
if timer.channel and timer.channel.permissions_for(ctx.author).view_channel
]
if not timers:
# No timers in this guild!
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_list|error:no_timers',
"No timers have been setup in this server!\n"
"You can ask an admin to create one with {command}."
)).format(command='`/pomodoro admin create`')
)
# TODO: Update command mention when we have command mentions
await ctx.reply(embed=embed, ephemeral=True)
elif not visible_timers:
# Timers exist, but the member can't see any
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_list|error:no_visible_timers',
"There are no visible timers in this server!"
))
)
await ctx.reply(embed=embed, ephemeral=True)
else:
# Timers exist and are visible!
embed = discord.Embed(
colour=discord.Colour.orange(),
title=t(_p(
'cmd:pomodoro_list|embed:timer_list|title',
"Pomodoro Timers in **{guild}**"
)).format(guild=ctx.guild.name),
)
for timer in visible_timers:
stage = timer.current_stage
if stage is None:
if timer.auto_restart:
lazy_status = _p(
'cmd:pomodoro_list|status:stopped_auto',
"`{pattern}` timer is stopped with no members!\nJoin {channel} to restart it."
)
else:
lazy_status = _p(
'cmd:pomodoro_list|status:stopped_manual',
"`{pattern}` timer is stopped with `{members}` members!\n"
"Join {channel} and press `Start` to start it!"
)
else:
if stage.focused:
lazy_status = _p(
'cmd:pomodoro_list|status:running_focus',
"`{pattern}` timer is running with `{members}` members!\n"
"Currently **focusing**, with break starting {timestamp}"
)
else:
lazy_status = _p(
'cmd:pomodoro_list|status:running_break',
"`{pattern}` timer is running with `{members}` members!\n"
"Currently **resting**, with focus starting {timestamp}"
)
status = t(lazy_status).format(
pattern=timer.pattern,
channel=timer.channel.mention,
members=len(timer.members),
timestamp=f"<t:{int(stage.end.timestamp())}:R>" if stage else None
)
embed.add_field(name=timer.channel.mention, value=status, inline=False)
await ctx.reply(embed=embed, ephemeral=False)
# -- Admin Commands --
@pomodoro_group.group(
name=_p('cmd:pomodoro_admin', "admin"),
desc=_p('cmd:pomodoro_admin|desc', "Command group for pomodoro admin controls.")
)
async def pomodoro_admin_group(self, ctx: LionContext):
...
@pomodoro_admin_group.command(
name=_p('cmd:pomodoro_create', "create"),
description=_p(
'cmd:pomodoro_create|desc',
"Create a new Pomodoro timer. Requires admin permissions."
)
)
@appcmds.rename(
channel=_p('cmd:pomodoro_create|param:channel', "timer_channel"),
**{param: option._display_name for param, (option, _) in _param_options.items()}
)
@appcmds.describe(
channel=_p(
'cmd:pomodoro_create|param:channel|desc',
"Voice channel to create the timer in. (Defaults to your current channel, or makes a new one.)"
),
**{param: option._desc for param, (option, _) in _param_options.items()}
)
async def cmd_pomodoro_create(self, ctx: LionContext,
focus_length: appcmds.Range[int, 1, 24*60],
break_length: appcmds.Range[int, 1, 24*60],
channel: Optional[discord.VoiceChannel] = None,
notification_channel: Optional[discord.TextChannel | discord.VoiceChannel] = None,
inactivity_threshold: Optional[appcmds.Range[int, 0, 127]] = None,
manager_role: Optional[discord.Role] = None,
voice_alerts: Optional[bool] = None,
name: Optional[appcmds.Range[str, 0, 100]] = None,
channel_name: Optional[appcmds.Range[str, 0, 100]] = None,
):
t = self.bot.translator.t
# Type guards
if not ctx.guild:
return
if not ctx.interaction:
return
# Check permissions
if not ctx.author.guild_permissions.administrator:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_create|error:insufficient_perms',
"Only server administrators can create timers!"
))
)
await ctx.reply(embed=embed, ephemeral=True)
return
# If a voice channel was not given, attempt to resolve it or make one
if channel is None:
# Resolving order: command channel, author voice channel, new channel
if ctx.channel.type is discord.ChannelType.voice:
channel = ctx.channel
elif ctx.author.voice and ctx.author.voice.channel:
channel = ctx.author.voice.channel
else:
# Attempt to create new channel in current category
if ctx.guild.me.guild_permissions.manage_channels:
try:
channel = await ctx.guild.create_voice_channel(
name="Timer",
reason="Creating Pomodoro Voice Channel",
category=ctx.channel.category
)
except discord.HTTPException:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'cmd:pomodoro_create|error:channel_create_failed|title',
"Could not create pomodoro voice channel!"
)),
description=t(_p(
'cmd:pomodoro_create|error:channel_create|desc',
"Failed to create a new pomodoro voice channel due to an unknown "
"Discord communication error. "
"Please try creating the channel manually and pass it to the "
"`timer_channel` argument of this command."
))
)
await ctx.reply(embed=embed, ephemeral=True)
return
else:
# Error
embed = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'cmd:pomodoro_create|error:channel_create_permissions|title',
"Could not create pomodoro voice channel!"
)),
description=t(_p(
'cmd:pomodoro_create|error:channel_create_permissions|desc',
"No `timer_channel` was provided, and I lack the `MANAGE_CHANNELS` permission "
"needed to create a new voice channel."
))
)
await ctx.reply(embed=embed, ephemeral=True)
return
# At this point, we have a voice channel
# Make sure a timer does not already exist in the channel
if (self.get_channel_timer(channel.id)) is not None:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_create|error:timer_exists',
"A timer already exists in {channel}! Use `/pomodoro admin edit` to modify it."
)).format(channel=channel.mention)
)
await ctx.reply(embed=embed, ephemeral=True)
return
# Build the creation arguments from the rest of the provided args
provided = {
'focus_length': focus_length * 60,
'break_length': break_length * 60,
'notification_channel': notification_channel,
'inactivity_threshold': inactivity_threshold,
'manager_role': manager_role,
'voice_alerts': voice_alerts,
'name': name or channel.name,
'channel_name': channel_name or None,
}
create_args = {'channelid': channel.id, 'guildid': channel.guild.id}
for param, value in provided.items():
if value is not None:
setting, _ = _param_options[param]
create_args[setting._column] = setting._data_from_value(channel.id, value)
# Permission checks and input checking done
await ctx.interaction.response.defer(thinking=True)
# Create timer
timer = await self.create_timer(**create_args)
# Start timer
await timer.start()
# Ack with a config UI
ui = TimerOptionsUI(self.bot, timer, TimerRole.ADMIN, callerid=ctx.author.id)
await ui.run(
ctx.interaction,
content=t(_p(
'cmd:pomodoro_create|response:success|content',
"Timer created successfully! Use the panel below to reconfigure."
))
)
await ui.wait()
@pomodoro_admin_group.command(
name=_p('cmd:pomodoro_destroy', "destroy"),
description=_p(
'cmd:pomodoro_destroy|desc',
"Delete a pomodoro timer from a voice channel. Requires admin permissions."
)
)
@appcmds.rename(
channel=_p('cmd:pomodoro_destroy|param:channel', "timer_channel"),
)
@appcmds.describe(
channel=_p('cmd:pomodoro_destroy|param:channel', "Channel with the timer to delete."),
)
async def cmd_pomodoro_delete(self, ctx: LionContext, channel: discord.VoiceChannel):
t = self.bot.translator.t
# Type guards
if not ctx.guild:
return
if not ctx.interaction:
return
# Check the timer actually exists
timer = self.get_channel_timer(channel.id)
if timer is None:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_destroy|error:no_timer',
"This channel doesn't have an attached pomodoro timer!"
))
)
await ctx.interaction.response.send_message(embed=embed, ephemeral=True)
return
# Check the user has sufficient permissions to delete the timer
# TODO: Should we drop the admin requirement down to manage channel?
timer_role = timer.get_member_role(ctx.author)
if timer.owned:
if timer_role < TimerRole.OWNER:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_destroy|error:insufficient_perms|owned',
"You need to be an administrator or own this channel to remove this timer!"
))
)
await ctx.interaction.response.send_message(embed=embed, ephemeral=True)
return
elif timer_role is not TimerRole.ADMIN:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_destroy|error:insufficient_perms|notowned',
"You need to be a server administrator to remove this timer!"
))
)
await ctx.interaction.response.send_message(embed=embed, ephemeral=True)
return
await ctx.interaction.response.defer(thinking=True)
await self.destroy_timer(timer, reason="Deleted by command")
embed = discord.Embed(
colour=discord.Colour.brand_green(),
description=t(_p(
'cmd:pomdoro_destroy|response:success|description',
"Timer successfully removed from {channel}."
)).format(channel=channel.mention)
)
await ctx.interaction.edit_original_response(embed=embed)
@pomodoro_admin_group.command(
name=_p('cmd:pomodoro_edit', "edit"),
description=_p(
'cmd:pomodoro_edit|desc',
"Edit a Timer"
)
)
@appcmds.rename(
channel=_p('cmd:pomodoro_edit|param:channel', "timer_channel"),
**{param: option._display_name for param, (option, _) in _param_options.items()}
)
@appcmds.describe(
channel=_p(
'cmd:pomodoro_edit|param:channel|desc',
"Channel holding the timer to edit."
),
**{param: option._desc for param, (option, _) in _param_options.items()}
)
async def cmd_pomodoro_edit(self, ctx: LionContext,
channel: discord.VoiceChannel,
focus_length: Optional[appcmds.Range[int, 1, 24*60]] = None,
break_length: Optional[appcmds.Range[int, 1, 24*60]] = None,
notification_channel: Optional[discord.TextChannel | discord.VoiceChannel] = None,
inactivity_threshold: Optional[appcmds.Range[int, 0, 127]] = None,
manager_role: Optional[discord.Role] = None,
voice_alerts: Optional[bool] = None,
name: Optional[appcmds.Range[str, 0, 100]] = None,
channel_name: Optional[appcmds.Range[str, 0, 100]] = None,
):
t = self.bot.translator.t
provided = {
'focus_length': focus_length * 60 if focus_length else None,
'break_length': break_length * 60 if break_length else None,
'notification_channel': notification_channel,
'inactivity_threshold': inactivity_threshold,
'manager_role': manager_role,
'voice_alerts': voice_alerts,
'name': name or None,
'channel_name': channel_name or None,
}
modified = set(param for param, value in provided.items() if value is not None)
# Type guards
if not ctx.guild:
return
if not ctx.interaction:
return
# Check the timer actually exists
timer = self.get_channel_timer(channel.id)
if timer is None:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_edit|error:no_timer',
"This channel doesn't have an attached pomodoro timer to edit!"
))
)
await ctx.interaction.response.send_message(embed=embed, ephemeral=True)
return
# Check that the author has sufficient permissions to update the timer at all
timer_role = timer.get_member_role(ctx.author)
if timer_role is TimerRole.OTHER:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:pomodoro_edit|error:insufficient_perms|role:other',
"Insufficient permissions to modifiy this timer!\n"
"You need to be a server administrator, own this channel, or have the timer manager role."
))
)
await ctx.reply(embed=embed, ephemeral=True)
return
# Check that the author has sufficient permissions to modify the requested items
# And build the list of arguments to write
update_args = {}
for param in modified:
setting, required = _param_options[param]
if timer_role < required:
if required is TimerRole.OWNER and not timer.owned:
required = TimerRole.ADMIN
elif required is TimerRole.MANAGER and timer.data.manager_roleid is None:
required = TimerRole.ADMIN
if required is TimerRole.ADMIN:
error = t(_p(
'cmd:pomodoro_edit|error:insufficient_permissions|role_needed:admin',
"You need to be a guild admin to modify this option!"
))
elif required is TimerRole.OWNER:
error = t(_p(
'cmd:pomodoro_edit|error:insufficient_permissions|role_needed:owner',
"You need to be a channel owner or guild admin to modify this option!"
))
elif required is TimerRole.MANAGER:
error = t(_p(
'cmd:pomodoro_edit|error:insufficient_permissions|role_needed:manager',
"You need to be a guild admin or have the manager role to modify this option!"
))
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=error
)
await ctx.reply(embed=embed, ephemeral=True)
return
update_args[setting._column] = setting._data_from_value(channel.id, provided[param])
await ctx.interaction.response.defer(thinking=True)
if update_args:
# Update the timer data
await timer.data.update(**update_args)
# Regenerate or refresh the timer
if ('focus_length' in modified) or ('break_length' in modified):
await timer.start()
elif ('notification_channel' in modified):
await timer.send_status()
else:
await timer.update_status_card()
# Show the config UI
ui = TimerOptionsUI(self.bot, timer, timer_role)
await ui.run(ctx.interaction)
await ui.wait()
# ----- Guild Config Commands -----
@LionCog.placeholder_group
@cmds.hybrid_group('configure', with_app_command=False)
async def configure_group(self, ctx: LionContext):
...
@configure_group.command(
name=_p('cmd:configure_pomodoro', "pomodoro"),
description=_p('cmd:configure_pomodoro|desc', "Configure Pomodoro Timer System")
)
@appcmds.rename(
pomodoro_channel=TimerSettings.PomodoroChannel._display_name
)
@appcmds.describe(
pomodoro_channel=TimerSettings.PomodoroChannel._desc
)
@cmds.check(low_management)
async def configure_pomodoro_command(self, ctx: LionContext,
pomodoro_channel: Optional[discord.VoiceChannel | discord.TextChannel] = None):
t = self.bot.translator.t
# Type checking guards
if not ctx.guild:
return
if not ctx.interaction:
return
await ctx.interaction.response.defer(thinking=True)
pomodoro_channel_setting = await self.settings.PomodoroChannel.get(ctx.guild.id)
if pomodoro_channel is not None:
# VALIDATE PERMISSIONS!
pomodoro_channel_setting.value = pomodoro_channel
await pomodoro_channel_setting.write()
modified = True
else:
modified = False
if modified:
line = pomodoro_channel_setting.update_message
embed = discord.Embed(
colour=discord.Colour.brand_green(),
description=f"{self.bot.config.emojis.tick} {line}"
)
await ctx.reply(embed=embed)
if ctx.channel.id not in TimerConfigUI._listening or not modified:
ui = TimerConfigUI(self.bot, ctx.guild.id, ctx.channel.id)
await ui.run(ctx.interaction)
await ui.wait()

View File

@@ -0,0 +1,46 @@
from data import Registry, RowModel
from data.columns import Integer, Bool, Timestamp, String
class TimerData(Registry):
class Timer(RowModel):
"""
Schema
------
CREATE TABLE timers(
channelid BIGINT PRIMARY KEY,
guildid BIGINT NOT NULL REFERENCES guild_config (guildid),
ownerid BIGINT REFERENCES user_config,
manager_roleid BIGINT,
notification_channelid BIGINT,
focus_length INTEGER NOT NULL,
break_length INTEGER NOT NULL,
last_started TIMESTAMPTZ,
last_messageid BIGINT,
voice_alerts BOOLEAN,
inactivity_threshold INTEGER,
auto_restart BOOLEAN,
channel_name TEXT,
pretty_name TEXT
);
CREATE INDEX timers_guilds ON timers (guildid);
"""
_tablename_ = 'timers'
channelid = Integer(primary=True)
guildid = Integer()
ownerid = Integer()
manager_roleid = Integer()
last_started = Timestamp()
focus_length = Integer()
break_length = Integer()
auto_restart = Bool()
inactivity_threshold = Integer()
notification_channelid = Integer()
last_messageid = Integer()
voice_alerts = Bool()
channel_name = String()
pretty_name = String()

View File

@@ -0,0 +1,55 @@
from typing import TYPE_CHECKING
from meta import LionBot
from utils.lib import utc_now
from gui.cards import FocusTimerCard, BreakTimerCard
if TYPE_CHECKING:
from .timer import Timer, Stage
from tracking.voice.cog import VoiceTrackerCog
async def get_timer_card(bot: LionBot, timer: 'Timer', stage: 'Stage'):
voicecog: 'VoiceTrackerCog' = bot.get_cog('VoiceTrackerCog')
name = timer.base_name
if stage is not None:
duration = stage.duration
remaining = (stage.end - utc_now()).total_seconds()
else:
remaining = duration = timer.data.focus_length
card_users = []
guildid = timer.data.guildid
for member in timer.members:
session_data = None
if voicecog is not None:
session = voicecog.get_session(guildid, member.id)
session_data = session.data
if session_data:
session_duration = (utc_now() - session_data.start_time).total_seconds()
tag = session_data.tag
else:
session_duration = 0
tag = None
card_user = (
(member.id, member.avatar.key),
session_duration,
tag,
)
card_users.append(card_user)
if stage is None or stage.focused:
card_cls = FocusTimerCard
else:
card_cls = BreakTimerCard
return card_cls(
name,
remaining,
duration,
users=card_users,
)

View File

@@ -0,0 +1,27 @@
import os
from enum import IntEnum
from meta import conf
from . import babel
_p = babel._p
class TimerRole(IntEnum):
ADMIN = 3
OWNER = 2
MANAGER = 1
OTHER = 0
channel_name_keys = [
("{remaining}", _p('formatstring:channel_name|key:remaining', "{remaining}")),
("{stage}", _p('formatstring:channel_name|key:stage', "{stage}")),
("{members}", _p('formatstring:channel_name|key:members', "{members}")),
("{name}", _p('formatstring:channel_name|key:name', "{name}")),
("{pattern}", _p('formatstring:channel_name|key:pattern', "{pattern}")),
]
focus_alert_path = os.path.join(conf.bot.asset_path, 'pomodoro', 'focus_alert.wav')
break_alert_path = os.path.join(conf.bot.asset_path, 'pomodoro', 'break_alert.wav')

View File

@@ -0,0 +1,252 @@
from typing import Optional
import discord
from meta import LionBot
from meta.errors import UserInputError
from babel.translator import ctx_translator
from settings import ModelData
from settings.groups import SettingGroup, ModelConfig, SettingDotDict
from settings.setting_types import (
ChannelSetting, RoleSetting, BoolSetting, StringSetting, IntegerSetting, DurationSetting
)
from .data import TimerData
from . import babel
_p = babel._p
class TimerConfig(ModelConfig):
settings = SettingDotDict()
_model_settings = set()
model = TimerData.Timer
class TimerOptions(SettingGroup):
@TimerConfig.register_model_setting
class VoiceChannel(ModelData, ChannelSetting):
setting_id = 'voice_channel'
_display_name = _p('timerset:voice_channel', "channel")
_desc = _p(
'timerset:voice_channel|desc',
"Channel in which to track timer members and send alerts."
)
_model = TimerData.Timer
_column = TimerData.Timer.channelid.name
@TimerConfig.register_model_setting
class NotificationChannel(ModelData, ChannelSetting):
setting_id = 'notification_channel'
_display_name = _p('timerset:notification_channel', "notification_channel")
_desc = _p(
'timerset:notification_channel|desc',
"Channel to which to send timer status cards and notifications."
)
_model = TimerData.Timer
_column = TimerData.Timer.notification_channelid.name
@classmethod
async def _check_value(cls, parent_id: int, value: Optional[discord.abc.GuildChannel], **kwargs):
if value is not None:
# TODO: Check we either have or can create a webhook
# TODO: Check we can send messages, embeds, and files
...
@classmethod
def _format_data(cls, parent_id, data, timer=None, **kwargs):
actual = timer.notification_channel if timer is not None else None
if data is None and actual is not None:
t = ctx_translator.get().t
formatted = t(_p(
'timerset:notification_channel|format:notset',
"Not Set (Using {channel})"
)).format(channel=actual.mention)
else:
formatted = super()._format_data(parent_id, data, **kwargs)
return formatted
@TimerConfig.register_model_setting
class InactivityThreshold(ModelData, IntegerSetting):
setting_id = 'inactivity_threshold'
_display_name = _p('timerset:inactivity_threshold|inactivity_threshold', "inactivity_threshold")
_desc = _p(
'timerset:inactivity_threshold|desc',
"Number of inactive focus+break stages before a member is removed from the timer."
)
_accepts = _p(
'timerset:inactivity_threshold|desc',
"How many timer cycles before kicking inactive members."
)
_model = TimerData.Timer
_column = TimerData.Timer.inactivity_threshold.name
@property
def input_formatted(self):
return str(self._data) if self._data is not None else ''
@TimerConfig.register_model_setting
class ManagerRole(ModelData, RoleSetting):
setting_id = 'manager_role'
_display_name = _p('timerset:manager_role', "manager_role")
_desc = _p(
'timerset:manager_role|desc',
"Role allowed to start, stop, and edit the focus/break lengths."
)
_model = TimerData.Timer
_column = TimerData.Timer.manager_roleid.name
@classmethod
def _format_data(cls, parent_id, data, timer=None, **kwargs):
if data is None:
t = ctx_translator.get().t
formatted = t(_p(
'timerset:manager_role|format:notset',
"Not Set (Only Admins may start/stop or edit pattern)"
))
else:
formatted = super()._format_data(parent_id, data, **kwargs)
return formatted
@TimerConfig.register_model_setting
class VoiceAlerts(ModelData, BoolSetting):
setting_id = 'voice_alerts'
_display_name = _p('timerset:voice_alerts', "voice_alerts")
_desc = _p(
'timerset:voice_alerts|desc',
"Whether to join the voice channel and announce focus and break stages."
)
_default = True
_model = TimerData.Timer
_column = TimerData.Timer.voice_alerts.name
@TimerConfig.register_model_setting
class BaseName(ModelData, StringSetting):
setting_id = 'base_name'
_display_name = _p('timerset:base_name', "name")
_desc = _p(
'timerset:base_name|desc',
"Timer name, as shown on the timer card."
)
_accepts = _p(
'timerset:base_name|accepts',
"Any short name, shown on the timer card."
)
# TODO: Consider ways of localising string setting defaults?
# Probably using the default property?
_default = "Timer"
_model = TimerData.Timer
_column = TimerData.Timer.pretty_name.name
@TimerConfig.register_model_setting
class ChannelFormat(ModelData, StringSetting):
setting_id = 'channel_name_format'
_display_name = _p('timerset:channel_name_format', "channel_name")
_desc = _p(
'timerset:channel_name_format|desc',
"Auto-updating voice channel name, accepting {remaining}, {name}, {pattern}, and {stage} keys."
)
_accepts = _p(
'timerset:channel_name|accepts',
"Timer channel name, with keys {remaining}, {name}, {pattern}, and {stage}."
)
_default = "{name} {pattern} - {stage}"
_model = TimerData.Timer
_column = TimerData.Timer.channel_name.name
@TimerConfig.register_model_setting
class FocusLength(ModelData, DurationSetting):
setting_id = 'focus_length'
_display_name = _p('timerset:focus_length', "focus_length")
_desc = _p(
'timerset:focus_length|desc',
"Length of the focus stage of the timer in minutes."
)
_virtual = True
_accepts = _p(
'timerset:focus_length|accepts',
"A positive integer number of minutes."
)
_required = True
_model = TimerData.Timer
_column = TimerData.Timer.focus_length.name
_default_multiplier = 60
allow_zero = False
_show_days = False
@property
def input_formatted(self):
return str(int(self._data // 60)) if self._data else None
@classmethod
async def _parse_string(cls, parent_id, string, **kwargs):
try:
return await super()._parse_string(parent_id, string, **kwargs)
except UserInputError:
t = ctx_translator.get().t
raise UserInputError(
t(_p(
'timerset:focus_length|desc',
"Please enter a positive number of minutes."
))
)
@TimerConfig.register_model_setting
class BreakLength(ModelData, DurationSetting):
setting_id = 'break_length'
_display_name = _p('timerset:break_length', "break_length")
_desc = _p(
'timerset:break_length|desc',
"Length of the break stage of the timer in minutes."
)
_virtual = True
_accepts = _p(
'timerset:break_length|accepts',
"A positive integer number of minutes."
)
_required = True
_model = TimerData.Timer
_column = TimerData.Timer.break_length.name
_default_multiplier = 60
allow_zero = False
_show_days = False
@property
def input_formatted(self):
return str(int(self._data // 60)) if self._data else None
@classmethod
async def _parse_string(cls, parent_id, string, **kwargs):
try:
return await super()._parse_string(parent_id, string, **kwargs)
except UserInputError:
t = ctx_translator.get().t
raise UserInputError(
t(_p(
'timerset:break_length|desc',
"Please enter a positive number of minutes."
))
)

View File

@@ -0,0 +1,47 @@
from settings import ModelData
from settings.groups import SettingGroup
from settings.setting_types import ChannelSetting
from core.data import CoreData
from babel.translator import ctx_translator
from . import babel
_p = babel._p
class TimerSettings(SettingGroup):
class PomodoroChannel(ModelData, ChannelSetting):
setting_id = 'pomodoro_channel'
_event = 'guildset_pomodoro_channel'
_display_name = _p('guildset:pomodoro_channel', "pomodoro_channel")
_desc = _p(
'guildset:pomodoro_channel|desc',
"Default central notification channel for pomodoro timers."
)
_long_desc = _p(
'guildset:pomodoro_channel|long_desc',
"Pomodoro timers which do not have a custom notification channel set will send "
"timer notifications in this channel. "
"If this setting is not set, pomodoro notifications will default to the "
"timer voice channel itself."
)
_model = CoreData.Guild
_column = CoreData.Guild.pomodoro_channel.name
@property
def update_message(self) -> str:
t = ctx_translator.get().t
value = self.value
if value is not None:
resp = t(_p(
'guildset:pomodoro_channel|set_response|set',
"Pomodoro timer notifications will now default to {channel}"
)).format(channel=value.mention)
else:
resp = t(_p(
'guildset:pomodoro_channel|set_response|unset',
"Pomodoro timer notifications will now default to their voice channel."
))
return resp

View File

@@ -0,0 +1,84 @@
import asyncio
import discord
from discord.ui.select import select, ChannelSelect
from meta import LionBot
from utils.ui import ConfigUI, DashboardSection
from utils.lib import MessageArgs
from .settings import TimerSettings
from . import babel
_p = babel._p
class TimerConfigUI(ConfigUI):
setting_classes = (
TimerSettings.PomodoroChannel,
)
def __init__(self, bot: LionBot, guildid: int, channelid: int, **kwargs):
self.settings = bot.get_cog('TimerCog').settings
super().__init__(bot, guildid, channelid, **kwargs)
# ----- UI Components -----
@select(cls=ChannelSelect, channel_types=[discord.ChannelType.text, discord.ChannelType.voice],
placeholder="CHANNEL_SELECT_PLACEHOLDER",
min_values=0, max_values=1)
async def channel_menu(self, selection: discord.Interaction, selected: ChannelSelect):
await selection.response.defer()
setting = self.instances[0]
setting.value = selected.values[0] if selected.values else None
await setting.write()
async def refresh_channel_menu(self):
self.channel_menu.placeholder = self.bot.translator.t(_p(
'ui:timer_config|menu:channels|placeholder',
"Select Pomodoro Notification Channel"
))
# ----- UI Flow -----
async def make_message(self) -> MessageArgs:
t = self.bot.translator.t
title = t(_p(
'ui:timer_config|embed|title',
"Timer Configuration Panel"
))
embed = discord.Embed(
colour=discord.Colour.orange(),
title=title
)
for setting in self.instances:
embed.add_field(**setting.embed_field, inline=False)
args = MessageArgs(embed=embed)
return args
async def reload(self):
lguild = await self.bot.core.lions.fetch_guild(self.guildid)
self.instances = (
lguild.config.get(TimerSettings.PomodoroChannel.setting_id),
)
async def refresh_components(self):
await asyncio.gather(
self.refresh_channel_menu(),
self.edit_button_refresh(),
self.close_button_refresh(),
self.reset_button_refresh(),
)
self.set_layout(
(self.channel_menu,),
(self.edit_button, self.reset_button, self.close_button)
)
class TimerDashboard(DashboardSection):
section_name = _p(
'dash:pomodoro|title',
"Pomodoro Configuration"
)
configui = TimerConfigUI
setting_classes = TimerConfigUI.setting_classes

View File

@@ -0,0 +1,800 @@
from typing import Optional, TYPE_CHECKING
import math
from collections import namedtuple
import asyncio
from datetime import timedelta, datetime
import discord
from meta import LionBot
from meta.logger import log_wrap, log_context
from utils.lib import MessageArgs, utc_now, replace_multiple
from core.lion_guild import LionGuild
from core.data import CoreData
from babel.translator import ctx_locale
from . import babel, logger
from .data import TimerData
from .ui import TimerStatusUI
from .graphics import get_timer_card
from .lib import TimerRole, channel_name_keys, focus_alert_path, break_alert_path
from .options import TimerConfig, TimerOptions
if TYPE_CHECKING:
from babel.cog import LocaleSettings
_p, _np = babel._p, babel._np
Stage = namedtuple('Stage', ['focused', 'start', 'duration', 'end'])
class Timer:
__slots__ = (
'bot',
'data',
'lguild',
'locale',
'config',
'last_seen',
'status_view',
'last_status_message',
'_hook',
'_state',
'_lock',
'_last_voice_update',
'_voice_update_task',
'_voice_update_lock',
'_run_task',
'_loop_task',
)
break_name = _p('timer|stage:break|name', "BREAK")
focus_name = _p('timer|stage:focus|name', "FOCUS")
def __init__(self, bot: LionBot, data: TimerData.Timer, lguild: LionGuild):
self.bot = bot
self.data = data
self.lguild = lguild
self.locale: LocaleSettings.GuildLocale = lguild.config.get('guild_locale')
self.config = TimerConfig(data.channelid, data)
log_context.set(f"tid: {self.data.channelid}")
# State
self.last_seen: dict[int, int] = {} # memberid -> last seen timestamp
self.status_view: Optional[TimerStatusUI] = None # Current TimerStatusUI
self.last_status_message: Optional[discord.Message] = None # Last deliever notification message
self._hook: Optional[CoreData.LionHook] = None # Cached notification webhook
self._state: Optional[Stage] = None # The currently active Stage
self._lock = asyncio.Lock() # Stage change and CRUD lock
# Timestamp of the last voice update, used to compute the next update time
self._last_voice_update = None
# Wait task for the current pending channel name update
self._voice_update_task = None
# Lock to prevent channel name update race
self._voice_update_lock = asyncio.Lock()
# Wait task for the update loop. May be safely cancelled to pause updates.
self._run_task = None
# Main loop task. Should not be cancelled.
self._loop_task = None
def __repr__(self):
return (
"<Timer "
f"channelid={self.data.channelid} "
f"channel='{self.channel}' "
f"guildid={self.data.guildid} "
f"guild='{self.guild}' "
f"members={len(self.members)} "
f"pattern='{self.data.focus_length}/{self.data.break_length}' "
f"base_name={self.data.pretty_name!r} "
f"format_string={self.data.channel_name!r}"
">"
)
# Consider exposing configurable settings through a Settings interface, for ease of formatting.
@property
def auto_restart(self) -> bool:
"""
Whether to automatically restart a stopped timer when a user joins.
"""
return bool(self.data.auto_restart)
@property
def guild(self) -> Optional[discord.Guild]:
"""
The discord.Guild that this timer belongs to.
"""
return self.bot.get_guild(self.data.guildid)
@property
def channel(self) -> Optional[discord.VoiceChannel]:
"""
The discord VoiceChannel that this timer lives in.
"""
return self.bot.get_channel(self.data.channelid)
@property
def notification_channel(self) -> Optional[discord.abc.Messageable]:
"""
The Messageable channel to which to send timer notifications.
"""
if cid := self.data.notification_channelid:
channel = self.bot.get_channel(cid)
else:
channel = self.lguild.config.get('pomodoro_channel').value
if channel is None:
channel = self.channel
return channel
async def get_notification_webhook(self) -> Optional[discord.Webhook]:
channel = self.notification_channel
if channel:
cid = channel.id
if self._hook and self._hook.channelid == cid:
hook = self._hook
else:
hook = self._hook = await self.bot.core.data.LionHook.fetch(cid)
if not hook:
# Attempt to create and save webhook
try:
if channel.permissions_for(channel.guild.me).manage_webhooks:
avatar = self.bot.user.avatar
avatar_data = (await avatar.to_file()).fp.read() if avatar else None
webhook = await channel.create_webhook(
avatar=avatar_data,
name=f"{self.bot.user.name} Pomodoro",
reason="Pomodoro Notifications"
)
hook = await self.bot.core.data.LionHook.create(
channelid=channel.id,
token=webhook.token,
webhookid=webhook.id
)
elif channel.permissions_for(channel.guild.me).send_messages:
await channel.send(
"I require the `manage_webhooks` permission to send pomodoro notifications here!"
)
except discord.HTTPException:
logger.warning(
"Unexpected Exception caught while creating timer notification webhook "
f"for timer: {self!r}",
exc_info=True
)
if hook:
return hook.as_webhook(client=self.bot)
@property
def members(self) -> list[discord.Member]:
"""
The list of members of the current timer.
Uses voice channel member cache as source-of-truth.
"""
if (chan := self.channel):
members = [member for member in chan.members if not member.bot]
else:
members = []
return members
@property
def owned(self) -> bool:
"""
Whether this timer is "owned".
Owned timers have slightly different UI.
"""
return bool(self.data.ownerid)
@property
def running(self) -> bool:
"""
Whether this timer is currently running.
"""
return bool(self.data.last_started)
@property
def channel_name(self) -> str:
"""
The configured formatted name of the voice channel.
Usually does not match the actual voice channel name due to Discord ratelimits.
"""
channel_name_format = self.data.channel_name or "{name} - {stage}"
channel_name = replace_multiple(channel_name_format, self.channel_name_map())
# Truncate to maximum name length
return channel_name[:100]
@property
def base_name(self) -> str:
if not (name := self.data.pretty_name):
pattern = f"{int(self.data.focus_length // 60)}/{int(self.data.break_length // 60)}"
name = self.bot.translator.t(_p(
'timer|default_base_name',
"Timer {pattern}"
), locale=self.locale.value).format(pattern=pattern)
return name
@property
def pattern(self) -> str:
data = self.data
return f"{int(data.focus_length // 60)}/{int(data.break_length // 60)}"
def channel_name_map(self):
"""
Compute the replace map used to format the channel name.
"""
t = self.bot.translator.t
stage = self.current_stage
pattern = self.pattern
name = self.base_name
if stage is not None:
remaining = str(int(5 * math.ceil((stage.end - utc_now()).total_seconds() / 60)))
stagestr = t(self.focus_name if stage.focused else self.break_name, locale=self.locale.value)
else:
remaining = str(self.data.focus_length // 60)
stagestr = t(self.focus_name, locale=self.locale.value)
mapping = {
'{remaining}': remaining,
'{stage}': stagestr,
'{members}': str(len(self.members)),
'{name}': name,
'{pattern}': pattern
}
return mapping
@property
def voice_alerts(self) -> bool:
"""
Whether voice alerts are enabled for this timer.
Takes into account the default.
"""
if (alerts := self.data.voice_alerts) is None:
alerts = True
return alerts
@property
def current_stage(self) -> Optional[Stage]:
"""
Calculate the current stage.
Returns None if the timer is currently stopped.
"""
if self.running:
now = utc_now()
focusl = self.data.focus_length
breakl = self.data.break_length
interval = focusl + breakl
diff = (now - self.data.last_started).total_seconds()
diff %= interval
if diff > focusl:
stage_focus = False
stage_start = now - timedelta(seconds=(diff - focusl))
stage_duration = breakl
else:
stage_focus = True
stage_start = now - timedelta(seconds=diff)
stage_duration = focusl
stage_end = stage_start + timedelta(seconds=stage_duration)
stage = Stage(stage_focus, stage_start, stage_duration, stage_end)
else:
stage = None
return stage
@property
def inactivity_threshold(self):
if (threshold := self.data.inactivity_threshold) is None:
threshold = 3
return threshold
def get_member_role(self, member: discord.Member) -> TimerRole:
"""
Calculate the highest timer permission level of the given member.
"""
if member.guild_permissions.administrator:
role = TimerRole.ADMIN
elif member.id == self.data.ownerid:
role = TimerRole.OWNER
elif (roleid := self.data.manager_roleid) and roleid in (r.id for r in member.roles):
role = TimerRole.MANAGER
else:
role = TimerRole.OTHER
return role
@log_wrap(action="Start Timer")
async def start(self):
"""
Start a new or stopped timer.
May also be used to restart the timer.
"""
try:
async with self._lock:
now = utc_now()
self.last_seen = {
member.id: now for member in self.members
}
await self.data.update(last_started=now)
await self.send_status(with_warnings=False)
self.launch()
except Exception:
logger.exception(
f"Exception occurred while starting timer! Timer {self!r}"
)
else:
logger.info(
f"Starting timer {self!r}"
)
def warning_threshold(self, state: Stage) -> Optional[datetime]:
"""
Timestamp warning threshold for last_seen, from the given stage.
Members who have not been since this time are "at risk",
and will be kicked on the next stage change.
"""
if self.inactivity_threshold > 0:
diff = self.inactivity_threshold * (self.data.break_length + self.data.focus_length)
threshold = utc_now() - timedelta(seconds=diff)
else:
threshold = None
return threshold
@log_wrap(action='Timer Change Stage')
async def notify_change_stage(self, from_stage, to_stage, kick=True):
"""
Notify timer members that the stage has changed.
This includes deleting the last status message,
sending a new status message, pinging members, running the voice alert,
and kicking inactive members if `kick` is True.
"""
if not self.members:
t = self.bot.translator.t
await self.stop(auto_restart=True)
return
async with self._lock:
tasks = []
after_tasks = []
# Submit channel name update request
after_tasks.append(asyncio.create_task(self._update_channel_name()))
if kick and (threshold := self.warning_threshold(from_stage)):
now = utc_now()
# Kick people who need kicking
needs_kick = []
for member in self.members:
last_seen = self.last_seen.get(member.id, None)
if last_seen is None:
last_seen = self.last_seen[member.id] = now
elif last_seen < threshold:
needs_kick.append(member)
for member in needs_kick:
tasks.append(member.edit(voice_channel=None))
notify_hook = await self.get_notification_webhook()
if needs_kick:
t = self.bot.translator.t
kick_message = t(_np(
'timer|kicked_message',
"{mentions} was removed from {channel} because they were inactive! "
"Remember to press {tick} to register your presence every stage.",
"{mentions} were removed from {channel} because they were inactive! "
"Remember to press {tick} to register your presence every stage.",
len(needs_kick)
), locale=self.locale.value).format(
channel=self.channel.mention,
mentions=', '.join(member.mention for member in needs_kick),
tick=self.bot.config.emojis.tick
)
tasks.append(notify_hook.send(kick_message))
if self.voice_alerts:
after_tasks.append(asyncio.create_task(self._voice_alert(to_stage)))
if tasks:
try:
await asyncio.gather(*tasks)
except Exception:
logger.exception(f"Exception occurred during pre-tasks for change stage in timer {self!r}")
print("Sending Status")
await self.send_status()
print("Sent Status")
if after_tasks:
try:
await asyncio.gather(*after_tasks)
except Exception:
logger.exception(f"Exception occurred during post-tasks for change stage in timer {self!r}")
@log_wrap(action='Voice Alert')
async def _voice_alert(self, stage: Stage):
"""
Join the voice channel, play the associated alert, and leave the channel.
"""
if not stage:
return
if not self.channel.permissions_for(self.guild.me).speak:
return
async with self.lguild.voice_lock:
try:
if self.guild.voice_client:
print("Disconnecting")
await self.guild.voice_client.disconnect(force=True)
print("Disconnected")
alert_file = focus_alert_path if stage.focused else break_alert_path
try:
print("Connecting")
voice_client = await self.channel.connect(timeout=60, reconnect=False)
print("Connected")
except asyncio.TimeoutError:
logger.warning(f"Timed out while connecting to voice channel in timer {self!r}")
return
with open(alert_file, 'rb') as audio_stream:
finished = asyncio.Event()
def voice_callback(error):
if error:
try:
raise error
except Exception:
logger.exception(
f"Callback exception occured while playing voice alert for timer {self!r}"
)
finished.set()
voice_client.play(discord.PCMAudio(audio_stream), after=voice_callback)
# Quit when we finish playing or after 10 seconds, whichever comes first
sleep_task = asyncio.create_task(asyncio.sleep(10))
wait_task = asyncio.create_task(finished.wait())
_, pending = await asyncio.wait([sleep_task, wait_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
await self.guild.voice_client.disconnect(force=True)
except Exception:
logger.exception(
"Exception occurred while playing voice alert for timer {self!r}"
)
def stageline(self, stage: Stage):
t = self.bot.translator.t
ctx_locale.set(self.locale.value)
if stage.focused:
lazy_stageline = _p(
'timer|status|stage:focus|statusline',
"{channel} is now in **FOCUS**! Good luck, **BREAK** starts {timestamp}"
)
else:
lazy_stageline = _p(
'timer|status|stage:break|statusline',
"{channel} is now on **BREAK**! Take a rest, **FOCUS** starts {timestamp}"
)
stageline = t(lazy_stageline).format(
channel=self.channel.mention,
timestamp=f"<t:{int(stage.end.timestamp())}:R>"
)
return stageline
async def current_status(self, with_notify=True, with_warnings=True) -> MessageArgs:
"""
Message arguments for the current timer status message.
"""
t = self.bot.translator.t
now = utc_now()
ctx_locale.set(self.locale.value)
stage = self.current_stage
if self.running:
stageline = self.stageline(stage)
warningline = ""
needs_warning = []
if with_warnings and self.inactivity_threshold > 0:
threshold = self.warning_threshold(stage)
for member in self.members:
last_seen = self.last_seen.get(member.id, None)
if last_seen is None:
last_seen = self.last_seen[member.id] = now
elif last_seen < threshold:
needs_warning.append(member)
if needs_warning:
warningline = t(_p(
'timer|status|warningline',
"**Warning:** {mentions}, please press {tick} to avoid being removed on the next stage."
)).format(
mentions=' '.join(member.mention for member in needs_warning),
tick=self.bot.config.emojis.tick
)
if with_notify and self.members:
# TODO: Handle case with too many members
notifyline = ''.join(member.mention for member in self.members if member not in needs_warning)
notifyline = f"||{notifyline}||"
else:
notifyline = ""
content = "\n".join(string for string in (stageline, warningline, notifyline) if string)
elif self.auto_restart:
content = t(_p(
'timer|status|stopped:auto',
"Timer stopped! Join {channel} to start the timer."
)).format(channel=self.channel.mention)
else:
content = t(_p(
'timer|status|stopped:manual',
"Timer stopped! Press `Start` to restart the timer."
)).format(channel=self.channel.mention)
card = await get_timer_card(self.bot, self, stage)
await card.render()
if (ui := self.status_view) is None:
ui = self.status_view = TimerStatusUI(self.bot, self, self.channel)
await ui.refresh()
return MessageArgs(
content=content,
file=card.as_file(f"pomodoro_{self.data.channelid}.png"),
view=ui
)
@log_wrap(action='Send Timer Status')
async def send_status(self, delete_last=True, **kwargs):
"""
Send a new status card to the notification channel.
"""
notify_hook = await self.get_notification_webhook()
if not notify_hook:
return
# Delete last notification message if possible
last_message_id = self.data.last_messageid
if delete_last and last_message_id:
try:
if self.last_status_message:
await self.last_status_message.delete()
else:
await notify_hook.delete_message(last_message_id)
except discord.HTTPException:
logger.debug(
f"Timer {self!r} failed to delete last status message {last_message_id}"
)
last_message_id = None
self.last_status_message = None
# Send new notification message
args = await self.current_status(**kwargs)
logger.debug(
f"Timer {self!r} is sending a new status: {args.send_args}"
)
try:
message = await notify_hook.send(**args.send_args, wait=True)
last_message_id = message.id
self.last_status_message = message
except discord.NotFound:
if self._hook is not None:
await self._hook.delete()
self._hook = None
# To avoid killing the client on an infinite loop (which should be impossible)
await asyncio.sleep(1)
await self.send_status(delete_last, **kwargs)
except discord.HTTPException:
pass
# Save last message id
if last_message_id != self.data.last_messageid:
await self.data.update(last_messageid=last_message_id)
@log_wrap(action='Update Timer Status')
async def update_status_card(self, **kwargs):
"""
Update the last status card sent.
"""
async with self._lock:
args = await self.current_status(**kwargs)
logger.debug(
f"Timer {self!r} is updating last status with new status: {args.edit_args}"
)
last_message = self.last_status_message
if last_message is None and self.data.last_messageid is not None:
# Attempt to retrieve previous message
notify_hook = await self.get_notification_webhook()
try:
if notify_hook:
last_message = await notify_hook.fetch_message(self.data.last_messageid)
except discord.HTTPException:
last_message = None
self.last_status_message = None
except Exception:
logger.exception(
f"Unhandled exception while updating timer last status for timer {self!r}"
)
repost = last_message is None
if not repost:
try:
await last_message.edit(**args.edit_args)
self.last_status_message = last_message
except discord.NotFound:
repost = True
except discord.HTTPException:
# Unexpected issue with sending the status message
logger.exception(
f"Exception occurred updating status for Timer {self!r}"
)
if repost:
await self.send_status(delete_last=False, with_notify=False)
async def _update_channel_name(self):
"""
Submit a task to update the voice channel name.
Attempts to ensure that only one task is running at a time.
Attempts to wait until the next viable channel update slot (via ratelimit).
"""
if self._voice_update_task and not self._voice_update_task.done():
# Voice update request already submitted
return
async with self._voice_update_lock:
if self._last_voice_update:
to_wait = ((self._last_voice_update + timedelta(minutes=5)) - utc_now()).total_seconds()
if to_wait > 0:
self._voice_update_task = asyncio.create_task(asyncio.sleep(to_wait))
try:
await self._voice_update_task
except asyncio.CancelledError:
return
if not self.channel:
return
if not self.channel.permissions_for(self.guild.me).manage_channels:
return
new_name = self.channel_name
if new_name == self.channel.name:
return
self._last_voice_update = utc_now()
await self.channel.edit(name=self.channel_name)
@log_wrap(action="Stop Timer")
async def stop(self, auto_restart=False):
"""
Stop the timer.
Stops the run loop, and updates the last status message to a stopped message.
"""
try:
async with self._lock:
if self._run_task and not self._run_task.done():
self._run_task.cancel()
await self.data.update(last_started=None, auto_restart=auto_restart)
await self.update_status_card()
except Exception:
logger.exception(f"Exception while stopping Timer {self!r}!")
else:
logger.info(f"Timer {self!r} has stopped. Auto restart is {'on' if auto_restart else 'off'}")
@log_wrap(action="Destroy Timer")
async def destroy(self, reason: str = None):
"""
Deconstructs the timer, stopping all tasks.
"""
async with self._lock:
if self._run_task and not self._run_task.done():
self._run_task.cancel()
channelid = self.data.channelid
await self.data.delete()
if self.last_status_message:
try:
await self.last_status_message.delete()
except discord.HTTPException:
pass
logger.info(
f"Timer <tid: {channelid}> deleted. Reason given: {reason!r}"
)
@log_wrap(stack=['Timer Loop'])
async def _runloop(self):
"""
Main loop which controls the
regular stage changes and status updates.
"""
# Allow updating with 10 seconds of drift to the next stage change
drift = 10
if not self.running:
# Nothing to do
return
if not self.channel:
# Underlying Discord objects do not exist, destroy the timer.
await self.destroy(reason="Underlying channel no longer exists.")
return
background_tasks = set()
self._state = current = self.current_stage
while True:
to_next_stage = (current.end - utc_now()).total_seconds()
# TODO: Consider request rate and load
if to_next_stage > 1 * 60 - drift:
time_to_sleep = 1 * 60
else:
time_to_sleep = to_next_stage
self._run_task = asyncio.create_task(asyncio.sleep(time_to_sleep))
try:
await self._run_task
except asyncio.CancelledError:
break
if not self.running:
# We somehow stopped without cancelling the run task?
logger.warning(
f"Closing timer loop because we are no longer running. This should not happen! Timer {self!r}"
)
break
if not self.channel:
# Probably left the guild or the channel was deleted
await self.destroy(reason="Underlying channel no longer exists")
break
if current.end < utc_now():
self._state = self.current_stage
task = asyncio.create_task(self.notify_change_stage(current, self._state))
task.add_done_callback(background_tasks.discard)
current = self._state
elif self.members:
task = asyncio.create_task(self._update_channel_name())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
task = asyncio.create_task(self.update_status_card())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
if background_tasks:
await asyncio.gather(*background_tasks)
def launch(self):
"""
Launch the update loop, if the timer is running, otherwise do nothing.
"""
if self._loop_task and not self._loop_task.done():
self._loop_task.cancel()
if self.running:
self._loop_task = asyncio.create_task(self._runloop())
async def unload(self):
"""
Unload the timer without changing stored state.
Waits for all background tasks to complete.
"""
async with self._lock:
if self._loop_task and not self._loop_task.done():
if self._run_task and not self._run_task.done():
self._run_task.cancel()
await self._loop_task

View File

@@ -0,0 +1 @@
from .status import TimerStatusUI

View File

@@ -0,0 +1,315 @@
from typing import Optional, TYPE_CHECKING
import asyncio
import discord
from discord.ui.button import button, Button, ButtonStyle
from discord.ui.select import select, Select, RoleSelect, ChannelSelect
from meta import LionBot, conf
from utils.lib import utc_now, MessageArgs, error_embed
from utils.ui import MessageUI
from babel.translator import ctx_locale
from .. import babel
from ..lib import TimerRole
from ..options import TimerOptions
from .edit import TimerEditor
if TYPE_CHECKING:
from ..timer import Timer
_p = babel._p
class TimerOptionsUI(MessageUI):
"""
View options for and reconfigure a single timer.
"""
def __init__(self, bot: LionBot, timer: 'Timer', role: TimerRole, **kwargs):
self.locale = timer.locale.value
ctx_locale.set(self.locale)
super().__init__(**kwargs)
self.bot = bot
self.timer = timer
self.role = role
@button(label="EDIT_PLACEHOLDER", style=ButtonStyle.blurple)
async def edit_button(self, press: discord.Interaction, pressed: Button):
"""
Open the edit modal to modify focus/break/threshold/name/format_string
"""
modal = await TimerEditor.open_editor(self.bot, press, self.timer, press.user, callback=self.refresh)
await modal.wait()
async def refresh_edit_button(self):
self.edit_button.label = self.bot.translator.t(_p(
'ui:timer_options|button:edit|label',
"Edit"
))
@button(label="VOICE_ALERT_PLACEHOLDER", style=ButtonStyle.green)
async def voice_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
value = not self.timer.voice_alerts
setting = self.timer.config.get('voice_alerts')
setting.value = value
await setting.write()
await self.refresh(thinking=press)
async def refresh_voice_button(self):
button = self.voice_button
button.label = self.bot.translator.t(_p(
'ui:timer_options|button:voice_alerts|label',
"Voice Alerts"
))
if self.timer.voice_alerts:
button.style = ButtonStyle.green
else:
button.style = ButtonStyle.grey
@button(label="DELETE_PLACEHOLDER", style=ButtonStyle.red)
async def delete_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True)
channelid = self.timer.data.channelid
# Destroy through cog to maintain cache
cog = self.bot.get_cog('TimerCog')
await cog.destroy_timer(self.timer, reason="Manually destroyed through OptionUI")
await self.quit()
t = self.bot.translator.t
embed = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'ui:timer_options|button:delete|success|title',
"Timer Deleted"
)),
description=t(_p(
'ui:timer_options|button:delete|success|description',
"The timer in {channel} has been removed."
)).format(channel=f"<#{channelid}>")
)
await press.edit_original_response(embed=embed)
async def refresh_delete_button(self):
self.delete_button.label = self.bot.translator.t(_p(
'ui:timer_options|button:delete|label',
"Delete"
))
@button(emoji=conf.emojis.cancel, style=ButtonStyle.red)
async def close_button(self, press: discord.Interaction, pressed: Button):
print("HERE")
await press.response.defer()
await self.quit()
async def refresh_close_button(self):
pass
@select(cls=ChannelSelect, placeholder="VOICE_CHANNEL_PLACEHOLDER", channel_types=[discord.ChannelType.voice])
async def voice_menu(self, selection: discord.Interaction, selected):
...
async def refresh_voice_menu(self):
self.voice_menu.placeholder = self.bot.translator.t(_p(
'ui:timer_options|menu:voice_channel|placeholder',
"Set Voice Channel"
))
@select(cls=ChannelSelect,
placeholder="NOTIFICATION_PLACEHOLDER",
channel_types=[discord.ChannelType.text, discord.ChannelType.voice],
min_values=0, max_values=1)
async def notification_menu(self, selection: discord.Interaction, selected):
await selection.response.defer(thinking=True, ephemeral=True)
value = selected.values[0] if selected.values else None
setting = self.timer.config.get('notification_channel')
if issue := await setting._check_value(self.timer.data.channelid, value):
await selection.edit_original_response(embed=error_embed(issue))
else:
setting.value = value
await setting.write()
await self.timer.send_status()
await self.refresh(thinking=selection)
async def refresh_notification_menu(self):
self.notification_menu.placeholder = self.bot.translator.t(_p(
'ui:timer_options|menu:notification_channel|placeholder',
"Set Notification Channel"
))
@select(cls=RoleSelect, placeholder="ROLE_PLACEHOLDER", min_values=0, max_values=1)
async def manage_role_menu(self, selection: discord.Interaction, selected):
await selection.response.defer(thinking=True, ephemeral=True)
value = selected.values[0] if selected.values else None
setting = self.timer.config.get('manager_role')
setting.value = value
await setting.write()
await self.refresh(thinking=selection)
async def refresh_manage_role_menu(self):
self.manage_role_menu.placeholder = self.bot.translator.t(_p(
'ui:timer_options|menu:manager_role|placeholder',
"Set Manager Role"
))
# ----- UI FLow -----
async def make_message(self) -> MessageArgs:
t = self.bot.translator.t
title = t(_p(
'ui:timer_options|embed|title',
"Timer Control Panel for {channel}"
)).format(channel=f"<#{self.timer.data.channelid}>")
table = await TimerOptions().make_setting_table(self.timer.data.channelid, timer=self.timer)
footer = t(_p(
'ui:timer_options|embed|footer',
"Hover over the option names to view descriptions."
))
embed = discord.Embed(
colour=discord.Colour.orange(),
title=title,
description=table
)
embed.set_footer(text=footer)
# Add pattern field
embed.add_field(
name=t(_p('ui:timer_options|embed|field:pattern|name', "Pattern")),
value=t(_p(
'ui:timer_options|embed|field:pattern|value',
"**`{focus_len} minutes`** focus\n**`{break_len} minutes`** break"
)).format(
focus_len=self.timer.data.focus_length // 60,
break_len=self.timer.data.break_length // 60
)
)
# Add channel name field
embed.add_field(
name=t(_p(
'ui:timer_options|embed|field:channel_name|name',
"Channel Name Preview"
)),
value=t(_p(
'ui:timer_options|embed|field:channel_name|value',
"**`{name}`**\n(The actual channel name may not match due to ratelimits.)"
)).format(name=self.timer.channel_name)
)
# Add issue field (if there are any permission issues).
issues = await self._get_issues()
if issues:
embed.add_field(
name=t(_p(
'ui:timer_options|embed|field:issues|name',
"Issues"
)),
value='\n'.join(f"{conf.emojis.warning} {issue}" for issue in issues),
inline=False
)
return MessageArgs(embed=embed)
async def _get_issues(self):
"""
Report any issues with the timer setup, particularly with permissions.
"""
t = self.bot.translator.t
issues = []
if self.timer.channel is None:
issues.append(
t(_p(
'ui:timer_options|issue:no_voice_channel',
"The configured voice channel does not exist! Please update it below."
))
)
else:
channel = self.timer.channel
# Check join and speak permissions
my_voice_permissions = channel.permissions_for(channel.guild.me)
if self.timer.voice_alerts and not (my_voice_permissions.connect and my_voice_permissions.speak):
issues.append(
t(_p(
'ui:timer_options|issue:cannot_speak',
"Voice alerts are on, but I don't have speaking permissions in {channel}"
)).format(channel=channel.mention)
)
if not my_voice_permissions.manage_channels:
issues.append(
t(_p(
'ui:timer_options|issue:cannot_change_name',
"I cannot update the name of {channel}! (Needs `MANAGE_CHANNELS` permission)"
)).format(channel=channel.mention)
)
notif_channelid = self.timer.data.notification_channelid
if notif_channelid:
channel = self.bot.get_channel(notif_channelid)
if channel is None:
issues.append(
t(_p(
'ui:timer_options|issue:notif_channel_dne',
"Configured notification channel does not exist!"
))
)
else:
perms = channel.permissions_for(channel.guild.me)
if not (perms.embed_links and perms.attach_files):
issues.append(
t(_p(
'ui:timer_options|issue:notif_channel_write',
"I cannot attach files (`ATTACH_FILES`) or send embeds (`EMBED_LINKS`) in {channel}"
)).format(channel=channel.mention)
)
if not (perms.manage_webhooks):
issues.append(
t(_p(
'ui:timer_options|issues:cannot_make_webhooks',
"I cannot create the notification webhook (`MANAGE_WEBHOOKS`) in {channel}"
)).format(channel=channel.mention)
)
return issues
async def refresh_layout(self):
if self.timer.owned:
# Owned timers cannot change their voice channel, text channel, or manager role.
await asyncio.gather(
self.refresh_edit_button(),
self.refresh_close_button(),
self.refresh_voice_button(),
self.refresh_delete_button(),
)
self.set_layout(
(self.edit_button, self.voice_button, self.close_button)
)
else:
await asyncio.gather(
self.refresh_edit_button(),
self.refresh_close_button(),
self.refresh_voice_button(),
self.refresh_delete_button(),
# self.refresh_voice_menu(),
self.refresh_manage_role_menu(),
self.refresh_notification_menu()
)
self.set_layout(
# (self.voice_menu,),
(self.notification_menu,),
(self.manage_role_menu,),
(self.edit_button, self.voice_button, self.delete_button, self.close_button)
)
async def reload(self):
"""
Nothing to reload.
"""
pass

View File

@@ -0,0 +1,113 @@
from typing import Optional, TYPE_CHECKING
import discord
from discord.ui.text_input import TextInput, TextStyle
from meta import LionBot
from meta.errors import UserInputError
from utils.ui import FastModal, error_handler_for, ModalRetryUI
from utils.lib import parse_duration
from .. import babel, logger
from ..lib import TimerRole
from ..options import TimerOptions
if TYPE_CHECKING:
from ..timer import Timer
_p = babel._p
class TimerEditor(FastModal):
"""
Timer Option Editor
Appearence depends on caller role
(i.e. managers may edit focus/break times, owners may edit all fields.)
"""
@classmethod
async def open_editor(cls, bot: LionBot, interaction: discord.Interaction,
timer: 'Timer',
actor: discord.Member,
callback=None):
role = timer.get_member_role(actor)
if role >= TimerRole.OWNER:
settings = [
TimerOptions.FocusLength,
TimerOptions.BreakLength,
TimerOptions.InactivityThreshold,
TimerOptions.BaseName,
TimerOptions.ChannelFormat
]
elif role is TimerRole.MANAGER:
settings = [
TimerOptions.FocusLength,
TimerOptions.BreakLength,
]
else:
# This should be impossible
raise ValueError("Timer Editor Opened by Invalid Role")
instances = []
inputs = []
for setting in settings:
instance = timer.config.get(setting.setting_id)
input_field = instance.input_field
instances.append(instance)
inputs.append(input_field)
self = cls(
*inputs,
title=bot.translator.t(_p(
'modal:timer_editor|title',
"Timer Option Editor"
))
)
@self.submit_callback(timeout=10*60)
async def _edit_timer_callback(submit: discord.Interaction):
# Parse each field
# Parse errors will raise UserInputError and hence trigger ModalRetryUI
try:
data = timer.data
update_args = {}
modified = set()
for instance, field in zip(instances, inputs):
try:
parsed = await instance.from_string(data.channelid, field.value)
except UserInputError as e:
_msg = f"`{instance.display_name}:` {e._msg}"
raise UserInputError(_msg, info=e.info, details=e.details)
update_args[parsed._column] = parsed._data
if data.data[parsed._column] != parsed._data:
modified.add(instance.setting_id)
# Parsing successful, ack the submission
await submit.response.defer(thinking=False)
if modified:
await data.update(**update_args)
if ('focus_length' in modified or 'break_length' in modified) and timer.running:
# Regenerate timer
await timer.start()
else:
# Just update last status
# This will also update the warning list if the inactivity threshold is modified
await timer.update_status_card()
except UserInputError:
raise
except Exception:
logger.exception(
"Unhandled exception occurred during timer edit submission callback."
)
if callback is not None:
await callback(submit)
# Editor prepared, now send and return
await interaction.response.send_modal(self)
return self
@error_handler_for(UserInputError)
async def rerequest(self, interaction: discord.Interaction, error: UserInputError):
await ModalRetryUI(self, error.msg).respond_to(interaction)

View File

@@ -0,0 +1,162 @@
from typing import Optional, TYPE_CHECKING
import asyncio
import discord
from discord.ui.button import button, Button, ButtonStyle
from meta import LionBot, conf
from utils.lib import utc_now
from utils.ui import LeoUI
from babel.translator import ctx_locale
from .. import babel
from ..lib import TimerRole
from .config import TimerOptionsUI
from .edit import TimerEditor
if TYPE_CHECKING:
from ..timer import Timer
_p = babel._p
class TimerStatusUI(LeoUI):
"""
UI representing a single Timer Status message.
Not intended to persist across multiple stages/notifications.
Does support updates.
"""
def __init__(self, bot: LionBot, timer: 'Timer', channel: discord.abc.GuildChannel, show_present=True, **kwargs):
# Set the locale context before it is copied in LeoUI
# This is propagated via dispatch to component handlers
self.locale = timer.locale.value
ctx_locale.set(self.locale)
super().__init__(timeout=None, **kwargs)
self.bot = bot
self.timer = timer
self.channel = channel
self.show_present = show_present
@button(label="PRESENT_PLACEHOLDER", emoji=conf.emojis.tick, style=ButtonStyle.green)
async def present_button(self, press: discord.Interaction, pressed: Button):
"""
Pressed to indicate the user is present.
Does not send a visible response.
"""
await press.response.defer()
member: discord.Member = press.user
if member.voice and member.voice.channel and member.voice.channel.id == self.timer.data.channelid:
self.timer.last_seen[member.id] = utc_now()
async def refresh_present_button(self):
t = self.bot.translator.t
self.present_button.label = t(_p(
'ui:timer_status|button:present|label',
"Present"
), locale=self.locale)
@button(label="EDIT_PLACEHOLDER", style=ButtonStyle.blurple)
async def edit_button(self, press: discord.Interaction, pressed: Button):
"""
Pressed to edit the timer. Response depends on role-level of user.
"""
role = self.timer.get_member_role(press.user)
if role >= TimerRole.OWNER:
# Open ephemeral config UI
await press.response.defer(thinking=True, ephemeral=True)
ui = TimerOptionsUI(self.bot, self.timer, role, callerid=press.user.id)
await ui.run(press)
elif role is TimerRole.MANAGER:
# Open config modal for work/break times
modal = await TimerEditor.open_editor(self.bot, press, self.timer, press.user)
await modal.wait()
else:
# No permissions
t = self.bot.translator.t
error_msg = t(_p(
'ui:timer_status|button:edit|error:no_permissions',
"Configuring this timer requires guild admin permissions or the configured manager role!"
))
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=error_msg
)
await press.response.send_message(embed=embed, ephemeral=True)
async def refresh_edit_button(self):
t = self.bot.translator.t
self.edit_button.label = t(_p(
'ui:timer_status|button:edit|label',
"Options"
), locale=self.locale)
@button(label="START_PLACEHOLDER", style=ButtonStyle.green)
async def start_button(self, press: discord.Interaction, pressed: Button):
"""
Start a stopped timer.
"""
t = self.bot.translator.t
if self.timer.running:
# Timer is already running. Race condition? (Should be impossible)
# TODO: Log
await press.response.send_message(
embed=discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'ui:timer_status|button:start|error:already_running',
"Cannot start a timer that is already running!"
))
),
ephemeral=True
)
else:
# Start the timer
await press.response.defer()
await self.timer.start()
async def refresh_start_button(self):
t = self.bot.translator.t
self.start_button.label = t(_p(
'ui:timer_status|button:start|label',
"Start"
), locale=self.locale)
@button(label="STOP PLACEHOLDER", style=ButtonStyle.red)
async def stop_button(self, press: discord.Interaction, pressed: Button):
"""
Stop a running timer.
Note that unlike starting, stopping is allowed to be idempotent.
"""
await press.response.defer()
await self.timer.stop()
async def refresh_stop_button(self):
t = self.bot.translator.t
self.stop_button.label = t(_p(
'ui:timer_status|button:stop|label',
"Stop"
), locale=self.locale)
async def refresh(self):
"""
Refresh the internal UI components based on the current state of the Timer.
"""
await asyncio.gather(
self.refresh_present_button(),
self.refresh_edit_button(),
self.refresh_stop_button(),
self.refresh_start_button(),
)
if self.timer.running:
self.set_layout(
(self.present_button, self.edit_button, self.stop_button)
)
else:
self.set_layout(
(self.present_button, self.edit_button, self.start_button)
)

View File

@@ -12,8 +12,7 @@ from wards import sys_admin
from settings.groups import SettingGroup
class LeoSettings(LionCog, group_name='leo'):
__cog_is_app_commands_group__ = True
class LeoSettings(LionCog):
depends = {'CoreCog'}
def __init__(self, bot: LionBot):
@@ -21,7 +20,18 @@ class LeoSettings(LionCog, group_name='leo'):
self.bot_setting_groups: list[SettingGroup] = []
@cmds.hybrid_command(
@cmds.hybrid_group(
name="leo"
)
@cmds.check(sys_admin)
async def leo_group(self, ctx: LionContext):
"""
Base command group for global leo-only functions.
Only accessible by sysadmins.
"""
...
@leo_group.command(
name='dashboard',
description="Global setting dashboard"
)
@@ -36,7 +46,19 @@ class LeoSettings(LionCog, group_name='leo'):
description = group.description.format(ctx=ctx, bot=ctx.bot).strip()
embed.add_field(
name=group.title.format(ctx=ctx, bot=ctx.bot),
value=f"{description}\n{table}"
value=f"{description}\n{table}",
inline=False
)
await ctx.reply(embed=embed)
@leo_group.group(
name='configure',
description="Leo Configuration Group"
)
@cmds.check(sys_admin)
async def leo_configure_group(self, ctx: LionContext):
"""
Base command group for global configuration of Leo.
"""
...

View File

@@ -255,7 +255,6 @@ class VoiceTrackerCog(LionCog):
@LionCog.listener("on_voice_state_update")
@log_wrap(action='Voice Track')
@log_wrap(action='Voice Event')
async def session_voice_tracker(self, member, before, after):
"""
Spawns the correct tasks from members joining, leaving, and changing live state.
@@ -265,6 +264,9 @@ class VoiceTrackerCog(LionCog):
# Rely on initialisation to handle current state
return
if member.bot:
return
# Check user blacklist
blacklists = self.bot.get_cog('Blacklists')
if member.id in blacklists.user_blacklist:

View File

@@ -316,7 +316,7 @@ class MessageUI(LeoUI):
"""
raise NotImplementedError
async def draw(self, interaction, force_followup=False):
async def draw(self, interaction, force_followup=False, **kwargs):
"""
Send the UI as a response or followup to the given interaction.
@@ -332,10 +332,10 @@ class MessageUI(LeoUI):
as_followup = force_followup or interaction.response.is_done()
if as_followup:
self._message = await interaction.followup.send(**args.send_args, view=self)
self._message = await interaction.followup.send(**args.send_args, **kwargs, view=self)
else:
self._original = interaction
await interaction.response.send_message(**args.send_args, view=self)
await interaction.response.send_message(**args.send_args, **kwargs, view=self)
async def redraw(self, thinking: Optional[discord.Interaction] = None):
"""

View File

@@ -76,8 +76,8 @@ class FastModal(LeoModal):
return
try:
await coro(interaction, *pass_args, **pass_kwargs)
except Exception as error:
await self.on_error(interaction, error)
except Exception:
raise
finally:
if once:
self._waiters.remove(wrapped_callback)
@@ -100,12 +100,20 @@ class FastModal(LeoModal):
await super().on_error(interaction, error)
async def on_submit(self, interaction):
print("On submit")
old_result = self._result
self._result = asyncio.get_event_loop().create_future()
old_result.set_result(interaction)
tasks = []
for waiter in self._waiters:
asyncio.create_task(waiter(interaction), name=f"leo-ui-fastmodal-{self.id}-callback-{waiter.__name__}")
task = asyncio.create_task(
waiter(interaction),
name=f"leo-ui-fastmodal-{self.id}-callback-{waiter.__name__}"
)
tasks.append(task)
if tasks:
await asyncio.gather(*tasks)
async def input(