fix(timers): Improve timer loading.
This commit is contained in:
@@ -12,6 +12,7 @@ from meta.logger import log_wrap
|
||||
from meta.sharding import THIS_SHARD
|
||||
from meta.monitor import ComponentMonitor, ComponentStatus, StatusLevel
|
||||
from utils.lib import utc_now
|
||||
from utils.ratelimits import limit_concurrency
|
||||
|
||||
from wards import low_management_ward
|
||||
|
||||
@@ -48,16 +49,38 @@ class TimerCog(LionCog):
|
||||
self.timer_options = TimerOptions()
|
||||
|
||||
self.ready = False
|
||||
self.timers = defaultdict(dict)
|
||||
self.timers: dict[int, dict[int, Timer]] = defaultdict(dict)
|
||||
|
||||
async def _monitor(self):
|
||||
timers = [timer for tguild in self.timers.values() for timer in tguild.values()]
|
||||
state = (
|
||||
"<TimerState"
|
||||
" loaded={loaded}"
|
||||
" guilds={guilds}"
|
||||
" members={members}"
|
||||
" running={running}"
|
||||
" launched={launched}"
|
||||
" looping={looping}"
|
||||
" locked={locked}"
|
||||
" voice_locked={voice_locked}"
|
||||
">"
|
||||
)
|
||||
data = dict(
|
||||
loaded=len(timers),
|
||||
guilds=len(set(timer.data.guildid for timer in timers)),
|
||||
members=sum(len(timer.members) for timer in timers),
|
||||
running=sum(1 for timer in timers if timer.running),
|
||||
launched=sum(1 for timer in timers if timer._run_task and not timer._run_task.done()),
|
||||
looping=sum(1 for timer in timers if timer._loop_task and not timer._loop_task.done()),
|
||||
locked=sum(1 for timer in timers if timer._lock.locked()),
|
||||
voice_locked=sum(1 for timer in timers if timer._voice_update_lock.locked()),
|
||||
)
|
||||
if not self.ready:
|
||||
level = StatusLevel.STARTING
|
||||
info = "(STARTING) Not ready. {timers} timers loaded."
|
||||
info = f"(STARTING) Not ready. {state}"
|
||||
else:
|
||||
level = StatusLevel.OKAY
|
||||
info = "(OK) {timers} timers loaded."
|
||||
data = dict(timers=len(self.timers))
|
||||
info = f"(OK) Ready. {state}"
|
||||
return ComponentStatus(level, info, info, data)
|
||||
|
||||
async def cog_load(self):
|
||||
@@ -79,15 +102,12 @@ class TimerCog(LionCog):
|
||||
Clears caches and stops run-tasks for each active timer.
|
||||
Does not exist until all timers have completed background tasks.
|
||||
"""
|
||||
timers = (timer for tguild in self.timers.values() for timer in tguild.values())
|
||||
try:
|
||||
await asyncio.gather(*(timer.unload() for timer in timers))
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception encountered while unloading `TimerCog`"
|
||||
)
|
||||
timers = [timer for tguild in self.timers.values() for timer in tguild.values()]
|
||||
self.timers.clear()
|
||||
|
||||
if timers:
|
||||
await self._unload_timers(timers)
|
||||
|
||||
async def cog_check(self, ctx: LionContext):
|
||||
if not self.ready:
|
||||
raise CheckFailure(
|
||||
@@ -101,6 +121,20 @@ class TimerCog(LionCog):
|
||||
else:
|
||||
return True
|
||||
|
||||
@log_wrap(action='Unload Timers')
|
||||
async def _unload_timers(self, timers: list[Timer]):
|
||||
"""
|
||||
Unload all active timers.
|
||||
"""
|
||||
tasks = [asyncio.create_task(timer.unload()) for timer in timers]
|
||||
for timer, task in zip(timers, tasks):
|
||||
try:
|
||||
await task
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Unexpected exception while unloading timer {timer!r}"
|
||||
)
|
||||
|
||||
async def _load_timers(self, timer_data: list[TimerData.Timer]):
|
||||
"""
|
||||
Factored method to load a list of timers from data rows.
|
||||
@@ -108,6 +142,7 @@ class TimerCog(LionCog):
|
||||
guildids = set()
|
||||
to_delete = []
|
||||
to_create = []
|
||||
to_unload = []
|
||||
for row in timer_data:
|
||||
channel = self.bot.get_channel(row.channelid)
|
||||
if not channel:
|
||||
@@ -115,6 +150,12 @@ class TimerCog(LionCog):
|
||||
else:
|
||||
guildids.add(row.guildid)
|
||||
to_create.append(row)
|
||||
if row.guildid in self.timers:
|
||||
if row.channelid in self.timers[row.guildid]:
|
||||
to_unload.append(self.timers[row.guildid].pop(row.channelid))
|
||||
|
||||
if to_unload:
|
||||
await self._unload_timers(to_unload)
|
||||
|
||||
if guildids:
|
||||
lguilds = await self.bot.core.lions.fetch_guilds(*guildids)
|
||||
@@ -145,37 +186,57 @@ class TimerCog(LionCog):
|
||||
# Re-launch and update running timers
|
||||
for timer in to_launch:
|
||||
timer.launch()
|
||||
tasks = [
|
||||
asyncio.create_task(timer.update_status_card()) for timer in to_launch
|
||||
]
|
||||
if tasks:
|
||||
|
||||
coros = [timer.update_status_card() for timer in to_launch]
|
||||
if coros:
|
||||
i = 0
|
||||
async for task in limit_concurrency(coros, 10):
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception occurred updating timer status for running timers."
|
||||
await task
|
||||
except discord.HTTPException:
|
||||
timer = to_launch[i]
|
||||
logger.warning(
|
||||
f"Unhandled discord exception while updating timer status for {timer!r}",
|
||||
exc_info=True
|
||||
)
|
||||
except Exception:
|
||||
timer = to_launch[i]
|
||||
logger.exception(
|
||||
f"Unexpected exception while updating timer status for {timer!r}",
|
||||
exc_info=True
|
||||
)
|
||||
i += 1
|
||||
logger.info(
|
||||
f"Updated and launched {len(to_launch)} running timers."
|
||||
)
|
||||
|
||||
# Update stopped timers
|
||||
tasks = [
|
||||
asyncio.create_task(timer.update_status_card()) for timer in to_update
|
||||
]
|
||||
if tasks:
|
||||
coros = [timer.update_status_card(render=False) for timer in to_update]
|
||||
if coros:
|
||||
i = 0
|
||||
async for task in limit_concurrency(coros, 10):
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception occurred updating timer status for stopped timers."
|
||||
await task
|
||||
except discord.HTTPException:
|
||||
timer = to_update[i]
|
||||
logger.warning(
|
||||
f"Unhandled discord exception while updating timer status for {timer!r}",
|
||||
exc_info=True
|
||||
)
|
||||
except Exception:
|
||||
timer = to_update[i]
|
||||
logger.exception(
|
||||
f"Unexpected exception while updating timer status for {timer!r}",
|
||||
exc_info=True
|
||||
)
|
||||
i += 1
|
||||
logger.info(
|
||||
f"Updated {len(to_update)} stopped timers."
|
||||
)
|
||||
|
||||
# Update timer registry
|
||||
self.timers.update(timer_reg)
|
||||
for gid, gtimers in timer_reg.items():
|
||||
self.timers[gid].update(gtimers)
|
||||
|
||||
@LionCog.listener('on_ready')
|
||||
@log_wrap(action='Init Timers')
|
||||
@@ -185,10 +246,14 @@ class TimerCog(LionCog):
|
||||
"""
|
||||
self.ready = False
|
||||
self.timers = defaultdict(dict)
|
||||
if self.timers:
|
||||
timers = [timer for tguild in self.timers.values() for timer in tguild.values()]
|
||||
await self._unload_timers(timers)
|
||||
self.timers.clear()
|
||||
|
||||
# Fetch timers in guilds on this shard
|
||||
# TODO: Join with guilds and filter by guilds we are still in
|
||||
timer_data = await self.data.Timer.fetch_where(THIS_SHARD)
|
||||
guildids = [guild.id for guild in self.bot.guilds]
|
||||
timer_data = await self.data.Timer.fetch_where(guildid=guildids)
|
||||
await self._load_timers(timer_data)
|
||||
|
||||
# Ready to handle events
|
||||
|
||||
@@ -7,7 +7,7 @@ from datetime import timedelta, datetime
|
||||
import discord
|
||||
|
||||
from meta import LionBot
|
||||
from meta.logger import log_wrap, log_context
|
||||
from meta.logger import log_wrap, log_context, set_logging_context
|
||||
from utils.lib import MessageArgs, utc_now, replace_multiple
|
||||
from core.lion_guild import LionGuild
|
||||
from core.data import CoreData
|
||||
@@ -61,7 +61,7 @@ class Timer:
|
||||
log_context.set(f"tid: {self.data.channelid}")
|
||||
|
||||
# State
|
||||
self.last_seen: dict[int, int] = {} # memberid -> last seen timestamp
|
||||
self.last_seen: dict[int, datetime] = {} # memberid -> last seen timestamp
|
||||
self.status_view: Optional[TimerStatusUI] = None # Current TimerStatusUI
|
||||
self.last_status_message: Optional[discord.Message] = None # Last deliever notification message
|
||||
self._hook: Optional[CoreData.LionHook] = None # Cached notification webhook
|
||||
@@ -384,7 +384,7 @@ class Timer:
|
||||
tasks = []
|
||||
after_tasks = []
|
||||
# Submit channel name update request
|
||||
after_tasks.append(asyncio.create_task(self._update_channel_name()))
|
||||
after_tasks.append(asyncio.create_task(self._update_channel_name(), name='Update-name'))
|
||||
|
||||
if kick and (threshold := self.warning_threshold(from_stage)):
|
||||
now = utc_now()
|
||||
@@ -397,12 +397,25 @@ class Timer:
|
||||
elif last_seen < threshold:
|
||||
needs_kick.append(member)
|
||||
|
||||
t = self.bot.translator.t
|
||||
if self.channel and self.channel.permissions_for(self.channel.guild.me).move_members:
|
||||
for member in needs_kick:
|
||||
tasks.append(member.edit(voice_channel=None))
|
||||
tasks.append(
|
||||
asyncio.create_task(
|
||||
member.edit(
|
||||
voice_channel=None,
|
||||
reason=t(_p(
|
||||
'timer|disconnect|audit_reason',
|
||||
"Disconnecting inactive member from timer."
|
||||
), locale=self.locale.value)
|
||||
),
|
||||
name="Disconnect-timer-member"
|
||||
)
|
||||
)
|
||||
|
||||
notify_hook = await self.get_notification_webhook()
|
||||
if needs_kick and notify_hook:
|
||||
t = self.bot.translator.t
|
||||
if needs_kick and notify_hook and self.channel:
|
||||
if self.channel.permissions_for(self.channel.guild.me).move_members:
|
||||
kick_message = t(_np(
|
||||
'timer|kicked_message',
|
||||
"{mentions} was removed from {channel} because they were inactive! "
|
||||
@@ -415,20 +428,34 @@ class Timer:
|
||||
mentions=', '.join(member.mention for member in needs_kick),
|
||||
tick=self.bot.config.emojis.tick
|
||||
)
|
||||
tasks.append(notify_hook.send(kick_message))
|
||||
else:
|
||||
kick_message = t(_p(
|
||||
'timer|kick_failed',
|
||||
"**Warning!** Timer {channel} is configured to disconnect on inactivity, "
|
||||
"but I lack the 'Move Members' permission to do this!"
|
||||
), locale=self.locale.value).format(
|
||||
channel=self.channel.mention
|
||||
)
|
||||
tasks.append(asyncio.create_task(notify_hook.send(kick_message), name='kick-message'))
|
||||
|
||||
if self.voice_alerts:
|
||||
after_tasks.append(asyncio.create_task(self._voice_alert(to_stage)))
|
||||
after_tasks.append(asyncio.create_task(self._voice_alert(to_stage), name='voice-alert'))
|
||||
|
||||
if tasks:
|
||||
for task in tasks:
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
await task
|
||||
except discord.Forbidden:
|
||||
logger.warning(
|
||||
f"Unexpected forbidden during pre-task {task!r} for change stage in timer {self!r}"
|
||||
)
|
||||
except discord.HTTPException:
|
||||
logger.warning(
|
||||
f"Unexpected API error during pre-task {task!r} for change stage in timer {self!r}"
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(f"Exception occurred during pre-tasks for change stage in timer {self!r}")
|
||||
logger.exception(f"Exception occurred during pre-task {task!r} for change stage in timer {self!r}")
|
||||
|
||||
print("Sending Status")
|
||||
await self.send_status()
|
||||
print("Sent Status")
|
||||
|
||||
if after_tasks:
|
||||
try:
|
||||
@@ -444,7 +471,7 @@ class Timer:
|
||||
if not stage:
|
||||
return
|
||||
|
||||
if not self.channel or not self.channel.permissions_for(self.guild.me).speak:
|
||||
if not self.guild or not self.channel or not self.channel.permissions_for(self.guild.me).speak:
|
||||
return
|
||||
|
||||
async with self.lguild.voice_lock:
|
||||
@@ -480,15 +507,16 @@ class Timer:
|
||||
|
||||
# Quit when we finish playing or after 10 seconds, whichever comes first
|
||||
sleep_task = asyncio.create_task(asyncio.sleep(10))
|
||||
wait_task = asyncio.create_task(finished.wait())
|
||||
wait_task = asyncio.create_task(finished.wait(), name='timer-voice-waiting')
|
||||
_, pending = await asyncio.wait([sleep_task, wait_task], return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
if self.guild and self.guild.voice_client:
|
||||
await self.guild.voice_client.disconnect(force=True)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception occurred while playing voice alert for timer {self!r}"
|
||||
f"Exception occurred while playing voice alert for timer {self!r}"
|
||||
)
|
||||
|
||||
def stageline(self, stage: Stage):
|
||||
@@ -511,7 +539,7 @@ class Timer:
|
||||
)
|
||||
return stageline
|
||||
|
||||
async def current_status(self, with_notify=True, with_warnings=True) -> MessageArgs:
|
||||
async def current_status(self, with_notify=True, with_warnings=True, render=True) -> MessageArgs:
|
||||
"""
|
||||
Message arguments for the current timer status message.
|
||||
"""
|
||||
@@ -520,7 +548,7 @@ class Timer:
|
||||
ctx_locale.set(self.locale.value)
|
||||
stage = self.current_stage
|
||||
|
||||
if self.running:
|
||||
if self.running and stage is not None:
|
||||
stageline = self.stageline(stage)
|
||||
warningline = ""
|
||||
needs_warning = []
|
||||
@@ -530,7 +558,7 @@ class Timer:
|
||||
last_seen = self.last_seen.get(member.id, None)
|
||||
if last_seen is None:
|
||||
last_seen = self.last_seen[member.id] = now
|
||||
elif last_seen < threshold:
|
||||
elif threshold and last_seen < threshold:
|
||||
needs_warning.append(member)
|
||||
if needs_warning:
|
||||
warningline = t(_p(
|
||||
@@ -567,13 +595,16 @@ class Timer:
|
||||
|
||||
await ui.refresh()
|
||||
|
||||
card = await get_timer_card(self.bot, self, stage)
|
||||
rawargs = dict(content=content, view=ui)
|
||||
|
||||
if render:
|
||||
try:
|
||||
card = await get_timer_card(self.bot, self, stage)
|
||||
await card.render()
|
||||
file = card.as_file(f"pomodoro_{self.data.channelid}.png")
|
||||
args = MessageArgs(content=content, file=file, view=ui)
|
||||
rawargs['file'] = card.as_file(f"pomodoro_{self.data.channelid}.png")
|
||||
except RenderingException:
|
||||
args = MessageArgs(content=content, view=ui)
|
||||
pass
|
||||
args = MessageArgs(**rawargs)
|
||||
|
||||
return args
|
||||
|
||||
@@ -764,12 +795,16 @@ class Timer:
|
||||
f"Timer <tid: {channelid}> deleted. Reason given: {reason!r}"
|
||||
)
|
||||
|
||||
@log_wrap(action='Timer Loop')
|
||||
@log_wrap(isolate=True, stack=())
|
||||
async def _runloop(self):
|
||||
"""
|
||||
Main loop which controls the
|
||||
regular stage changes and status updates.
|
||||
"""
|
||||
set_logging_context(
|
||||
action=f"TimerLoop {self.data.channelid}",
|
||||
context=f"tid: {self.data.channelid}",
|
||||
)
|
||||
# Allow updating with 10 seconds of drift to the next stage change
|
||||
drift = 10
|
||||
|
||||
@@ -785,6 +820,11 @@ class Timer:
|
||||
|
||||
self._state = current = self.current_stage
|
||||
while True:
|
||||
if current is None:
|
||||
logger.exception(
|
||||
f"Closing timer loop because current state is None. Timer {self!r}"
|
||||
)
|
||||
break
|
||||
to_next_stage = (current.end - utc_now()).total_seconds()
|
||||
|
||||
# TODO: Consider request rate and load
|
||||
@@ -812,12 +852,18 @@ class Timer:
|
||||
|
||||
if current.end < utc_now():
|
||||
self._state = self.current_stage
|
||||
task = asyncio.create_task(self.notify_change_stage(current, self._state))
|
||||
task = asyncio.create_task(
|
||||
self.notify_change_stage(current, self._state),
|
||||
name='notify-change-stage'
|
||||
)
|
||||
background_tasks.add(task)
|
||||
task.add_done_callback(background_tasks.discard)
|
||||
current = self._state
|
||||
elif self.members:
|
||||
task = asyncio.create_task(self._update_channel_name())
|
||||
task = asyncio.create_task(
|
||||
self._update_channel_name(),
|
||||
name='regular-channel-update'
|
||||
)
|
||||
background_tasks.add(task)
|
||||
task.add_done_callback(background_tasks.discard)
|
||||
task = asyncio.create_task(self.update_status_card())
|
||||
@@ -825,7 +871,13 @@ class Timer:
|
||||
task.add_done_callback(background_tasks.discard)
|
||||
|
||||
if background_tasks:
|
||||
try:
|
||||
await asyncio.gather(*background_tasks)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"Unexpected error while finishing background tasks for timer {self!r}",
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
def launch(self):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user