Merge branch 'moderation' into python-rewrite

This commit is contained in:
2021-09-30 21:56:40 +03:00
8 changed files with 809 additions and 0 deletions

View File

@@ -5,3 +5,5 @@ from . import admin
from . import tickets
from . import video
from . import commands

View File

@@ -0,0 +1,442 @@
"""
Shared commands for the moderation module.
"""
import asyncio
from collections import defaultdict
import discord
from cmdClient.lib import ResponseTimedOut
from wards import guild_moderator
from .module import module
from .tickets import Ticket, TicketType, TicketState
type_accepts = {
'note': TicketType.NOTE,
'notes': TicketType.NOTE,
'studyban': TicketType.STUDY_BAN,
'studybans': TicketType.STUDY_BAN,
'warn': TicketType.WARNING,
'warns': TicketType.WARNING,
'warning': TicketType.WARNING,
'warnings': TicketType.WARNING,
}
type_formatted = {
TicketType.NOTE: 'NOTE',
TicketType.STUDY_BAN: 'STUDYBAN',
TicketType.WARNING: 'WARNING',
}
type_summary_formatted = {
TicketType.NOTE: 'note',
TicketType.STUDY_BAN: 'studyban',
TicketType.WARNING: 'warning',
}
state_formatted = {
TicketState.OPEN: 'ACTIVE',
TicketState.EXPIRING: 'TEMP',
TicketState.EXPIRED: 'EXPIRED',
TicketState.PARDONED: 'PARDONED'
}
state_summary_formatted = {
TicketState.OPEN: 'Active',
TicketState.EXPIRING: 'Temporary',
TicketState.EXPIRED: 'Expired',
TicketState.REVERTED: 'Manually Reverted',
TicketState.PARDONED: 'Pardoned'
}
@module.cmd(
"tickets",
group="Moderation",
desc="View and filter the server moderation tickets.",
flags=('active', 'type=')
)
@guild_moderator()
async def cmd_tickets(ctx, flags):
"""
Usage``:
{prefix}tickets [@user] [--type <type>] [--active]
Description:
Display and optionally filter the moderation event history in this guild.
Flags::
type: Filter by ticket type. See **Ticket Types** below.
active: Only show in-effect tickets (i.e. hide expired and pardoned ones).
Ticket Types::
note: Moderation notes.
warn: Moderation warnings, both manual and automatic.
studyban: Bans from using study features from abusing the study system.
blacklist: Complete blacklisting from using my commands.
Ticket States::
Active: Active tickets that will not automatically expire.
Temporary: Active tickets that will automatically expire after a set duration.
Expired: Tickets that have automatically expired.
Reverted: Tickets with actions that have been reverted.
Pardoned: Tickets that have been pardoned and no longer apply to the user.
Examples:
{prefix}tickets {ctx.guild.owner.mention} --type warn --active
"""
# Parse filter fields
# First the user
if ctx.args:
userstr = ctx.args.strip('<@!&> ')
if not userstr.isdigit():
return await ctx.error_reply(
"**Usage:** `{prefix}tickets [@user] [--type <type>] [--active]`.\n"
"Please provide the `user` as a mention or id!".format(prefix=ctx.best_prefix)
)
filter_userid = int(userstr)
else:
filter_userid = None
if flags['type']:
typestr = flags['type'].lower()
if typestr not in type_accepts:
return await ctx.error_reply(
"Please see `{prefix}help tickets` for the valid ticket types!".format(prefix=ctx.best_prefix)
)
filter_type = type_accepts[typestr]
else:
filter_type = None
filter_active = flags['active']
# Build the filter arguments
filters = {}
if filter_userid:
filters['targetid'] = filter_userid
if filter_type:
filters['ticket_type'] = filter_type
if filter_active:
filters['ticket_state'] = [TicketState.OPEN, TicketState.EXPIRING]
# Fetch the tickets with these filters
tickets = Ticket.fetch_tickets(**filters)
if not tickets:
if filters:
return ctx.embed_reply("There are no tickets with these criteria!")
else:
return ctx.embed_reply("There are no moderation tickets in this server!")
tickets = sorted(tickets, key=lambda ticket: ticket.data.guild_ticketid, reverse=True)
ticket_map = {ticket.data.guild_ticketid: ticket for ticket in tickets}
# Build the format string based on the filters
components = []
# Ticket id with link to message in mod log
components.append("[#{ticket.data.guild_ticketid}]({ticket.link})")
# Ticket creation date
components.append("<t:{timestamp:.0f}:d>")
# Ticket type, with current state
if filter_type is None:
if not filter_active:
components.append("`{ticket_type}{ticket_state}`")
else:
components.append("`{ticket_type}`")
elif not filter_active:
components.append("`{ticket_real_state}`")
if not filter_userid:
# Ticket user
components.append("<@{ticket.data.targetid}>")
if filter_userid or (filter_active and filter_type):
# Truncated ticket content
components.append("{content}")
format_str = ' | '.join(components)
# Break tickets into blocks
blocks = [tickets[i:i+10] for i in range(0, len(tickets), 10)]
# Build pages of tickets
ticket_pages = []
for block in blocks:
ticket_page = []
type_len = max(len(type_formatted[ticket.type]) for ticket in block)
state_len = max(len(state_formatted[ticket.state]) for ticket in block)
for ticket in block:
# First truncate content if required
content = ticket.data.content
if len(content) > 40:
content = content[:37] + '...'
# Build ticket line
line = format_str.format(
ticket=ticket,
timestamp=ticket.data.created_at.timestamp(),
ticket_type=type_formatted[ticket.type],
type_len=type_len,
ticket_state=" [{}]".format(state_formatted[ticket.state]) if ticket.state != TicketState.OPEN else '',
ticket_real_state=state_formatted[ticket.state],
state_len=state_len,
content=content
)
if ticket.state == TicketState.PARDONED:
line = "~~{}~~".format(line)
# Add to current page
ticket_page.append(line)
# Combine lines and add page to pages
ticket_pages.append('\n'.join(ticket_page))
# Build active ticket type summary
freq = defaultdict(int)
for ticket in tickets:
if ticket.state != TicketState.PARDONED:
freq[ticket.type] += 1
summary_pairs = [
(num, type_summary_formatted[ttype] + ('s' if num > 1 else ''))
for ttype, num in freq.items()
]
summary_pairs.sort(key=lambda pair: pair[0])
# num_len = max(len(str(num)) for num in freq.values())
# type_summary = '\n'.join(
# "**`{:<{}}`** {}".format(pair[0], num_len, pair[1])
# for pair in summary_pairs
# )
# # Build status summary
# freq = defaultdict(int)
# for ticket in tickets:
# freq[ticket.state] += 1
# num_len = max(len(str(num)) for num in freq.values())
# status_summary = '\n'.join(
# "**`{:<{}}`** {}".format(freq[state], num_len, state_str)
# for state, state_str in state_summary_formatted.items()
# if state in freq
# )
summary_strings = [
"**`{}`** {}".format(*pair) for pair in summary_pairs
]
if len(summary_strings) > 2:
summary = ', '.join(summary_strings[:-1]) + ', and ' + summary_strings[-1]
elif len(summary_strings) == 2:
summary = ' and '.join(summary_strings)
else:
summary = ''.join(summary_strings)
if summary:
summary += '.'
# Build embed info
title = "{}{}{}".format(
"Active " if filter_active else '',
"{} tickets ".format(type_formatted[filter_type]) if filter_type else "Tickets ",
(" for {}".format(ctx.guild.get_member(filter_userid) or filter_userid)
if filter_userid else " in {}".format(ctx.guild.name))
)
footer = "Click a ticket id to jump to it, or type the number to show the full ticket."
page_count = len(blocks)
if page_count > 1:
footer += "\nPage {{page_num}}/{}".format(page_count)
# Create embeds
embeds = [
discord.Embed(
title=title,
description="{}\n{}".format(summary, page),
colour=discord.Colour.orange(),
).set_footer(text=footer.format(page_num=i+1))
for i, page in enumerate(ticket_pages)
]
# Run output with cancellation and listener
out_msg = await ctx.pager(embeds, add_cancel=True)
display_task = asyncio.create_task(_ticket_display(ctx, ticket_map))
ctx.tasks.append(display_task)
await ctx.cancellable(out_msg, add_reaction=False)
async def _ticket_display(ctx, ticket_map):
"""
Display tickets when the ticket number is entered.
"""
current_ticket_msg = None
try:
while True:
# Wait for a number
try:
result = await ctx.client.wait_for(
"message",
check=lambda msg: (msg.author == ctx.author
and msg.channel == ctx.ch
and msg.content.isdigit()
and int(msg.content) in ticket_map)
)
except asyncio.TimeoutError:
return
# Delete the response
try:
await result.delete()
except discord.HTTPException:
pass
# Display the ticket
embed = ticket_map[int(result.content)].msg_args['embed']
if current_ticket_msg:
try:
await current_ticket_msg.edit(embed=embed)
except discord.HTTPException:
current_ticket_msg = None
if not current_ticket_msg:
try:
current_ticket_msg = await ctx.reply(embed=embed)
except discord.HTTPException:
return
asyncio.create_task(ctx.offer_delete(current_ticket_msg))
except asyncio.CancelledError:
if current_ticket_msg:
try:
await current_ticket_msg.delete()
except discord.HTTPException:
pass
@module.cmd(
"pardon",
group="Moderation",
desc="Pardon a ticket, or clear a member's moderation history.",
flags=('type=',)
)
@guild_moderator()
async def cmd_pardon(ctx, flags):
"""
Usage``:
{prefix}pardon ticketid, ticketid, ticketid
{prefix}pardon @user [--type <type>]
Description:
Marks the given tickets as no longer applicable.
These tickets will not be considered when calculating automod actions such as automatic study bans.
This may be used to mark warns or other tickets as no longer in-effect.
If the ticket is active when it is pardoned, it will be reverted, and any expiry cancelled.
Use the `{prefix}tickets` command to view the relevant tickets.
Flags::
type: Filter by ticket type. See **Ticket Types** in `{prefix}help tickets`.
Examples:
{prefix}pardon 21
{prefix}pardon {ctx.guild.owner.mention} --type warn
"""
usage = "**Usage**: `{prefix}pardon ticketid` or `{prefix}pardon @user`.".format(prefix=ctx.best_prefix)
if not ctx.args:
return await ctx.error_reply(
usage
)
# Parse provided tickets or filters
targetid = None
ticketids = []
if ',' in ctx.args:
# Assume provided numbers are ticketids.
items = [item.strip() for item in ctx.args.split(',')]
if not all(item.isdigit() for item in items):
return await ctx.error_reply(usage)
ticketids = [int(item) for item in items]
args = {'guild_ticketid': ticketids}
else:
# Guess whether the provided numbers were ticketids or not
idstr = ctx.args.strip('<@!&> ')
if not idstr.isdigit():
return await ctx.error_reply(usage)
maybe_id = int(idstr)
if maybe_id > 4194304: # Testing whether it is greater than the minimum snowflake id
# Assume userid
targetid = maybe_id
args = {'targetid': maybe_id}
# Add the type filter if provided
if flags['type']:
typestr = flags['type'].lower()
if typestr not in type_accepts:
return await ctx.error_reply(
"Please see `{prefix}help tickets` for the valid ticket types!".format(prefix=ctx.best_prefix)
)
args['ticket_type'] = type_accepts[typestr]
else:
# Assume guild ticketid
ticketids = [maybe_id]
args = {'guild_ticketid': maybe_id}
# Fetch the matching tickets
tickets = Ticket.fetch_tickets(**args)
# Check whether we have the right selection of tickets
if targetid and not tickets:
return await ctx.error_reply(
"<@{}> has no matching tickets to pardon!"
)
if ticketids and len(ticketids) != len(tickets):
# Not all of the ticketids were valid
difference = list(set(ticketids).difference(ticket.ticketid for ticket in tickets))
if len(difference) == 1:
return await ctx.error_reply(
"Couldn't find ticket `{}`!".format(difference[0])
)
else:
return await ctx.error_reply(
"Couldn't find any of the following tickets:\n`{}`".format(
'`, `'.join(difference)
)
)
# Check whether there are any tickets left to pardon
to_pardon = [ticket for ticket in tickets if ticket.state != TicketState.PARDONED]
if not to_pardon:
if ticketids and len(tickets) == 1:
ticket = tickets[0]
return await ctx.error_reply(
"[Ticket #{}]({}) is already pardoned!".format(ticket.data.guild_ticketid, ticket.link)
)
else:
return await ctx.error_reply(
"All of these tickets are already pardoned!"
)
# We now know what tickets we want to pardon
# Request the pardon reason
try:
reason = await ctx.input("Please provide a reason for the pardon.")
except ResponseTimedOut:
raise ResponseTimedOut("Prompt timed out, no tickets were pardoned.")
# Pardon the tickets
for ticket in to_pardon:
await ticket.pardon(ctx.author, reason)
# Finally, ack the pardon
if targetid:
await ctx.embed_reply(
"The active {}s for <@{}> have been cleared.".format(
type_summary_formatted[args['ticket_type']] if flags['type'] else 'ticket',
targetid
)
)
elif len(to_pardon) == 1:
ticket = to_pardon[0]
await ctx.embed_reply(
"[Ticket #{}]({}) was pardoned.".format(
ticket.data.guild_ticketid,
ticket.link
)
)
else:
await ctx.embed_reply(
"The following tickets were pardoned.\n{}".format(
", ".join(
"[#{}]({})".format(ticket.data.guild_ticketid, ticket.link)
for ticket in to_pardon
)
)
)

View File

@@ -32,6 +32,7 @@ class TicketState(FieldEnum):
EXPIRING = 'EXPIRING', "Active"
EXPIRED = 'EXPIRED', "Expired"
PARDONED = 'PARDONED', "Pardoned"
REVERTED = 'REVERTED', "Reverted"
class Ticket:
@@ -200,6 +201,10 @@ class Ticket:
def state(self):
return TicketState(self.data.ticket_state)
@property
def type(self):
return TicketType(self.data.ticket_type)
async def update(self, **kwargs):
"""
Update ticket fields.

View File

@@ -1,2 +1,4 @@
from .Ticket import Ticket, TicketType, TicketState
from .studybans import StudyBanTicket
from .notes import NoteTicket
from .warns import WarnTicket

View File

@@ -0,0 +1,112 @@
"""
Note ticket implementation.
Guild moderators can add a note about a user, visible in their moderation history.
Notes appear in the moderation log and the user's ticket history, like any other ticket.
This module implements the Note TicketType and the `note` moderation command.
"""
from cmdClient.lib import ResponseTimedOut
from wards import guild_moderator
from ..module import module
from ..data import tickets
from .Ticket import Ticket, TicketType, TicketState
@Ticket.register_ticket_type
class NoteTicket(Ticket):
_ticket_type = TicketType.NOTE
@classmethod
async def create(cls, guildid, targetid, moderatorid, content, **kwargs):
"""
Create a new Note on a target.
`kwargs` are passed transparently to the table insert method.
"""
ticket_row = tickets.insert(
guildid=guildid,
targetid=targetid,
ticket_type=cls._ticket_type,
ticket_state=TicketState.OPEN,
moderator_id=moderatorid,
auto=False,
content=content,
**kwargs
)
# Create the note ticket
ticket = cls(ticket_row['ticketid'])
# Post the ticket and return
await ticket.post()
return ticket
@module.cmd(
"note",
group="Moderation",
desc="Add a Note to a member's record."
)
@guild_moderator()
async def cmd_note(ctx):
"""
Usage``:
{prefix}note @target
{prefix}note @target <content>
Description:
Add a note to the target's moderation record.
The note will appear in the moderation log and in the `tickets` command.
The `target` must be specificed by mention or user id.
If the `content` is not given, it will be prompted for.
Example:
{prefix}note {ctx.author.mention} Seen reading the `note` documentation.
"""
if not ctx.args:
return await ctx.error_reply(
"**Usage:** `{}note @target <content>`.".format(ctx.best_prefix)
)
# Extract the target. We don't require them to be in the server
splits = ctx.args.split(maxsplit=1)
target_str = splits[0].strip('<@!&> ')
if not target_str.isdigit():
return await ctx.error_reply(
"**Usage:** `{}note @target <content>`.\n"
"`target` must be provided by mention or userid.".format(ctx.best_prefix)
)
targetid = int(target_str)
# Extract or prompt for the content
if len(splits) != 2:
try:
content = await ctx.input("What note would you like to add?", timeout=300)
except ResponseTimedOut:
raise ResponseTimedOut("Prompt timed out, no note was created.")
else:
content = splits[1].strip()
# Create the note ticket
ticket = await NoteTicket.create(
ctx.guild.id,
targetid,
ctx.author.id,
content
)
if ticket.data.log_msg_id:
await ctx.embed_reply(
"Note on <@{}> created as [Ticket #{}]({}).".format(
targetid,
ticket.data.guild_ticketid,
ticket.link
)
)
else:
await ctx.embed_reply(
"Note on <@{}> created as Ticket #{}.".format(targetid, ticket.data.guild_ticketid)
)

View File

@@ -0,0 +1,154 @@
"""
Warn ticket implementation.
Guild moderators can officially warn a user via command.
This DMs the users with the warning.
"""
import datetime
import discord
from cmdClient.lib import ResponseTimedOut
from wards import guild_moderator
from ..module import module
from ..data import tickets
from .Ticket import Ticket, TicketType, TicketState
@Ticket.register_ticket_type
class WarnTicket(Ticket):
_ticket_type = TicketType.WARNING
@classmethod
async def create(cls, guildid, targetid, moderatorid, content, **kwargs):
"""
Create a new Warning for the target.
`kwargs` are passed transparently to the table insert method.
"""
ticket_row = tickets.insert(
guildid=guildid,
targetid=targetid,
ticket_type=cls._ticket_type,
ticket_state=TicketState.OPEN,
moderator_id=moderatorid,
auto=False,
content=content,
**kwargs
)
# Create the note ticket
ticket = cls(ticket_row['ticketid'])
# Post the ticket and return
await ticket.post()
return ticket
async def _revert(*args, **kwargs):
# Warnings don't have a revert process
pass
@module.cmd(
"warn",
group="Moderation",
desc="Officially warn a user for a misbehaviour."
)
@guild_moderator()
async def cmd_warn(ctx):
"""
Usage``:
{prefix}warn @target
{prefix}warn @target <reason>
Description:
The `target` must be specificed by mention or user id.
If the `reason` is not given, it will be prompted for.
Example:
{prefix}warn {ctx.author.mention} Don't actually read the documentation!
"""
if not ctx.args:
return await ctx.error_reply(
"**Usage:** `{}warn @target <reason>`.".format(ctx.best_prefix)
)
# Extract the target. We do require them to be in the server
splits = ctx.args.split(maxsplit=1)
target_str = splits[0].strip('<@!&> ')
if not target_str.isdigit():
return await ctx.error_reply(
"**Usage:** `{}warn @target <reason>`.\n"
"`target` must be provided by mention or userid.".format(ctx.best_prefix)
)
targetid = int(target_str)
target = ctx.guild.get_member(targetid)
if not target:
return await ctx.error_reply("Cannot warn a user who is not in the server!")
# Extract or prompt for the content
if len(splits) != 2:
try:
content = await ctx.input("Please give a reason for this warning!", timeout=300)
except ResponseTimedOut:
raise ResponseTimedOut("Prompt timed out, the member was not warned.")
else:
content = splits[1].strip()
# Create the warn ticket
ticket = await WarnTicket.create(
ctx.guild.id,
targetid,
ctx.author.id,
content
)
# Attempt to message the member
embed = discord.Embed(
title="You have received a warning!",
description=(
content
),
colour=discord.Colour.red(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(
name="Info",
value=(
"*Warnings appear in your moderation history. "
"Failure to comply, or repeated warnings, "
"may result in muting, studybanning, or server banning.*"
)
)
embed.set_footer(
icon_url=ctx.guild.icon_url,
text=ctx.guild.name
)
dm_msg = None
try:
dm_msg = await target.send(embed=embed)
except discord.HTTPException:
pass
# Get previous warnings
count = tickets.select_one_where(
guildid=ctx.guild.id,
targetid=targetid,
ticket_type=TicketType.WARNING,
ticket_state=[TicketState.OPEN, TicketState.EXPIRING],
select_columns=('COUNT(*)',)
)[0]
if count == 1:
prev_str = "This is their first warning."
else:
prev_str = "They now have `{}` warnings.".format(count)
await ctx.embed_reply(
"[Ticket #{}]({}): {} has been warned. {}\n{}".format(
ticket.data.guild_ticketid,
ticket.link,
target.mention,
prev_str,
"*Could not DM the user their warning!*" if not dm_msg else ''
)
)