feat (voice): Implement now command.

This commit is contained in:
2023-08-27 15:53:12 +03:00
parent 622d8b150d
commit 4773de53a2
4 changed files with 250 additions and 22 deletions

View File

@@ -23,14 +23,13 @@ async def get_timer_card(bot: LionBot, timer: 'Timer', stage: 'Stage'):
card_users = []
guildid = timer.data.guildid
for member in timer.members:
session_data = None
if voicecog is not None:
session = voicecog.get_session(guildid, member.id)
session_data = session.data
if session_data:
session_duration = (utc_now() - session_data.start_time).total_seconds()
tag = session_data.tag
tag = session.tag
if session.start_time:
session_duration = (utc_now() - session.start_time).total_seconds()
else:
session_duration = 0
else:
session_duration = 0
tag = None

View File

@@ -14,13 +14,13 @@ from meta.sharding import THIS_SHARD
from utils.lib import utc_now, error_embed
from core.lion_guild import VoiceMode
from wards import low_management_ward
from wards import low_management_ward, moderator_ctxward
from . import babel, logger
from .data import VoiceTrackerData
from .settings import VoiceTrackerSettings, VoiceTrackerConfigUI
from .session import VoiceSession, TrackedVoiceState
from .session import VoiceSession, TrackedVoiceState, SessionState
_p = babel._p
@@ -71,13 +71,13 @@ class VoiceTrackerCog(LionCog):
# Simultaneously!
...
def get_session(self, guildid, userid) -> VoiceSession:
def get_session(self, guildid, userid, **kwargs) -> VoiceSession:
"""
Get the VoiceSession for the given member.
Creates it if it does not exist.
"""
return VoiceSession.get(self.bot, guildid, userid)
return VoiceSession.get(self.bot, guildid, userid, **kwargs)
@LionCog.listener('on_ready')
@log_wrap(action='Init Voice Sessions')
@@ -635,6 +635,197 @@ class VoiceTrackerCog(LionCog):
f"Closed {len(to_close)} voice sessions after leaving guild '{guild.name}' <gid:{guild.id}>"
)
# ----- Commands -----
@cmds.hybrid_command(
name=_p('cmd:now', "now"),
description=_p(
'cmd:now|desc',
"Describe what you are working on, or see what your friends are working on!"
)
)
@appcmds.rename(
tag=_p('cmd:now|param:tag', "tag"),
user=_p('cmd:now|param:user', "user"),
clear=_p('cmd:now|param:clear', "clear"),
)
@appcmds.describe(
tag=_p(
'cmd:now|param:tag|desc',
"Describe what you are working on in 10 characters or less!"
),
user=_p(
'cmd:now|param:user|desc',
"Check what a friend is working on."
),
clear=_p(
'cmd:now|param:clear|desc',
"Unset your activity tag (or the target user's tag, for moderators)."
)
)
@appcmds.guild_only
async def now_cmd(self, ctx: LionContext,
tag: Optional[appcmds.Range[str, 0, 10]] = None,
user: Optional[discord.Member] = None,
clear: Optional[bool] = None
):
if not ctx.guild:
return
if not ctx.interaction:
return
t = self.bot.translator.t
await ctx.interaction.response.defer(thinking=True, ephemeral=True)
is_moderator = await moderator_ctxward(ctx)
target = user if user is not None else ctx.author
session = self.get_session(ctx.guild.id, target.id, create=False)
# Handle case where target is not active
if (session is None) or session.activity is SessionState.INACTIVE:
if target == ctx.author:
error = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:now|target:self|error:target_inactive',
"You have no running session! "
"Join a tracked voice channel to start a session."
)).format(mention=target.mention)
)
else:
error = discord.Embed(
colour=discord.Colour.brand_red(),
description=t(_p(
'cmd:now|target:other|error:target_inactive',
"{mention} has no running session!"
)).format(mention=target.mention)
)
await ctx.interaction.edit_original_response(embed=error)
return
if clear:
# Clear activity tag mode
if target == ctx.author:
# Clear the author's tag
await session.set_tag(None)
ack = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:now|target:self|mode:clear|success|title',
"Session Tag Cleared"
)),
description=t(_p(
'cmd:now|target:self|mode:clear|success|desc',
"Successfully unset your session tag."
))
)
elif not is_moderator:
# Trying to clear someone else's tag without being a moderator
ack = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'cmd:now|target:other|mode:clear|error:perms|title',
"You can't do that!"
)),
description=t(_p(
'cmd:now|target:other|mode:clear|error:perms|desc',
"You need to be a moderator to set or clear someone else's session tag."
))
)
else:
# Clearing someone else's tag as a moderator
await session.set_tag(None)
ack = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:now|target:other|mode:clear|success|title',
"Session Tag Cleared!"
)),
description=t(_p(
'cmd:now|target:other|mode:clear|success|desc',
"Cleared {target}'s session tag."
)).format(target=target.mention)
)
elif tag:
# Tag setting mode
if target == ctx.author:
# Set the author's tag
await session.set_tag(tag)
ack = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:now|target:self|mode:set|success|title',
"Session Tag Set!"
)),
description=t(_p(
'cmd:now|target:self|mode:set|success|desc',
"You are now working on `{new_tag}`. Good luck!"
)).format(new_tag=tag)
)
elif not is_moderator:
# Trying the set someone else's tag without being a moderator
ack = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'cmd:now|target:other|mode:set|error:perms|title',
"You can't do that!"
)),
description=t(_p(
'cmd:now|target:other|mode:set|error:perms|desc',
"You need to be a moderator to set or clear someone else's session tag!"
))
)
else:
# Setting someone else's tag as a moderator
await session.set_tag(tag)
ack = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:now|target:other|mode:set|success|title',
"Session Tag Set!"
)),
description=t(_p(
'cmd:now|target:other|mode:set|success|desc',
"Set {target}'s session tag to `{new_tag}`."
)).format(target=target.mention, new_tag=tag)
)
else:
# Display tag and voice time
if target == ctx.author:
if session.tag:
desc = t(_p(
'cmd:now|target:self|mode:show_with_tag|desc',
"You have been working on **`{tag}`** in {channel} since {time}!"
))
else:
desc = t(_p(
'cmd:now|target:self|mode:show_without_tag|desc',
"You have been working in {channel} since {time}!\n\n"
"Use `/now <tag>` to set what you are working on."
))
else:
if session.tag:
desc = t(_p(
'cmd:now|target:other|mode:show_with_tag|desc',
"{target} is current working in {channel}!\n"
"They have been working on **{tag}** since {time}."
))
else:
desc = t(_p(
'cmd:now|target:other|mode:show_without_tag|desc',
"{target} has been working in {channel} since {time}!"
))
desc = desc.format(
tag=session.tag,
channel=f"<#{session.state.channelid}>",
time=discord.utils.format_dt(session.start_time, 't'),
target=target.mention,
)
ack = discord.Embed(
colour=discord.Colour.orange(),
description=desc,
timestamp=utc_now()
)
await ctx.interaction.edit_original_response(embed=ack)
# ----- Configuration Commands -----
@LionCog.placeholder_group
@cmds.hybrid_group('configure', with_app_command=False)

View File

@@ -72,6 +72,7 @@ class VoiceSession:
'registry',
'start_task', 'expiry_task',
'data', 'state', 'hourly_rate',
'_tag', '_start_time',
'__weakref__'
)
@@ -92,6 +93,24 @@ class VoiceSession:
# Must match data when session in ongoing
self.state: Optional[TrackedVoiceState] = None
self.hourly_rate: Optional[float] = None
self._tag = None
self._start_time = None
@property
def tag(self) -> Optional[str]:
if self.data:
tag = self.data.tag
else:
tag = self._tag
return tag
@property
def start_time(self):
if self.data:
start_time = self.data.start_time
else:
start_time = self._start_time
return start_time
@property
def activity(self):
@@ -103,13 +122,13 @@ class VoiceSession:
return SessionState.INACTIVE
@classmethod
def get(cls, bot: LionBot, guildid: int, userid: int) -> 'VoiceSession':
def get(cls, bot: LionBot, guildid: int, userid: int, create=True) -> Optional['VoiceSession']:
"""
Fetch the VoiceSession for the given member. Respects cache.
Creates the session if it doesn't already exist.
"""
session = cls._sessions_[guildid].get(userid, None)
if session is None:
if session is None and create:
session = cls(bot, guildid, userid)
cls._sessions_[guildid][userid] = session
return session
@@ -129,6 +148,13 @@ class VoiceSession:
self._active_sessions_[self.guildid][self.userid] = self
return self
async def set_tag(self, new_tag):
if self.activity is SessionState.INACTIVE:
raise ValueError("Cannot set tag on an inactive voice session.")
self._tag = new_tag
if self.data is not None:
await self.data.update(tag=new_tag)
async def schedule_start(self, delay, start_time, expire_time, state, hourly_rate):
"""
Schedule the voice session to start at the given target time,
@@ -136,6 +162,8 @@ class VoiceSession:
"""
self.state = state
self.hourly_rate = hourly_rate
self._start_time = start_time
self._tag = None
self.start_task = asyncio.create_task(self._start_after(delay, start_time))
self.schedule_expiry(expire_time)
@@ -171,7 +199,8 @@ class VoiceSession:
last_update=start_time,
live_stream=state.stream,
live_video=state.video,
hourly_coins=self.hourly_rate
hourly_coins=self.hourly_rate,
tag=self._tag
)
self.bot.dispatch('voice_session_start', self.data)
self.start_task = None

View File

@@ -51,6 +51,20 @@ async def low_management_iward(interaction: discord.Interaction) -> bool:
return await low_management(interaction.client, interaction.user)
# High level ctx wards
async def moderator_ctxward(ctx: LionContext) -> bool:
if not ctx.guild:
return False
passed = await low_management(ctx.bot, ctx.author)
if passed:
return True
modrole = ctx.lguild.data.mod_role
roleids = [role.id for role in ctx.author.roles]
if not (modrole and modrole in roleids):
return False
return True
# Command Wards, raise CheckFailure with localised error message
@cmds.check
@@ -101,14 +115,8 @@ async def low_management_ward(ctx: LionContext) -> bool:
@cmds.check
async def moderator_ward(ctx: LionContext) -> bool:
if not ctx.guild:
return False
passed = await low_management(ctx.bot, ctx.author)
if passed:
return True
modrole = ctx.lguild.data.mod_role
roleids = [role.id for role in ctx.author.roles]
if not (modrole and modrole in roleids):
passed = await moderator_ctxward(ctx)
if not passed:
raise CheckFailure(
ctx.bot.translator.t(_p(
'ward:moderator|failed',
@@ -116,6 +124,7 @@ async def moderator_ward(ctx: LionContext) -> bool:
"or `MANAGE_GUILD` permissions to do this."
))
)
else:
return True
# ---- Assorted manual wards and checks ----