rewrite: New Video channels and moderation.

This commit is contained in:
2023-08-15 14:03:23 +03:00
parent 7e6217a2ae
commit 2cc90375c7
21 changed files with 2227 additions and 11 deletions

View File

@@ -0,0 +1,10 @@
import logging
from babel.translator import LocalBabel
logger = logging.getLogger(__name__)
babel = LocalBabel('video')
async def setup(bot):
from .cog import VideoCog
await bot.add_cog(VideoCog(bot))

View File

@@ -0,0 +1,575 @@
from typing import Optional
from collections import defaultdict
from weakref import WeakValueDictionary
import datetime as dt
import asyncio
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
from discord.app_commands import Range
from meta import LionCog, LionBot, LionContext
from meta.logger import log_wrap
from meta.sharding import THIS_SHARD
from core.data import CoreData
from utils.lib import utc_now
from wards import high_management_ward, low_management_ward, equippable_role
from modules.moderation.cog import ModerationCog
from . import babel, logger
from .data import VideoData
from .settings import VideoSettings
from .settingui import VideoSettingUI
from .ticket import VideoTicket
_p = babel._p
class VideoCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(VideoData())
self.settings = VideoSettings()
self.ready = asyncio.Event()
self._video_tasks: dict[tuple[int, int], asyncio.Task] = {}
self._event_locks: dict[tuple[int, int], asyncio.Lock] = WeakValueDictionary()
async def cog_load(self):
await self.data.init()
# TODO: Register Video Ticket type here
modcog = self.bot.get_cog('ModerationCog')
if modcog is None:
raise ValueError("Cannot load VideoCog before ModerationCog!")
self.bot.core.guild_config.register_model_setting(self.settings.VideoBlacklist)
self.bot.core.guild_config.register_model_setting(self.settings.VideoGracePeriod)
await self.settings.VideoChannels.setup(self.bot)
await self.settings.VideoExempt.setup(self.bot)
configcog = self.bot.get_cog('ConfigCog')
if configcog is None:
logger.warning(
"Could not load ConfigCog. VideoCog configuration will not crossload."
)
else:
self.crossload_group(self.configure_group, configcog.configure_group)
if self.bot.is_ready():
await self.initialise()
async def cog_unload(self):
...
@LionCog.listener('on_ready')
async def initialise(self):
"""
Read all current voice channel members.
Ensure that all video channel members have tasks running or are valid.
Note that we do start handling events before the bot cache is ready.
This is because the event data carries all required member data with it.
However, members who were already present and didn't fire an event
may still need to be handled.
"""
# Re-cache, now using the actual client guilds
await self.settings.VideoChannels.setup(self.bot)
await self.settings.VideoExempt.setup(self.bot)
# Collect members that need handling
active = [channel for guild in self.bot.guilds for channel in guild.voice_channels if channel.members]
tasks = []
for channel in active:
if await self.check_video_channel(channel):
for member in list(channel.members):
key = (channel.guild.id, member.id)
async with self.event_lock(key):
if key in self._video_tasks:
pass
elif await self.check_member_exempt(member):
pass
elif await self.check_member_blacklist(member):
task = asyncio.create_task(
self._remove_blacklisted(member, channel)
)
tasks.append(task)
else:
task = asyncio.create_task(
self._joined_video_channel(member, channel)
)
tasks.append(task)
self._video_tasks[key] = task
if tasks:
await asyncio.gather(*tasks)
# ----- Event Handlers -----
def event_lock(self, key) -> asyncio.Lock:
"""
Get an asyncio.Lock for the given key.
Guarantees sequential event handling.
"""
lock = self._event_locks.get(key, None)
if lock is None:
lock = self._event_locks[key] = asyncio.Lock()
logger.debug(f"Getting video event lock {key} (locked: {lock.locked()})")
return lock
@LionCog.listener('on_voice_state_update')
@log_wrap(action='Video Watchdog')
async def video_watchdog(self, member: discord.Member,
before: discord.VoiceState, after: discord.VoiceState):
if member.bot:
return
task_key = (member.guild.id, member.id)
# Freeze the state so it doesn't get updated by other events
after_channel = after.channel
before_channel = before.channel
after_video = after.self_video
async with self.event_lock(task_key):
if after_channel != before_channel:
# Channel changed, cancel any running tasks
task = self._video_tasks.pop(task_key, None)
if task and not task.done() and not task.cancelled():
task.cancel()
# If they are joining a video channel, run join logic
run_join = (
after_channel and not after_video
and await self.check_video_channel(after_channel)
and not await self.check_member_exempt(member)
)
if run_join:
# Check if the member is blacklisted
if await self.check_member_blacklist(member):
# Kick them from the channel
await self._remove_blacklisted(member, after_channel)
join_task = asyncio.create_task(
self._joined_video_channel(member, after_channel)
)
self._video_tasks[task_key] = join_task
logger.debug(
f"Launching video channel join task for <uid:{member.id}> "
f"in <cid:{after_channel.id}> of guild <gid:{member.guild.id}>."
)
elif after_channel and (before.self_video != after_video):
# Video state changed
channel = after_channel
if (await self.check_video_channel(channel) and not await self.check_member_exempt(member)):
# Relevant video event
if after_video:
# They turned their video on!
# Cancel any running tasks
task = self._video_tasks.pop(task_key, None)
if task and not task.done() and not task.cancelled():
task.cancel()
elif (task := self._video_tasks.get(task_key, None)) is None or task.done():
# They turned their video off, and there are no tasks handling the member
# Give them a brief grace period and then kick them
kick_task = asyncio.create_task(
self._disabled_video_kick(member, channel)
)
self._video_tasks[task_key] = kick_task
logger.debug(
f"Launching video channel kick task for <uid:{member.id}> "
f"in <cid:{channel.id}> of guild <gid:{member.guild.id}>"
)
async def check_member_exempt(self, member: discord.Member) -> bool:
"""
Check whether a member is video-exempt.
Should almost always hit cache.
"""
exempt_setting = await self.settings.VideoExempt.get(member.guild.id)
exempt_ids = set(exempt_setting.data)
return any(role.id in exempt_ids for role in member.roles)
async def check_member_blacklist(self, member: discord.Member) -> bool:
"""
Check whether a member is video blacklisted.
(i.e. check whether they have the blacklist role)
"""
blacklistid = (await self.settings.VideoBlacklist.get(member.guild.id)).data
return (blacklistid and any(role.id == blacklistid for role in member.roles))
async def check_video_channel(self, channel: discord.VoiceChannel) -> bool:
"""
Check whether a given channel is a video only channel.
Should almost always hit cache.
"""
channel_setting = await self.settings.VideoChannels.get(channel.guild.id)
channelids = set(channel_setting.data)
return (channel.id in channelids) or (channel.category_id and channel.category_id in channelids)
async def _remove_blacklisted(self, member: discord.Member, channel: discord.VoiceChannel):
"""
Remove a video blacklisted member from the channel.
"""
logger.info(
f"Removing video blacklisted member <uid:{member.id}> from <cid:{channel.id}> in "
f"<gid:{member.guild.id}>"
)
t = self.bot.translator.t
try:
# Kick the member from the channel
await asyncio.shield(
member.edit(
voice_channel=None,
reason=t(_p(
'video_watchdog|kick_blacklisted_member|audit_reason',
"Removing video blacklisted member from a video channel."
))
)
)
except discord.HTTPException:
# TODO: Event log
...
except asyncio.CancelledError:
# This shouldn't happen because we don't wait for this task the same way
# And the event lock should wait for this to be complete anyway
pass
# TODO: Notify through the moderation alert API
embed = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'video_watchdog|kick_blacklisted_member|notification|title',
"You have been disconnected."
)),
description=t(_p(
'video_watchdog|kick_blacklisted_member|notification|desc',
"You were disconnected from the video channel {channel} because you are "
"blacklisted from video channels in **{server}**."
)).format(channel=channel.mention, server=channel.guild.name),
)
modcog: ModerationCog = self.bot.get_cog('ModerationCog')
await modcog.send_alert(
member,
embed=embed
)
async def _joined_video_channel(self, member: discord.Member, channel: discord.VoiceChannel):
"""
Handle a (non-exempt, non-blacklisted) member joining a video channel.
"""
if not member.voice or not member.voice.channel:
# In case the member already left
return
if member.voice.self_video:
# In case they already turned video on
return
try:
# First wait for 15 seconds for them to turn their video on (without prompting)
await asyncio.sleep(15)
# Fetch the required setting data (allow cancellation while we fetch or create)
lion = await self.bot.core.lions.fetch_member(member.guild.id, member.id)
except asyncio.CancelledError:
# They left the video channel or turned their video on
return
t = self.bot.translator.t
modcog: ModerationCog = self.bot.get_cog('ModerationCog')
now = utc_now()
# Important that we use a sync request here
grace = lion.lguild.config.get(self.settings.VideoGracePeriod.setting_id).value
disconnect_at = now + dt.timedelta(seconds=grace)
jump_field = t(_p(
'video_watchdog|join_task|jump_field',
"[Click to jump back]({link})"
)).format(link=channel.jump_url)
request = discord.Embed(
colour=discord.Colour.orange(),
title=t(_p(
'video_watchdog|join_task|initial_request:title',
"Please enable your video!"
)),
description=t(_p(
'video_watchdog|join_task|initial_request:description',
"**You have joined the video channel {channel}!**\n"
"Please **enable your video** or **leave the channel** "
"or you will be disconnected {timestamp} and "
"potentially **blacklisted**."
)).format(
channel=channel.mention,
timestamp=discord.utils.format_dt(disconnect_at, 'R'),
),
timestamp=now
).add_field(name='', value=jump_field)
thanks = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'video_watchdog|join_task|thanks:title',
"Thanks for enabling your video!"
)),
).add_field(name='', value=jump_field)
bye = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'video_watchdog|join_task|bye:title',
"Thanks for leaving the channel promptly!"
))
)
alert_task = asyncio.create_task(
modcog.send_alert(
member,
embed=request
)
)
try:
message = await asyncio.shield(alert_task)
await discord.utils.sleep_until(disconnect_at)
except asyncio.CancelledError:
# Member enabled video or moved to another channel or left the server
# Wait for the message to finish sending if we need to
message = await alert_task
# Fetch a new member to check voice state
member = member.guild.get_member(member.id)
if member and message:
if member.voice and (member.voice.channel == channel) and member.voice.self_video:
# Assume member enabled video
embed = thanks
else:
# Assume member left channel
embed = bye
embed.timestamp = utc_now()
try:
await message.edit(embed=embed)
except discord.HTTPException:
pass
else:
# Member never enabled video in the grace period.
# No longer accept cancellation
self._video_tasks.pop((member.guild.id, member.id), None)
# Disconnect user
try:
await member.edit(
voice_channel=None,
reason=t(_p(
'video_watchdog|join_task|kick_after_grace|audit_reason',
"Member never enabled their video in video channel."
))
)
except discord.HTTPException:
# TODO: Event log
...
# Assign warn/blacklist ticket as needed
blacklist = lion.lguild.config.get(self.settings.VideoBlacklist.setting_id)
only_warn = (not lion.data.video_warned) and blacklist
ticket = None
if not only_warn:
# Try to apply blacklist
try:
ticket = await self.blacklist_member(
member,
reason=t(_p(
'video_watchdog|join_task|kick_after_grace|ticket_reason',
"Failed to enable their video in time in the video channel {channel}"
)).format(channel=channel.mention)
)
except discord.HTTPException as e:
logger.debug(
f"Could not create blacklist ticket on member <uid:{member.id}> "
f"in <gid:{member.guild.id}>: {e.text}"
)
only_warn = True
# Ack based on ticket created
alert_ref = message.to_reference(fail_if_not_exists=False)
if only_warn:
# TODO: Warn ticket
warning = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'video_watchdog|join_task|kick_after_grace|warning|title',
"You have received a warning!"
)),
description=t(_p(
'video_watchdog|join_task|kick_after_grace|warning|desc',
"**You must enable your camera in camera-only rooms.**\n"
"You have been disconnected from the video {channel} for not "
"enabling your camera."
)).format(channel=channel.mention),
timestamp=utc_now()
).add_field(name='', value=jump_field)
await modcog.send_alert(member, embed=warning, reference=alert_ref)
if not lion.data.video_warned:
await lion.data.update(video_warned=True)
else:
alert = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'video_watchdog|join_task|kick_after_grace|blacklist|title',
"You have been blacklisted!"
)),
description=t(_p(
'video_watchdog|join_task|kick_after_grace|blacklist|desc',
"You have been blacklisted from the video channels in this server."
)),
timestamp=utc_now()
).add_field(name='', value=jump_field)
# TODO: Add duration
await modcog.send_alert(member, embed=alert, reference=alert_ref)
async def _disabled_video_kick(self, member: discord.Member, channel: discord.VoiceChannel):
"""
Kick a video channel member who has disabled their video.
"""
# Give them 15 seconds to re-enable
try:
await asyncio.sleep(15)
except asyncio.CancelledError:
# Member left the channel or turned on their video
return
# Member did not turn on their video, actually kick and notify
t = self.bot.translator.t
logger.info(
f"Removing member <uid:{member.id}> from video channel <cid:{channel.id}> in "
f"<gid:{member.guild.id}> because they disabled their video."
)
# Disconnection is now inevitable
# We also don't want our own disconnection to cancel the task
self._video_tasks.pop((member.guild.id, member.id), None)
try:
await asyncio.shield(
member.edit(
voice_channel=None,
reason=t(_p(
'video_watchdog|disabled_video_kick|audit_reason',
"Disconnected for disabling video for more than {number} seconds in video channel."
)).format(number=15)
)
)
except asyncio.CancelledError:
# Ignore cancelled error at this point
pass
except discord.HTTPException:
# TODO: Event logging
pass
embed = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'video_watchdog|disabled_video_kick|notification|title',
"You have been disconnected."
)),
description=t(_p(
'video_watchdog|disabled_video_kick|notification|desc',
"You were disconnected from the video channel {channel} because "
"you disabled your video.\n"
"Please keep your video on at all times, and leave the channel if you need "
"to disable it!"
))
)
modcog: ModerationCog = self.bot.get_cog('ModerationCog')
await modcog.send_alert(
member,
embed=embed
)
async def blacklist_member(self, member: discord.Member, reason: str):
"""
Create a VideoBlacklist ticket with the appropriate duration,
and apply the video blacklist role.
Propagates any exceptions that may arise.
"""
return await VideoTicket.autocreate(
self.bot, member, reason
)
# ----- Commands -----
# ------ Configuration -----
@LionCog.placeholder_group
@cmds.hybrid_group('configure', with_app_command=False)
async def configure_group(self, ctx: LionContext):
...
@configure_group.command(
name=_p('cmd:configure_video', "video_channels"),
description=_p(
'cmd:configure_video|desc', "Configure video-only channels and blacklisting."
)
)
@appcmds.rename(
video_blacklist=VideoSettings.VideoBlacklist._display_name,
video_blacklist_durations=VideoSettings.VideoBlacklistDurations._display_name,
video_grace_period=VideoSettings.VideoGracePeriod._display_name,
)
@appcmds.describe(
video_blacklist=VideoSettings.VideoBlacklist._desc,
video_blacklist_durations=VideoSettings.VideoBlacklistDurations._desc,
video_grace_period=VideoSettings.VideoGracePeriod._desc,
)
@low_management_ward
async def configure_video(self, ctx: LionContext,
video_blacklist: Optional[discord.Role] = None,
video_blacklist_durations: Optional[str] = None,
video_grace_period: Optional[str] = None,
):
if not ctx.guild:
return
if not ctx.interaction:
return
await ctx.interaction.response.defer(thinking=True)
modified = []
if video_blacklist is not None:
await equippable_role(self.bot, video_blacklist, ctx.author)
setting = self.settings.VideoBlacklist
await setting._check_value(ctx.guild.id, video_blacklist)
instance = setting(ctx.guild.id, video_blacklist.id)
modified.append(instance)
if video_blacklist_durations is not None:
setting = self.settings.VideoBlacklistDurations
instance = await setting.from_string(ctx.guild.id, video_blacklist_durations)
modified.append(instance)
if video_grace_period is not None:
setting = self.settings.VideoGracePeriod
instance = await setting.from_string(ctx.guild.id, video_grace_period)
modified.append(instance)
if modified:
ack_lines = []
for instance in modified:
await instance.write()
ack_lines.append(instance.update_message)
# Ack modified
tick = self.bot.config.emojis.tick
embed = discord.Embed(
colour=discord.Colour.brand_green(),
description='\n'.join(f"{tick} {line}" for line in ack_lines),
)
await ctx.reply(embed=embed)
if ctx.channel.id not in VideoSettingUI._listening or not modified:
ui = VideoSettingUI(self.bot, ctx.guild.id, ctx.channel.id)
await ui.run(ctx.interaction)
await ui.wait()

View File

@@ -0,0 +1,7 @@
from data import Registry, Table
class VideoData(Registry):
video_channels = Table('video_channels')
video_exempt_roles = Table('video_exempt_roles')
video_blacklist_durations = Table('studyban_durations')

View File

@@ -0,0 +1,315 @@
from cachetools import LRUCache
from collections import defaultdict
from settings import ModelData, ListData
from settings.groups import SettingGroup
from settings.ui import InteractiveSetting
from settings.setting_types import (
DurationSetting, RoleSetting, RoleListSetting, ChannelListSetting,
ListSetting
)
from meta import conf
from meta.sharding import THIS_SHARD
from meta.logger import log_wrap
from core.data import CoreData
from babel.translator import ctx_translator
from . import babel, logger
from .data import VideoData
_p = babel._p
class VideoSettings(SettingGroup):
class VideoChannels(ListData, ChannelListSetting):
setting_id = "video_channels"
_event = 'guildset_video_channels'
_display_name = _p('guildset:video_channels', "video_channels")
_desc = _p(
'guildset:video_channels|desc',
"List of voice channels and categories in which to enforce video."
)
_long_desc = _p(
'guildset:video_channels|long_desc',
"Member will be required to turn on their video in these channels.\n"
"If they do not enable their video with `15` seconds of joining, "
"they will be asked to enable it "
"through a notification in direct messages or the `alert_channel`. "
"If they still have not enabled it after the `video_grace_period` has passed, "
"they will be kicked from the channel. "
"Further, after the first offence (which is considered a warning), "
"they will be given the `video_blacklist` role, if configured, "
"which will stop them from joining video channels.\n"
"As usual, if a category is configured, this will apply to all voice channels "
"under the category."
)
_accepts = _p(
'guildset:video_channels|accepts',
"Comma separated channel ids or names."
)
_cache = LRUCache(maxsize=2500)
_table_interface = VideoData.video_channels
_id_column = 'guildid'
_data_column = 'channelid'
_order_column = 'channelid'
@property
def update_message(self) -> str:
t = ctx_translator.get().t
value = self.value
if value:
resp = t(_p(
'guildset:video_channels|set_response:set',
"Members will be asked to turn on their video in the following channels: {channels}"
)).format(channels=self.formatted)
else:
resp = t(_p(
'guildset:video_channels|set_response:unset',
"Members will not be asked to turn on their video in any channels."
))
return resp
@classmethod
@log_wrap(action="Cache video_channels")
async def setup(cls, bot):
"""
Preload video channels for every guild on the current shard.
"""
data: VideoData = bot.db.registries[VideoData._name]
if bot.is_ready():
rows = await data.video_channels.select_where(
guildid=[guild.id for guild in bot.guilds]
)
else:
rows = await data.video_channels.select_where(THIS_SHARD)
new_cache = defaultdict(list)
count = 0
for row in rows:
new_cache[row['guildid']].append(row['channelid'])
count += 1
if cls._cache is None:
cls._cache = LRUCache(2500)
cls._cache.clear()
cls._cache.update(new_cache)
logger.info(f"Loaded {count} video channels on this shard.")
class VideoBlacklist(ModelData, RoleSetting):
setting_id = "video_blacklist"
_event = 'guildset_video_blacklist'
_display_name = _p('guildset:video_blacklist', "video_blacklist")
_desc = _p(
'guildset:video_blacklist|desc',
"Role given when members are blacklisted from video channels."
)
_long_desc = _p(
'guildset:video_blacklist|long_desc',
"This role will be automatically given after a member has failed to keep their video "
"enabled in a video channel (see above).\n"
"Members who have this role will not be able to join configured video channels. "
"The role permissions may be freely configured by server admins "
"to place further restrictions on the offender.\n"
"The role may also be manually assigned, to the same effect.\n"
"If this role is not set, no video blacklist will occur, "
"and members will only be kicked from the channel and warned."
)
_accepts = _p(
'guildset:video_blacklist|accepts',
"Blacklist role name or id."
)
_default = None
_model = CoreData.Guild
_column = CoreData.Guild.studyban_role.name
_allow_object = False
@property
def update_message(self) -> str:
t = ctx_translator.get().t
value = self.value
if value:
resp = t(_p(
'guildset:video_blacklist|set_response:set',
"Members who fail to keep their video on will be given {role}"
)).format(role=f"<@&{self.data}>")
else:
resp = t(_p(
'guildset:video_blacklist|set_response:unset',
"Members will no longer be automatically blacklisted from video channels."
))
return resp
@classmethod
def _format_data(cls, parent_id, data, **kwargs):
t = ctx_translator.get().t
if data is not None:
return super()._format_data(parent_id, data, **kwargs)
else:
return t(_p(
'guildset:video_blacklist|formatted:unset',
"Not Set. (Members will not be automatically blacklisted.)"
))
class VideoBlacklistDurations(ListData, ListSetting, InteractiveSetting):
setting_id = 'video_durations'
_setting = DurationSetting
_display_name = _p('guildset:video_durations', "video_blacklist_durations")
_desc = _p(
'guildset:video_durations|desc',
"Sequence of durations for automatic video blacklists."
)
_long_desc = _p(
'guildset:video_durations|long_desc',
"When `video_blacklist` is set and members fail to turn on their video within "
"the configured `video_grace_period`, they will be automatically blacklisted "
"(i.e. given the `video_blacklist` role).\n"
"This setting describes *how long* the member will be blacklisted for, "
"for each offence.\n"
"E.g. if this is set to `1d, 7d, 30d`, "
"then on the first offence the member will be blacklisted for 1 day, "
"on the second for 7 days, and on the third for 30 days. "
"A subsequent offence will result in an infinite blacklist."
)
_accepts = _p(
'guildset:video_durations|accepts',
"Comma separated list of durations."
)
_default = [
5 * 60,
60 * 60,
6 * 60 * 60,
24 * 60 * 60,
168 * 60 * 60,
720 * 60 * 60
]
# No need to expire
_cache = {}
_table_interface = VideoData.video_blacklist_durations
_id_column = 'guildid'
_data_column = 'duration'
_order_column = 'rowid'
@property
def update_message(self) -> str:
t = ctx_translator.get().t
value = self.value
if value:
resp = t(_p(
'guildset:video_durations|set_response:set',
"Members will be automatically blacklisted for: {durations}"
)).format(durations=self.formatted)
else:
resp = t(_p(
'guildset:video_durations|set_response:unset',
"Video blacklists are now always permanent."
))
return resp
class VideoGracePeriod(ModelData, DurationSetting):
setting_id = "video_grace_period"
_event = 'guildset_video_grace_period'
_display_name = _p('guildset:video_grace_period', "video_grace_period")
_desc = _p(
'guildset:video_grace_period|desc',
"How long to wait (in seconds) before kicking/blacklist members who don't enable their video."
)
_long_desc = _p(
'guildset:video_grace_period|long_desc',
"The length of time a member has to enable their video after joining a video channel. "
"After this time, if they have not enabled their video, they will be kicked from the channel "
"and potentially blacklisted from video channels."
)
_accepts = _p(
'guildset:video_grace_period|accepts',
"How many seconds to wait for a member to enable video."
)
_default = 90
_default_multiplier = 1
_model = CoreData.Guild
_column = CoreData.Guild.video_grace_period.name
_cache = LRUCache(2500)
@property
def update_message(self) -> str:
t = ctx_translator.get().t
resp = t(_p(
'guildset:video_grace_period|set_response:set',
"Members will now have **{duration}** to enable their video."
)).format(duration=self.formatted)
return resp
class VideoExempt(ListData, RoleListSetting):
setting_id = "video_exempt"
_event = 'guildset_video_exempt'
_display_name = _p('guildset:video_exempt', "video_exempt")
_desc = _p(
'guildset:video_exempt|desc',
"List of roles which are exempt from video channels."
)
_long_desc = _p(
'guildset:video_exempt|long_desc',
"Members who have **any** of these roles "
"will not be required to enable their video in the `video_channels`. "
"This also overrides the `video_blacklist` role."
)
_accepts = _p(
'guildset:video_exempt|accepts',
"List of exempt role names or ids."
)
_table_interface = VideoData.video_exempt_roles
_id_column = 'guildid'
_data_column = 'roleid'
_order_column = 'roleid'
@property
def update_message(self) -> str:
t = ctx_translator.get().t
value = self.value
if value:
resp = t(_p(
'guildset:video_exempt|set_response:set',
"The following roles will now be exempt from video channels: {roles}"
)).format(roles=self.formatted)
else:
resp = t(_p(
'guildset:video_exempt|set_response:unset',
"No members will be exempt from video channel requirements."
))
return resp
@classmethod
@log_wrap(action="Cache video_exempt")
async def setup(cls, bot):
"""
Preload video exempt roles for every guild on the current shard.
"""
data: VideoData = bot.db.registries[VideoData._name]
if bot.is_ready():
rows = await data.video_exempt_roles.select_where(
guildid=[guild.id for guild in bot.guilds]
)
else:
rows = await data.video_exempt_roles.select_where(THIS_SHARD)
new_cache = defaultdict(list)
count = 0
for row in rows:
new_cache[row['guildid']].append(row['roleid'])
count += 1
if cls._cache is None:
cls._cache = LRUCache(2500)
cls._cache.clear()
cls._cache.update(new_cache)
logger.info(f"Loaded {count} video exempt roles on this shard.")

View File

@@ -0,0 +1,163 @@
import asyncio
import discord
from discord.ui.button import button, Button, ButtonStyle
from discord.ui.select import select, ChannelSelect, RoleSelect
from meta import LionBot
from wards import equippable_role
from utils.ui import ConfigUI, DashboardSection
from utils.lib import MessageArgs
from . import babel
from .settings import VideoSettings
_p = babel._p
class VideoSettingUI(ConfigUI):
setting_classes = (
VideoSettings.VideoChannels,
VideoSettings.VideoExempt,
VideoSettings.VideoGracePeriod,
VideoSettings.VideoBlacklist,
VideoSettings.VideoBlacklistDurations,
)
def __init__(self, bot: LionBot, guildid: int, channelid: int, **kwargs):
self.settings = bot.get_cog('VideoCog').settings
super().__init__(bot, guildid, channelid, **kwargs)
# ----- UI Components -----
# Video Channels channel selector
@select(
cls=ChannelSelect,
channel_types=[discord.ChannelType.voice, discord.ChannelType.category],
placeholder="CHANNELS_MENU_PLACEHOLDER",
min_values=0, max_values=25
)
async def channels_menu(self, selection: discord.Interaction, selected: RoleSelect):
"""
Multi-channel selector for the `video_channels` setting.
"""
await selection.response.defer(thinking=True, ephemeral=True)
setting = self.get_instance(VideoSettings.VideoChannels)
setting.value = selected.values
await setting.write()
await selection.delete_original_response()
async def channels_menu_refresh(self):
menu = self.channels_menu
t = self.bot.translator.t
menu.placeholder = t(_p(
'ui:video_config|menu:channels|placeholder',
"Select Video Channels"
))
# Video exempt role selector
@select(
cls=RoleSelect,
placeholder="EXEMPT_MENU_PLACEHOLDER",
min_values=0, max_values=25
)
async def exempt_menu(self, selection: discord.Interaction, selected: RoleSelect):
"""
Multi-role selector for the `video_exempt` setting.
"""
await selection.response.defer(thinking=True, ephemeral=True)
setting = self.get_instance(VideoSettings.VideoExempt)
setting.value = selected.values
await setting.write()
await selection.delete_original_response()
async def exempt_menu_refresh(self):
menu = self.exempt_menu
t = self.bot.translator.t
menu.placeholder = t(_p(
'ui:video_config|menu:exempt|placeholder',
"Select Exempt Roles"
))
# Video blacklist role selector
@select(
cls=RoleSelect,
placeholder="VIDEO_BLACKLIST_MENU_PLACEHOLDER",
min_values=0, max_values=1
)
async def video_blacklist_menu(self, selection: discord.Interaction, selected: RoleSelect):
"""
Single role selector for the `video_blacklist` setting.
"""
await selection.response.defer(thinking=True, ephemeral=True)
setting = self.get_instance(VideoSettings.VideoBlacklist)
setting.value = selected.values[0] if selected.values else None
if setting.value:
await equippable_role(self.bot, setting.value, selection.user)
await setting.write()
await selection.delete_original_response()
async def video_blacklist_menu_refresh(self):
menu = self.video_blacklist_menu
t = self.bot.translator.t
menu.placeholder = t(_p(
'ui:video_config|menu:video_blacklist|placeholder',
"Select Blacklist Role"
))
# ----- UI Flow -----
async def make_message(self) -> MessageArgs:
t = self.bot.translator.t
title = t(_p(
'ui:video_config|embed|title',
"Video Channel Configuration Panel"
))
embed = discord.Embed(
title=title,
colour=discord.Colour.orange()
)
for setting in self.instances:
embed.add_field(**setting.embed_field, inline=False)
return MessageArgs(embed=embed)
async def reload(self):
self.instances = [
await setting.get(self.guildid)
for setting in self.setting_classes
]
async def refresh_components(self):
component_refresh = (
self.edit_button_refresh(),
self.close_button_refresh(),
self.reset_button_refresh(),
self.channels_menu_refresh(),
self.exempt_menu_refresh(),
self.video_blacklist_menu_refresh(),
)
await asyncio.gather(*component_refresh)
self.set_layout(
(self.channels_menu,),
(self.exempt_menu,),
(self.video_blacklist_menu,),
(self.edit_button, self.reset_button, self.close_button,),
)
class VideoDashboard(DashboardSection):
section_name = _p(
"dash:video|title",
"Video Channel Settings ({commands[configure video_channels]})"
)
_option_name = _p(
"dash:video|option|name",
"Video Channel Panel"
)
configui = VideoSettingUI
setting_classes = VideoSettingUI.setting_classes

View File

@@ -0,0 +1,110 @@
import datetime as dt
import discord
from meta import LionBot
from utils.lib import utc_now
from modules.moderation.cog import ModerationCog
from modules.moderation.data import TicketType, TicketState, ModerationData
from modules.moderation.ticket import Ticket, ticket_factory
from . import babel, logger
from .settings import VideoSettings
@ticket_factory(TicketType.STUDY_BAN)
class VideoTicket(Ticket):
__slots__ = ()
@classmethod
async def create(
cls, bot: LionBot, member: discord.Member,
moderatorid: int, reason: str, expiry=None,
**kwargs
):
modcog: ModerationCog = bot.get_cog('ModerationCog')
ticket_data = await modcog.data.Ticket.create(
guildid=member.guild.id,
targetid=member.id,
ticket_type=TicketType.STUDY_BAN,
ticket_state=TicketState.EXPIRING if expiry else TicketState.OPEN,
moderator_id=moderatorid,
auto=(moderatorid == bot.user.id),
content=reason,
expiry=expiry,
**kwargs
)
lguild = await bot.core.lions.fetch_guild(member.guild.id, guild=member.guild)
new_ticket = cls(lguild, ticket_data)
# Schedule expiry if required
if expiry:
cls.expiring.schedule_task(ticket_data.ticketid, expiry.timestamp())
await new_ticket.post()
# Cancel any existent expiring video blacklists
tickets = await cls.fetch_tickets(
bot,
(modcog.data.Ticket.ticketid != new_ticket.data.ticketid),
guildid=member.guild.id,
targetid=member.id,
ticket_state=TicketState.EXPIRING
)
for ticket in tickets:
await ticket.cancel_expiry()
return new_ticket
@classmethod
async def autocreate(cls, bot: LionBot, target: discord.Member, reason: str, **kwargs):
modcog: ModerationCog = bot.get_cog('ModerationCog')
lguild = await bot.core.lions.fetch_guild(target.guild.id, guild=target.guild)
blacklist = lguild.config.get(VideoSettings.VideoBlacklist.setting_id).value
if not blacklist:
return
# This will propagate HTTPException if needed
await target.add_roles(blacklist, reason=reason)
Ticket = modcog.data.Ticket
row = await Ticket.table.select_one_where(
(Ticket.ticket_state != TicketState.PARDONED),
guildid=target.guild.id,
targetid=target.id,
ticket_type=TicketType.STUDY_BAN,
).with_no_adapter().select(ticket_count="COUNT(*)")
count = row[0]['ticket_count'] if row else 0
durations = (await VideoSettings.VideoBlacklistDurations.get(target.guild.id)).value
if count < len(durations):
durations.sort()
duration = durations[count]
expiry = utc_now() + dt.timedelta(seconds=duration)
else:
duration = None
expiry = None
return await cls.create(
bot, target,
bot.user.id, reason,
duration=duration, expiry=expiry,
**kwargs
)
async def _revert(self, reason=None):
target = self.target
blacklist = self.lguild.config.get(VideoSettings.VideoBlacklist.setting_id).value
# TODO: User lion.remove_role instead
if target and blacklist in target.roles:
try:
await target.remove_roles(
blacklist,
reason=reason
)
except discord.HTTPException as e:
logger.debug(f"Revert failed for ticket {self.data.ticketid}: {e.text}")