Cleaning up migrated components.

This commit is contained in:
2023-08-16 12:33:56 +03:00
parent 0904b2deb7
commit 5799b51a32
15 changed files with 0 additions and 1319 deletions

View File

@@ -1,7 +0,0 @@
from .module import module
from . import guild_config
from . import statreset
from . import new_members
from . import reaction_roles
from . import economy

View File

@@ -1,3 +0,0 @@
from ..module import module
from . import set_coins

View File

@@ -1,104 +0,0 @@
import discord
import datetime
from wards import guild_admin
from settings import GuildSettings
from core import Lion
from ..module import module
POSTGRES_INT_MAX = 2147483647
@module.cmd(
"set_coins",
group="Guild Admin",
desc="Set coins on a member."
)
@guild_admin()
async def cmd_set(ctx):
"""
Usage``:
{prefix}set_coins <user mention> <amount>
Description:
Sets the given number of coins on the mentioned user.
If a number greater than 0 is mentioned, will add coins.
If a number less than 0 is mentioned, will remove coins.
Note: LionCoins on a member cannot be negative.
Example:
{prefix}set_coins {ctx.author.mention} 100
{prefix}set_coins {ctx.author.mention} -100
"""
# Extract target and amount
# Handle a slightly more flexible input than stated
splits = ctx.args.split()
digits = [isNumber(split) for split in splits[:2]]
mentions = ctx.msg.mentions
if len(splits) < 2 or not any(digits) or not (all(digits) or mentions):
return await _send_usage(ctx)
if all(digits):
# Both are digits, hopefully one is a member id, and one is an amount.
target, amount = ctx.guild.get_member(int(splits[0])), int(splits[1])
if not target:
amount, target = int(splits[0]), ctx.guild.get_member(int(splits[1]))
if not target:
return await _send_usage(ctx)
elif digits[0]:
amount, target = int(splits[0]), mentions[0]
elif digits[1]:
target, amount = mentions[0], int(splits[1])
# Fetch the associated lion
target_lion = Lion.fetch(ctx.guild.id, target.id)
# Check sanity conditions
if target == ctx.client.user:
return await ctx.embed_reply("Thanks, but Ari looks after all my needs!")
if target.bot:
return await ctx.embed_reply("We are still waiting for {} to open an account.".format(target.mention))
# Finally, send the amount and the ack message
# Postgres `coins` column is `integer`, sanity check postgres int limits - which are smalled than python int range
target_coins_to_set = target_lion.coins + amount
if target_coins_to_set >= 0 and target_coins_to_set <= POSTGRES_INT_MAX:
target_lion.addCoins(amount)
elif target_coins_to_set < 0:
target_coins_to_set = -target_lion.coins # Coins cannot go -ve, cap to 0
target_lion.addCoins(target_coins_to_set)
target_coins_to_set = 0
else:
return await ctx.embed_reply("Member coins cannot be more than {}".format(POSTGRES_INT_MAX))
embed = discord.Embed(
title="Funds Set",
description="You have set LionCoins on {} to **{}**!".format(target.mention,target_coins_to_set),
colour=discord.Colour.orange(),
timestamp=datetime.datetime.utcnow()
).set_footer(text=str(ctx.author), icon_url=ctx.author.avatar_url)
await ctx.reply(embed=embed, reference=ctx.msg)
GuildSettings(ctx.guild.id).event_log.log(
"{} set {}'s LionCoins to`{}`.".format(
ctx.author.mention,
target.mention,
target_coins_to_set
),
title="Funds Set"
)
def isNumber(var):
try:
return isinstance(int(var), int)
except:
return False
async def _send_usage(ctx):
return await ctx.error_reply(
"**Usage:** `{prefix}set_coins <mention> <amount>`\n"
"**Example:**\n"
" {prefix}set_coins {ctx.author.mention} 100\n"
" {prefix}set_coins {ctx.author.mention} -100".format(
prefix=ctx.best_prefix,
ctx=ctx
)
)

View File

@@ -1,163 +0,0 @@
import difflib
import discord
from cmdClient.lib import SafeCancellation
from wards import guild_admin, guild_moderator
from settings import UserInputError, GuildSettings
from utils.lib import prop_tabulate
import utils.ctx_addons # noqa
from .module import module
# Pages of configuration categories to display
cat_pages = {
'Administration': ('Meta', 'Guild Roles', 'New Members'),
'Moderation': ('Moderation', 'Video Channels'),
'Productivity': ('Study Tracking', 'TODO List', 'Workout'),
'Study Rooms': ('Rented Rooms', 'Scheduled Sessions'),
}
# Descriptions of each configuration category
descriptions = {
}
@module.cmd("config",
desc="View and modify the server settings.",
flags=('add', 'remove'),
group="Guild Configuration")
@guild_moderator()
async def cmd_config(ctx, flags):
"""
Usage``:
{prefix}config
{prefix}config info
{prefix}config <setting>
{prefix}config <setting> <value>
Description:
Display the server configuration panel, and view/modify the server settings.
Use `{prefix}config` to see the settings with their current values, or `{prefix}config info` to \
show brief descriptions instead.
Use `{prefix}config <setting>` (e.g. `{prefix}config event_log`) to view a more detailed description for each setting, \
including the possible values.
Finally, use `{prefix}config <setting> <value>` to set the setting to the given value.
To unset a setting, or set it to the default, use `{prefix}config <setting> None`.
Additional usage for settings which accept a list of values:
`{prefix}config <setting> <value1>, <value2>, ...`
`{prefix}config <setting> --add <value1>, <value2>, ...`
`{prefix}config <setting> --remove <value1>, <value2>, ...`
Note that the first form *overwrites* the setting completely,\
while the second two will only *add* and *remove* values, respectively.
Examples``:
{prefix}config event_log
{prefix}config event_log {ctx.ch.name}
{prefix}config autoroles Member, Level 0, Level 10
{prefix}config autoroles --remove Level 10
"""
# Cache and map some info for faster access
setting_displaynames = {setting.display_name.lower(): setting for setting in GuildSettings.settings.values()}
if not ctx.args or ctx.args.lower() in ('info', 'help'):
# Fill the setting cats
cats = {}
for setting in GuildSettings.settings.values():
cat = cats.get(setting.category, [])
cat.append(setting)
cats[setting.category] = cat
# Format the cats
sections = {}
for catname, cat in cats.items():
catprops = {
setting.display_name: setting.get(ctx.guild.id).summary if not ctx.args else setting.desc
for setting in cat
}
# TODO: Add cat description here
sections[catname] = prop_tabulate(*zip(*catprops.items()))
# Put the cats on the correct pages
pages = []
for page_name, cat_names in cat_pages.items():
page = {
cat_name: sections[cat_name] for cat_name in cat_names if cat_name in sections
}
if page:
embed = discord.Embed(
colour=discord.Colour.orange(),
title=page_name,
description=(
"View brief setting descriptions with `{prefix}config info`.\n"
"Use e.g. `{prefix}config event_log` to see more details.\n"
"Modify a setting with e.g. `{prefix}config event_log {ctx.ch.name}`.\n"
"See the [Online Tutorial]({tutorial}) for a complete setup guide.".format(
prefix=ctx.best_prefix,
ctx=ctx,
tutorial="https://discord.studylions.com/tutorial"
)
)
)
for name, value in page.items():
embed.add_field(name=name, value=value, inline=False)
pages.append(embed)
if len(pages) > 1:
[
embed.set_footer(text="Page {} of {}".format(i+1, len(pages)))
for i, embed in enumerate(pages)
]
await ctx.pager(pages)
elif pages:
await ctx.reply(embed=pages[0])
else:
await ctx.reply("No configuration options set up yet!")
else:
# Some args were given
parts = ctx.args.split(maxsplit=1)
name = parts[0]
setting = setting_displaynames.get(name.lower(), None)
if setting is None:
matches = difflib.get_close_matches(name, setting_displaynames.keys(), n=2)
match = "`{}`".format('` or `'.join(matches)) if matches else None
return await ctx.error_reply(
"Couldn't find a setting called `{}`!\n"
"{}"
"Use `{}config info` to see all the server settings.".format(
name,
"Maybe you meant {}?\n".format(match) if match else "",
ctx.best_prefix
)
)
if len(parts) == 1 and not ctx.msg.attachments:
# config <setting>
# View config embed for provided setting
await setting.get(ctx.guild.id).widget(ctx, flags=flags)
else:
# config <setting> <value>
# Ignoring the write ward currently and just enforcing admin
# Check the write ward
# if not await setting.write_ward.run(ctx):
# raise SafeCancellation(setting.write_ward.msg)
if not await guild_admin.run(ctx):
raise SafeCancellation("You need to be a server admin to modify settings!")
# Attempt to set config setting
try:
parsed = await setting.parse(ctx.guild.id, ctx, parts[1] if len(parts) > 1 else '')
parsed.write(add_only=flags['add'], remove_only=flags['remove'])
except UserInputError as e:
await ctx.reply(embed=discord.Embed(
description="{} {}".format('', e.msg),
colour=discord.Colour.red()
))
else:
await ctx.reply(embed=discord.Embed(
description="{} {}".format('', setting.get(ctx.guild.id).success_response),
colour=discord.Colour.green()
))

View File

@@ -1,4 +0,0 @@
from LionModule import LionModule
module = LionModule("Guild_Admin")

View File

@@ -1,3 +0,0 @@
from . import settings
from . import greetings
from . import roles

View File

@@ -1,6 +0,0 @@
from data import Table, RowTable
autoroles = Table('autoroles')
bot_autoroles = Table('bot_autoroles')
past_member_roles = Table('past_member_roles')

View File

@@ -1,29 +0,0 @@
import discord
from LionContext import LionContext as Context
from meta import client
from .settings import greeting_message, greeting_channel, returning_message
@client.add_after_event('member_join')
async def send_greetings(client, member):
guild = member.guild
returning = bool(client.data.lions.fetch((guild.id, member.id)))
# Handle greeting message
channel = greeting_channel.get(guild.id).value
if channel is not None:
if channel == greeting_channel.DMCHANNEL:
channel = member
ctx = Context(client, guild=guild, author=member)
if returning:
args = returning_message.get(guild.id).args(ctx)
else:
args = greeting_message.get(guild.id).args(ctx)
try:
await channel.send(**args)
except discord.HTTPException:
pass

View File

@@ -1,115 +0,0 @@
import asyncio
import discord
from collections import defaultdict
from meta import client
from core import Lion
from settings import GuildSettings
from .settings import autoroles, bot_autoroles, role_persistence
from .data import past_member_roles
# Locks to avoid storing the roles while adding them
# The locking is cautious, leaving data unchanged upon collision
locks = defaultdict(asyncio.Lock)
@client.add_after_event('member_join')
async def join_role_tracker(client, member):
"""
Add autoroles or saved roles as needed.
"""
guild = member.guild
if not guild.me.guild_permissions.manage_roles:
# We can't manage the roles here, don't try to give/restore the member roles
return
async with locks[(guild.id, member.id)]:
if role_persistence.get(guild.id).value and client.data.lions.fetch((guild.id, member.id)):
# Lookup stored roles
role_rows = past_member_roles.select_where(
guildid=guild.id,
userid=member.id
)
# Identify roles from roleids
roles = (guild.get_role(row['roleid']) for row in role_rows)
# Remove non-existent roles
roles = (role for role in roles if role is not None)
# Remove roles the client can't add
roles = [role for role in roles if role < guild.me.top_role]
if roles:
try:
await member.add_roles(
*roles,
reason="Restoring saved roles.",
)
except discord.HTTPException:
# This shouldn't ususally happen, but there are valid cases where it can
# E.g. the user left while we were restoring their roles
pass
# Event log!
GuildSettings(guild.id).event_log.log(
"Restored the following roles for returning member {}:\n{}".format(
member.mention,
', '.join(role.mention for role in roles)
),
title="Saved roles restored"
)
else:
# Add autoroles
roles = bot_autoroles.get(guild.id).value if member.bot else autoroles.get(guild.id).value
# Remove roles the client can't add
roles = [role for role in roles if role < guild.me.top_role]
if roles:
try:
await member.add_roles(
*roles,
reason="Adding autoroles.",
)
except discord.HTTPException:
# This shouldn't ususally happen, but there are valid cases where it can
# E.g. the user left while we were adding autoroles
pass
# Event log!
GuildSettings(guild.id).event_log.log(
"Gave {} the guild autoroles:\n{}".format(
member.mention,
', '.join(role.mention for role in roles)
),
titles="Autoroles added"
)
@client.add_after_event('member_remove')
async def left_role_tracker(client, member):
"""
Delete and re-store member roles when they leave the server.
"""
if (member.guild.id, member.id) in locks and locks[(member.guild.id, member.id)].locked():
# Currently processing a join event
# Which means the member left while we were adding their roles
# Cautiously return, not modifying the saved role data
return
# Delete existing member roles for this user
# NOTE: Not concurrency-safe
past_member_roles.delete_where(
guildid=member.guild.id,
userid=member.id,
)
if role_persistence.get(member.guild.id).value:
# Make sure the user has an associated lion, so we can detect when they rejoin
Lion.fetch(member.guild.id, member.id)
# Then insert the current member roles
values = [
(member.guild.id, member.id, role.id)
for role in member.roles
if not role.is_bot_managed() and not role.is_integration() and not role.is_default()
]
if values:
past_member_roles.insert_many(
*values,
insert_keys=('guildid', 'userid', 'roleid')
)

View File

@@ -1,303 +0,0 @@
import datetime
import discord
import settings
from settings import GuildSettings, GuildSetting
import settings.setting_types as stypes
from wards import guild_admin
from .data import autoroles, bot_autoroles
@GuildSettings.attach_setting
class greeting_channel(stypes.Channel, GuildSetting):
"""
Setting describing the destination of the greeting message.
Extended to support the following special values, with input and output supported.
Data `None` corresponds to `Off`.
Data `1` corresponds to `DM`.
"""
DMCHANNEL = object()
category = "New Members"
attr_name = 'greeting_channel'
_data_column = 'greeting_channel'
display_name = "welcome_channel"
desc = "Channel to send the welcome message in"
long_desc = (
"Channel to post the `welcome_message` in when a new user joins the server. "
"Accepts `DM` to indicate the welcome should be sent via direct message."
)
_accepts = (
"Text Channel name/id/mention, or `DM`, or `None` to disable."
)
_chan_type = discord.ChannelType.text
@classmethod
def _data_to_value(cls, id, data, **kwargs):
if data is None:
return None
elif data == 1:
return cls.DMCHANNEL
else:
return super()._data_to_value(id, data, **kwargs)
@classmethod
def _data_from_value(cls, id, value, **kwargs):
if value is None:
return None
elif value == cls.DMCHANNEL:
return 1
else:
return super()._data_from_value(id, value, **kwargs)
@classmethod
async def _parse_userstr(cls, ctx, id, userstr, **kwargs):
lower = userstr.lower()
if lower in ('0', 'none', 'off'):
return None
elif lower == 'dm':
return 1
else:
return await super()._parse_userstr(ctx, id, userstr, **kwargs)
@classmethod
def _format_data(cls, id, data, **kwargs):
if data is None:
return "Off"
elif data == 1:
return "DM"
else:
return "<#{}>".format(data)
@property
def success_response(self):
value = self.value
if not value:
return "Welcome messages are disabled."
elif value == self.DMCHANNEL:
return "Welcome messages will be sent via direct message."
else:
return "Welcome messages will be posted in {}".format(self.formatted)
@GuildSettings.attach_setting
class greeting_message(stypes.Message, GuildSetting):
category = "New Members"
attr_name = 'greeting_message'
_data_column = 'greeting_message'
display_name = 'welcome_message'
desc = "Welcome message sent to welcome new members."
long_desc = (
"Message to send to the configured `welcome_channel` when a member joins the server for the first time."
)
_default = r"""
{
"embed": {
"title": "Welcome!",
"thumbnail": {"url": "{guild_icon}"},
"description": "Hi {mention}!\nWelcome to **{guild_name}**! You are the **{member_count}**th member.\nThere are currently **{studying_count}** people studying.\nGood luck and stay productive!",
"color": 15695665
}
}
"""
_substitution_desc = {
'{mention}': "Mention the new member.",
'{user_name}': "Username of the new member.",
'{user_avatar}': "Avatar of the new member.",
'{guild_name}': "Name of this server.",
'{guild_icon}': "Server icon url.",
'{member_count}': "Number of members in the server.",
'{studying_count}': "Number of current voice channel members.",
}
def substitution_keys(self, ctx, **kwargs):
return {
'{mention}': ctx.author.mention,
'{user_name}': ctx.author.name,
'{user_avatar}': str(ctx.author.avatar_url),
'{guild_name}': ctx.guild.name,
'{guild_icon}': str(ctx.guild.icon_url),
'{member_count}': str(len(ctx.guild.members)),
'{studying_count}': str(len([member for ch in ctx.guild.voice_channels for member in ch.members]))
}
@property
def success_response(self):
return "The welcome message has been set!"
@GuildSettings.attach_setting
class returning_message(stypes.Message, GuildSetting):
category = "New Members"
attr_name = 'returning_message'
_data_column = 'returning_message'
display_name = 'returning_message'
desc = "Welcome message sent to returning members."
long_desc = (
"Message to send to the configured `welcome_channel` when a member returns to the server."
)
_default = r"""
{
"embed": {
"title": "Welcome Back {user_name}!",
"thumbnail": {"url": "{guild_icon}"},
"description": "Welcome back to **{guild_name}**!\nYou last studied with us <t:{last_time}:R>.\nThere are currently **{studying_count}** people studying.\nGood luck and stay productive!",
"color": 15695665
}
}
"""
_substitution_desc = {
'{mention}': "Mention the returning member.",
'{user_name}': "Username of the member.",
'{user_avatar}': "Avatar of the member.",
'{guild_name}': "Name of this server.",
'{guild_icon}': "Server icon url.",
'{member_count}': "Number of members in the server.",
'{studying_count}': "Number of current voice channel members.",
'{last_time}': "Unix timestamp of the last time the member studied.",
}
def substitution_keys(self, ctx, **kwargs):
return {
'{mention}': ctx.author.mention,
'{user_name}': ctx.author.name,
'{user_avatar}': str(ctx.author.avatar_url),
'{guild_name}': ctx.guild.name,
'{guild_icon}': str(ctx.guild.icon_url),
'{member_count}': str(len(ctx.guild.members)),
'{studying_count}': str(len([member for ch in ctx.guild.voice_channels for member in ch.members])),
'{last_time}': int(ctx.alion.data._timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()),
}
@property
def success_response(self):
return "The returning message has been set!"
@GuildSettings.attach_setting
class starting_funds(stypes.Integer, GuildSetting):
category = "New Members"
attr_name = 'starting_funds'
_data_column = 'starting_funds'
display_name = 'starting_funds'
desc = "Coins given when a user first joins."
long_desc = (
"Members will be given this number of coins the first time they join the server."
)
_default = 1000
@property
def success_response(self):
return "Members will be given `{}` coins when they first join the server.".format(self.formatted)
@GuildSettings.attach_setting
class autoroles(stypes.RoleList, settings.ListData, settings.Setting):
category = "New Members"
write_ward = guild_admin
attr_name = 'autoroles'
_table_interface = autoroles
_id_column = 'guildid'
_data_column = 'roleid'
display_name = "autoroles"
desc = "Roles to give automatically to new members."
_force_unique = True
long_desc = (
"These roles will be given automatically to users when they join the server. "
"If `role_persistence` is enabled, the roles will only be given the first time a user joins the server."
)
# Flat cache, no need to expire
_cache = {}
@property
def success_response(self):
if self.value:
return "New members will be given the following roles:\n{}".format(self.formatted)
else:
return "New members will not automatically be given any roles."
@GuildSettings.attach_setting
class bot_autoroles(stypes.RoleList, settings.ListData, settings.Setting):
category = "New Members"
write_ward = guild_admin
attr_name = 'bot_autoroles'
_table_interface = bot_autoroles
_id_column = 'guildid'
_data_column = 'roleid'
display_name = "bot_autoroles"
desc = "Roles to give automatically to new bots."
_force_unique = True
long_desc = (
"These roles will be given automatically to bots when they join the server. "
"If `role_persistence` is enabled, the roles will only be given the first time a bot joins the server."
)
# Flat cache, no need to expire
_cache = {}
@property
def success_response(self):
if self.value:
return "New bots will be given the following roles:\n{}".format(self.formatted)
else:
return "New bots will not automatically be given any roles."
@GuildSettings.attach_setting
class role_persistence(stypes.Boolean, GuildSetting):
category = "New Members"
attr_name = "role_persistence"
_data_column = 'persist_roles'
display_name = "role_persistence"
desc = "Whether to remember member roles when they leave the server."
_outputs = {True: "Enabled", False: "Disabled"}
_default = True
long_desc = (
"When enabled, restores member roles when they rejoin the server.\n"
"This enables profile roles and purchased roles, such as field of study and colour roles, "
"as well as moderation roles, "
"such as the studyban and mute roles, to persist even when a member leaves and rejoins.\n"
"Note: Members who leave while this is disabled will not have their roles restored."
)
@property
def success_response(self):
if self.value:
return "Roles will now be restored when a member rejoins."
else:
return "Member roles will no longer be saved or restored."

View File

@@ -1,65 +0,0 @@
from io import StringIO
import discord
from wards import guild_admin
from data import tables
from core import Lion
from .module import module
@module.cmd("studyreset",
desc="Perform a reset of the server's study statistics.",
group="Guild Admin")
@guild_admin()
async def cmd_statreset(ctx):
"""
Usage``:
{prefix}studyreset
Description:
Perform a complete reset of the server's study statistics.
That is, deletes the tracked time of all members and removes their study badges.
This may be used to set "seasons" of study.
Before the reset, I will send a csv file with the current member statistics.
**This is not reversible.**
"""
if not await ctx.ask("Are you sure you want to reset the study time and badges for all members? "
"**THIS IS NOT REVERSIBLE!**"):
return
# Build the data csv
rows = tables.lions.select_where(
select_columns=('userid', 'tracked_time', 'coins', 'workout_count', 'b.roleid AS badge_roleid'),
_extra=(
"LEFT JOIN study_badges b ON last_study_badgeid = b.badgeid "
"WHERE members.guildid={}"
).format(ctx.guild.id)
)
header = "userid, tracked_time, coins, workouts, rank_roleid\n"
csv_rows = [
','.join(str(data) for data in row)
for row in rows
]
with StringIO() as stats_file:
stats_file.write(header)
stats_file.write('\n'.join(csv_rows))
stats_file.seek(0)
out_file = discord.File(stats_file, filename="guild_{}_member_statistics.csv".format(ctx.guild.id))
await ctx.reply(file=out_file)
# Reset the statistics
tables.lions.update_where(
{'tracked_time': 0},
guildid=ctx.guild.id
)
Lion.sync()
await ctx.embed_reply(
"The member study times have been reset!\n"
"(It may take a while for the studybadges to update.)"
)

View File

@@ -1,4 +0,0 @@
from . import data
from . import admin
from . import watchdog

View File

@@ -1,128 +0,0 @@
from collections import defaultdict
from settings import GuildSettings, GuildSetting
from wards import guild_admin
import settings
from .data import video_channels
@GuildSettings.attach_setting
class video_channels(settings.ChannelList, settings.ListData, settings.Setting):
category = "Video Channels"
attr_name = 'video_channels'
_table_interface = video_channels
_id_column = 'guildid'
_data_column = 'channelid'
_setting = settings.VoiceChannel
write_ward = guild_admin
display_name = "video_channels"
desc = "Channels where members are required to enable their video."
_force_unique = True
long_desc = (
"Members must keep their video enabled in these channels.\n"
"If they do not keep their video enabled, they will be asked to enable it in their DMS after `15` seconds, "
"and then kicked from the channel with another warning after the `video_grace_period` duration has passed.\n"
"After the first offence, if the `video_studyban` is enabled and the `studyban_role` is set, "
"they will also be automatically studybanned."
)
# Flat cache, no need to expire objects
_cache = {}
@property
def success_response(self):
if self.value:
return "Members must enable their video in the following channels:\n{}".format(self.formatted)
else:
return "There are no video-required channels set up."
@classmethod
async def launch_task(cls, client):
"""
Launch initialisation step for the `video_channels` setting.
Pre-fill cache for the guilds with currently active voice channels.
"""
active_guildids = [
guild.id
for guild in client.guilds
if any(channel.members for channel in guild.voice_channels)
]
if active_guildids:
cache = {guildid: [] for guildid in active_guildids}
rows = cls._table_interface.select_where(
guildid=active_guildids
)
for row in rows:
cache[row['guildid']].append(row['channelid'])
cls._cache.update(cache)
@GuildSettings.attach_setting
class video_studyban(settings.Boolean, GuildSetting):
category = "Video Channels"
attr_name = 'video_studyban'
_data_column = 'video_studyban'
display_name = "video_studyban"
desc = "Whether to studyban members if they don't enable their video."
long_desc = (
"If enabled, members who do not enable their video in the configured `video_channels` will be "
"study-banned after a single warning.\n"
"When disabled, members will only be warned and removed from the channel."
)
_default = True
_outputs = {True: "Enabled", False: "Disabled"}
@property
def success_response(self):
if self.value:
return "Members will now be study-banned if they don't enable their video in the configured video channels."
else:
return "Members will not be study-banned if they don't enable their video in video channels."
@GuildSettings.attach_setting
class video_grace_period(settings.Duration, GuildSetting):
category = "Video Channels"
attr_name = 'video_grace_period'
_data_column = 'video_grace_period'
display_name = "video_grace_period"
desc = "How long to wait before kicking/studybanning members who don't enable their video."
long_desc = (
"The period after a member has been asked to enable their video in a video-only channel "
"before they will be kicked from the channel, and warned or studybanned (if enabled)."
)
_default = 90
_default_multiplier = 1
@classmethod
def _format_data(cls, id: int, data, **kwargs):
"""
Return the string version of the data.
"""
if data is None:
return None
else:
return "`{} seconds`".format(data)
@property
def success_response(self):
return (
"Members who do not enable their video will "
"be disconnected after {}.".format(self.formatted)
)

View File

@@ -1,4 +0,0 @@
from data import Table, RowTable
video_channels = Table('video_channels')

View File

@@ -1,381 +0,0 @@
"""
Implements a tracker to warn, kick, and studyban members in video channels without video enabled.
"""
import asyncio
import logging
import datetime
import discord
from meta import client
from core import Lion
from utils.lib import strfdelta
from settings import GuildSettings
from ..tickets import StudyBanTicket, WarnTicket
from ..module import module
_tasks = {} # (guildid, userid) -> Task
async def _send_alert(member, embed, alert_channel):
"""
Sends an embed to the member.
If we can't reach the member, send it via alert_channel, if it exists.
Returns the message, if it was sent, otherwise None.
"""
try:
return await member.send(embed=embed)
except discord.Forbidden:
if alert_channel:
try:
return await alert_channel.send(
content=(
"{} (Please enable your DMs with me to get alerts privately!)"
).format(member.mention),
embed=embed
)
except discord.HTTPException:
pass
async def _join_video_channel(member, channel):
# Sanity checks
if not member.voice and member.voice.channel:
# Not in a voice channel
return
if member.voice.self_video:
# Already have video on
return
# First wait for 15 seconds for them to turn their video on
try:
await asyncio.sleep(15)
except asyncio.CancelledError:
# They left the channel or turned their video on
return
# Fetch the relevant settings and build embeds
guild_settings = GuildSettings(member.guild.id)
grace_period = guild_settings.video_grace_period.value
studyban = guild_settings.video_studyban.value
studyban_role = guild_settings.studyban_role.value
alert_channel = guild_settings.alert_channel.value
lion = Lion.fetch(member.guild.id, member.id)
previously_warned = lion.data.video_warned
request_embed = discord.Embed(
title="Please enable your video!",
description=(
"**You have joined the video-only channel {}!**\n"
"Please **enable your video** or **leave the channel** in the next `{}` seconds, "
"otherwise you will be **disconnected** and "
"potentially **banned** from using this server's study facilities."
).format(
channel.mention,
grace_period
),
colour=discord.Colour.orange(),
timestamp=datetime.datetime.utcnow()
).set_footer(
text=member.guild.name,
icon_url=member.guild.icon_url
)
thanks_embed = discord.Embed(
title="Thanks for enabling your video! Best of luck with your study.",
colour=discord.Colour.green(),
timestamp=datetime.datetime.utcnow()
).set_footer(
text=member.guild.name,
icon_url=member.guild.icon_url
)
bye_embed = discord.Embed(
title="Thanks for leaving the channel promptly!",
colour=discord.Colour.green(),
timestamp=datetime.datetime.utcnow()
).set_footer(
text=member.guild.name,
icon_url=member.guild.icon_url
)
# Send the notification message and wait for the grace period
out_msg = None
alert_task = asyncio.create_task(_send_alert(
member,
request_embed,
alert_channel
))
try:
out_msg = await asyncio.shield(alert_task)
await asyncio.sleep(grace_period)
except asyncio.CancelledError:
# They left the channel or turned their video on
# Finish the message task if it wasn't complete
if not alert_task.done():
out_msg = await alert_task
# Update the notification message
# The out_msg may be None here, if we have no way of reaching the member
if out_msg is not None:
try:
if not member.voice or not (member.voice.channel == channel):
await out_msg.edit(embed=bye_embed)
elif member.voice.self_video:
await out_msg.edit(embed=thanks_embed)
except discord.HTTPException:
pass
return
# Disconnect, notify, warn, and potentially study ban
# Don't allow this to be cancelled any more
_tasks.pop((member.guild.id, member.id), None)
# First disconnect
client.log(
("Disconnecting member {} (uid: {}) in guild {} (gid: {}) from video channel {} (cid:{}) "
"for not enabling their video.").format(
member.name,
member.id,
member.guild.name,
member.guild.id,
channel.name,
channel.id
),
context="VIDEO_WATCHDOG"
)
try:
await member.edit(
voice_channel=None,
reason="Member in video-only channel did not enable video."
)
except discord.HTTPException:
# TODO: Add it to the moderation ticket
# Error log?
...
# Then warn or study ban, with appropriate notification
only_warn = not previously_warned or not studyban or not studyban_role
if only_warn:
# Give them an official warning
embed = discord.Embed(
title="You have received a warning!",
description=(
"You must enable your camera in camera-only rooms."
),
colour=discord.Colour.red(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(
name="Info",
value=(
"*Warnings appear in your moderation history. "
"Failure to comply, or repeated warnings, "
"may result in muting, studybanning, or server banning.*"
)
)
embed.set_footer(
icon_url=member.guild.icon_url,
text=member.guild.name
)
await _send_alert(member, embed, alert_channel)
await WarnTicket.create(
member.guild.id,
member.id,
client.user.id,
"Failed to enable their video in time in the video channel {}.".format(channel.mention),
auto=True
)
# TODO: Warning ticket and related embed.
lion.data.video_warned = True
else:
# Apply an automatic studyban
ticket = await StudyBanTicket.autoban(
member.guild,
member,
"Failed to enable their video in time in the video channel {}.".format(channel.mention)
)
if ticket:
tip = "TIP: When joining a video only study room, always be ready to enable your video immediately!"
embed = discord.Embed(
title="You have been studybanned!",
description=(
"You have been banned from studying in **{}**.\n"
"Study features, including access to the server **study channels**, "
"will ***not be available to you until this ban is lifted.***".format(
member.guild.name,
)
),
colour=discord.Colour.red(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(
name="Reason",
value="Failure to enable your video in time in a video-only channel.\n\n*{}*".format(tip)
)
if ticket.data.duration:
embed.add_field(
name="Duration",
value="`{}` (Expires <t:{:.0f}>)".format(
strfdelta(datetime.timedelta(seconds=ticket.data.duration)),
ticket.data.expiry.timestamp()
),
inline=False
)
embed.set_footer(
text=member.guild.name,
icon_url=member.guild.icon_url
)
await _send_alert(member, embed, alert_channel)
else:
# This should be impossible
# TODO: Cautionary error logging
pass
@client.add_after_event("voice_state_update")
async def video_watchdog(client, member, before, after):
if member.bot:
return
task_key = (member.guild.id, member.id)
if after.channel != before.channel:
# Channel change, cancel any running tasks for the member
task = _tasks.pop(task_key, None)
if task and not task.done():
task.cancel()
# Check whether they are joining a video channel, run join logic if so
if after.channel and not after.self_video:
video_channel_ids = GuildSettings(member.guild.id).video_channels.data
if after.channel.id in video_channel_ids:
client.log(
("Launching join task for member {} (uid: {}) "
"in guild {} (gid: {}) and video channel {} (cid:{}).").format(
member.name,
member.id,
member.guild.name,
member.guild.id,
after.channel.name,
after.channel.id
),
context="VIDEO_WATCHDOG",
level=logging.DEBUG
)
_tasks[task_key] = asyncio.create_task(_join_video_channel(member, after.channel))
else:
video_channel_ids = GuildSettings(member.guild.id).video_channels.data
if after.channel and after.channel.id in video_channel_ids:
channel = after.channel
if after.self_video:
# If they have their video on, cancel any running tasks
task = _tasks.pop(task_key, None)
if task and not task.done():
task.cancel()
else:
# They have their video off
# Don't do anything if there are running tasks, the tasks will handle it
task = _tasks.get(task_key, None)
if task and not task.done():
return
# Otherwise, give them 10 seconds
_tasks[task_key] = task = asyncio.create_task(asyncio.sleep(10))
try:
await task
except asyncio.CancelledError:
# Task was cancelled, they left the channel or turned their video on
return
# Then kick them out, alert them, and event log it
client.log(
("Disconnecting member {} (uid: {}) in guild {} (gid: {}) from video channel {} (cid:{}) "
"for disabling their video.").format(
member.name,
member.id,
member.guild.name,
member.guild.id,
channel.name,
channel.id
),
context="VIDEO_WATCHDOG"
)
try:
await member.edit(
voice_channel=None,
reason="Removing non-video member from video-only channel."
)
await _send_alert(
member,
discord.Embed(
title="You have been kicked from the video channel.",
description=(
"You were disconnected from the video-only channel {} for disabling your video.\n"
"Please keep your video on at all times, and leave the channel if you need "
"to make adjustments!"
).format(
channel.mention,
),
colour=discord.Colour.red(),
timestamp=datetime.datetime.utcnow()
).set_footer(
text=member.guild.name,
icon_url=member.guild.icon_url
),
GuildSettings(member.guild.id).alert_channel.value
)
except discord.Forbidden:
GuildSettings(member.guild.id).event_log.log(
"I attempted to disconnect {} from the video-only channel {} "
"because they disabled their video, but I didn't have the required permissions!\n".format(
member.mention,
channel.mention
)
)
else:
GuildSettings(member.guild.id).event_log.log(
"{} was disconnected from the video-only channel {} "
"because they disabled their video.".format(
member.mention,
channel.mention
)
)
@module.launch_task
async def load_video_channels(client):
"""
Process existing video channel members.
Pre-fills the video channel cache by running the setting launch task.
Treats members without video on as having just joined.
"""
# Run the video channel initialisation to populate the setting cache
await GuildSettings.settings.video_channels.launch_task(client)
# Launch join tasks for all members in video channels without video enabled
video_channels = (
channel
for guild in client.guilds
for channel in guild.voice_channels
if channel.members and channel.id in GuildSettings.settings.video_channels.get(guild.id).data
)
to_task = [
(member, channel)
for channel in video_channels
for member in channel.members
if not member.voice.self_video
]
for member, channel in to_task:
_tasks[(member.guild.id, member.id)] = asyncio.create_task(_join_video_channel(member, channel))
if to_task:
client.log(
"Launched {} join tasks for members who need to enable their video.".format(len(to_task)),
context="VIDEO_CHANNEL_LAUNCH"
)