135 lines
4.7 KiB
Python
135 lines
4.7 KiB
Python
from typing import Literal
|
|
from collections import defaultdict
|
|
import datetime as dt
|
|
from datetime import datetime, timedelta, UTC
|
|
|
|
from data.queries import ORDER
|
|
import discord
|
|
from discord.ext import commands as cmds
|
|
from discord import app_commands as appcmds
|
|
|
|
from meta import LionBot, LionCog, LionContext
|
|
from meta.logger import log_wrap
|
|
from utils.lib import strfdur, utc_now, strfdur, paginate_list, pager
|
|
|
|
from modules.voicelog.plugin.data import VoiceLogSession
|
|
|
|
|
|
class YarnCog(LionCog):
|
|
"""
|
|
Assorted toys for Lilac
|
|
"""
|
|
|
|
def __init__(self, bot: LionBot):
|
|
self.bot = bot
|
|
self.desired_voice = defaultdict(dict)
|
|
|
|
@LionCog.listener("on_voice_state_update")
|
|
async def voicestate_muter(self, member, before, after):
|
|
if not after.channel:
|
|
return
|
|
target_state = self.desired_voice[member.guild.id].pop(member.id, None)
|
|
if target_state is None:
|
|
return
|
|
await member.edit(mute=target_state)
|
|
# TODO: Log using voicelog webhook
|
|
|
|
@LionCog.listener("on_reaction_add")
|
|
async def lilac_confirms(self, reaction: discord.Reaction, user: discord.User):
|
|
if not reaction.me:
|
|
await reaction.message.add_reaction(reaction.emoji)
|
|
|
|
@LionCog.listener("on_reaction_remove")
|
|
async def lilac_unconfirms(self, reaction: discord.Reaction, user: discord.User):
|
|
if (
|
|
reaction.me
|
|
and reaction.count == 1
|
|
and str(reaction.emoji) not in ("⬅️", "➡️")
|
|
):
|
|
await reaction.remove(self.bot.user)
|
|
|
|
@cmds.hybrid_command(name="voicestate")
|
|
@cmds.has_guild_permissions(mute_members=True)
|
|
async def voicestate_cmd(
|
|
self, ctx, user: discord.Member, state: Literal["muted", "unmuted", "clear"]
|
|
):
|
|
self.desired_voice[ctx.guild.id].pop(user.id, None)
|
|
|
|
if state == "clear":
|
|
# We've already removed the saved state, don't do anything else.
|
|
ack = f"{user.mention} target voice state cleared!"
|
|
elif user.voice:
|
|
# If user is currently in channel, apply the state
|
|
if state == "muted":
|
|
await user.edit(mute=True)
|
|
ack = f"{user.mention} muted!"
|
|
elif state == "unmuted":
|
|
await user.edit(mute=False)
|
|
ack = f"{user.mention} unmuted!"
|
|
else:
|
|
# If user is not currently in channel, save the state
|
|
if state == "muted":
|
|
self.desired_voice[ctx.guild.id][user.id] = True
|
|
ack = f"{user.mention} will be muted!"
|
|
elif state == "unmuted":
|
|
self.desired_voice[ctx.guild.id][user.id] = False
|
|
ack = f"{user.mention} will be unmuted!"
|
|
await ctx.reply(
|
|
embed=discord.Embed(colour=discord.Colour.brand_green(), description=ack)
|
|
)
|
|
|
|
@cmds.hybrid_command(name="topvoice")
|
|
async def topvoice_cmd(self, ctx):
|
|
"""
|
|
Show top voice members by total time.
|
|
"""
|
|
target_channelid = 1383707078740279366
|
|
since_stamp = 1769832959
|
|
|
|
voicelogger = ctx.bot.get_cog("VoiceLogCog")
|
|
session_data = voicelogger.data.voicelog_sessions
|
|
|
|
query = (
|
|
session_data.select_where(
|
|
VoiceLogSession.joined_at
|
|
>= datetime.fromtimestamp(since_stamp, tz=UTC),
|
|
guildid=ctx.guild.id,
|
|
channelid=target_channelid,
|
|
)
|
|
.select(
|
|
userid="userid",
|
|
total_time="SUM(COALESCE(duration, EXTRACT(EPOCH FROM (NOW() - joined_at))))",
|
|
)
|
|
.order_by("total_time", ORDER.DESC)
|
|
.group_by("userid")
|
|
.with_no_adapter()
|
|
)
|
|
leaderboard = [(row["userid"], int(row["total_time"])) for row in await query]
|
|
|
|
# Format for display and pager
|
|
# First collect names
|
|
names = {}
|
|
for uid, _ in leaderboard:
|
|
user = ctx.guild.get_member(uid)
|
|
if user is None:
|
|
try:
|
|
user = await ctx.guild.fetch_member(uid)
|
|
except discord.NotFound:
|
|
user = None
|
|
names[uid] = user.display_name if user else str(uid)
|
|
|
|
lb_strings = []
|
|
max_name_len = min((30, max(len(name) for name in names.values())))
|
|
for i, (uid, total) in enumerate(leaderboard):
|
|
lb_strings.append(
|
|
"{:<{}}\t{:<9}".format(
|
|
names[uid], max_name_len, strfdur(total, short=False)
|
|
)
|
|
)
|
|
|
|
page_len = 20
|
|
title = "Voice Leaderboard"
|
|
pages = paginate_list(lb_strings, block_length=page_len, title=title)
|
|
|
|
await pager(ctx, pages)
|