fix (schedule): Bugfixes.

Fix ScheduleUI issue where clear button was not enabling.
Fix ScheduleUI menus showing soon entries.
Fix ScheduleUI time format being T instead of f.
Fix ScheduleUI cancel menu out of order.
Add special case format for `_format_until` with 0 distance.

Add `TimeSlot` repr.
Add `Sesson` repr.
Fix issue where noshow could potentially deadlock sessions.
Fix issue where `create_booking` could add garbage to cache.
Add `bot.idlock` for snowflake locking.
Remove valid channel check from clock off.
Changed implementation of batchrun.
Add `cog.nowid` for easier introspection.
Add more logging.
This commit is contained in:
2023-07-08 08:57:00 +03:00
parent 78fb398e03
commit 043f358f57
7 changed files with 309 additions and 173 deletions

View File

@@ -1,6 +1,7 @@
from typing import List, Optional, TYPE_CHECKING
import logging
import asyncio
from weakref import WeakValueDictionary
import discord
from discord.utils import MISSING
@@ -44,6 +45,8 @@ class LionBot(Bot):
self.core: Optional['CoreCog'] = None
self.translator = translator
self._locks = WeakValueDictionary()
async def setup_hook(self) -> None:
log_context.set(f"APP: {self.application_id}")
await self.app_ipc.connect()
@@ -81,6 +84,12 @@ class LionBot(Bot):
with logging_context(action=f"Dispatch {event_name}"):
super().dispatch(event_name, *args, **kwargs)
def idlock(self, snowflakeid):
lock = self._locks.get(snowflakeid, None)
if lock is None:
lock = self._locks[snowflakeid] = asyncio.Lock()
return lock
async def on_ready(self):
logger.info(
f"Logged in as {self.application.name}\n"

View File

@@ -57,6 +57,11 @@ class ScheduleCog(LionCog):
self.session_channels = self.settings.SessionChannels._cache
@property
def nowid(self):
now = utc_now()
return time_to_slotid(now)
async def cog_load(self):
await self.data.init()
@@ -143,6 +148,7 @@ class ScheduleCog(LionCog):
Every hour, starting at start_at,
the spawn loop will use `_spawner` to ensure the next slotid has been launched.
"""
logger.info(f"Started scheduled session spawner at {start_at}")
next_spawn = start_at
while True:
try:
@@ -185,6 +191,7 @@ class ScheduleCog(LionCog):
lock = self._slot_locks.get(slotid, None)
if lock is None:
lock = self._slot_locks[slotid] = asyncio.Lock()
logger.debug(f"Getting slotlock <slotid: {slotid}> (locked: {lock.locked()})")
return lock
@log_wrap(action='Cancel Booking')
@@ -255,21 +262,12 @@ class ScheduleCog(LionCog):
nextsession = nextslot.sessions.get(guildid, None) if nextslot else None
nextmember = (userid in nextsession.members) if nextsession else None
unlock = None
if (nextmember is None) or not (nextsession.prepared):
async with self.bot.idlock(room.id):
try:
if nextmember:
unlock = nextsession.lock
await unlock.acquire()
update = (not nextsession.prepared)
else:
update = True
if update:
await room.set_permissions(member, overwrite=None)
except discord.HTTPException:
pass
finally:
if unlock is not None:
unlock.release()
elif slot is not None and member is None:
# Should not happen
logger.error(
@@ -305,6 +303,9 @@ class ScheduleCog(LionCog):
blacklists depending on guild settings,
and notifies the user.
"""
logger.debug(
"Handling TimeSlot noshow for members: {}".format(', '.join(map(str, memberids)))
)
now = utc_now()
nowid = time_to_slotid(now)
member_model = self.data.ScheduleSessionMember
@@ -358,6 +359,9 @@ class ScheduleCog(LionCog):
tasks.append(task)
# TODO: Logging and some error handling
await asyncio.gather(*tasks, return_exceptions=True)
logger.info(
f"Applied scheduled session blacklist to {len(to_blacklist)} missing members."
)
# Now cancel future sessions for members who were not blacklisted and are not currently clocked on
to_clear = []
@@ -380,7 +384,12 @@ class ScheduleCog(LionCog):
bookingids = [(b.slotid, b.guildid, b.userid) for b in bookings]
if bookingids:
await self.cancel_bookings(*bookingids, refund=False)
# TODO: Logging and error handling
logger.info(
f"Cancelled future sessions for {len(to_clear)} missing members."
)
logger.debug(
"Completed NoShow handling"
)
@log_wrap(action='Create Booking')
async def create_booking(self, guildid, userid, *slotids):
@@ -390,12 +399,14 @@ class ScheduleCog(LionCog):
Probably best refactored into an interactive method,
with some parts in slot and session.
"""
logger.debug(
f"Creating bookings for member <uid: {userid}> in <gid: {guildid}> "
f"for slotids: {', '.join(map(str, slotids))}"
)
t = self.bot.translator.t
locks = [self.slotlock(slotid) for slotid in slotids]
await asyncio.gather(*(lock.acquire() for lock in locks))
try:
conn = await self.bot.db.get_connection()
async with conn.transaction():
# Validate bookings
guild_data = await self.data.ScheduleGuild.fetch_or_create(guildid)
config = ScheduleConfig(guildid, guild_data)
@@ -452,15 +463,16 @@ class ScheduleCog(LionCog):
"One or more requested timeslots are already booked!"
))
raise UserInputError(error)
conn = await self.bot.db.get_connection()
# Booking request is now validated. Perform bookings.
# Fetch or create session data
await self.data.ScheduleSlot.fetch_multiple(*slotids)
session_data = await self.data.ScheduleSession.fetch_multiple(
*((guildid, slotid) for slotid in slotids)
)
async with conn.transaction():
# Create transactions
economy = self.bot.get_cog('Economy')
trans_data = (
@@ -513,11 +525,25 @@ class ScheduleCog(LionCog):
mem, connect=True, view_channel=True
)
except discord.HTTPException:
pass
logger.info(
f"Could not set room permissions for newly booked session "
f"<uid: {userid}> in {session!r}",
exc_info=True
)
logger.info(
f"Member <uid: {userid}> in <gid: {guildid}> booked scheduled sessions: " +
', '.join(map(str, slotids))
)
except UserInputError:
raise
except Exception:
logger.exception(
"Unexpected exception occurred while booking scheduled sessions."
)
raise
finally:
for lock in locks:
lock.release()
# TODO: Logging and error handling
return booking_data
# Event listeners
@@ -592,7 +618,7 @@ class ScheduleCog(LionCog):
member.clock_on(session_data.start_time)
session.update_status_soon()
logger.debug(
f"Clocked on member {member.data!r} with session {session_data!r}"
f"Clocked on member {member.data!r} in session {session!r}"
)
except Exception:
logger.exception(
@@ -616,11 +642,11 @@ class ScheduleCog(LionCog):
member = session.members.get(session_data.userid, None) if session else None
if member is not None:
async with session.lock:
if session.listening and session.validate_channel(session_data.channelid):
if session.listening and member.clock_start is not None:
member.clock_off(ended_at)
session.update_status_soon()
logger.debug(
f"Clocked off member {member.data!r} from session {session_data!r}"
f"Clocked off member {member.data!r} from session {session!r}"
)
except Exception:
logger.exception(

View File

@@ -80,6 +80,26 @@ class ScheduledSession:
self._updater = None
self._status_task = None
def __repr__(self):
return ' '.join(
"<ScheduledSession"
f"slotid={self.slotid}",
f"guildid={self.guildid}",
f"lobbyid={ch.id if (ch := self.lobby_channel) else None}",
f"roomid={ch.id if (ch := self.room_channel) else None}",
f"members={len(self.members)}",
f"listening={self.listening}",
f"prepared={self.prepared}",
f"opened={self.opened}",
f"cancelled={self.cancelled}",
f"locked={self.lock.locked()}",
f"status_message={msg.id if (msg := self.status_message) else None}",
f"lobby_hook={hook.webhookid if (hook := self._hook) else None}",
f"last_update={self._last_update}",
f"updater_running={True if (self._updater and not self._updater.done()) else False}",
">"
)
# Setting shortcuts
@property
def room_channel(self) -> Optional[discord.VoiceChannel]:
@@ -185,7 +205,7 @@ class ScheduledSession:
self._hook = None
except discord.HTTPException:
logger.warning(
f"Exception occurred sending to webhooks for scheduled session {self.data!r}",
f"Exception occurred sending to webhooks for scheduled session {self!r}",
exc_info=True
)
@@ -221,14 +241,12 @@ class ScheduledSession:
await room.edit(overwrites=overwrites)
except discord.HTTPException:
logger.warning(
f"Unexpected discord exception received while preparing schedule session room <cid: {room.id}> "
f"in guild <gid: {self.guildid}> for timeslot <sid: {self.slotid}>.",
f"Unexpected discord exception received while preparing schedule session room {self!r}",
exc_info=True
)
else:
logger.debug(
f"Prepared schedule session room <cid: {room.id}> "
f"in guild <gid: {self.guildid}> for timeslot <sid: {self.slotid}>.",
f"Prepared schedule session room for session {self!r}"
)
else:
t = self.bot.translator.t
@@ -266,13 +284,11 @@ class ScheduledSession:
await room.edit(overwrites=overwrites)
except discord.HTTPException:
logger.exception(
f"Unhandled discord exception received while opening schedule session room <cid: {room.id}> "
f"in guild <gid: {self.guildid}> for timeslot <sid: {self.slotid}>."
f"Unhandled discord exception received while opening schedule session room {self!r}"
)
else:
logger.debug(
f"Opened schedule session room <cid: {room.id}> "
f"in guild <gid: {self.guildid}> for timeslot <sid: {self.slotid}>.",
f"Opened schedule session room for session {self!r}"
)
else:
t = self.bot.translator.t
@@ -485,7 +501,7 @@ class ScheduledSession:
except discord.HTTPException:
# Unexpected issue updating the message
logger.exception(
f"Exception occurred updating status for scheduled session {self.data!r}"
f"Exception occurred updating status for scheduled session {self!r}"
)
if repost and resend and self.members:
@@ -531,12 +547,11 @@ class ScheduledSession:
await self.update_status()
except asyncio.CancelledError:
logger.debug(
f"Cancelled scheduled session update loop <slotid: {self.slotid}> ,gid: {self.guildid}>"
f"Cancelled scheduled session update loop for session {self!r}"
)
except Exception:
logger.exception(
"Unknown exception encountered during session "
f"update loop <slotid: {self.slotid}> ,gid: {self.guildid}>"
"Unknown exception encountered during session update loop for session {self!r} "
)
def start_updating(self):

View File

@@ -13,12 +13,13 @@ from core.lion_member import LionMember
from core.lion_guild import LionGuild
from tracking.voice.session import SessionState
from utils.data import as_duration, MEMBERS, TemporaryTable
from utils.ratelimits import Bucket
from modules.economy.cog import Economy
from modules.economy.data import EconomyData, TransactionType
from .. import babel, logger
from ..data import ScheduleData as Data
from ..lib import slotid_to_utc, batchrun_per_second
from ..lib import slotid_to_utc, batchrun_per_second, limit_concurrency
from ..settings import ScheduleSettings
from .session import ScheduledSession
@@ -63,6 +64,41 @@ class TimeSlot:
self.run_task = None
self.loaded = False
def __repr__(self):
if self.closing.is_set():
state = 'closing'
elif self.opened.is_set():
state = 'opened'
elif self.opening.is_set():
state = 'opening'
elif self.preparing.is_set():
state = 'preparing'
elif self.loaded:
state = 'loaded'
else:
state = 'unloaded'
if self.run_task:
if self.run_task.cancelled():
running = 'Cancelled'
elif self.run_task.done():
running = 'Done'
else:
running = 'Running'
else:
running = 'None'
return (
"<TimeSlot "
f"slotid={self.slotid} "
f"state='{state}' "
f"sessions={len(self.sessions)} "
f"members={sum(len(s.members) for s in self.sessions.values())} "
f"loaded={self.loaded} "
f"run_task='{running}'"
">"
)
@log_wrap(action="Fetch sessions")
async def fetch(self):
"""
@@ -81,7 +117,7 @@ class TimeSlot:
self.sessions.update(sessions)
self.loaded = True
logger.info(
f"Timeslot <slotid: {self.slotid}> finished preloading {len(self.sessions)} guilds. Ready to open."
f"Timeslot {self!r}> finished preloading {len(self.sessions)} guilds. Ready to open."
)
@log_wrap(action="Load sessions")
@@ -129,7 +165,7 @@ class TimeSlot:
sessions[row.guildid] = session
logger.debug(
f"Timeslot <slotid: {self.slotid}> "
f"Timeslot {self!r} "
f"loaded guild data for {len(sessions)} guilds: {', '.join(map(str, guildids))}"
)
return sessions
@@ -204,10 +240,12 @@ class TimeSlot:
This does not take the session lock for setting perms, because this is race-safe
(aside from potentially leaving extra permissions, which will be overwritten by `open`).
"""
logger.debug(f"Running prepare for time slot <slotid: {self.slotid}> with {len(sessions)} sessions.")
logger.debug(f"Running prepare for time slot: {self!r}")
try:
coros = [session.prepare(save=False) for session in sessions if session.can_run]
await batchrun_per_second(coros, 5)
bucket = Bucket(5, 1)
coros = [bucket.wrapped(session.prepare(save=False)) for session in sessions if session.can_run]
async for task in limit_concurrency(coros, 5):
await task
# Save messageids
tmptable = TemporaryTable(
@@ -227,11 +265,11 @@ class TimeSlot:
).from_expr(tmptable)
except Exception:
logger.exception(
f"Unhandled exception while preparing timeslot <slotid: {self.slotid}>."
f"Unhandled exception while preparing timeslot: {self!r}"
)
else:
logger.info(
f"Prepared {len(sessions)} for scheduled session timeslot <slotid: {self.slotid}>"
f"Prepared {len(sessions)} for scheduled session timeslot: {self!r}"
)
@log_wrap(action="Open Sessions")
@@ -269,12 +307,14 @@ class TimeSlot:
session.start_updating()
# Bulk run guild open to open session rooms
bucket = Bucket(5, 1)
voice_coros = [
session.open_room()
bucket.wrapped(session.open_room())
for session in fresh
if session.room_channel is not None and session.data.opened_at is None
]
await batchrun_per_second(voice_coros, 5)
async for task in limit_concurrency(voice_coros, 5):
await task
await asyncio.gather(*message_tasks)
await asyncio.gather(*notify_tasks)
@@ -297,11 +337,11 @@ class TimeSlot:
).from_expr(tmptable)
except Exception:
logger.exception(
f"Unhandled exception while opening sessions for timeslot <slotid: {self.slotid}>."
f"Unhandled exception while opening sessions for timeslot: {self!r}"
)
else:
logger.info(
f"Opened {len(sessions)} sessions for scheduled session timeslot <slotid: {self.slotid}>"
f"Opened {len(sessions)} sessions for scheduled session timeslot: {self!r}"
)
@log_wrap(action="Close Sessions")
@@ -394,11 +434,11 @@ class TimeSlot:
await self.cog.handle_noshow(*did_not_show)
except Exception:
logger.exception(
f"Unhandled exception while closing sessions for timeslot <slotid: {self.slotid}>."
f"Unhandled exception while closing sessions for timeslot: {self!r}"
)
else:
logger.info(
f"Closed {len(sessions)} for scheduled session timeslot <slotid: {self.slotid}>"
f"Closed {len(sessions)} for scheduled session timeslot: {self!r}"
)
def launch(self) -> asyncio.Task:
@@ -420,32 +460,30 @@ class TimeSlot:
if now < self.start_at:
await discord.utils.sleep_until(self.prep_at)
self.preparing.set()
logger.info(f"Active timeslot preparing. {self!r}")
await self.prepare(list(self.sessions.values()))
else:
logger.info(f"Active timeslot prepared. {self!r}")
await discord.utils.sleep_until(self.start_at)
else:
self.preparing.set()
self.opening.set()
logger.info(f"Active timeslot opening. {self!r}")
await self.open(list(self.sessions.values()))
logger.info(f"Active timeslot opened. {self!r}")
self.opened.set()
await discord.utils.sleep_until(self.end_at)
self.closing.set()
logger.info(f"Active timeslot closing. {self!r}")
await self.close(list(self.sessions.values()), consequences=True)
logger.info(f"Active timeslot closed. {self!r}")
except asyncio.CancelledError:
if self.closing.is_set():
state = 'closing'
elif self.opened.is_set():
state = 'opened'
elif self.opening.is_set():
state = 'opening'
elif self.preparing.is_set():
state = 'preparing'
logger.info(
f"Deactivating active time slot <slotid: {self.slotid}> "
f"with state '{state}'."
f"Deactivating active time slot: {self!r}"
)
except Exception:
logger.exception(
f"Unexpected exception occurred while running active time slot <slotid: {self.slotid}>."
f"Unexpected exception occurred while running active time slot: {self!r}."
)
@log_wrap(action="Slot Cleanup")

View File

@@ -2,6 +2,7 @@ import asyncio
import itertools
import datetime as dt
from . import logger
from utils.ratelimits import Bucket
@@ -39,3 +40,34 @@ async def batchrun_per_second(awaitables, batchsize):
task = asyncio.create_task(awaitable)
task.add_done_callback(lambda fut: sem.release())
return await asyncio.gather(*tasks, return_exceptions=True)
async def limit_concurrency(aws, limit):
"""
Run provided awaitables concurrently,
ensuring that no more than `limit` are running at once.
"""
aws = iter(aws)
aws_ended = False
pending = set()
count = 0
logger.debug("Starting limited concurrency executor")
while pending or not aws_ended:
while len(pending) < limit and not aws_ended:
aw = next(aws, None)
if aw is None:
aws_ended = True
else:
pending.add(asyncio.create_task(aw))
count += 1
if not pending:
break
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
while done:
yield done.pop()
logger.debug(f"Completed {count} tasks")

View File

@@ -141,8 +141,7 @@ class ScheduleUI(MessageUI):
'ui:schedule|button:clear|label',
"Clear Schedule"
))
if not self.schedule:
self.clear_button.disabled = True
self.clear_button.disabled = (not self.schedule)
@button(label='ABOUT_PLACEHOLDER', emoji=conf.emojis.question, style=ButtonStyle.grey)
async def about_button(self, press: discord.Interaction, pressed: Button):
@@ -220,7 +219,7 @@ class ScheduleUI(MessageUI):
try:
await self.cog.create_booking(self.guildid, self.userid, *slotids)
timestrings = [
discord.utils.format_dt(slotid_to_utc(slotid), style='T')
discord.utils.format_dt(slotid_to_utc(slotid), style='f')
for slotid in slotids
]
ack = t(_np(
@@ -264,6 +263,9 @@ class ScheduleUI(MessageUI):
# Populate with choices
nowid = self.nowid
if ((slotid_to_utc(nowid + 3600) - utc_now()).total_seconds() < 60):
# Start from next session instead
nowid += 3600
upcoming = [nowid + 3600 * i for i in range(1, 25)]
upcoming = [slotid for slotid in upcoming if slotid not in self.schedule]
options = self._format_slot_options(*upcoming)
@@ -298,7 +300,7 @@ class ScheduleUI(MessageUI):
slot_format = t(_p(
'ui:schedule|menu:slots|option|format',
"{day} {time} (in {until})"
"{day} {time} ({until})"
))
today_name = t(_p(
'ui:schedule|menu:slots|option|day:today',
@@ -325,12 +327,18 @@ class ScheduleUI(MessageUI):
def _format_until(self, distance):
t = self.bot.translator.t
if distance:
return t(_np(
'ui:schedule|format_until',
"<1 hour",
"{number} hours",
'ui:schedule|format_until|positive',
"in <1 hour",
"in {number} hours",
distance
)).format(number=distance)
else:
return t(_p(
'ui:schedule|format_until|now',
"right now!"
))
@select(cls=Select, placeholder='CANCEL_MENU_PLACEHOLDER')
async def cancel_menu(self, selection: discord.Interaction, selected):
@@ -379,7 +387,7 @@ class ScheduleUI(MessageUI):
embed = error_embed(error)
else:
timestrings = [
discord.utils.format_dt(slotid_to_utc(record['slotid']), style='T')
discord.utils.format_dt(slotid_to_utc(record['slotid']), style='f')
for record in booking_records
]
ack = t(_np(
@@ -407,8 +415,11 @@ class ScheduleUI(MessageUI):
'ui:schedule|menu:cancel|placeholder',
"Cancel booked sessions"
))
can_cancel = set(self.schedule.keys())
can_cancel.discard(self.nowid)
minid = self.nowid
if ((slotid_to_utc(self.nowid + 3600) - utc_now()).total_seconds() < 60):
minid = self.nowid + 3600
can_cancel = list(slotid for slotid in self.schedule.keys() if slotid > minid)
menu.options = self._format_slot_options(*can_cancel)
menu.max_values = len(menu.options)
@@ -520,11 +531,11 @@ class ScheduleUI(MessageUI):
t = self.bot.translator.t
short_format = t(_p(
'ui:schedule|booking_format:short',
"`in {until}` | {start} - {end}"
"`{until}` | {start} - {end}"
))
long_format = t(_p(
'ui:schedule|booking_format:long',
"> `in {until}` | {start} - {end}"
"> `{until}` | {start} - {end}"
))
items = []
format = long_format if show_guild else short_format

View File

@@ -92,6 +92,11 @@ class Bucket:
while self.full:
await asyncio.sleep(self.delay)
async def wrapped(self, coro):
await self.wait()
self.request()
await coro
class RateLimit:
def __init__(self, max_level, empty_time, error=None, cache=TTLCache(1000, 60 * 60)):