Added guild and user settings module.

This commit is contained in:
2021-09-12 11:27:37 +03:00
parent 0183b63c55
commit 572a3a8688
7 changed files with 1757 additions and 0 deletions

5
bot/settings/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .base import * # noqa
from .setting_types import * # noqa
from .user_settings import UserSettings, UserSetting # noqa
from .guild_settings import GuildSettings, GuildSetting # noqa

431
bot/settings/base.py Normal file
View File

@@ -0,0 +1,431 @@
import discord
from cmdClient.cmdClient import cmdClient, Context
from cmdClient.lib import SafeCancellation
from cmdClient.Check import Check
from utils.lib import prop_tabulate, DotDict
from meta import client
from data import Table, RowTable
class Setting:
"""
Abstract base class describing a stored configuration setting.
A setting consists of logic to load the setting from storage,
present it in a readable form, understand user entered values,
and write it again in storage.
Additionally, the setting has attributes attached describing
the setting in a user-friendly manner for display purposes.
"""
attr_name: str = None # Internal attribute name for the setting
_default: ... = None # Default data value for the setting.. this may be None if the setting overrides 'default'.
write_ward: Check = None # Check that must be passed to write the setting. Not implemented internally.
# Configuration interface descriptions
display_name: str = None # User readable name of the setting
desc: str = None # User readable brief description of the setting
long_desc: str = None # User readable long description of the setting
accepts: str = None # User readable description of the acceptable values
def __init__(self, id, data: ..., **kwargs):
self.client: cmdClient = client
self.id = id
self._data = data
# Configuration embeds
@property
def embed(self):
"""
Discord Embed showing an information summary about the setting.
"""
embed = discord.Embed(
title="Configuration options for `{}`".format(self.display_name),
)
fields = ("Current value", "Default value", "Accepted input")
values = (self.formatted or "Not Set",
self._format_data(self.id, self.default) or "None",
self.accepts)
table = prop_tabulate(fields, values)
embed.description = "{}\n{}".format(self.long_desc.format(self=self, client=self.client), table)
return embed
@property
def summary(self):
"""
Formatted summary of the data.
May be implemented in `_format_data(..., summary=True, ...)` or overidden.
"""
return self._format_data(self.id, self.data, summary=True)
@property
def success_response(self):
"""
Response message sent when the setting has successfully been updated.
"""
return "Setting Updated!"
# Instance generation
@classmethod
def get(cls, id: int, **kwargs):
"""
Return a setting instance initialised from the stored value.
"""
data = cls._reader(id, **kwargs)
return cls(id, data, **kwargs)
@classmethod
async def parse(cls, id: int, ctx: Context, userstr: str, **kwargs):
"""
Return a setting instance initialised from a parsed user string.
"""
data = await cls._parse_userstr(ctx, id, userstr, **kwargs)
return cls(id, data, **kwargs)
# Main interface
@property
def data(self):
"""
Retrieves the current internal setting data if it is set, otherwise the default data
"""
return self._data if self._data is not None else self.default
@data.setter
def data(self, new_data):
"""
Sets the internal setting data and writes the changes.
"""
self._data = new_data
self.write()
@property
def default(self):
"""
Retrieves the default value for this setting.
Settings should override this if the default depends on the object id.
"""
return self._default
@property
def value(self):
"""
Discord-aware object or objects associated with the setting.
"""
return self._data_to_value(self.id, self.data)
@value.setter
def value(self, new_value):
"""
Setter which reads the discord-aware object, converts it to data, and writes it.
"""
self._data = self._data_from_value(self.id, new_value)
self.write()
@property
def formatted(self):
"""
User-readable form of the setting.
"""
return self._format_data(self.id, self.data)
def write(self, **kwargs):
"""
Write value to the database.
For settings which override this,
ensure you handle deletion of values when internal data is None.
"""
self._writer(self.id, self._data, **kwargs)
# Raw converters
@classmethod
def _data_from_value(cls, id: int, value, **kwargs):
"""
Convert a high-level setting value to internal data.
Must be overriden by the setting.
Be aware of None values, these should always pass through as None
to provide an unsetting interface.
"""
raise NotImplementedError
@classmethod
def _data_to_value(cls, id: int, data: ..., **kwargs):
"""
Convert internal data to high-level setting value.
Must be overriden by the setting.
"""
raise NotImplementedError
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Parse user provided input into internal data.
Must be overriden by the setting if the setting is user-configurable.
"""
raise NotImplementedError
@classmethod
def _format_data(cls, id: int, data: ..., **kwargs):
"""
Convert internal data into a formatted user-readable string.
Must be overriden by the setting if the setting is user-viewable.
"""
raise NotImplementedError
# Database access classmethods
@classmethod
def _reader(cls, id: int, **kwargs):
"""
Read a setting from storage and return setting data or None.
Must be overriden by the setting.
"""
raise NotImplementedError
@classmethod
def _writer(cls, id: int, data: ..., **kwargs):
"""
Write provided setting data to storage.
Must be overriden by the setting unless the `write` method is overidden.
If the data is None, the setting is empty and should be unset.
"""
raise NotImplementedError
@classmethod
async def command(cls, ctx, id):
"""
Standardised command viewing/setting interface for the setting.
"""
if not ctx.args:
# View config embed for provided cls
await ctx.reply(embed=cls.get(id).embed)
else:
# Check the write ward
if cls.write_ward and not await cls.write_ward.run(ctx):
await ctx.error_reply(cls.write_ward.msg)
else:
# Attempt to set config cls
try:
cls = await cls.parse(id, ctx, ctx.args)
except UserInputError as e:
await ctx.reply(embed=discord.Embed(
description="{} {}".format('', e.msg),
Colour=discord.Colour.red()
))
else:
cls.write()
await ctx.reply(embed=discord.Embed(
description="{} {}".format('', cls.success_response),
Colour=discord.Colour.green()
))
class ObjectSettings:
"""
Abstract class representing a linked collection of settings for a single object.
Initialised settings are provided as instance attributes in the form of properties.
"""
__slots__ = ('id', 'params')
settings: DotDict = None
def __init__(self, id, **kwargs):
self.id = id
self.params = tuple(kwargs.items())
@classmethod
def _setting_property(cls, setting):
def wrapped_setting(self):
return setting.get(self.id, **dict(self.params))
return wrapped_setting
@classmethod
def attach_setting(cls, setting: Setting):
name = setting.attr_name or setting.__name__
setattr(cls, name, property(cls._setting_property(setting)))
cls.settings[name] = setting
return setting
class ColumnData:
"""
Mixin for settings stored in a single row and column of a Table.
Intended to be used with tables where the only primary key is the object id.
"""
# Table storing the desired data
_table_interface: Table = None
# Name of the column storing the setting object id
_id_column: str = None
# Name of the column with the desired data
_data_column: str = None
# Whether to use create a row if not found (only applies to TableRow)
_create_row = False
# Whether to upsert or update for updates
_upsert: bool = True
# High level data cache to use, set to None to disable cache.
_cache = None # Map[id -> value]
@classmethod
def _reader(cls, id: int, use_cache=True, **kwargs):
"""
Read in the requested entry associated to the id.
Supports reading cached values from a `RowTable`.
"""
if cls._cache is not None and id in cls._cache and use_cache:
return cls._cache[id]
table = cls._table_interface
if isinstance(table, RowTable) and cls._id_column == table.id_col:
if cls._create_row:
row = table.fetch_or_create(id)
else:
row = table.fetch(id)
data = row.data[cls._data_column] if row else None
else:
params = {
"select_columns": (cls._data_column,),
cls._id_column: id
}
row = table.select_one_where(**params)
data = row[cls._data_column] if row else None
if cls._cache is not None:
cls._cache[id] = data
return data
@classmethod
def _writer(cls, id: int, data: ..., **kwargs):
"""
Write the provided entry to the table, allowing replacements.
"""
table = cls._table_interface
params = {
cls._id_column: id
}
values = {
cls._data_column: data
}
# Update data
if cls._upsert:
# Upsert data
table.upsert(
constraint=cls._id_column,
**params,
**values
)
else:
# Update data
table.update_where(values, **params)
if cls._cache is not None:
cls._cache[id] = data
class ListData:
"""
Mixin for list types implemented on a Table.
Implements a reader and writer.
This assumes the list is the only data stored in the table,
and removes list entries by deleting rows.
"""
# Table storing the setting data
_table_interface: Table = None
# Name of the column storing the id
_id_column: str = None
# Name of the column storing the data to read
_data_column: str = None
# Name of column storing the order index to use, if any. Assumed to be Serial on writing.
_order_column: str = None
_order_type: str = "ASC"
# High level data cache to use, set to None to disable cache.
_cache = None # Map[id -> value]
@classmethod
def _reader(cls, id: int, use_cache=True, **kwargs):
"""
Read in all entries associated to the given id.
"""
if cls._cache is not None and id in cls._cache and use_cache:
return cls._cache[id]
table = cls._table_interface # type: Table
params = {
"select_columns": [cls._data_column],
cls._id_column: id
}
if cls._order_column:
params['_extra'] = "ORDER BY {} {}".format(cls._order_column, cls._order_type)
rows = table.select_where(**params)
data = [row[cls._data_column] for row in rows]
if cls._cache is not None:
cls._cache[id] = data
return data
@classmethod
def _writer(cls, id: int, data: ..., add_only=False, remove_only=False, **kwargs):
"""
Write the provided list to storage.
"""
# TODO: Transaction lock on the table so this is atomic
# Or just use the connection context manager
table = cls._table_interface # type: Table
# Handle None input as an empty list
if data is None:
data = []
current = cls._reader(id, **kwargs)
if not cls._order_column and (add_only or remove_only):
to_insert = [item for item in data if item not in current] if not remove_only else []
to_remove = data if remove_only else (
[item for item in current if item not in data] if not add_only else []
)
# Handle required deletions
if to_remove:
params = {
cls._id_column: id,
cls._data_column: to_remove
}
table.delete_where(**params)
# Handle required insertions
if to_insert:
columns = (cls._id_column, cls._data_column)
values = [(id, value) for value in to_insert]
table.insert_many(*values, insert_keys=columns)
if cls._cache is not None:
new_current = [item for item in current + to_insert if item not in to_remove]
cls._cache[id] = new_current
else:
# Remove all and add all to preserve order
# TODO: This really really should be atomic if anything else reads this
delete_params = {cls._id_column: id}
table.delete_where(**delete_params)
if data:
columns = (cls._id_column, cls._data_column)
values = [(id, value) for value in data]
table.insert_many(*values, insert_keys=columns)
if cls._cache is not None:
cls._cache[id] = data
class UserInputError(SafeCancellation):
pass

View File

@@ -0,0 +1,170 @@
import datetime
import asyncio
import discord
import settings
from utils.lib import DotDict
from utils import seekers # noqa
from wards import guild_admin
from data import tables as tb
class GuildSettings(settings.ObjectSettings):
settings = DotDict()
class GuildSetting(settings.ColumnData, settings.Setting):
_table_interface = tb.guild_config
_id_column = 'guildid'
_create_row = True
category = None
write_ward = guild_admin
@GuildSettings.attach_setting
class event_log(settings.Channel, GuildSetting):
category = "Meta"
attr_name = 'event_log'
_data_column = 'event_log_channel'
display_name = "event_log"
desc = "Bot event logging channel."
long_desc = (
"Channel to post 'events', such as workouts completing or members renting a room."
)
_chan_type = discord.ChannelType.text
@property
def success_response(self):
if self.value:
return "The event log is now {}.".format(self.formatted)
else:
return "The event log has been unset."
def log(self, description="", colour=discord.Color.orange(), **kwargs):
channel = self.value
if channel:
embed = discord.Embed(
description=description,
colour=colour,
timestamp=datetime.datetime.utcnow(),
**kwargs
)
asyncio.create_task(channel.send(embed=embed))
@GuildSettings.attach_setting
class admin_role(settings.Role, GuildSetting):
category = "Guild Roles"
attr_name = 'admin_role'
_data_column = 'admin_role'
display_name = "admin_role"
desc = "Server administrator role."
long_desc = (
"Server administrator role.\n"
"Allows usage of the administrative commands, such as `config`.\n"
"These commands may also be used by anyone with the discord adminitrator permission."
)
# TODO Expand on what these are.
@property
def success_response(self):
if self.value:
return "The administrator role is now {}.".format(self.formatted)
else:
return "The administrator role has been unset."
@GuildSettings.attach_setting
class mod_role(settings.Role, GuildSetting):
category = "Guild Roles"
attr_name = 'mod_role'
_data_column = 'mod_role'
display_name = "mod_role"
desc = "Server moderator role."
long_desc = (
"Server moderator role.\n"
"Allows usage of the modistrative commands."
)
# TODO Expand on what these are.
@property
def success_response(self):
if self.value:
return "The moderator role is now {}.".format(self.formatted)
else:
return "The moderator role has been unset."
@GuildSettings.attach_setting
class unranked_roles(settings.RoleList, settings.ListData, settings.Setting):
category = "Guild Roles"
attr_name = 'unranked_roles'
_table_interface = tb.unranked_roles
_id_column = 'guildid'
_data_column = 'roleid'
write_ward = guild_admin
display_name = "unranked_roles"
desc = "Roles to exclude from the leaderboards."
_force_unique = True
long_desc = (
"Roles to be excluded from the `top` and `topcoins` leaderboards."
)
# Flat cache, no need to expire objects
_cache = {}
@property
def success_response(self):
if self.value:
return "The following roles will be excluded from the leaderboard:\n{}".format(self.formatted)
else:
return "The excluded roles have been removed."
@GuildSettings.attach_setting
class donator_roles(settings.RoleList, settings.ListData, settings.Setting):
category = "Guild Roles"
attr_name = 'donator_roles'
_table_interface = tb.donator_roles
_id_column = 'guildid'
_data_column = 'roleid'
write_ward = guild_admin
display_name = "donator_roles"
desc = "Donator badge roles."
_force_unique = True
long_desc = (
"Members with these roles will be considered donators and have access to premium features."
)
# Flat cache, no need to expire objects
_cache = {}
@property
def success_response(self):
if self.value:
return "The donator badges are now:\n{}".format(self.formatted)
else:
return "The donator badges have been removed."

View File

@@ -0,0 +1,714 @@
import itertools
from enum import IntEnum
from typing import Any, Optional
import pytz
import discord
from cmdClient.Context import Context
from cmdClient.lib import SafeCancellation
from meta import client
from .base import UserInputError
class SettingType:
"""
Abstract class representing a setting type.
Intended to be used as a mixin for a Setting,
with the provided methods implementing converter methods for the setting.
"""
accepts: str = None # User readable description of the acceptable values
# Raw converters
@classmethod
def _data_from_value(cls, id: int, value, **kwargs):
"""
Convert a high-level setting value to internal data.
"""
raise NotImplementedError
@classmethod
def _data_to_value(cls, id: int, data: Any, **kwargs):
"""
Convert internal data to high-level setting value.
"""
raise NotImplementedError
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Parse user provided input into internal data.
"""
raise NotImplementedError
@classmethod
def _format_data(cls, id: int, data: Any, **kwargs):
"""
Convert internal data into a formatted user-readable string.
"""
raise NotImplementedError
class Boolean(SettingType):
"""
Boolean type, supporting truthy and falsey user input.
Configurable to change truthy and falsey values, and the output map.
Types:
data: Optional[bool]
The stored boolean value.
value: Optional[bool]
The stored boolean value.
"""
accepts = "Yes/No, On/Off, True/False, Enabled/Disabled"
# Values that are accepted as truthy and falsey by the parser
_truthy = {"yes", "true", "on", "enable", "enabled"}
_falsey = {"no", "false", "off", "disable", "disabled"}
# The user-friendly output strings to use for each value
_outputs = {True: "On", False: "Off", None: "Not Set"}
@classmethod
def _data_from_value(cls, id: int, value: Optional[bool], **kwargs):
"""
Both data and value are of type Optional[bool].
Directly return the provided value as data.
"""
return value
@classmethod
def _data_to_value(cls, id: int, data: Optional[bool], **kwargs):
"""
Both data and value are of type Optional[bool].
Directly return the internal data as the value.
"""
return data
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Looks up the provided string in the truthy and falsey tables.
"""
_userstr = userstr.lower()
if _userstr == "none":
return None
if _userstr in cls._truthy:
return True
elif _userstr in cls._falsey:
return False
else:
raise UserInputError("Unknown boolean type `{}`".format(userstr))
@classmethod
def _format_data(cls, id: int, data: bool, **kwargs):
"""
Pass the provided value through the outputs map.
"""
return cls._outputs[data]
class Integer(SettingType):
"""
Integer type. Storing any integer.
Types:
data: Optional[int]
The stored integer value.
value: Optional[int]
The stored integer value.
"""
accepts = "An integer."
# Set limits on the possible integers
_min = -4096
_max = 4096
@classmethod
def _data_from_value(cls, id: int, value: Optional[bool], **kwargs):
"""
Both data and value are of type Optional[int].
Directly return the provided value as data.
"""
return value
@classmethod
def _data_to_value(cls, id: int, data: Optional[bool], **kwargs):
"""
Both data and value are of type Optional[int].
Directly return the internal data as the value.
"""
return data
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Relies on integer casting to convert the user string
"""
if userstr.lower() == "none":
return None
try:
num = int(userstr)
except Exception:
raise UserInputError("Couldn't parse provided integer.") from None
if num > cls._max:
raise UserInputError("Provided integer was too large!")
elif num < cls._min:
raise UserInputError("Provided integer was too small!")
return num
@classmethod
def _format_data(cls, id: int, data: Optional[int], **kwargs):
"""
Return the string version of the data.
"""
if data is None:
return None
else:
return str(data)
class String(SettingType):
"""
String type, storing arbitrary text.
Configurable to limit text length and restrict input options.
Types:
data: Optional[str]
The stored string.
value: Optional[str]
The stored string.
"""
accepts = "Any text"
# Maximum length of string to accept
_maxlen: int = None
# Set of input options to accept
_options: set = None
# Whether to quote the string as code
_quote: bool = True
@classmethod
def _data_from_value(cls, id: int, value: Optional[str], **kwargs):
"""
Return the provided value string as the data string.
"""
return value
@classmethod
def _data_to_value(cls, id: int, data: Optional[str], **kwargs):
"""
Return the provided data string as the value string.
"""
return data
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Check that the user-entered string is of the correct length.
Accept "None" to unset.
"""
if userstr.lower() == "none":
# Unsetting case
return None
elif cls._maxlen is not None and len(userstr) > cls._maxlen:
raise UserInputError("Provided string was too long! Maximum length is `{}`".format(cls._maxlen))
elif cls._options is not None and not userstr.lower() in cls._options:
raise UserInputError("Invalid option! Valid options are `{}`".format("`, `".join(cls._options)))
else:
return userstr
@classmethod
def _format_data(cls, id: int, data: str, **kwargs):
"""
Wrap the string in backtics for formatting.
Handle the special case where the string is empty.
"""
if data:
return "`{}`".format(data) if cls._quote else str(data)
else:
return None
class Channel(SettingType):
"""
Channel type, storing a single `discord.Channel`.
Types:
data: Optional[int]
The id of the stored Channel.
value: Optional[discord.abc.GuildChannel]
The stored Channel.
"""
accepts = "Channel mention/id/name, or 'None' to unset"
# Type of channel, if any
_chan_type: discord.ChannelType = None
@classmethod
def _data_from_value(cls, id: int, value: Optional[discord.abc.GuildChannel], **kwargs):
"""
Returns the channel id.
"""
return value.id if value is not None else None
@classmethod
def _data_to_value(cls, id: int, data: Optional[int], **kwargs):
"""
Uses the client to look up the channel id.
Returns the Channel if found, otherwise None.
"""
# Always passthrough None
if data is None:
return None
return client.get_channel(data)
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Pass to the channel seeker utility to find the requested channel.
Handle `0` and variants of `None` to unset.
"""
if userstr.lower() in ('0', 'none'):
return None
else:
channel = await ctx.find_channel(userstr, interactive=True, chan_type=cls._chan_type)
if channel is None:
raise SafeCancellation
else:
return channel.id
@classmethod
def _format_data(cls, id: int, data: Optional[int], **kwargs):
"""
Retrieve an artificially created channel mention.
If the channel does not exist, this will show up as invalid-channel.
"""
if data is None:
return None
else:
return "<#{}>".format(data)
class VoiceChannel(Channel):
_chan_type = discord.ChannelType.voice
class TextChannel(Channel):
_chan_type = discord.ChannelType.text
class Role(SettingType):
"""
Role type, storing a single `discord.Role`.
Configurably allows returning roles which don't exist or are not seen by the client
as `discord.Object`.
Settings may override `get_guildid` if the setting object `id` is not the guildid.
Types:
data: Optional[int]
The id of the stored Role.
value: Optional[Union[discord.Role, discord.Object]]
The stored Role, or, if the role wasn't found and `_strict` is not set,
a discord Object with the role id set.
"""
accepts = "Role mention/id/name, or 'None' to unset"
# Whether to disallow returning roles which don't exist as `discord.Object`s
_strict = True
@classmethod
def _data_from_value(cls, id: int, value: Optional[discord.Role], **kwargs):
"""
Returns the role id.
"""
return value.id if value is not None else None
@classmethod
def _data_to_value(cls, id: int, data: Optional[int], **kwargs):
"""
Uses the client to look up the guild and role id.
Returns the role if found, otherwise returns a `discord.Object` with the id set,
depending on the `_strict` setting.
"""
# Always passthrough None
if data is None:
return None
# Fetch guildid
guildid = cls._get_guildid(id, **kwargs)
# Search for the role
role = None
guild = client.get_guild(guildid)
if guild is not None:
role = guild.get_role(data)
if role is not None:
return role
elif not cls._strict:
return discord.Object(id=data)
else:
return None
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Pass to the role seeker utility to find the requested role.
Handle `0` and variants of `None` to unset.
"""
if userstr.lower() in ('0', 'none'):
return None
else:
role = await ctx.find_role(userstr, create=False, interactive=True)
if role is None:
raise SafeCancellation
else:
return role.id
@classmethod
def _format_data(cls, id: int, data: Optional[int], **kwargs):
"""
Retrieve the role mention if found, otherwise the role id or None depending on `_strict`.
"""
role = cls._data_to_value(id, data, **kwargs)
if role is None:
return "Not Set"
elif isinstance(role, discord.Role):
return role.mention
else:
return "`{}`".format(role.id)
@classmethod
def _get_guildid(cls, id: int, **kwargs):
"""
Fetch the current guildid.
Assumes that the guilid is either passed as a kwarg or is the object id.
Should be overriden in other cases.
"""
return kwargs.get('guildid', id)
class Emoji(SettingType):
"""
Emoji type. Stores both custom and unicode emojis.
"""
accepts = "Emoji, either built in or custom. Use 'None' to unset."
@staticmethod
def _parse_emoji(emojistr):
"""
Converts a provided string into a PartialEmoji.
If the string is badly formatted, returns None.
"""
if ":" in emojistr:
emojistr = emojistr.strip('<>')
splits = emojistr.split(":")
if len(splits) == 3:
animated, name, id = splits
animated = bool(animated)
return discord.PartialEmoji(name, animated=animated, id=int(id))
else:
# TODO: Check whether this is a valid emoji
return discord.PartialEmoji(emojistr)
@classmethod
def _data_from_value(cls, id: int, value: Optional[discord.PartialEmoji], **kwargs):
"""
Both data and value are of type Optional[discord.PartialEmoji].
Directly return the provided value as data.
"""
return value
@classmethod
def _data_to_value(cls, id: int, data: Optional[discord.PartialEmoji], **kwargs):
"""
Both data and value are of type Optional[discord.PartialEmoji].
Directly return the internal data as the value.
"""
return data
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Pass to the emoji string parser to get the emoji.
Handle `0` and variants of `None` to unset.
"""
if userstr.lower() in ('0', 'none'):
return None
else:
return cls._parse_emoji(userstr)
@classmethod
def _format_data(cls, id: int, data: Optional[discord.PartialEmoji], **kwargs):
"""
Return a string form of the partial emoji, which generally displays the emoji.
"""
if data is None:
return None
else:
return str(data)
class Timezone(SettingType):
"""
Timezone type, storing a valid timezone string.
Types:
data: Optional[str]
The string representing the timezone in POSIX format.
value: Optional[timezone]
The pytz timezone.
"""
accepts = (
"A timezone name from [this list](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) "
"(e.g. `Europe/London`)."
)
@classmethod
def _data_from_value(cls, id: int, value: Optional[str], **kwargs):
"""
Return the provided value string as the data string.
"""
if value is not None:
return str(value)
@classmethod
def _data_to_value(cls, id: int, data: Optional[str], **kwargs):
"""
Return the provided data string as the value string.
"""
if data is not None:
return pytz.timezone(data)
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Check that the user-entered string is of the correct length.
Accept "None" to unset.
"""
if userstr.lower() == "none":
# Unsetting case
return None
try:
timezone = pytz.timezone(userstr)
except pytz.exceptions.UnknownTimeZoneError:
timezones = [tz for tz in pytz.all_timezones if userstr.lower() in tz.lower()]
if len(timezones) == 1:
timezone = timezones[0]
elif timezones:
result = await ctx.selector(
"Multiple matching timezones found, please select one.",
timezones
)
timezone = timezones[result]
else:
raise UserInputError(
"Unknown timezone `{}`. "
"Please provide a TZ name from "
"[this list](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones)".format(userstr)
) from None
return str(timezone)
@classmethod
def _format_data(cls, id: int, data: str, **kwargs):
"""
Wrap the string in backtics for formatting.
Handle the special case where the string is empty.
"""
if data:
return "`{}`".format(data)
else:
return 'Not Set'
class IntegerEnum(SettingType):
"""
Integer Enum type, accepting limited strings, storing an integer, and returning an IntEnum value
Types:
data: Optional[int]
The stored integer.
value: Optional[Any]
The corresponding Enum member
"""
accepts = "A valid option."
# Enum to use for mapping values
_enum: IntEnum = None
# Custom map to format the value. If None, uses the enum names.
_output_map = None
@classmethod
def _data_from_value(cls, id: int, value: ..., **kwargs):
"""
Return the value corresponding to the enum member
"""
if value is not None:
return value.value
@classmethod
def _data_to_value(cls, id: int, data: ..., **kwargs):
"""
Return the enum member corresponding to the provided integer
"""
if data is not None:
return cls._enum(data)
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Find the corresponding enum member's value to the provided user input.
Accept "None" to unset.
"""
userstr = userstr.lower()
options = {name.lower(): mem.value for name, mem in cls._enum.__members__.items()}
if userstr == "none":
# Unsetting case
return None
elif userstr not in options:
raise UserInputError("Invalid option!")
else:
return options[userstr]
@classmethod
def _format_data(cls, id: int, data: int, **kwargs):
"""
Format the data using either the `_enum` or the provided output map.
"""
if data is not None:
value = cls._enum(data)
if cls._output_map:
return cls._output_map[value]
else:
return "`{}`".format(value.name)
class SettingList(SettingType):
"""
List of a particular type of setting.
Types:
data: List[SettingType.data]
List of data types of the specified SettingType.
Some of the data may be None.
value: List[SettingType.value]
List of the value types of the specified SettingType.
Some of the values may be None.
"""
# Base setting type to make the list from
_setting = None # type: SettingType
# Whether 'None' values are filtered out of the data when creating values
_allow_null_values = False # type: bool
# Whether duplicate data values should be filtered out
_force_unique = False
@classmethod
def _data_from_value(cls, id: int, values: ..., **kwargs):
"""
Returns the setting type data for each value in the value list
"""
if values is None:
# Special behaviour here, store an empty list instead of None
return []
else:
return [cls._setting._data_from_value(id, value) for value in values]
@classmethod
def _data_to_value(cls, id: int, data: ..., **kwargs):
"""
Returns the setting type value for each entry in the data list
"""
if data is None:
return []
else:
values = [cls._setting._data_to_value(id, entry) for entry in data]
# Filter out null values if required
if not cls._allow_null_values:
values = [value for value in values if value is not None]
return values
@classmethod
async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs):
"""
Splits the user string across `,` to break up the list.
Handle `0` and variants of `None` to unset.
"""
if userstr.lower() in ('0', 'none'):
return []
else:
data = []
for item in userstr.split(','):
data.append(await cls._setting._parse_userstr(ctx, id, item.strip()))
if cls._force_unique:
data = list(set(data))
return data
@classmethod
def _format_data(cls, id: int, data: ..., **kwargs):
"""
Format the list by adding `,` between each formatted item
"""
if not data:
return None
else:
formatted_items = []
for item in data:
formatted_item = cls._setting._format_data(id, item)
if formatted_item is not None:
formatted_items.append(formatted_item)
return ", ".join(formatted_items)
class ChannelList(SettingList):
"""
List of channels
"""
accepts = (
"Comma separated list of channel mentions/ids/names. Use `None` to unset. "
"Write `--add` or `--remove` to add or remove channels."
)
_setting = Channel
class RoleList(SettingList):
"""
List of roles
"""
accepts = (
"Comma separated list of role mentions/ids/names. Use `None` to unset. "
"Write `--add` or `--remove` to add or remove roles."
)
_setting = Role
@property
def members(self):
roles = self.value
return list(set(itertools.chain(*(role.members for role in roles))))
class StringList(SettingList):
"""
List of strings
"""
accepts = (
"Comma separated list of strings. Use `None` to unset. "
"Write `--add` or `--remove` to add or remove strings."
)
_setting = String

View File

@@ -0,0 +1,42 @@
import datetime
import settings
from utils.lib import DotDict
from data import tables as tb
class UserSettings(settings.ObjectSettings):
settings = DotDict()
class UserSetting(settings.ColumnData, settings.Setting):
_table_interface = tb.user_config
_id_column = 'userid'
_create_row = True
write_ward = None
@UserSettings.attach_setting
class timezone(settings.Timezone, UserSetting):
attr_name = 'timezone'
_data_column = 'timezone'
_default = 'UTC'
display_name = 'timezone'
desc = "Timezone to display prompts in."
long_desc = (
"Timezone used for displaying certain prompts (e.g. selecting an accountability room)."
)
@property
def success_response(self):
if self.value:
return (
"Your personal timezone is now {}.\n"
"Your current time is **{}**."
).format(self.formatted, datetime.datetime.now(tz=self.value).strftime("%H:%M"))
else:
return "Your personal timezone has been unset."

64
bot/utils/ctx_addons.py Normal file
View File

@@ -0,0 +1,64 @@
import discord
from cmdClient import Context
from data import tables
from core import Lion
from settings import GuildSettings, UserSettings
@Context.util
async def embed_reply(ctx, desc, colour=discord.Colour(0x9b59b6), **kwargs):
"""
Simple helper to embed replies.
All arguments are passed to the embed constructor.
`desc` is passed as the `description` kwarg.
"""
embed = discord.Embed(description=desc, colour=colour, **kwargs)
return await ctx.reply(embed=embed)
@Context.util
async def error_reply(ctx, error_str, **kwargs):
"""
Notify the user of a user level error.
Typically, this will occur in a red embed, posted in the command channel.
"""
embed = discord.Embed(
colour=discord.Colour.red(),
description=error_str
)
try:
message = await ctx.ch.send(embed=embed, reference=ctx.msg, **kwargs)
ctx.sent_messages.append(message)
return message
except discord.Forbidden:
message = await ctx.reply(error_str)
ctx.sent_messages.append(message)
return message
def context_property(func):
setattr(Context, func.__name__, property(func))
return func
@context_property
def best_prefix(ctx):
return ctx.client.prefix
@context_property
def guild_settings(ctx):
if ctx.guild:
tables.guild_config.fetch_or_create(ctx.guild.id)
return GuildSettings(ctx.guild.id if ctx.guild else 0)
@context_property
def author_settings(ctx):
return UserSettings(ctx.author.id)
@context_property
def alion(ctx):
return Lion.fetch(ctx.guild.id if ctx.guild else 0, ctx.author.id)

331
bot/utils/seekers.py Normal file
View File

@@ -0,0 +1,331 @@
import discord
from cmdClient import Context
from cmdClient.lib import InvalidContext, UserCancelled, ResponseTimedOut, SafeCancellation
from . import interactive
@Context.util
async def find_role(ctx, userstr, create=False, interactive=False, collection=None, allow_notfound=True):
"""
Find a guild role given a partial matching string,
allowing custom role collections and several behavioural switches.
Parameters
----------
userstr: str
String obtained from a user, expected to partially match a role in the collection.
The string will be tested against both the id and the name of the role.
create: bool
Whether to offer to create the role if it does not exist.
The bot will only offer to create the role if it has the `manage_channels` permission.
interactive: bool
Whether to offer the user a list of roles to choose from,
or pick the first matching role.
collection: List[Union[discord.Role, discord.Object]]
Collection of roles to search amongst.
If none, uses the guild role list.
allow_notfound: bool
Whether to return `None` when there are no matches, instead of raising `SafeCancellation`.
Overriden by `create`, if it is set.
Returns
-------
discord.Role:
If a valid role is found.
None:
If no valid role has been found.
Raises
------
cmdClient.lib.UserCancelled:
If the user cancels interactive role selection.
cmdClient.lib.ResponseTimedOut:
If the user fails to respond to interactive role selection within `60` seconds`
cmdClient.lib.SafeCancellation:
If `allow_notfound` is `False`, and the search returned no matches.
"""
# Handle invalid situations and input
if not ctx.guild:
raise InvalidContext("Attempt to use find_role outside of a guild.")
if userstr == "":
raise ValueError("User string passed to find_role was empty.")
# Create the collection to search from args or guild roles
collection = collection if collection is not None else ctx.guild.roles
# If the unser input was a number or possible role mention, get it out
userstr = userstr.strip()
roleid = userstr.strip('<#@&!> ')
roleid = int(roleid) if roleid.isdigit() else None
searchstr = userstr.lower()
# Find the role
role = None
# Check method to determine whether a role matches
def check(role):
return (role.id == roleid) or (searchstr in role.name.lower())
# Get list of matching roles
roles = list(filter(check, collection))
if len(roles) == 0:
# Nope
role = None
elif len(roles) == 1:
# Select our lucky winner
role = roles[0]
else:
# We have multiple matching roles!
if interactive:
# Interactive prompt with the list of roles, handle `Object`s
role_names = [
role.name if isinstance(role, discord.Role) else str(role.id) for role in roles
]
try:
selected = await ctx.selector(
"`{}` roles found matching `{}`!".format(len(roles), userstr),
role_names,
timeout=60
)
except UserCancelled:
raise UserCancelled("User cancelled role selection.") from None
except ResponseTimedOut:
raise ResponseTimedOut("Role selection timed out.") from None
role = roles[selected]
else:
# Just select the first one
role = roles[0]
# Handle non-existence of the role
if role is None:
msgstr = "Couldn't find a role matching `{}`!".format(userstr)
if create:
# Inform the user
msg = await ctx.error_reply(msgstr)
if ctx.guild.me.guild_permissions.manage_roles:
# Offer to create it
resp = await ctx.ask("Would you like to create this role?", timeout=30)
if resp:
# They accepted, create the role
# Before creation, check if the role name is too long
if len(userstr) > 100:
await ctx.error_reply("Could not create a role with a name over 100 characters long!")
else:
role = await ctx.guild.create_role(
name=userstr,
reason="Interactive role creation for {} (uid:{})".format(ctx.author, ctx.author.id)
)
await msg.delete()
await ctx.reply("You have created the role `{}`!".format(userstr))
# If we still don't have a role, cancel unless allow_notfound is set
if role is None and not allow_notfound:
raise SafeCancellation
elif not allow_notfound:
raise SafeCancellation(msgstr)
else:
await ctx.error_reply(msgstr)
return role
@Context.util
async def find_channel(ctx, userstr, interactive=False, collection=None, chan_type=None):
"""
Find a guild channel given a partial matching string,
allowing custom channel collections and several behavioural switches.
Parameters
----------
userstr: str
String obtained from a user, expected to partially match a channel in the collection.
The string will be tested against both the id and the name of the channel.
interactive: bool
Whether to offer the user a list of channels to choose from,
or pick the first matching channel.
collection: List(discord.Channel)
Collection of channels to search amongst.
If none, uses the full guild channel list.
chan_type: discord.ChannelType
Type of channel to restrict the collection to.
Returns
-------
discord.Channel:
If a valid channel is found.
None:
If no valid channel has been found.
Raises
------
cmdClient.lib.UserCancelled:
If the user cancels interactive channel selection.
cmdClient.lib.ResponseTimedOut:
If the user fails to respond to interactive channel selection within `60` seconds`
"""
# Handle invalid situations and input
if not ctx.guild:
raise InvalidContext("Attempt to use find_channel outside of a guild.")
if userstr == "":
raise ValueError("User string passed to find_channel was empty.")
# Create the collection to search from args or guild channels
collection = collection if collection else ctx.guild.channels
if chan_type is not None:
collection = [chan for chan in collection if chan.type == chan_type]
# If the user input was a number or possible channel mention, extract it
chanid = userstr.strip('<#@&!>')
chanid = int(chanid) if chanid.isdigit() else None
searchstr = userstr.lower()
# Find the channel
chan = None
# Check method to determine whether a channel matches
def check(chan):
return (chan.id == chanid) or (searchstr in chan.name.lower())
# Get list of matching roles
channels = list(filter(check, collection))
if len(channels) == 0:
# Nope
chan = None
elif len(channels) == 1:
# Select our lucky winner
chan = channels[0]
else:
# We have multiple matching channels!
if interactive:
# Interactive prompt with the list of channels
chan_names = [chan.name for chan in channels]
try:
selected = await ctx.selector(
"`{}` channels found matching `{}`!".format(len(channels), userstr),
chan_names,
timeout=60
)
except UserCancelled:
raise UserCancelled("User cancelled channel selection.") from None
except ResponseTimedOut:
raise ResponseTimedOut("Channel selection timed out.") from None
chan = channels[selected]
else:
# Just select the first one
chan = channels[0]
if chan is None:
await ctx.error_reply("Couldn't find a channel matching `{}`!".format(userstr))
return chan
@Context.util
async def find_member(ctx, userstr, interactive=False, collection=None, silent=False):
"""
Find a guild member given a partial matching string,
allowing custom member collections.
Parameters
----------
userstr: str
String obtained from a user, expected to partially match a member in the collection.
The string will be tested against both the userid, full user name and user nickname.
interactive: bool
Whether to offer the user a list of members to choose from,
or pick the first matching channel.
collection: List(discord.Member)
Collection of members to search amongst.
If none, uses the full guild member list.
silent: bool
Whether to reply with an error when there are no matches.
Returns
-------
discord.Member:
If a valid member is found.
None:
If no valid member has been found.
Raises
------
cmdClient.lib.UserCancelled:
If the user cancels interactive member selection.
cmdClient.lib.ResponseTimedOut:
If the user fails to respond to interactive member selection within `60` seconds`
"""
# Handle invalid situations and input
if not ctx.guild:
raise InvalidContext("Attempt to use find_member outside of a guild.")
if userstr == "":
raise ValueError("User string passed to find_member was empty.")
# Create the collection to search from args or guild members
collection = collection if collection else ctx.guild.members
# If the user input was a number or possible member mention, extract it
userid = userstr.strip('<#@&!>')
userid = int(userid) if userid.isdigit() else None
searchstr = userstr.lower()
# Find the member
member = None
# Check method to determine whether a member matches
def check(member):
return (
member.id == userid
or searchstr in member.display_name.lower()
or searchstr in str(member).lower()
)
# Get list of matching roles
members = list(filter(check, collection))
if len(members) == 0:
# Nope
member = None
elif len(members) == 1:
# Select our lucky winner
member = members[0]
else:
# We have multiple matching members!
if interactive:
# Interactive prompt with the list of members
member_names = [
"{} {}".format(
member.nick if member.nick else (member if members.count(member) > 1
else member.name),
("<{}>".format(member)) if member.nick else ""
) for member in members
]
try:
selected = await ctx.selector(
"`{}` members found matching `{}`!".format(len(members), userstr),
member_names,
timeout=60
)
except UserCancelled:
raise UserCancelled("User cancelled member selection.") from None
except ResponseTimedOut:
raise ResponseTimedOut("Member selection timed out.") from None
member = members[selected]
else:
# Just select the first one
member = members[0]
if member is None and not silent:
await ctx.error_reply("Couldn't find a member matching `{}`!".format(userstr))
return member