diff --git a/bot/settings/__init__.py b/bot/settings/__init__.py new file mode 100644 index 00000000..3f72ea44 --- /dev/null +++ b/bot/settings/__init__.py @@ -0,0 +1,5 @@ +from .base import * # noqa +from .setting_types import * # noqa + +from .user_settings import UserSettings, UserSetting # noqa +from .guild_settings import GuildSettings, GuildSetting # noqa diff --git a/bot/settings/base.py b/bot/settings/base.py new file mode 100644 index 00000000..3c19709b --- /dev/null +++ b/bot/settings/base.py @@ -0,0 +1,431 @@ +import discord +from cmdClient.cmdClient import cmdClient, Context +from cmdClient.lib import SafeCancellation +from cmdClient.Check import Check + +from utils.lib import prop_tabulate, DotDict + +from meta import client +from data import Table, RowTable + + +class Setting: + """ + Abstract base class describing a stored configuration setting. + A setting consists of logic to load the setting from storage, + present it in a readable form, understand user entered values, + and write it again in storage. + Additionally, the setting has attributes attached describing + the setting in a user-friendly manner for display purposes. + """ + attr_name: str = None # Internal attribute name for the setting + _default: ... = None # Default data value for the setting.. this may be None if the setting overrides 'default'. + + write_ward: Check = None # Check that must be passed to write the setting. Not implemented internally. + + # Configuration interface descriptions + display_name: str = None # User readable name of the setting + desc: str = None # User readable brief description of the setting + long_desc: str = None # User readable long description of the setting + accepts: str = None # User readable description of the acceptable values + + def __init__(self, id, data: ..., **kwargs): + self.client: cmdClient = client + self.id = id + self._data = data + + # Configuration embeds + @property + def embed(self): + """ + Discord Embed showing an information summary about the setting. + """ + embed = discord.Embed( + title="Configuration options for `{}`".format(self.display_name), + ) + fields = ("Current value", "Default value", "Accepted input") + values = (self.formatted or "Not Set", + self._format_data(self.id, self.default) or "None", + self.accepts) + table = prop_tabulate(fields, values) + embed.description = "{}\n{}".format(self.long_desc.format(self=self, client=self.client), table) + return embed + + @property + def summary(self): + """ + Formatted summary of the data. + May be implemented in `_format_data(..., summary=True, ...)` or overidden. + """ + return self._format_data(self.id, self.data, summary=True) + + @property + def success_response(self): + """ + Response message sent when the setting has successfully been updated. + """ + return "Setting Updated!" + + # Instance generation + @classmethod + def get(cls, id: int, **kwargs): + """ + Return a setting instance initialised from the stored value. + """ + data = cls._reader(id, **kwargs) + return cls(id, data, **kwargs) + + @classmethod + async def parse(cls, id: int, ctx: Context, userstr: str, **kwargs): + """ + Return a setting instance initialised from a parsed user string. + """ + data = await cls._parse_userstr(ctx, id, userstr, **kwargs) + return cls(id, data, **kwargs) + + # Main interface + @property + def data(self): + """ + Retrieves the current internal setting data if it is set, otherwise the default data + """ + return self._data if self._data is not None else self.default + + @data.setter + def data(self, new_data): + """ + Sets the internal setting data and writes the changes. + """ + self._data = new_data + self.write() + + @property + def default(self): + """ + Retrieves the default value for this setting. + Settings should override this if the default depends on the object id. + """ + return self._default + + @property + def value(self): + """ + Discord-aware object or objects associated with the setting. + """ + return self._data_to_value(self.id, self.data) + + @value.setter + def value(self, new_value): + """ + Setter which reads the discord-aware object, converts it to data, and writes it. + """ + self._data = self._data_from_value(self.id, new_value) + self.write() + + @property + def formatted(self): + """ + User-readable form of the setting. + """ + return self._format_data(self.id, self.data) + + def write(self, **kwargs): + """ + Write value to the database. + For settings which override this, + ensure you handle deletion of values when internal data is None. + """ + self._writer(self.id, self._data, **kwargs) + + # Raw converters + @classmethod + def _data_from_value(cls, id: int, value, **kwargs): + """ + Convert a high-level setting value to internal data. + Must be overriden by the setting. + Be aware of None values, these should always pass through as None + to provide an unsetting interface. + """ + raise NotImplementedError + + @classmethod + def _data_to_value(cls, id: int, data: ..., **kwargs): + """ + Convert internal data to high-level setting value. + Must be overriden by the setting. + """ + raise NotImplementedError + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Parse user provided input into internal data. + Must be overriden by the setting if the setting is user-configurable. + """ + raise NotImplementedError + + @classmethod + def _format_data(cls, id: int, data: ..., **kwargs): + """ + Convert internal data into a formatted user-readable string. + Must be overriden by the setting if the setting is user-viewable. + """ + raise NotImplementedError + + # Database access classmethods + @classmethod + def _reader(cls, id: int, **kwargs): + """ + Read a setting from storage and return setting data or None. + Must be overriden by the setting. + """ + raise NotImplementedError + + @classmethod + def _writer(cls, id: int, data: ..., **kwargs): + """ + Write provided setting data to storage. + Must be overriden by the setting unless the `write` method is overidden. + If the data is None, the setting is empty and should be unset. + """ + raise NotImplementedError + + @classmethod + async def command(cls, ctx, id): + """ + Standardised command viewing/setting interface for the setting. + """ + if not ctx.args: + # View config embed for provided cls + await ctx.reply(embed=cls.get(id).embed) + else: + # Check the write ward + if cls.write_ward and not await cls.write_ward.run(ctx): + await ctx.error_reply(cls.write_ward.msg) + else: + # Attempt to set config cls + try: + cls = await cls.parse(id, ctx, ctx.args) + except UserInputError as e: + await ctx.reply(embed=discord.Embed( + description="{} {}".format('❌', e.msg), + Colour=discord.Colour.red() + )) + else: + cls.write() + await ctx.reply(embed=discord.Embed( + description="{} {}".format('✅', cls.success_response), + Colour=discord.Colour.green() + )) + + +class ObjectSettings: + """ + Abstract class representing a linked collection of settings for a single object. + Initialised settings are provided as instance attributes in the form of properties. + """ + __slots__ = ('id', 'params') + + settings: DotDict = None + + def __init__(self, id, **kwargs): + self.id = id + self.params = tuple(kwargs.items()) + + @classmethod + def _setting_property(cls, setting): + def wrapped_setting(self): + return setting.get(self.id, **dict(self.params)) + return wrapped_setting + + @classmethod + def attach_setting(cls, setting: Setting): + name = setting.attr_name or setting.__name__ + setattr(cls, name, property(cls._setting_property(setting))) + cls.settings[name] = setting + return setting + + +class ColumnData: + """ + Mixin for settings stored in a single row and column of a Table. + Intended to be used with tables where the only primary key is the object id. + """ + # Table storing the desired data + _table_interface: Table = None + + # Name of the column storing the setting object id + _id_column: str = None + + # Name of the column with the desired data + _data_column: str = None + + # Whether to use create a row if not found (only applies to TableRow) + _create_row = False + + # Whether to upsert or update for updates + _upsert: bool = True + + # High level data cache to use, set to None to disable cache. + _cache = None # Map[id -> value] + + @classmethod + def _reader(cls, id: int, use_cache=True, **kwargs): + """ + Read in the requested entry associated to the id. + Supports reading cached values from a `RowTable`. + """ + if cls._cache is not None and id in cls._cache and use_cache: + return cls._cache[id] + + table = cls._table_interface + if isinstance(table, RowTable) and cls._id_column == table.id_col: + if cls._create_row: + row = table.fetch_or_create(id) + else: + row = table.fetch(id) + data = row.data[cls._data_column] if row else None + else: + params = { + "select_columns": (cls._data_column,), + cls._id_column: id + } + row = table.select_one_where(**params) + data = row[cls._data_column] if row else None + + if cls._cache is not None: + cls._cache[id] = data + + return data + + @classmethod + def _writer(cls, id: int, data: ..., **kwargs): + """ + Write the provided entry to the table, allowing replacements. + """ + table = cls._table_interface + params = { + cls._id_column: id + } + values = { + cls._data_column: data + } + + # Update data + if cls._upsert: + # Upsert data + table.upsert( + constraint=cls._id_column, + **params, + **values + ) + else: + # Update data + table.update_where(values, **params) + + if cls._cache is not None: + cls._cache[id] = data + + +class ListData: + """ + Mixin for list types implemented on a Table. + Implements a reader and writer. + This assumes the list is the only data stored in the table, + and removes list entries by deleting rows. + """ + # Table storing the setting data + _table_interface: Table = None + + # Name of the column storing the id + _id_column: str = None + + # Name of the column storing the data to read + _data_column: str = None + + # Name of column storing the order index to use, if any. Assumed to be Serial on writing. + _order_column: str = None + _order_type: str = "ASC" + + # High level data cache to use, set to None to disable cache. + _cache = None # Map[id -> value] + + @classmethod + def _reader(cls, id: int, use_cache=True, **kwargs): + """ + Read in all entries associated to the given id. + """ + if cls._cache is not None and id in cls._cache and use_cache: + return cls._cache[id] + + table = cls._table_interface # type: Table + params = { + "select_columns": [cls._data_column], + cls._id_column: id + } + if cls._order_column: + params['_extra'] = "ORDER BY {} {}".format(cls._order_column, cls._order_type) + + rows = table.select_where(**params) + data = [row[cls._data_column] for row in rows] + + if cls._cache is not None: + cls._cache[id] = data + + return data + + @classmethod + def _writer(cls, id: int, data: ..., add_only=False, remove_only=False, **kwargs): + """ + Write the provided list to storage. + """ + # TODO: Transaction lock on the table so this is atomic + # Or just use the connection context manager + + table = cls._table_interface # type: Table + + # Handle None input as an empty list + if data is None: + data = [] + + current = cls._reader(id, **kwargs) + if not cls._order_column and (add_only or remove_only): + to_insert = [item for item in data if item not in current] if not remove_only else [] + to_remove = data if remove_only else ( + [item for item in current if item not in data] if not add_only else [] + ) + + # Handle required deletions + if to_remove: + params = { + cls._id_column: id, + cls._data_column: to_remove + } + table.delete_where(**params) + + # Handle required insertions + if to_insert: + columns = (cls._id_column, cls._data_column) + values = [(id, value) for value in to_insert] + table.insert_many(*values, insert_keys=columns) + + if cls._cache is not None: + new_current = [item for item in current + to_insert if item not in to_remove] + cls._cache[id] = new_current + else: + # Remove all and add all to preserve order + # TODO: This really really should be atomic if anything else reads this + delete_params = {cls._id_column: id} + table.delete_where(**delete_params) + + if data: + columns = (cls._id_column, cls._data_column) + values = [(id, value) for value in data] + table.insert_many(*values, insert_keys=columns) + + if cls._cache is not None: + cls._cache[id] = data + + +class UserInputError(SafeCancellation): + pass diff --git a/bot/settings/guild_settings.py b/bot/settings/guild_settings.py new file mode 100644 index 00000000..62436d27 --- /dev/null +++ b/bot/settings/guild_settings.py @@ -0,0 +1,170 @@ +import datetime +import asyncio +import discord + +import settings +from utils.lib import DotDict +from utils import seekers # noqa + +from wards import guild_admin +from data import tables as tb + + +class GuildSettings(settings.ObjectSettings): + settings = DotDict() + + +class GuildSetting(settings.ColumnData, settings.Setting): + _table_interface = tb.guild_config + _id_column = 'guildid' + _create_row = True + + category = None + + write_ward = guild_admin + + +@GuildSettings.attach_setting +class event_log(settings.Channel, GuildSetting): + category = "Meta" + + attr_name = 'event_log' + _data_column = 'event_log_channel' + + display_name = "event_log" + desc = "Bot event logging channel." + + long_desc = ( + "Channel to post 'events', such as workouts completing or members renting a room." + ) + + _chan_type = discord.ChannelType.text + + @property + def success_response(self): + if self.value: + return "The event log is now {}.".format(self.formatted) + else: + return "The event log has been unset." + + def log(self, description="", colour=discord.Color.orange(), **kwargs): + channel = self.value + if channel: + embed = discord.Embed( + description=description, + colour=colour, + timestamp=datetime.datetime.utcnow(), + **kwargs + ) + asyncio.create_task(channel.send(embed=embed)) + + +@GuildSettings.attach_setting +class admin_role(settings.Role, GuildSetting): + category = "Guild Roles" + + attr_name = 'admin_role' + _data_column = 'admin_role' + + display_name = "admin_role" + desc = "Server administrator role." + + long_desc = ( + "Server administrator role.\n" + "Allows usage of the administrative commands, such as `config`.\n" + "These commands may also be used by anyone with the discord adminitrator permission." + ) + # TODO Expand on what these are. + + @property + def success_response(self): + if self.value: + return "The administrator role is now {}.".format(self.formatted) + else: + return "The administrator role has been unset." + + +@GuildSettings.attach_setting +class mod_role(settings.Role, GuildSetting): + category = "Guild Roles" + + attr_name = 'mod_role' + _data_column = 'mod_role' + + display_name = "mod_role" + desc = "Server moderator role." + + long_desc = ( + "Server moderator role.\n" + "Allows usage of the modistrative commands." + ) + # TODO Expand on what these are. + + @property + def success_response(self): + if self.value: + return "The moderator role is now {}.".format(self.formatted) + else: + return "The moderator role has been unset." + + +@GuildSettings.attach_setting +class unranked_roles(settings.RoleList, settings.ListData, settings.Setting): + category = "Guild Roles" + + attr_name = 'unranked_roles' + + _table_interface = tb.unranked_roles + _id_column = 'guildid' + _data_column = 'roleid' + + write_ward = guild_admin + display_name = "unranked_roles" + desc = "Roles to exclude from the leaderboards." + + _force_unique = True + + long_desc = ( + "Roles to be excluded from the `top` and `topcoins` leaderboards." + ) + + # Flat cache, no need to expire objects + _cache = {} + + @property + def success_response(self): + if self.value: + return "The following roles will be excluded from the leaderboard:\n{}".format(self.formatted) + else: + return "The excluded roles have been removed." + + +@GuildSettings.attach_setting +class donator_roles(settings.RoleList, settings.ListData, settings.Setting): + category = "Guild Roles" + + attr_name = 'donator_roles' + + _table_interface = tb.donator_roles + _id_column = 'guildid' + _data_column = 'roleid' + + write_ward = guild_admin + display_name = "donator_roles" + desc = "Donator badge roles." + + _force_unique = True + + long_desc = ( + "Members with these roles will be considered donators and have access to premium features." + ) + + # Flat cache, no need to expire objects + _cache = {} + + @property + def success_response(self): + if self.value: + return "The donator badges are now:\n{}".format(self.formatted) + else: + return "The donator badges have been removed." diff --git a/bot/settings/setting_types.py b/bot/settings/setting_types.py new file mode 100644 index 00000000..a1060c02 --- /dev/null +++ b/bot/settings/setting_types.py @@ -0,0 +1,714 @@ +import itertools +from enum import IntEnum +from typing import Any, Optional + +import pytz +import discord +from cmdClient.Context import Context +from cmdClient.lib import SafeCancellation + +from meta import client + +from .base import UserInputError + + +class SettingType: + """ + Abstract class representing a setting type. + Intended to be used as a mixin for a Setting, + with the provided methods implementing converter methods for the setting. + """ + accepts: str = None # User readable description of the acceptable values + + # Raw converters + @classmethod + def _data_from_value(cls, id: int, value, **kwargs): + """ + Convert a high-level setting value to internal data. + """ + raise NotImplementedError + + @classmethod + def _data_to_value(cls, id: int, data: Any, **kwargs): + """ + Convert internal data to high-level setting value. + """ + raise NotImplementedError + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Parse user provided input into internal data. + """ + raise NotImplementedError + + @classmethod + def _format_data(cls, id: int, data: Any, **kwargs): + """ + Convert internal data into a formatted user-readable string. + """ + raise NotImplementedError + + +class Boolean(SettingType): + """ + Boolean type, supporting truthy and falsey user input. + Configurable to change truthy and falsey values, and the output map. + + Types: + data: Optional[bool] + The stored boolean value. + value: Optional[bool] + The stored boolean value. + """ + accepts = "Yes/No, On/Off, True/False, Enabled/Disabled" + + # Values that are accepted as truthy and falsey by the parser + _truthy = {"yes", "true", "on", "enable", "enabled"} + _falsey = {"no", "false", "off", "disable", "disabled"} + + # The user-friendly output strings to use for each value + _outputs = {True: "On", False: "Off", None: "Not Set"} + + @classmethod + def _data_from_value(cls, id: int, value: Optional[bool], **kwargs): + """ + Both data and value are of type Optional[bool]. + Directly return the provided value as data. + """ + return value + + @classmethod + def _data_to_value(cls, id: int, data: Optional[bool], **kwargs): + """ + Both data and value are of type Optional[bool]. + Directly return the internal data as the value. + """ + return data + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Looks up the provided string in the truthy and falsey tables. + """ + _userstr = userstr.lower() + if _userstr == "none": + return None + if _userstr in cls._truthy: + return True + elif _userstr in cls._falsey: + return False + else: + raise UserInputError("Unknown boolean type `{}`".format(userstr)) + + @classmethod + def _format_data(cls, id: int, data: bool, **kwargs): + """ + Pass the provided value through the outputs map. + """ + return cls._outputs[data] + + +class Integer(SettingType): + """ + Integer type. Storing any integer. + + Types: + data: Optional[int] + The stored integer value. + value: Optional[int] + The stored integer value. + """ + accepts = "An integer." + + # Set limits on the possible integers + _min = -4096 + _max = 4096 + + @classmethod + def _data_from_value(cls, id: int, value: Optional[bool], **kwargs): + """ + Both data and value are of type Optional[int]. + Directly return the provided value as data. + """ + return value + + @classmethod + def _data_to_value(cls, id: int, data: Optional[bool], **kwargs): + """ + Both data and value are of type Optional[int]. + Directly return the internal data as the value. + """ + return data + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Relies on integer casting to convert the user string + """ + if userstr.lower() == "none": + return None + + try: + num = int(userstr) + except Exception: + raise UserInputError("Couldn't parse provided integer.") from None + + if num > cls._max: + raise UserInputError("Provided integer was too large!") + elif num < cls._min: + raise UserInputError("Provided integer was too small!") + + return num + + @classmethod + def _format_data(cls, id: int, data: Optional[int], **kwargs): + """ + Return the string version of the data. + """ + if data is None: + return None + else: + return str(data) + + +class String(SettingType): + """ + String type, storing arbitrary text. + Configurable to limit text length and restrict input options. + + Types: + data: Optional[str] + The stored string. + value: Optional[str] + The stored string. + """ + accepts = "Any text" + + # Maximum length of string to accept + _maxlen: int = None + + # Set of input options to accept + _options: set = None + + # Whether to quote the string as code + _quote: bool = True + + @classmethod + def _data_from_value(cls, id: int, value: Optional[str], **kwargs): + """ + Return the provided value string as the data string. + """ + return value + + @classmethod + def _data_to_value(cls, id: int, data: Optional[str], **kwargs): + """ + Return the provided data string as the value string. + """ + return data + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Check that the user-entered string is of the correct length. + Accept "None" to unset. + """ + if userstr.lower() == "none": + # Unsetting case + return None + elif cls._maxlen is not None and len(userstr) > cls._maxlen: + raise UserInputError("Provided string was too long! Maximum length is `{}`".format(cls._maxlen)) + elif cls._options is not None and not userstr.lower() in cls._options: + raise UserInputError("Invalid option! Valid options are `{}`".format("`, `".join(cls._options))) + else: + return userstr + + @classmethod + def _format_data(cls, id: int, data: str, **kwargs): + """ + Wrap the string in backtics for formatting. + Handle the special case where the string is empty. + """ + if data: + return "`{}`".format(data) if cls._quote else str(data) + else: + return None + + +class Channel(SettingType): + """ + Channel type, storing a single `discord.Channel`. + + Types: + data: Optional[int] + The id of the stored Channel. + value: Optional[discord.abc.GuildChannel] + The stored Channel. + """ + accepts = "Channel mention/id/name, or 'None' to unset" + + # Type of channel, if any + _chan_type: discord.ChannelType = None + + @classmethod + def _data_from_value(cls, id: int, value: Optional[discord.abc.GuildChannel], **kwargs): + """ + Returns the channel id. + """ + return value.id if value is not None else None + + @classmethod + def _data_to_value(cls, id: int, data: Optional[int], **kwargs): + """ + Uses the client to look up the channel id. + Returns the Channel if found, otherwise None. + """ + # Always passthrough None + if data is None: + return None + + return client.get_channel(data) + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Pass to the channel seeker utility to find the requested channel. + Handle `0` and variants of `None` to unset. + """ + if userstr.lower() in ('0', 'none'): + return None + else: + channel = await ctx.find_channel(userstr, interactive=True, chan_type=cls._chan_type) + if channel is None: + raise SafeCancellation + else: + return channel.id + + @classmethod + def _format_data(cls, id: int, data: Optional[int], **kwargs): + """ + Retrieve an artificially created channel mention. + If the channel does not exist, this will show up as invalid-channel. + """ + if data is None: + return None + else: + return "<#{}>".format(data) + + +class VoiceChannel(Channel): + _chan_type = discord.ChannelType.voice + + +class TextChannel(Channel): + _chan_type = discord.ChannelType.text + + +class Role(SettingType): + """ + Role type, storing a single `discord.Role`. + Configurably allows returning roles which don't exist or are not seen by the client + as `discord.Object`. + + Settings may override `get_guildid` if the setting object `id` is not the guildid. + + Types: + data: Optional[int] + The id of the stored Role. + value: Optional[Union[discord.Role, discord.Object]] + The stored Role, or, if the role wasn't found and `_strict` is not set, + a discord Object with the role id set. + """ + accepts = "Role mention/id/name, or 'None' to unset" + + # Whether to disallow returning roles which don't exist as `discord.Object`s + _strict = True + + @classmethod + def _data_from_value(cls, id: int, value: Optional[discord.Role], **kwargs): + """ + Returns the role id. + """ + return value.id if value is not None else None + + @classmethod + def _data_to_value(cls, id: int, data: Optional[int], **kwargs): + """ + Uses the client to look up the guild and role id. + Returns the role if found, otherwise returns a `discord.Object` with the id set, + depending on the `_strict` setting. + """ + # Always passthrough None + if data is None: + return None + + # Fetch guildid + guildid = cls._get_guildid(id, **kwargs) + + # Search for the role + role = None + guild = client.get_guild(guildid) + if guild is not None: + role = guild.get_role(data) + + if role is not None: + return role + elif not cls._strict: + return discord.Object(id=data) + else: + return None + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Pass to the role seeker utility to find the requested role. + Handle `0` and variants of `None` to unset. + """ + if userstr.lower() in ('0', 'none'): + return None + else: + role = await ctx.find_role(userstr, create=False, interactive=True) + if role is None: + raise SafeCancellation + else: + return role.id + + @classmethod + def _format_data(cls, id: int, data: Optional[int], **kwargs): + """ + Retrieve the role mention if found, otherwise the role id or None depending on `_strict`. + """ + role = cls._data_to_value(id, data, **kwargs) + if role is None: + return "Not Set" + elif isinstance(role, discord.Role): + return role.mention + else: + return "`{}`".format(role.id) + + @classmethod + def _get_guildid(cls, id: int, **kwargs): + """ + Fetch the current guildid. + Assumes that the guilid is either passed as a kwarg or is the object id. + Should be overriden in other cases. + """ + return kwargs.get('guildid', id) + + +class Emoji(SettingType): + """ + Emoji type. Stores both custom and unicode emojis. + """ + accepts = "Emoji, either built in or custom. Use 'None' to unset." + + @staticmethod + def _parse_emoji(emojistr): + """ + Converts a provided string into a PartialEmoji. + If the string is badly formatted, returns None. + """ + if ":" in emojistr: + emojistr = emojistr.strip('<>') + splits = emojistr.split(":") + if len(splits) == 3: + animated, name, id = splits + animated = bool(animated) + return discord.PartialEmoji(name, animated=animated, id=int(id)) + else: + # TODO: Check whether this is a valid emoji + return discord.PartialEmoji(emojistr) + + @classmethod + def _data_from_value(cls, id: int, value: Optional[discord.PartialEmoji], **kwargs): + """ + Both data and value are of type Optional[discord.PartialEmoji]. + Directly return the provided value as data. + """ + return value + + @classmethod + def _data_to_value(cls, id: int, data: Optional[discord.PartialEmoji], **kwargs): + """ + Both data and value are of type Optional[discord.PartialEmoji]. + Directly return the internal data as the value. + """ + return data + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Pass to the emoji string parser to get the emoji. + Handle `0` and variants of `None` to unset. + """ + if userstr.lower() in ('0', 'none'): + return None + else: + return cls._parse_emoji(userstr) + + @classmethod + def _format_data(cls, id: int, data: Optional[discord.PartialEmoji], **kwargs): + """ + Return a string form of the partial emoji, which generally displays the emoji. + """ + if data is None: + return None + else: + return str(data) + + +class Timezone(SettingType): + """ + Timezone type, storing a valid timezone string. + + Types: + data: Optional[str] + The string representing the timezone in POSIX format. + value: Optional[timezone] + The pytz timezone. + """ + accepts = ( + "A timezone name from [this list](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) " + "(e.g. `Europe/London`)." + ) + + @classmethod + def _data_from_value(cls, id: int, value: Optional[str], **kwargs): + """ + Return the provided value string as the data string. + """ + if value is not None: + return str(value) + + @classmethod + def _data_to_value(cls, id: int, data: Optional[str], **kwargs): + """ + Return the provided data string as the value string. + """ + if data is not None: + return pytz.timezone(data) + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Check that the user-entered string is of the correct length. + Accept "None" to unset. + """ + if userstr.lower() == "none": + # Unsetting case + return None + try: + timezone = pytz.timezone(userstr) + except pytz.exceptions.UnknownTimeZoneError: + timezones = [tz for tz in pytz.all_timezones if userstr.lower() in tz.lower()] + if len(timezones) == 1: + timezone = timezones[0] + elif timezones: + result = await ctx.selector( + "Multiple matching timezones found, please select one.", + timezones + ) + timezone = timezones[result] + else: + raise UserInputError( + "Unknown timezone `{}`. " + "Please provide a TZ name from " + "[this list](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones)".format(userstr) + ) from None + + return str(timezone) + + @classmethod + def _format_data(cls, id: int, data: str, **kwargs): + """ + Wrap the string in backtics for formatting. + Handle the special case where the string is empty. + """ + if data: + return "`{}`".format(data) + else: + return 'Not Set' + + +class IntegerEnum(SettingType): + """ + Integer Enum type, accepting limited strings, storing an integer, and returning an IntEnum value + + Types: + data: Optional[int] + The stored integer. + value: Optional[Any] + The corresponding Enum member + """ + accepts = "A valid option." + + # Enum to use for mapping values + _enum: IntEnum = None + + # Custom map to format the value. If None, uses the enum names. + _output_map = None + + @classmethod + def _data_from_value(cls, id: int, value: ..., **kwargs): + """ + Return the value corresponding to the enum member + """ + if value is not None: + return value.value + + @classmethod + def _data_to_value(cls, id: int, data: ..., **kwargs): + """ + Return the enum member corresponding to the provided integer + """ + if data is not None: + return cls._enum(data) + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Find the corresponding enum member's value to the provided user input. + Accept "None" to unset. + """ + userstr = userstr.lower() + + options = {name.lower(): mem.value for name, mem in cls._enum.__members__.items()} + + if userstr == "none": + # Unsetting case + return None + elif userstr not in options: + raise UserInputError("Invalid option!") + else: + return options[userstr] + + @classmethod + def _format_data(cls, id: int, data: int, **kwargs): + """ + Format the data using either the `_enum` or the provided output map. + """ + if data is not None: + value = cls._enum(data) + if cls._output_map: + return cls._output_map[value] + else: + return "`{}`".format(value.name) + + +class SettingList(SettingType): + """ + List of a particular type of setting. + + Types: + data: List[SettingType.data] + List of data types of the specified SettingType. + Some of the data may be None. + value: List[SettingType.value] + List of the value types of the specified SettingType. + Some of the values may be None. + """ + # Base setting type to make the list from + _setting = None # type: SettingType + + # Whether 'None' values are filtered out of the data when creating values + _allow_null_values = False # type: bool + + # Whether duplicate data values should be filtered out + _force_unique = False + + @classmethod + def _data_from_value(cls, id: int, values: ..., **kwargs): + """ + Returns the setting type data for each value in the value list + """ + if values is None: + # Special behaviour here, store an empty list instead of None + return [] + else: + return [cls._setting._data_from_value(id, value) for value in values] + + @classmethod + def _data_to_value(cls, id: int, data: ..., **kwargs): + """ + Returns the setting type value for each entry in the data list + """ + if data is None: + return [] + else: + values = [cls._setting._data_to_value(id, entry) for entry in data] + + # Filter out null values if required + if not cls._allow_null_values: + values = [value for value in values if value is not None] + return values + + @classmethod + async def _parse_userstr(cls, ctx: Context, id: int, userstr: str, **kwargs): + """ + Splits the user string across `,` to break up the list. + Handle `0` and variants of `None` to unset. + """ + if userstr.lower() in ('0', 'none'): + return [] + else: + data = [] + for item in userstr.split(','): + data.append(await cls._setting._parse_userstr(ctx, id, item.strip())) + + if cls._force_unique: + data = list(set(data)) + return data + + @classmethod + def _format_data(cls, id: int, data: ..., **kwargs): + """ + Format the list by adding `,` between each formatted item + """ + if not data: + return None + else: + formatted_items = [] + for item in data: + formatted_item = cls._setting._format_data(id, item) + if formatted_item is not None: + formatted_items.append(formatted_item) + return ", ".join(formatted_items) + + +class ChannelList(SettingList): + """ + List of channels + """ + accepts = ( + "Comma separated list of channel mentions/ids/names. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove channels." + ) + _setting = Channel + + +class RoleList(SettingList): + """ + List of roles + """ + accepts = ( + "Comma separated list of role mentions/ids/names. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove roles." + ) + _setting = Role + + @property + def members(self): + roles = self.value + return list(set(itertools.chain(*(role.members for role in roles)))) + + +class StringList(SettingList): + """ + List of strings + """ + accepts = ( + "Comma separated list of strings. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove strings." + ) + _setting = String diff --git a/bot/settings/user_settings.py b/bot/settings/user_settings.py new file mode 100644 index 00000000..c9c49fc7 --- /dev/null +++ b/bot/settings/user_settings.py @@ -0,0 +1,42 @@ +import datetime + +import settings +from utils.lib import DotDict + +from data import tables as tb + + +class UserSettings(settings.ObjectSettings): + settings = DotDict() + + +class UserSetting(settings.ColumnData, settings.Setting): + _table_interface = tb.user_config + _id_column = 'userid' + _create_row = True + + write_ward = None + + +@UserSettings.attach_setting +class timezone(settings.Timezone, UserSetting): + attr_name = 'timezone' + _data_column = 'timezone' + + _default = 'UTC' + + display_name = 'timezone' + desc = "Timezone to display prompts in." + long_desc = ( + "Timezone used for displaying certain prompts (e.g. selecting an accountability room)." + ) + + @property + def success_response(self): + if self.value: + return ( + "Your personal timezone is now {}.\n" + "Your current time is **{}**." + ).format(self.formatted, datetime.datetime.now(tz=self.value).strftime("%H:%M")) + else: + return "Your personal timezone has been unset." diff --git a/bot/utils/ctx_addons.py b/bot/utils/ctx_addons.py new file mode 100644 index 00000000..5833961a --- /dev/null +++ b/bot/utils/ctx_addons.py @@ -0,0 +1,64 @@ +import discord +from cmdClient import Context + +from data import tables +from core import Lion +from settings import GuildSettings, UserSettings + + +@Context.util +async def embed_reply(ctx, desc, colour=discord.Colour(0x9b59b6), **kwargs): + """ + Simple helper to embed replies. + All arguments are passed to the embed constructor. + `desc` is passed as the `description` kwarg. + """ + embed = discord.Embed(description=desc, colour=colour, **kwargs) + return await ctx.reply(embed=embed) + + +@Context.util +async def error_reply(ctx, error_str, **kwargs): + """ + Notify the user of a user level error. + Typically, this will occur in a red embed, posted in the command channel. + """ + embed = discord.Embed( + colour=discord.Colour.red(), + description=error_str + ) + try: + message = await ctx.ch.send(embed=embed, reference=ctx.msg, **kwargs) + ctx.sent_messages.append(message) + return message + except discord.Forbidden: + message = await ctx.reply(error_str) + ctx.sent_messages.append(message) + return message + + +def context_property(func): + setattr(Context, func.__name__, property(func)) + return func + + +@context_property +def best_prefix(ctx): + return ctx.client.prefix + + +@context_property +def guild_settings(ctx): + if ctx.guild: + tables.guild_config.fetch_or_create(ctx.guild.id) + return GuildSettings(ctx.guild.id if ctx.guild else 0) + + +@context_property +def author_settings(ctx): + return UserSettings(ctx.author.id) + + +@context_property +def alion(ctx): + return Lion.fetch(ctx.guild.id if ctx.guild else 0, ctx.author.id) diff --git a/bot/utils/seekers.py b/bot/utils/seekers.py new file mode 100644 index 00000000..f6480593 --- /dev/null +++ b/bot/utils/seekers.py @@ -0,0 +1,331 @@ +import discord + +from cmdClient import Context +from cmdClient.lib import InvalidContext, UserCancelled, ResponseTimedOut, SafeCancellation +from . import interactive + + +@Context.util +async def find_role(ctx, userstr, create=False, interactive=False, collection=None, allow_notfound=True): + """ + Find a guild role given a partial matching string, + allowing custom role collections and several behavioural switches. + + Parameters + ---------- + userstr: str + String obtained from a user, expected to partially match a role in the collection. + The string will be tested against both the id and the name of the role. + create: bool + Whether to offer to create the role if it does not exist. + The bot will only offer to create the role if it has the `manage_channels` permission. + interactive: bool + Whether to offer the user a list of roles to choose from, + or pick the first matching role. + collection: List[Union[discord.Role, discord.Object]] + Collection of roles to search amongst. + If none, uses the guild role list. + allow_notfound: bool + Whether to return `None` when there are no matches, instead of raising `SafeCancellation`. + Overriden by `create`, if it is set. + + Returns + ------- + discord.Role: + If a valid role is found. + None: + If no valid role has been found. + + Raises + ------ + cmdClient.lib.UserCancelled: + If the user cancels interactive role selection. + cmdClient.lib.ResponseTimedOut: + If the user fails to respond to interactive role selection within `60` seconds` + cmdClient.lib.SafeCancellation: + If `allow_notfound` is `False`, and the search returned no matches. + """ + # Handle invalid situations and input + if not ctx.guild: + raise InvalidContext("Attempt to use find_role outside of a guild.") + + if userstr == "": + raise ValueError("User string passed to find_role was empty.") + + # Create the collection to search from args or guild roles + collection = collection if collection is not None else ctx.guild.roles + + # If the unser input was a number or possible role mention, get it out + userstr = userstr.strip() + roleid = userstr.strip('<#@&!> ') + roleid = int(roleid) if roleid.isdigit() else None + searchstr = userstr.lower() + + # Find the role + role = None + + # Check method to determine whether a role matches + def check(role): + return (role.id == roleid) or (searchstr in role.name.lower()) + + # Get list of matching roles + roles = list(filter(check, collection)) + + if len(roles) == 0: + # Nope + role = None + elif len(roles) == 1: + # Select our lucky winner + role = roles[0] + else: + # We have multiple matching roles! + if interactive: + # Interactive prompt with the list of roles, handle `Object`s + role_names = [ + role.name if isinstance(role, discord.Role) else str(role.id) for role in roles + ] + + try: + selected = await ctx.selector( + "`{}` roles found matching `{}`!".format(len(roles), userstr), + role_names, + timeout=60 + ) + except UserCancelled: + raise UserCancelled("User cancelled role selection.") from None + except ResponseTimedOut: + raise ResponseTimedOut("Role selection timed out.") from None + + role = roles[selected] + else: + # Just select the first one + role = roles[0] + + # Handle non-existence of the role + if role is None: + msgstr = "Couldn't find a role matching `{}`!".format(userstr) + if create: + # Inform the user + msg = await ctx.error_reply(msgstr) + if ctx.guild.me.guild_permissions.manage_roles: + # Offer to create it + resp = await ctx.ask("Would you like to create this role?", timeout=30) + if resp: + # They accepted, create the role + # Before creation, check if the role name is too long + if len(userstr) > 100: + await ctx.error_reply("Could not create a role with a name over 100 characters long!") + else: + role = await ctx.guild.create_role( + name=userstr, + reason="Interactive role creation for {} (uid:{})".format(ctx.author, ctx.author.id) + ) + await msg.delete() + await ctx.reply("You have created the role `{}`!".format(userstr)) + + # If we still don't have a role, cancel unless allow_notfound is set + if role is None and not allow_notfound: + raise SafeCancellation + elif not allow_notfound: + raise SafeCancellation(msgstr) + else: + await ctx.error_reply(msgstr) + + return role + + +@Context.util +async def find_channel(ctx, userstr, interactive=False, collection=None, chan_type=None): + """ + Find a guild channel given a partial matching string, + allowing custom channel collections and several behavioural switches. + + Parameters + ---------- + userstr: str + String obtained from a user, expected to partially match a channel in the collection. + The string will be tested against both the id and the name of the channel. + interactive: bool + Whether to offer the user a list of channels to choose from, + or pick the first matching channel. + collection: List(discord.Channel) + Collection of channels to search amongst. + If none, uses the full guild channel list. + chan_type: discord.ChannelType + Type of channel to restrict the collection to. + + Returns + ------- + discord.Channel: + If a valid channel is found. + None: + If no valid channel has been found. + + Raises + ------ + cmdClient.lib.UserCancelled: + If the user cancels interactive channel selection. + cmdClient.lib.ResponseTimedOut: + If the user fails to respond to interactive channel selection within `60` seconds` + """ + # Handle invalid situations and input + if not ctx.guild: + raise InvalidContext("Attempt to use find_channel outside of a guild.") + + if userstr == "": + raise ValueError("User string passed to find_channel was empty.") + + # Create the collection to search from args or guild channels + collection = collection if collection else ctx.guild.channels + if chan_type is not None: + collection = [chan for chan in collection if chan.type == chan_type] + + # If the user input was a number or possible channel mention, extract it + chanid = userstr.strip('<#@&!>') + chanid = int(chanid) if chanid.isdigit() else None + searchstr = userstr.lower() + + # Find the channel + chan = None + + # Check method to determine whether a channel matches + def check(chan): + return (chan.id == chanid) or (searchstr in chan.name.lower()) + + # Get list of matching roles + channels = list(filter(check, collection)) + + if len(channels) == 0: + # Nope + chan = None + elif len(channels) == 1: + # Select our lucky winner + chan = channels[0] + else: + # We have multiple matching channels! + if interactive: + # Interactive prompt with the list of channels + chan_names = [chan.name for chan in channels] + + try: + selected = await ctx.selector( + "`{}` channels found matching `{}`!".format(len(channels), userstr), + chan_names, + timeout=60 + ) + except UserCancelled: + raise UserCancelled("User cancelled channel selection.") from None + except ResponseTimedOut: + raise ResponseTimedOut("Channel selection timed out.") from None + + chan = channels[selected] + else: + # Just select the first one + chan = channels[0] + + if chan is None: + await ctx.error_reply("Couldn't find a channel matching `{}`!".format(userstr)) + + return chan + +@Context.util +async def find_member(ctx, userstr, interactive=False, collection=None, silent=False): + """ + Find a guild member given a partial matching string, + allowing custom member collections. + + Parameters + ---------- + userstr: str + String obtained from a user, expected to partially match a member in the collection. + The string will be tested against both the userid, full user name and user nickname. + interactive: bool + Whether to offer the user a list of members to choose from, + or pick the first matching channel. + collection: List(discord.Member) + Collection of members to search amongst. + If none, uses the full guild member list. + silent: bool + Whether to reply with an error when there are no matches. + + Returns + ------- + discord.Member: + If a valid member is found. + None: + If no valid member has been found. + + Raises + ------ + cmdClient.lib.UserCancelled: + If the user cancels interactive member selection. + cmdClient.lib.ResponseTimedOut: + If the user fails to respond to interactive member selection within `60` seconds` + """ + # Handle invalid situations and input + if not ctx.guild: + raise InvalidContext("Attempt to use find_member outside of a guild.") + + if userstr == "": + raise ValueError("User string passed to find_member was empty.") + + # Create the collection to search from args or guild members + collection = collection if collection else ctx.guild.members + + # If the user input was a number or possible member mention, extract it + userid = userstr.strip('<#@&!>') + userid = int(userid) if userid.isdigit() else None + searchstr = userstr.lower() + + # Find the member + member = None + + # Check method to determine whether a member matches + def check(member): + return ( + member.id == userid + or searchstr in member.display_name.lower() + or searchstr in str(member).lower() + ) + + # Get list of matching roles + members = list(filter(check, collection)) + + if len(members) == 0: + # Nope + member = None + elif len(members) == 1: + # Select our lucky winner + member = members[0] + else: + # We have multiple matching members! + if interactive: + # Interactive prompt with the list of members + member_names = [ + "{} {}".format( + member.nick if member.nick else (member if members.count(member) > 1 + else member.name), + ("<{}>".format(member)) if member.nick else "" + ) for member in members + ] + + try: + selected = await ctx.selector( + "`{}` members found matching `{}`!".format(len(members), userstr), + member_names, + timeout=60 + ) + except UserCancelled: + raise UserCancelled("User cancelled member selection.") from None + except ResponseTimedOut: + raise ResponseTimedOut("Member selection timed out.") from None + + member = members[selected] + else: + # Just select the first one + member = members[0] + + if member is None and not silent: + await ctx.error_reply("Couldn't find a member matching `{}`!".format(userstr)) + + return member