From 1b98b517d24646b19871870767c8eab619efc12f Mon Sep 17 00:00:00 2001 From: Conatum Date: Fri, 11 Nov 2022 08:05:05 +0200 Subject: [PATCH] rewrite: Setting abstract framework. --- bot/settings/__init__.py | 4 + bot/settings/base.py | 163 +++++ bot/settings/data.py | 216 +++++++ bot/settings/groups.py | 5 + bot/settings/mock.py | 13 + bot/settings/setting_types.py | 1065 +++++++++++++++++++++++++++++++++ bot/settings/ui.py | 355 +++++++++++ 7 files changed, 1821 insertions(+) create mode 100644 bot/settings/__init__.py create mode 100644 bot/settings/base.py create mode 100644 bot/settings/data.py create mode 100644 bot/settings/groups.py create mode 100644 bot/settings/mock.py create mode 100644 bot/settings/setting_types.py create mode 100644 bot/settings/ui.py diff --git a/bot/settings/__init__.py b/bot/settings/__init__.py new file mode 100644 index 00000000..71fb05aa --- /dev/null +++ b/bot/settings/__init__.py @@ -0,0 +1,4 @@ +from .data import ModelData +from .base import BaseSetting +from .ui import SettingWidget, InteractiveSetting +from .groups import SettingGroup diff --git a/bot/settings/base.py b/bot/settings/base.py new file mode 100644 index 00000000..86d8095a --- /dev/null +++ b/bot/settings/base.py @@ -0,0 +1,163 @@ +from typing import Generic, TypeVar, Type, Optional, overload + + +""" +Setting metclass? +Parse setting docstring to generate default info? +Or just put it in the decorator we are already using +""" + + +# Typing using Generic[parent_id_type, data_type, value_type] +# value generic, could be Union[?, UNSET] +ParentID = TypeVar('ParentID') +SettingData = TypeVar('SettingData') +SettingValue = TypeVar('SettingValue') + +T = TypeVar('T', bound='BaseSetting') + + +class BaseSetting(Generic[ParentID, SettingData, SettingValue]): + """ + Abstract base class describing a stored configuration setting. + A setting consists of logic to load the setting from storage, + present it in a readable form, understand user entered values, + and write it again in storage. + Additionally, the setting has attributes attached describing + the setting in a user-friendly manner for display purposes. + """ + _default: Optional[SettingData] = None # Default data value for the setting + + def __init__(self, parent_id: ParentID, data: Optional[SettingData], **kwargs): + self.parent_id = parent_id + self._data = data + + # Instance generation + @classmethod + async def get(cls: Type[T], parent_id: ParentID, **kwargs) -> T: + """ + Return a setting instance initialised from the stored value, associated with the given parent id. + """ + data = await cls._reader(parent_id, **kwargs) + return cls(parent_id, data, **kwargs) + + # Main interface + @property + def data(self) -> Optional[SettingData]: + """ + Retrieves the current internal setting data if it is set, otherwise the default data + """ + return self._data if self._data is not None else self.default + + @data.setter + def data(self, new_data: Optional[SettingData]): + """ + Sets the internal raw data. + Does not write the changes. + """ + self._data = new_data + + @property + def default(self) -> Optional[SettingData]: + """ + Retrieves the default value for this setting. + Settings should override this if the default depends on the object id. + """ + return self._default + + @property + def value(self) -> Optional[SettingValue]: + """ + Context-aware object or objects associated with the setting. + """ + return self._data_to_value(self.parent_id, self.data) + + @value.setter + def value(self, new_value: Optional[SettingValue]): + """ + Setter which reads the discord-aware object and converts it to data. + Does not write the new value. + """ + self._data = self._data_from_value(self.parent_id, new_value) + + async def write(self, **kwargs) -> None: + """ + Write current data to the database. + For settings which override this, + ensure you handle deletion of values when internal data is None. + """ + await self._writer(self.parent_id, self._data, **kwargs) + + # Raw converters + @overload + @classmethod + def _data_from_value(cls: Type[T], parent_id: ParentID, value: SettingValue, **kwargs) -> SettingData: + ... + + @overload + @classmethod + def _data_from_value(cls: Type[T], parent_id: ParentID, value: None, **kwargs) -> None: + ... + + @classmethod + def _data_from_value( + cls: Type[T], parent_id: ParentID, value: Optional[SettingValue], **kwargs + ) -> Optional[SettingData]: + """ + Convert a high-level setting value to internal data. + Must be overridden by the setting. + Be aware of UNSET values, these should always pass through as None + to provide an unsetting interface. + """ + raise NotImplementedError + + @overload + @classmethod + def _data_to_value(cls: Type[T], parent_id: ParentID, data: SettingData, **kwargs) -> SettingValue: + ... + + @overload + @classmethod + def _data_to_value(cls: Type[T], parent_id: ParentID, data: None, **kwargs) -> None: + ... + + @classmethod + def _data_to_value( + cls: Type[T], parent_id: ParentID, data: Optional[SettingData], **kwargs + ) -> Optional[SettingValue]: + """ + Convert internal data to high-level setting value. + Must be overriden by the setting. + """ + raise NotImplementedError + + # Database access + @classmethod + async def _reader(cls: Type[T], parent_id: ParentID, **kwargs) -> Optional[SettingData]: + """ + Retrieve the setting data associated with the given parent_id. + May be None if the setting is not set. + Must be overridden by the setting. + """ + raise NotImplementedError + + @classmethod + async def _writer(cls: Type[T], parent_id: ParentID, data: Optional[SettingData], **kwargs) -> None: + """ + Write provided setting data to storage. + Must be overridden by the setting unless the `write` method is overridden. + If the data is None, the setting is UNSET and should be deleted. + """ + raise NotImplementedError + + @classmethod + async def setup(cls, bot): + """ + Initialisation task to be executed during client initialisation. + May be used for e.g. populating a cache or required client setup. + + Main application must execute the initialisation task before the setting is used. + Further, the task must always be executable, if the setting is loaded. + Conditional initialisation should go in the relevant module's init tasks. + """ + return None diff --git a/bot/settings/data.py b/bot/settings/data.py new file mode 100644 index 00000000..a745e499 --- /dev/null +++ b/bot/settings/data.py @@ -0,0 +1,216 @@ +from typing import Type +import json + +from data import RowModel, Table, ORDER + + +class ModelData: + """ + Mixin for settings stored in a single row and column of a Model. + Assumes that the parent_id is the identity key of the Model. + + This does not create a reference to the Row. + """ + # Table storing the desired data + _model: Type[RowModel] + + # Column with the desired data + _column: str + + # Whether to create a row if not found + _create_row = False + + # High level data cache to use, leave as None to disable cache. + _cache = None # Map[id -> value] + + @classmethod + async def _reader(cls, parent_id, use_cache=True, **kwargs): + """ + Read in the requested column associated to the parent id. + """ + if cls._cache is not None and parent_id in cls._cache and use_cache: + return cls._cache[parent_id] + + model = cls._model + if cls._create_row: + row = await model.fetch_or_create(parent_id) + else: + row = await model.fetch(parent_id) + data = row[cls._column] if row else None + + if cls._cache is not None: + cls._cache[parent_id] = data + + return data + + @classmethod + async def _writer(cls, parent_id, data, **kwargs): + """ + Write the provided entry to the table. + This does *not* create the row if it does not exist. + It only updates. + """ + # TODO: Better way of getting the key? + if not isinstance(parent_id, tuple): + parent_id = (parent_id, ) + model = cls._model + rows = await model.table.update_where( + **model._dict_from_id(parent_id) + ).set( + **{cls._column: data} + ) + # If we didn't update any rows, create a new row + if not rows: + await model.table.fetch_or_create(**model._dict_from_id, **{cls._column: data}) + ... + + if cls._cache is not None: + cls._cache[parent_id] = data + + +class ListData: + """ + Mixin for list types implemented on a Table. + Implements a reader and writer. + This assumes the list is the only data stored in the table, + and removes list entries by deleting rows. + """ + # Table storing the setting data + _table_interface: Table + + # Name of the column storing the id + _id_column: str + + # Name of the column storing the data to read + _data_column: str + + # Name of column storing the order index to use, if any. Assumed to be Serial on writing. + _order_column: str + _order_type: ORDER = ORDER.ASC + + # High level data cache to use, set to None to disable cache. + _cache = None # Map[id -> value] + + @classmethod + async def _reader(cls, parent_id, use_cache=True, **kwargs): + """ + Read in all entries associated to the given id. + """ + if cls._cache is not None and parent_id in cls._cache and use_cache: + return cls._cache[parent_id] + + table = cls._table_interface # type: Table + query = table.select_where(**{cls._id_column: parent_id}).select(cls._data_column) + if cls._order_column: + query.order_by(cls._order_column, order=cls._order_type) + + rows = await query + data = [row[cls._data_column] for row in rows] + + if cls._cache is not None: + cls._cache[parent_id] = data + + return data + + @classmethod + async def _writer(cls, id, data, add_only=False, remove_only=False, **kwargs): + """ + Write the provided list to storage. + """ + table = cls._table_interface + conn = await table.connector.get_connection() + with conn.transaction(): + # Handle None input as an empty list + if data is None: + data = [] + + current = await cls._reader(id, use_cache=False, **kwargs) + if not cls._order_column and (add_only or remove_only): + to_insert = [item for item in data if item not in current] if not remove_only else [] + to_remove = data if remove_only else ( + [item for item in current if item not in data] if not add_only else [] + ) + + # Handle required deletions + if to_remove: + params = { + cls._id_column: id, + cls._data_column: to_remove + } + await table.delete_where(**params) + + # Handle required insertions + if to_insert: + columns = (cls._id_column, cls._data_column) + values = [(id, value) for value in to_insert] + await table.insert_many(columns, *values) + + if cls._cache is not None: + new_current = [item for item in current + to_insert if item not in to_remove] + cls._cache[id] = new_current + else: + # Remove all and add all to preserve order + delete_params = {cls._id_column: id} + await table.delete_where(**delete_params) + + if data: + columns = (cls._id_column, cls._data_column) + values = [(id, value) for value in data] + await table.insert_many(columns, *values) + + if cls._cache is not None: + cls._cache[id] = data + + +class KeyValueData: + """ + Mixin for settings implemented in a Key-Value table. + The underlying table should have a Unique constraint on the `(_id_column, _key_column)` pair. + """ + _table_interface: Table + + _id_column: str + + _key_column: str + + _value_column: str + + _key: str + + @classmethod + async def _reader(cls, id, **kwargs): + params = { + cls._id_column: id, + cls._key_column: cls._key + } + + row = await cls._table_interface.select_one_where(**params).select(cls._value_column) + data = row[cls._value_column] if row else None + + if data is not None: + data = json.loads(data) + + return data + + @classmethod + async def _writer(cls, id, data, **kwargs): + params = { + cls._id_column: id, + cls._key_column: cls._key + } + if data is not None: + values = { + cls._value_column: json.dumps(data) + } + rows = await cls._table_interface.update_where(**params).set(**values) + if not rows: + await cls._table_interface.insert_many( + (cls._id_column, cls._key_column, cls._value_column), + (id, cls._key, json.dumps(data)) + ) + else: + await cls._table_interface.delete_where(**params) + + +# class UserInputError(SafeCancellation): +# pass diff --git a/bot/settings/groups.py b/bot/settings/groups.py new file mode 100644 index 00000000..2fa75b41 --- /dev/null +++ b/bot/settings/groups.py @@ -0,0 +1,5 @@ +from discord.ui import View + + +class SettingGroup(View): + ... diff --git a/bot/settings/mock.py b/bot/settings/mock.py new file mode 100644 index 00000000..e80a4c93 --- /dev/null +++ b/bot/settings/mock.py @@ -0,0 +1,13 @@ +import discord +from discord import app_commands + + +class LocalString: + def __init__(self, string): + self.string = string + + def as_string(self): + return self.string + + +_ = LocalString diff --git a/bot/settings/setting_types.py b/bot/settings/setting_types.py new file mode 100644 index 00000000..b538f90e --- /dev/null +++ b/bot/settings/setting_types.py @@ -0,0 +1,1065 @@ +from typing import Optional, Union, TYPE_CHECKING, TypeVar, Generic, Any, TypeAlias, Type +from enum import Enum + +import pytz +import discord +import itertools +from discord import ui +from discord.ui.button import button, Button, ButtonStyle + +from meta.context import context +from utils.lib import strfdur, parse_dur +from meta.errors import UserInputError + +from .base import ParentID +from .ui import InteractiveSetting, SettingWidget + + +if TYPE_CHECKING: + from discord.guild import GuildChannel + + +class StringSetting(InteractiveSetting[ParentID, str, str]): + """ + Setting type mixin describing an arbitrary string type. + + Options + ------- + _maxlen: int + Maximum length of string to accept in `_parse_string`. + Default: 4000 + + _quote: bool + Whether to display the string with backticks. + Default: True + """ + + accepts = "Any text" + + _maxlen: int = 4000 + _quote: bool = True + + @property + def input_formatted(self) -> str: + """ + Return the current data string. + """ + if self._data is not None: + return str(self._data) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Return the provided value string as the data string. + """ + return value + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Return the provided data string as the value string. + """ + return data + + @classmethod + async def _parse_string(cls, parent_id, string: str, **kwargs): + """ + Parse the user input `string` into StringSetting data. + Provides some minor input validation. + Treats an empty string as a `None` value. + """ + if len(string) > cls._maxlen: + raise UserInputError("Provided string is too long! Maximum length: {} characters.".format(cls._maxlen)) + elif len(string) == 0: + return None + else: + return string + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Optionally (see `_quote`) wrap the data string in backticks. + """ + if data: + return "`{}`".format(data) if cls._quote else str(data) + else: + return None + + +CT = TypeVar('CT', 'GuildChannel', 'discord.Object', 'discord.Thread') +MCT = TypeVar('MCT', discord.TextChannel, discord.Thread, discord.VoiceChannel, discord.Object) + + +class ChannelSetting(Generic[ParentID, CT], InteractiveSetting[ParentID, int, CT]): + """ + Setting type mixin describing a Guild Channel. + + Options + ------- + _selector_placeholder: str + Placeholder to use in the Widget selector. + Default: "Select a channel" + + channel_types: list[discord.ChannelType] + List of guild channel types to accept. + Default: [] + """ + accepts = "Enter a channel name or id" + + _selector_placeholder = "Select a Channel" + channel_types: list[discord.ChannelType] = [] + + @classmethod + def _data_from_value(cls, parent_id, value, **kwargs): + """ + Returns the id of the provided channel. + """ + if value is not None: + return value.id + + @classmethod + def _data_to_value(cls, parent_id, data, **kwargs): + """ + Searches for the provided channel id in the current channel cache. + If the channel cannot be found, returns a `discord.Object` instead. + """ + if data is not None: + ctx = context.get() + channel = ctx.bot.get_channel(data) + if channel is None: + channel = discord.Object(id=data) + return channel + + @classmethod + async def _parse_string(cls, parent_id, string: str, **kwargs): + # TODO: Waiting on seeker utils. + ... + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Returns a manually formatted channel mention. + """ + if data: + return "<#{}>".format(data) + else: + return None + + @property + def input_formatted(self) -> str: + """ + Returns the channel name if possible, otherwise the id. + """ + if self._data is not None: + channel = self.value + if channel is not None: + if isinstance(channel, discord.Object): + return str(channel.id) + else: + return f"#{channel.name}" + else: + return "" + else: + return "" + + class Widget(SettingWidget['ChannelSetting']): + def update_children(self): + self.update_child( + self.channel_selector, { + 'channel_types': self.setting.channel_types, + 'placeholder': self.setting._selector_placeholder + } + ) + + def make_exports(self): + return [self.channel_selector] + + @ui.select( + cls=ui.ChannelSelect, + channel_types=[discord.ChannelType.text], + placeholder="Select a Channel", + max_values=1, + min_values=0 + ) + async def channel_selector(self, interaction: discord.Interaction, select: discord.ui.ChannelSelect) -> None: + await interaction.response.defer(thinking=True, ephemeral=True) + if select.values: + channel = select.values[0] + await self.setting.interactive_set(channel.id, interaction) + else: + await self.setting.interactive_set(None, interaction) + + +class VoiceChannelSetting(ChannelSetting): + """ + Setting type mixin representing a discord VoiceChannel. + Implemented as a narrowed `ChannelSetting`. + See `ChannelSetting` for options. + """ + channel_types = [discord.ChannelType.voice] + + +class MessageablelSetting(ChannelSetting): + """ + Setting type mixin representing a discord Messageable guild channel. + Implemented as a narrowed `ChannelSetting`. + See `ChannelSetting` for options. + """ + channel_types = [discord.ChannelType.text, discord.ChannelType.voice, discord.ChannelType.public_thread] + + @classmethod + def _data_to_value(cls, parent_id, data, **kwargs): + """ + Searches for the provided channel id in the current channel cache. + If the channel cannot be found, returns a `discord.PartialMessageable` instead. + """ + if data is not None: + ctx = context.get() + channel = ctx.bot.get_channel(data) + if channel is None: + channel = ctx.bot.get_partial_messageable(data, guild_id=parent_id) + return channel + + +class RoleSetting(InteractiveSetting[ParentID, int, Union[discord.Role, discord.Object]]): + """ + Setting type mixin describing a Guild Role. + + Options + ------- + _selector_placeholder: str + Placeholder to use in the Widget selector. + Default: "Select a Role" + """ + accepts = "Enter a role name or id" + + _selector_placeholder = "Select a Role" + + @classmethod + def _get_guildid(cls, parent_id: int, **kwargs) -> int: + """ + Fetch the current guildid. + Assumes that the guilid is either passed as a kwarg or is the object id. + Should be overridden in other cases. + """ + return kwargs.get('guildid', parent_id) + + @classmethod + def _data_from_value(cls, parent_id, value, **kwargs): + """ + Returns the id of the provided role. + """ + if value is not None: + return value.id + + @classmethod + def _data_to_value(cls, parent_id, data, **kwargs): + """ + Searches for the provided role id in the current channel cache. + If the channel cannot be found, returns a `discord.Object` instead. + """ + if data is not None: + role = None + + guildid = cls._get_guildid(parent_id) + ctx = context.get() + guild = ctx.bot.get_guild(guildid) + if guild is not None: + role = guild.get_role(data) + if role is None: + role = discord.Object(id=data) + return role + + @classmethod + async def _parse_string(cls, parent_id, string: str, **kwargs): + # TODO: Waiting on seeker utils. + ... + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Returns a manually formatted role mention. + """ + if data: + return "<@&{}>".format(data) + else: + return None + + @property + def input_formatted(self) -> str: + """ + Returns the role name if possible, otherwise the id. + """ + if self._data is not None: + role = self.value + if role is not None: + if isinstance(role, discord.Object): + return str(role.id) + else: + return f"@{role.name}" + else: + return "" + else: + return "" + + class Widget(SettingWidget['RoleSetting']): + def update_children(self): + self.update_child( + self.role_selector, + {'placeholder': self.setting._selector_placeholder} + ) + + def make_exports(self): + return [self.role_selector] + + @ui.select( + cls=ui.RoleSelect, + placeholder="Select a Role", + max_values=1, + min_values=0 + ) + async def role_selector(self, interaction: discord.Interaction, select: discord.ui.RoleSelect) -> None: + await interaction.response.defer(thinking=True, ephemeral=True) + if select.values: + role = select.values[0] + await self.setting.interactive_set(role.id, interaction) + else: + await self.setting.interactive_set(None, interaction) + + +class BoolSetting(InteractiveSetting[ParentID, bool, bool]): + """ + Setting type mixin describing a boolean. + + Options + ------- + _truthy: Set + Set of strings that are considered "truthy" in the parser. + Not case sensitive. + Default: {"yes", "true", "on", "enable", "enabled"} + + _falsey: Set + Set of strings that are considered "falsey" in the parser. + Not case sensitive. + Default: {"no", "false", "off", "disable", "disabled"} + + _outputs: tuple[str, str, str] + Strings to represent 'True', 'False', and 'None' values respectively. + Default: {True: "On", False: "Off", None: "Not Set"} + """ + + accepts = "True/False" + + # Values that are accepted as truthy and falsey by the parser + _truthy = {"yes", "true", "on", "enable", "enabled"} + _falsey = {"no", "false", "off", "disable", "disabled"} + + # The user-friendly output strings to use for each value + _outputs = {True: "On", False: "Off", None: "Not Set"} + + # Button labels + _true_button_args: dict[str, Any] = {} + _false_button_args: dict[str, Any] = {} + _reset_button_args: dict[str, Any] = {} + + @property + def input_formatted(self) -> str: + """ + Return the current data string. + """ + if self._data is not None: + output = self._outputs[self._data] + set = (self._falsey, self._truthy)[self._data] + + if output.lower() in set: + return output + else: + return next(iter(set)) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Directly return provided value bool as data bool. + """ + return value + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Directly return provided data bool as value bool. + """ + return data + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Looks up the provided string in the truthy and falsey tables. + """ + _userstr = string.lower() + if not _userstr or _userstr == "none": + return None + if _userstr in cls._truthy: + return True + elif _userstr in cls._falsey: + return False + else: + raise UserInputError("Could not parse `{}` as a boolean.".format(string)) + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Use provided _outputs dictionary to format data. + """ + return cls._outputs[data] + + class Widget(SettingWidget['BoolSetting']): + def update_children(self): + self.update_child(self.true_button, self.setting._true_button_args) + self.update_child(self.false_button, self.setting._false_button_args) + self.update_child(self.reset_button, self.setting._reset_button_args) + self.order_children(self.true_button, self.false_button, self.reset_button) + + def make_exports(self): + return [self.true_button, self.false_button, self.reset_button] + + @button(style=ButtonStyle.secondary, label="On", row=4) + async def true_button(self, interaction: discord.Interaction, button: Button): + await interaction.response.defer(thinking=True, ephemeral=True) + await self.setting.interactive_set(True, interaction) + + @button(style=ButtonStyle.secondary, label="Off", row=4) + async def false_button(self, interaction: discord.Interaction, button: Button): + await interaction.response.defer(thinking=True, ephemeral=True) + await self.setting.interactive_set(False, interaction) + + +class IntegerSetting(InteractiveSetting[ParentID, int, int]): + """ + Setting type mixin describing a ranged integer. + As usual, override `_parse_string` to customise error messages. + + Options + ------- + _min: int + A minimum integer to accept. + Default: -2147483647 + + _max: int + A maximum integer to accept. + Default: 2147483647 + """ + _min = -2147483647 + _max = 2147483647 + + accepts = "An integer" + + @property + def input_formatted(self) -> str: + """ + Return a string representation of the set integer. + """ + if self._data is not None: + return str(self._data) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Directly return value integer as data integer. + """ + return value + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Directly return data integer as value integer. + """ + return data + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Parse the user input into an integer. + """ + if not string: + return None + try: + num = int(string) + except Exception: + raise UserInputError("Couldn't parse provided integer.") from None + + if num > cls._max: + raise UserInputError("Provided integer was too large!") + elif num < cls._min: + raise UserInputError("Provided integer was too small!") + + return num + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Returns the stringified integer in backticks. + """ + if data is not None: + return f"`{data}`" + + +class EmojiSetting(InteractiveSetting[ParentID, str, discord.PartialEmoji]): + """ + Setting type mixin describing an Emoji string. + + Options + ------- + None + """ + + accepts = "Unicode or custom emoji" + + @staticmethod + def _parse_emoji(emojistr): + """ + Converts a provided string into a PartialEmoji. + If the string is badly formatted, returns None. + """ + if ":" in emojistr: + emojistr = emojistr.strip('<>') + splits = emojistr.split(":") + if len(splits) == 3: + animated, name, id = splits + animated = bool(animated) + return discord.PartialEmoji(name=name, animated=animated, id=int(id)) + else: + # TODO: Check whether this is a valid emoji + return discord.PartialEmoji(name=emojistr) + + @property + def input_formatted(self) -> str: + """ + Return the current data string. + """ + if self._data is not None: + return str(self._data) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Stringify the value emoji into a consistent data string. + """ + return str(value) if value is not None else None + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Convert the stored string into an emoji, through parse_emoji. + This may return None if the parsing syntax changes. + """ + return cls._parse_emoji(data) if data is not None else None + + @classmethod + async def _parse_string(cls, parent_id, string: str, **kwargs): + """ + Parse the provided string into a PartialEmoji if possible. + """ + if string: + emoji = cls._parse_emoji(string) + if emoji is None: + raise UserInputError("Could not understand provided emoji!") + return str(emoji) + return None + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Emojis are pretty much self-formatting. Just return the data directly. + """ + return data + + +class GuildIDSetting(InteractiveSetting[ParentID, int, int]): + """ + Setting type mixin describing a guildid. + This acts like a pure integer type, apart from the formatting. + + Options + ------- + """ + accepts = "Any Snowflake ID" + # TODO: Consider autocomplete for guilds the user is in + + @property + def input_formatted(self) -> str: + """ + Return a string representation of the stored snowflake. + """ + if self._data is not None: + return str(self._data) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Directly return value integer as data integer. + """ + return value + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Directly return data integer as value integer. + """ + return data + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Parse the user input into an integer. + """ + if not string: + return None + try: + num = int(string) + except Exception: + raise UserInputError("Couldn't parse provided guildid.") from None + return num + + @classmethod + def _format_data(cls, parent_id: ParentID, data, **kwargs): + """ + Return the stored snowflake as a string. + If the guild is in cache, attach the name as well. + """ + if data is not None: + ctx = context.get() + guild = ctx.bot.get_guild(data) + if guild is not None: + return f"`{data}` ({guild.name})" + else: + return f"`{data}`" + + +TZT: TypeAlias = pytz.BaseTzInfo + + +class TimezoneSetting(InteractiveSetting[ParentID, str, TZT]): + """ + Typed Setting ABC representing timezone information. + """ + # TODO Definitely need autocomplete here + accepts = "A timezone name." + _accepts = ( + "A timezone name from [this list](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) " + "(e.g. `Europe/London`)." + ) + + @property + def input_formatted(self) -> str: + """ + Return a string representation of the stored timezone. + """ + if self._data is not None: + return str(self._data) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Use str to transform the pytz timezone into a string. + """ + if value: + return str(value) + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Use pytz to convert the stored timezone string to a timezone. + """ + if data: + return pytz.timezone(data) + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Parse the user input into an integer. + """ + # TODO: Another selection case. + if not string: + return None + try: + timezone = pytz.timezone(string) + except pytz.exceptions.UnknownTimeZoneError: + timezones = [tz for tz in pytz.all_timezones if string.lower() in tz.lower()] + if len(timezones) == 1: + timezone = timezones[0] + elif timezones: + raise UserInputError("Multiple matching timezones found!") + # result = await ctx.selector( + # "Multiple matching timezones found, please select one.", + # timezones + # ) + # timezone = timezones[result] + else: + raise UserInputError( + "Unknown timezone `{}`. " + "Please provide a TZ name from " + "[this list](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones)".format(string) + ) from None + return timezone + + @classmethod + def _format_data(cls, parent_id: ParentID, data, **kwargs): + """ + Return the stored snowflake as a string. + If the guild is in cache, attach the name as well. + """ + if data is not None: + return f"`{data}`" + + +ET = TypeVar('ET', bound='Enum') + + +class EnumSetting(InteractiveSetting[ParentID, ET, ET]): + """ + Typed InteractiveSetting ABC representing a stored Enum. + The Enum is assumed to be data adapted (e.g. through RegisterEnum). + + The embed of an enum setting should usually be overridden to describe the options. + + The default widget is implemented as a select menu, + although it may also make sense to implement using colour-changing buttons. + + Options + ------- + _enum: Enum + The Enum to act as a setting interface to. + _outputs: dict[Enum, str] + A map of enum items to output strings. + Describes how the enum should be formatted. + _inputs: dict[Enum, str] + A map of accepted input strings (not case sensitive) to enum items. + This should almost always include the strings from `_outputs`. + """ + + _enum: ET + _outputs: dict[ET, str] + _inputs: dict[str, ET] + + accepts = "A valid option." + + @property + def input_formatted(self) -> str: + """ + Return the output string for the current data. + This assumes the output strings are accepted as inputs! + """ + if self._data is not None: + return self._outputs[self._data] + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Return the provided value enum item as the data enum item. + """ + return value + + @classmethod + def _data_to_value(cls, id, data, **kwargs): + """ + Return the provided data enum item as the value enum item. + """ + return data + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Parse the user input into an enum item. + """ + # TODO: Another selection case. + if not string: + return None + string = string.lower() + if string not in cls._inputs: + raise UserInputError("Invalid choice!") + return cls._inputs[string] + + @classmethod + def _format_data(cls, parent_id: ParentID, data, **kwargs): + """ + Format the enum using the provided output map. + """ + if data is not None: + if data not in cls._outputs: + raise ValueError(f"Enum item {data} unmapped.") + return cls._outputs[data] + + +class DurationSetting(InteractiveSetting[ParentID, int, int]): + """ + Typed InteractiveSetting ABC representing a stored duration. + Stored and retrieved as an integer number of seconds. + Shown and set as a "duration string", e.g. "24h 10m 20s". + + Options + ------- + _max: int + Upper limit on the stored duration, in seconds. + Default: 60 * 60 * 24 * 365 + _min: Optional[int] + Lower limit on the stored duration, in seconds. + The duration can never be negative. + _default_multiplier: int + Default multiplier to use to convert the number when it is provided alone. + E.g. 1 for seconds, or 60 for minutes. + Default: 1 + allow_zero: bool + Whether to allow a zero duration. + The duration parser typically returns 0 when no duration is found, + so this may be useful for error checking. + Default: False + _show_days: bool + Whether to show days in the formatted output. + Default: False + """ + + accepts = "A number of days, hours, minutes, and seconds, e.g. `2d 4h 10s`." + + # Set an upper limit on the duration + _max = 60 * 60 * 24 * 365 + _min = None + + # Default multiplier when the number is provided alone + # 1 for seconds, 60 from minutes, etc + _default_multiplier = None + + # Whether to allow empty durations + # This is particularly useful since the duration parser will return 0 for most non-duration strings + allow_zero = False + + # Whether to show days on the output + _show_days = False + + @property + def input_formatted(self) -> str: + """ + Return the formatted duration, which is accepted as input. + """ + if self._data is not None: + return strfdur(self._data, short=True, show_days=self._show_days) + else: + return "" + + @classmethod + def _data_from_value(cls, parent_id: ParentID, value, **kwargs): + """ + Passthrough the provided duration in seconds. + """ + return value + + @classmethod + def _data_to_value(cls, parent_id: ParentID, data, **kwargs): + """ + Passthrough the provided duration in seconds. + """ + return data + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Parse the user input into a duration. + """ + if not string: + return None + + if cls._default_multiplier and string.isdigit(): + num = int(string) * cls._default_multiplier + else: + num = parse_dur(string) + + if num == 0 and not cls.allow_zero: + raise UserInputError( + "The provided duration cannot be `0`! (Please enter in the format `1d 2h 3m 4s`.)" + ) + + if cls._max is not None and num > cls._max: + raise UserInputError( + "Duration cannot be longer than `{}`!".format( + strfdur(cls._max, short=False, show_days=cls._show_days) + ) + ) + if cls._min is not None and num < cls._min: + raise UserInputError( + "Duration cannot be shorter than `{}`!".format( + strfdur(cls._min, short=False, show_days=cls._show_days) + ) + ) + + return num + + @classmethod + def _format_data(cls, parent_id: ParentID, data, **kwargs): + """ + Format the enum using the provided output map. + """ + if data is not None: + return "`{}`".format(strfdur(data, short=False, show_days=cls._show_days)) + + +class MessageSetting(StringSetting): + """ + Typed Setting ABC representing a message sent to Discord. + + Placeholder implemented as a StringSetting until Context is built. + """ + ... + + +class ListSetting: + """ + Mixin to implement a setting type representing a list of existing settings. + + Does not implement a Widget, + since arbitrary combinations of setting widgets are undefined. + """ + # Base setting type to make the list from + _setting = None # type: Type[InteractiveSetting] + + # Whether 'None' values are filtered out of the data when creating values + _allow_null_values = False # type: bool + + # Whether duplicate data values should be filtered out + _force_unique = False + + @classmethod + def _data_from_value(cls, parent_id: ParentID, values, **kwargs): + """ + Returns the setting type data for each value in the value list + """ + if values is None: + # Special behaviour here, store an empty list instead of None + return [] + else: + return [cls._setting._data_from_value(parent_id, value) for value in values] + + @classmethod + def _data_to_value(cls, parent_id: ParentID, data, **kwargs): + """ + Returns the setting type value for each entry in the data list + """ + if data is None: + return [] + else: + values = [cls._setting._data_to_value(parent_id, entry) for entry in data] + + # Filter out null values if required + if not cls._allow_null_values: + values = [value for value in values if value is not None] + return values + + @classmethod + async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs): + """ + Splits the user string across `,` to break up the list. + """ + if not string: + return [] + else: + data = [] + items = (item.strip() for item in string.split(',')) + items = (item for item in items if item) + data = [await cls._setting._parse_string(parent_id, item, **kwargs) for item in items] + + if cls._force_unique: + data = list(set(data)) + return data + + @classmethod + def _format_data(cls, parent_id: ParentID, data, **kwargs): + """ + Format the list by adding `,` between each formatted item + """ + if not data: + return None + else: + formatted_items = [] + for item in data: + formatted_item = cls._setting._format_data(id, item) + if formatted_item is not None: + formatted_items.append(formatted_item) + return ", ".join(formatted_items) + + @property + def input_formatted(self): + """ + Format the list by adding `,` between each input formatted item. + """ + if self._data: + formatted_items = [] + for item in self._data: + formatted_item = self._setting(self.parent_id, item).input_formatted + if formatted_item: + formatted_items.append(formatted_item) + return ", ".join(formatted_items) + else: + return "" + + +class ChannelListSetting(ListSetting, InteractiveSetting): + """ + List of channels + """ + accepts = ( + "Comma separated list of channel mentions/ids/names. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove channels." + ) + _setting = ChannelSetting + + +class RoleListSetting(InteractiveSetting, ListSetting): + """ + List of roles + """ + accepts = ( + "Comma separated list of role mentions/ids/names. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove roles." + ) + _setting = RoleSetting + + @property + def members(self): + roles = self.value + return list(set(itertools.chain(*(role.members for role in roles)))) + + +class StringListSetting(InteractiveSetting, ListSetting): + """ + List of strings + """ + accepts = ( + "Comma separated list of strings. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove strings." + ) + _setting = StringSetting + + +class GuildIDListSetting(InteractiveSetting, ListSetting): + """ + List of guildids. + """ + accepts = ( + "Comma separated list of guild ids. Use `None` to unset. " + "Write `--add` or `--remove` to add or remove ids. " + "The provided ids are not verified in any way." + ) + + _setting = GuildIDSetting diff --git a/bot/settings/ui.py b/bot/settings/ui.py new file mode 100644 index 00000000..f64625a6 --- /dev/null +++ b/bot/settings/ui.py @@ -0,0 +1,355 @@ +from typing import Optional, Callable, Any, Dict, Coroutine, Generic, TypeVar, List +import asyncio +from contextvars import copy_context + +import discord +from discord import ui +from discord.ui.button import ButtonStyle, Button, button +from discord.ui.modal import Modal +from discord.ui.text_input import TextInput + +from utils.lib import prop_tabulate, recover_context +from utils.ui import FastModal +from meta.config import conf + +from .base import BaseSetting, ParentID, SettingData, SettingValue + + +ST = TypeVar('ST', bound='InteractiveSetting') + + +class SettingModal(FastModal): + input_field: TextInput = TextInput(label="Edit Setting") + + def update_field(self, new_field): + self.remove_item(self.input_field) + self.add_item(new_field) + self.input_field = new_field + + +class SettingWidget(Generic[ST], ui.View): + # TODO: Permission restrictions and callback! + # Context variables for permitted user(s)? Subclass ui.View with PermittedView? + # Don't need to descend permissions to Modal + # Maybe combine with timeout manager + + def __init__(self, setting: ST, auto_write=True, **kwargs): + self.setting = setting + self.update_children() + super().__init__(**kwargs) + self.auto_write = auto_write + + self._interaction: Optional[discord.Interaction] = None + self._modal: Optional[SettingModal] = None + self._exports: List[ui.Item] = self.make_exports() + + self._context = copy_context() + + def update_children(self): + """ + Method called before base View initialisation. + Allows updating the children components (usually explicitly defined callbacks), + before Item instantiation. + """ + pass + + def order_children(self, *children): + """ + Helper method to set and order the children using bound methods. + """ + child_map = {child.__name__: child for child in self.__view_children_items__} + self.__view_children_items__ = [child_map[child.__name__] for child in children] + + def update_child(self, child, new_args): + args = getattr(child, '__discord_ui_model_kwargs__') + args |= new_args + + def make_exports(self): + """ + Called post-instantiation to populate self._exports. + """ + return self.children + + def refresh(self): + """ + Update widget components from current setting data, if applicable. + E.g. to update the default entry in a select list after a choice has been made, + or update button colours. + This does not trigger a discord ui update, + that is the responsibility of the interaction handler. + """ + pass + + async def show(self, interaction: discord.Interaction, key: Any = None, override=False, **kwargs): + """ + Complete standard setting widget UI flow for this setting. + The SettingWidget components may be attached to other messages as needed, + and they may be triggered individually, + but this coroutine defines the standard interface. + Intended for use by any interaction which wants to "open the setting". + + Extra keyword arguments are passed directly to the interaction reply (for e.g. ephemeral). + """ + if key is None: + # By default, only have one widget listener per interaction. + key = ('widget', interaction.id) + + # If there is already a widget listening on this key, respect override + if self.setting.get_listener(key) and not override: + # Refuse to spawn another widget + return + + async def update_callback(new_data): + self.setting.data = new_data + await interaction.edit_original_response(embed=self.setting.embed, view=self, **kwargs) + + self.setting.register_callback(key)(update_callback) + await interaction.response.send_message(embed=self.setting.embed, view=self, **kwargs) + await self.wait() + try: + # Try and detach the view, since we aren't handling events anymore. + await interaction.edit_original_response(view=None) + except discord.HTTPException: + pass + self.setting.deregister_callback(key) + + def attach(self, group_view: ui.View): + """ + Attach this setting widget to a view representing several settings. + """ + for item in self._exports: + group_view.add_item(item) + + @button(style=ButtonStyle.secondary, label="Edit", row=4) + async def edit_button(self, interaction: discord.Interaction, button: ui.Button): + """ + Spawn a simple edit modal, + populated with `setting.input_field`. + """ + recover_context(self._context) + # Spawn the setting modal + await interaction.response.send_modal(self.modal) + + @button(style=ButtonStyle.danger, label="Reset", row=4) + async def reset_button(self, interaction: discord.Interaction, button: Button): + recover_context(self._context) + await interaction.response.defer(thinking=True, ephemeral=True) + await self.setting.interactive_set(None, interaction) + + @property + def modal(self) -> Modal: + """ + Build a Modal dialogue for updating the setting. + Refreshes (and re-attaches) the input field each time this is called. + """ + if self._modal is not None: + self._modal.update_field(self.setting.input_field) + return self._modal + + # TODO: Attach shared timeouts to the modal + self._modal = modal = SettingModal( + title=f"Edit {self.setting.display_name}", + ) + modal.update_field(self.setting.input_field) + + @modal.submit_callback() + async def edit_submit(interaction: discord.Interaction): + # TODO: Catch and handle UserInputError + await interaction.response.defer(thinking=True, ephemeral=True) + data = await self.setting._parse_string(self.setting.parent_id, modal.input_field.value) + await self.setting.interactive_set(data, interaction) + + return modal + + +class InteractiveSetting(BaseSetting[ParentID, SettingData, SettingValue]): + __slots__ = ('_widget',) + + # Configuration interface descriptions + display_name: str # User readable name of the setting + desc: str # User readable brief description of the setting + long_desc: str # User readable long description of the setting + accepts: str # User readable description of the acceptable values + + Widget = SettingWidget + + # A list of callback coroutines to call when the setting updates + # This can be used globally to refresh state when the setting updates, + # Or locallly to e.g. refresh an active widget. + # The callbacks are called on write, so they may be bypassed by direct use of _writer! + _listeners_: Dict[Any, Callable[[Optional[SettingData]], Coroutine[Any, Any, None]]] = {} + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._widget: Optional[SettingWidget] = None + + async def write(self, **kwargs) -> None: + await super().write(**kwargs) + for listener in self._listeners_.values(): + asyncio.create_task(listener(self.data)) + + def get_listener(self, key): + return self._listeners_.get(key, None) + + @classmethod + def register_callback(cls, name=None): + def wrapped(coro): + cls._listeners_[name or coro.__name__] = coro + return coro + return wrapped + + @classmethod + def deregister_callback(cls, name): + cls._listeners_.pop(name, None) + + @property + def update_message(self): + """ + Response message sent when the setting has successfully been updated. + Should generally be one line. + """ + if self.data is None: + return "Setting reset!" + else: + return f"Setting Updated! New value: {self.formatted}" + + async def update_response(self, interaction: discord.Interaction, **kwargs): + """ + Respond to an interaction which triggered a setting update. + Usually just wraps `update_message` in an embed and sends it back. + Passes any extra `kwargs` to the message creation method. + """ + embed = discord.Embed( + description=f"{str(conf.emojis.tick)} {self.update_message}", + colour=discord.Color.green() + ) + if interaction.response.is_done(): + await interaction.edit_original_response(embed=embed, **kwargs) + else: + await interaction.response.send_message(embed=embed, **kwargs) + + async def interactive_set(self, new_data: Optional[SettingData], interaction: discord.Interaction, **kwargs): + self.data = new_data + await self.write() + await self.update_response(interaction, **kwargs) + + @property + def embed_field(self): + """ + Returns a {name, value} pair for use in an Embed field. + """ + name = self.display_name + value = f"{self.long_dec}\n{self.desc_table}" + return {'name': name, 'value': value} + + @property + def embed(self): + """ + Returns a full embed describing this setting. + """ + embed = discord.Embed( + title="Configuration options for `{}`".format(self.display_name), + ) + embed.description = "{}\n{}".format(self.long_desc.format(self=self), self.desc_table) + return embed + + @property + def desc_table(self): + fields = ("Current value", "Default value") + values = (self.formatted or "Not Set", + self._format_data(self.parent_id, self.default) or "None") + return prop_tabulate(fields, values) + + @property + def input_field(self) -> TextInput: + """ + TextInput field used for string-based setting modification. + May be added to external modal for grouped setting editing. + This property is not persistent, and creates a new field each time. + """ + return TextInput( + label=self.display_name, + placeholder=self.accepts, + default=self.input_formatted, + required=False + ) + + @property + def widget(self): + """ + Returns the Discord UI View associated with the current setting. + """ + if self._widget is None: + self._widget = self.Widget(self) + return self._widget + + @classmethod + def set_widget(cls, WidgetCls): + """ + Convenience decorator to create the widget class for this setting. + """ + cls.Widget = WidgetCls + return WidgetCls + + @property + def formatted(self): + """ + Default user-readable form of the setting. + Should be a short single line. + """ + return self._format_data(self.parent_id, self.data) + + @property + def input_formatted(self) -> str: + """ + Format the current value as a default value for an input field. + Returned string must be acceptable through parse_string. + Does not take into account defaults. + """ + if self._data is not None: + return str(self._data) + else: + return "" + + @property + def summary(self): + """ + Formatted summary of the data. + May be implemented in `_format_data(..., summary=True, ...)` or overidden. + """ + return self._format_data(self.parent_id, self.data, summary=True) + + @classmethod + async def from_string(cls, parent_id, userstr: str, **kwargs): + """ + Return a setting instance initialised from a parsed user string. + """ + data = await cls._parse_string(parent_id, userstr, **kwargs) + return cls(parent_id, data, **kwargs) + + @classmethod + async def _parse_string(cls, parent_id, string: str, **kwargs) -> Optional[SettingData]: + """ + Parse user provided string (usually from a TextInput) into raw setting data. + Must be overriden by the setting if the setting is user-configurable. + Returns None if the setting was unset. + """ + raise NotImplementedError + + @classmethod + def _format_data(cls, parent_id, data, **kwargs): + """ + Convert raw setting data into a formatted user-readable string, + representing the current value. + """ + raise NotImplementedError + + +""" +command callback for set command? +autocomplete for set command? + +Might be better in a ConfigSetting subclass. +But also mix into the base setting types. +"""