Adds auto-reaction to emotes. Adds 'voicestate' command for offline (un)muting. Adds 'topvoice' command for current voice channel challenge.
130 lines
4.5 KiB
Python
130 lines
4.5 KiB
Python
from typing import Literal
|
|
from collections import defaultdict
|
|
import datetime as dt
|
|
from datetime import datetime, timedelta, UTC
|
|
|
|
from data.queries import ORDER
|
|
import discord
|
|
from discord.ext import commands as cmds
|
|
from discord import app_commands as appcmds
|
|
|
|
from meta import LionBot, LionCog, LionContext
|
|
from meta.logger import log_wrap
|
|
from utils.lib import strfdur, utc_now, strfdur, paginate_list, pager
|
|
|
|
from modules.voicelog.plugin.data import VoiceLogSession
|
|
|
|
|
|
class YarnCog(LionCog):
|
|
"""
|
|
Assorted toys for Lilac
|
|
"""
|
|
|
|
def __init__(self, bot: LionBot):
|
|
self.bot = bot
|
|
self.desired_voice = defaultdict(dict)
|
|
|
|
@LionCog.listener("on_voice_state_update")
|
|
async def voicestate_muter(self, member, before, after):
|
|
if not after.channel:
|
|
return
|
|
target_state = self.desired_voice[member.guild.id].pop(member.id, None)
|
|
if target_state is None:
|
|
return
|
|
await member.edit(mute=target_state)
|
|
# TODO: Log using voicelog webhook
|
|
|
|
@LionCog.listener("on_reaction_add")
|
|
async def lilac_confirms(self, reaction: discord.Reaction, user: discord.User):
|
|
if not reaction.me:
|
|
await reaction.message.add_reaction(reaction.emoji)
|
|
|
|
@LionCog.listener("on_reaction_remove")
|
|
async def lilac_unconfirms(self, reaction: discord.Reaction, user: discord.User):
|
|
if reaction.me and reaction.count == 1:
|
|
await reaction.remove(self.bot.user)
|
|
|
|
@cmds.hybrid_command(name="voicestate")
|
|
@cmds.has_guild_permissions(mute_members=True)
|
|
async def voicestate_cmd(
|
|
self, ctx, user: discord.Member, state: Literal["muted", "unmuted", "clear"]
|
|
):
|
|
self.desired_voice[ctx.guild.id].pop(user.id, None)
|
|
|
|
if state == "clear":
|
|
# We've already removed the saved state, don't do anything else.
|
|
ack = f"{user.mention} target voice state cleared!"
|
|
elif user.voice:
|
|
# If user is currently in channel, apply the state
|
|
if state == "muted":
|
|
await user.edit(mute=True)
|
|
ack = f"{user.mention} muted!"
|
|
elif state == "unmuted":
|
|
await user.edit(mute=False)
|
|
ack = f"{user.mention} unmuted!"
|
|
else:
|
|
# If user is not currently in channel, save the state
|
|
if state == "muted":
|
|
self.desired_voice[ctx.guild.id][user.id] = True
|
|
ack = f"{user.mention} will be muted!"
|
|
elif state == "unmuted":
|
|
self.desired_voice[ctx.guild.id][user.id] = False
|
|
ack = f"{user.mention} will be unmuted!"
|
|
await ctx.reply(
|
|
embed=discord.Embed(colour=discord.Colour.brand_green(), description=ack)
|
|
)
|
|
|
|
@cmds.hybrid_command(name="topvoice")
|
|
async def topvoice_cmd(self, ctx):
|
|
"""
|
|
Show top voice members by total time.
|
|
"""
|
|
target_channelid = 1383707078740279366
|
|
since_stamp = 1769832959
|
|
|
|
voicelogger = ctx.bot.get_cog("VoiceLogCog")
|
|
session_data = voicelogger.data.voicelog_sessions
|
|
|
|
query = (
|
|
session_data.select_where(
|
|
VoiceLogSession.joined_at
|
|
>= datetime.fromtimestamp(since_stamp, tz=UTC),
|
|
guildid=ctx.guild.id,
|
|
channelid=target_channelid,
|
|
)
|
|
.select(
|
|
userid="userid",
|
|
total_time="SUM(COALESCE(duration, EXTRACT(EPOCH FROM (NOW() - joined_at))))",
|
|
)
|
|
.order_by("total_time", ORDER.DESC)
|
|
.with_no_adapter()
|
|
)
|
|
leaderboard = [(row["userid"], row["total_time"]) for row in await query]
|
|
|
|
# Format for display and pager
|
|
# First collect names
|
|
names = {}
|
|
for uid, _ in leaderboard:
|
|
user = ctx.guild.get_member(uid)
|
|
if user is None:
|
|
try:
|
|
user = await ctx.bot.fetch_member(uid)
|
|
except discord.NotFound:
|
|
user = None
|
|
names[uid] = user.display_name if user else str(uid)
|
|
|
|
lb_strings = []
|
|
max_name_len = min((30, max(len(name) for name in names.values())))
|
|
for i, (uid, total) in enumerate(leaderboard):
|
|
lb_strings.append(
|
|
"{:<{}}\t{:<9}".format(
|
|
names[uid], max_name_len, strfdur(total, short=False)
|
|
)
|
|
)
|
|
|
|
page_len = 20
|
|
title = "Voice Leaderboard"
|
|
pages = paginate_list(lb_strings, block_length=page_len, title=title)
|
|
|
|
await ctx.pager(pages)
|