rewrite: Setting abstract framework.

This commit is contained in:
2022-11-11 08:05:05 +02:00
parent 7249e25975
commit 1b98b517d2
7 changed files with 1821 additions and 0 deletions

4
bot/settings/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .data import ModelData
from .base import BaseSetting
from .ui import SettingWidget, InteractiveSetting
from .groups import SettingGroup

163
bot/settings/base.py Normal file
View File

@@ -0,0 +1,163 @@
from typing import Generic, TypeVar, Type, Optional, overload
"""
Setting metclass?
Parse setting docstring to generate default info?
Or just put it in the decorator we are already using
"""
# Typing using Generic[parent_id_type, data_type, value_type]
# value generic, could be Union[?, UNSET]
ParentID = TypeVar('ParentID')
SettingData = TypeVar('SettingData')
SettingValue = TypeVar('SettingValue')
T = TypeVar('T', bound='BaseSetting')
class BaseSetting(Generic[ParentID, SettingData, SettingValue]):
"""
Abstract base class describing a stored configuration setting.
A setting consists of logic to load the setting from storage,
present it in a readable form, understand user entered values,
and write it again in storage.
Additionally, the setting has attributes attached describing
the setting in a user-friendly manner for display purposes.
"""
_default: Optional[SettingData] = None # Default data value for the setting
def __init__(self, parent_id: ParentID, data: Optional[SettingData], **kwargs):
self.parent_id = parent_id
self._data = data
# Instance generation
@classmethod
async def get(cls: Type[T], parent_id: ParentID, **kwargs) -> T:
"""
Return a setting instance initialised from the stored value, associated with the given parent id.
"""
data = await cls._reader(parent_id, **kwargs)
return cls(parent_id, data, **kwargs)
# Main interface
@property
def data(self) -> Optional[SettingData]:
"""
Retrieves the current internal setting data if it is set, otherwise the default data
"""
return self._data if self._data is not None else self.default
@data.setter
def data(self, new_data: Optional[SettingData]):
"""
Sets the internal raw data.
Does not write the changes.
"""
self._data = new_data
@property
def default(self) -> Optional[SettingData]:
"""
Retrieves the default value for this setting.
Settings should override this if the default depends on the object id.
"""
return self._default
@property
def value(self) -> Optional[SettingValue]:
"""
Context-aware object or objects associated with the setting.
"""
return self._data_to_value(self.parent_id, self.data)
@value.setter
def value(self, new_value: Optional[SettingValue]):
"""
Setter which reads the discord-aware object and converts it to data.
Does not write the new value.
"""
self._data = self._data_from_value(self.parent_id, new_value)
async def write(self, **kwargs) -> None:
"""
Write current data to the database.
For settings which override this,
ensure you handle deletion of values when internal data is None.
"""
await self._writer(self.parent_id, self._data, **kwargs)
# Raw converters
@overload
@classmethod
def _data_from_value(cls: Type[T], parent_id: ParentID, value: SettingValue, **kwargs) -> SettingData:
...
@overload
@classmethod
def _data_from_value(cls: Type[T], parent_id: ParentID, value: None, **kwargs) -> None:
...
@classmethod
def _data_from_value(
cls: Type[T], parent_id: ParentID, value: Optional[SettingValue], **kwargs
) -> Optional[SettingData]:
"""
Convert a high-level setting value to internal data.
Must be overridden by the setting.
Be aware of UNSET values, these should always pass through as None
to provide an unsetting interface.
"""
raise NotImplementedError
@overload
@classmethod
def _data_to_value(cls: Type[T], parent_id: ParentID, data: SettingData, **kwargs) -> SettingValue:
...
@overload
@classmethod
def _data_to_value(cls: Type[T], parent_id: ParentID, data: None, **kwargs) -> None:
...
@classmethod
def _data_to_value(
cls: Type[T], parent_id: ParentID, data: Optional[SettingData], **kwargs
) -> Optional[SettingValue]:
"""
Convert internal data to high-level setting value.
Must be overriden by the setting.
"""
raise NotImplementedError
# Database access
@classmethod
async def _reader(cls: Type[T], parent_id: ParentID, **kwargs) -> Optional[SettingData]:
"""
Retrieve the setting data associated with the given parent_id.
May be None if the setting is not set.
Must be overridden by the setting.
"""
raise NotImplementedError
@classmethod
async def _writer(cls: Type[T], parent_id: ParentID, data: Optional[SettingData], **kwargs) -> None:
"""
Write provided setting data to storage.
Must be overridden by the setting unless the `write` method is overridden.
If the data is None, the setting is UNSET and should be deleted.
"""
raise NotImplementedError
@classmethod
async def setup(cls, bot):
"""
Initialisation task to be executed during client initialisation.
May be used for e.g. populating a cache or required client setup.
Main application must execute the initialisation task before the setting is used.
Further, the task must always be executable, if the setting is loaded.
Conditional initialisation should go in the relevant module's init tasks.
"""
return None

216
bot/settings/data.py Normal file
View File

@@ -0,0 +1,216 @@
from typing import Type
import json
from data import RowModel, Table, ORDER
class ModelData:
"""
Mixin for settings stored in a single row and column of a Model.
Assumes that the parent_id is the identity key of the Model.
This does not create a reference to the Row.
"""
# Table storing the desired data
_model: Type[RowModel]
# Column with the desired data
_column: str
# Whether to create a row if not found
_create_row = False
# High level data cache to use, leave as None to disable cache.
_cache = None # Map[id -> value]
@classmethod
async def _reader(cls, parent_id, use_cache=True, **kwargs):
"""
Read in the requested column associated to the parent id.
"""
if cls._cache is not None and parent_id in cls._cache and use_cache:
return cls._cache[parent_id]
model = cls._model
if cls._create_row:
row = await model.fetch_or_create(parent_id)
else:
row = await model.fetch(parent_id)
data = row[cls._column] if row else None
if cls._cache is not None:
cls._cache[parent_id] = data
return data
@classmethod
async def _writer(cls, parent_id, data, **kwargs):
"""
Write the provided entry to the table.
This does *not* create the row if it does not exist.
It only updates.
"""
# TODO: Better way of getting the key?
if not isinstance(parent_id, tuple):
parent_id = (parent_id, )
model = cls._model
rows = await model.table.update_where(
**model._dict_from_id(parent_id)
).set(
**{cls._column: data}
)
# If we didn't update any rows, create a new row
if not rows:
await model.table.fetch_or_create(**model._dict_from_id, **{cls._column: data})
...
if cls._cache is not None:
cls._cache[parent_id] = data
class ListData:
"""
Mixin for list types implemented on a Table.
Implements a reader and writer.
This assumes the list is the only data stored in the table,
and removes list entries by deleting rows.
"""
# Table storing the setting data
_table_interface: Table
# Name of the column storing the id
_id_column: str
# Name of the column storing the data to read
_data_column: str
# Name of column storing the order index to use, if any. Assumed to be Serial on writing.
_order_column: str
_order_type: ORDER = ORDER.ASC
# High level data cache to use, set to None to disable cache.
_cache = None # Map[id -> value]
@classmethod
async def _reader(cls, parent_id, use_cache=True, **kwargs):
"""
Read in all entries associated to the given id.
"""
if cls._cache is not None and parent_id in cls._cache and use_cache:
return cls._cache[parent_id]
table = cls._table_interface # type: Table
query = table.select_where(**{cls._id_column: parent_id}).select(cls._data_column)
if cls._order_column:
query.order_by(cls._order_column, order=cls._order_type)
rows = await query
data = [row[cls._data_column] for row in rows]
if cls._cache is not None:
cls._cache[parent_id] = data
return data
@classmethod
async def _writer(cls, id, data, add_only=False, remove_only=False, **kwargs):
"""
Write the provided list to storage.
"""
table = cls._table_interface
conn = await table.connector.get_connection()
with conn.transaction():
# Handle None input as an empty list
if data is None:
data = []
current = await cls._reader(id, use_cache=False, **kwargs)
if not cls._order_column and (add_only or remove_only):
to_insert = [item for item in data if item not in current] if not remove_only else []
to_remove = data if remove_only else (
[item for item in current if item not in data] if not add_only else []
)
# Handle required deletions
if to_remove:
params = {
cls._id_column: id,
cls._data_column: to_remove
}
await table.delete_where(**params)
# Handle required insertions
if to_insert:
columns = (cls._id_column, cls._data_column)
values = [(id, value) for value in to_insert]
await table.insert_many(columns, *values)
if cls._cache is not None:
new_current = [item for item in current + to_insert if item not in to_remove]
cls._cache[id] = new_current
else:
# Remove all and add all to preserve order
delete_params = {cls._id_column: id}
await table.delete_where(**delete_params)
if data:
columns = (cls._id_column, cls._data_column)
values = [(id, value) for value in data]
await table.insert_many(columns, *values)
if cls._cache is not None:
cls._cache[id] = data
class KeyValueData:
"""
Mixin for settings implemented in a Key-Value table.
The underlying table should have a Unique constraint on the `(_id_column, _key_column)` pair.
"""
_table_interface: Table
_id_column: str
_key_column: str
_value_column: str
_key: str
@classmethod
async def _reader(cls, id, **kwargs):
params = {
cls._id_column: id,
cls._key_column: cls._key
}
row = await cls._table_interface.select_one_where(**params).select(cls._value_column)
data = row[cls._value_column] if row else None
if data is not None:
data = json.loads(data)
return data
@classmethod
async def _writer(cls, id, data, **kwargs):
params = {
cls._id_column: id,
cls._key_column: cls._key
}
if data is not None:
values = {
cls._value_column: json.dumps(data)
}
rows = await cls._table_interface.update_where(**params).set(**values)
if not rows:
await cls._table_interface.insert_many(
(cls._id_column, cls._key_column, cls._value_column),
(id, cls._key, json.dumps(data))
)
else:
await cls._table_interface.delete_where(**params)
# class UserInputError(SafeCancellation):
# pass

5
bot/settings/groups.py Normal file
View File

@@ -0,0 +1,5 @@
from discord.ui import View
class SettingGroup(View):
...

13
bot/settings/mock.py Normal file
View File

@@ -0,0 +1,13 @@
import discord
from discord import app_commands
class LocalString:
def __init__(self, string):
self.string = string
def as_string(self):
return self.string
_ = LocalString

File diff suppressed because it is too large Load Diff

355
bot/settings/ui.py Normal file
View File

@@ -0,0 +1,355 @@
from typing import Optional, Callable, Any, Dict, Coroutine, Generic, TypeVar, List
import asyncio
from contextvars import copy_context
import discord
from discord import ui
from discord.ui.button import ButtonStyle, Button, button
from discord.ui.modal import Modal
from discord.ui.text_input import TextInput
from utils.lib import prop_tabulate, recover_context
from utils.ui import FastModal
from meta.config import conf
from .base import BaseSetting, ParentID, SettingData, SettingValue
ST = TypeVar('ST', bound='InteractiveSetting')
class SettingModal(FastModal):
input_field: TextInput = TextInput(label="Edit Setting")
def update_field(self, new_field):
self.remove_item(self.input_field)
self.add_item(new_field)
self.input_field = new_field
class SettingWidget(Generic[ST], ui.View):
# TODO: Permission restrictions and callback!
# Context variables for permitted user(s)? Subclass ui.View with PermittedView?
# Don't need to descend permissions to Modal
# Maybe combine with timeout manager
def __init__(self, setting: ST, auto_write=True, **kwargs):
self.setting = setting
self.update_children()
super().__init__(**kwargs)
self.auto_write = auto_write
self._interaction: Optional[discord.Interaction] = None
self._modal: Optional[SettingModal] = None
self._exports: List[ui.Item] = self.make_exports()
self._context = copy_context()
def update_children(self):
"""
Method called before base View initialisation.
Allows updating the children components (usually explicitly defined callbacks),
before Item instantiation.
"""
pass
def order_children(self, *children):
"""
Helper method to set and order the children using bound methods.
"""
child_map = {child.__name__: child for child in self.__view_children_items__}
self.__view_children_items__ = [child_map[child.__name__] for child in children]
def update_child(self, child, new_args):
args = getattr(child, '__discord_ui_model_kwargs__')
args |= new_args
def make_exports(self):
"""
Called post-instantiation to populate self._exports.
"""
return self.children
def refresh(self):
"""
Update widget components from current setting data, if applicable.
E.g. to update the default entry in a select list after a choice has been made,
or update button colours.
This does not trigger a discord ui update,
that is the responsibility of the interaction handler.
"""
pass
async def show(self, interaction: discord.Interaction, key: Any = None, override=False, **kwargs):
"""
Complete standard setting widget UI flow for this setting.
The SettingWidget components may be attached to other messages as needed,
and they may be triggered individually,
but this coroutine defines the standard interface.
Intended for use by any interaction which wants to "open the setting".
Extra keyword arguments are passed directly to the interaction reply (for e.g. ephemeral).
"""
if key is None:
# By default, only have one widget listener per interaction.
key = ('widget', interaction.id)
# If there is already a widget listening on this key, respect override
if self.setting.get_listener(key) and not override:
# Refuse to spawn another widget
return
async def update_callback(new_data):
self.setting.data = new_data
await interaction.edit_original_response(embed=self.setting.embed, view=self, **kwargs)
self.setting.register_callback(key)(update_callback)
await interaction.response.send_message(embed=self.setting.embed, view=self, **kwargs)
await self.wait()
try:
# Try and detach the view, since we aren't handling events anymore.
await interaction.edit_original_response(view=None)
except discord.HTTPException:
pass
self.setting.deregister_callback(key)
def attach(self, group_view: ui.View):
"""
Attach this setting widget to a view representing several settings.
"""
for item in self._exports:
group_view.add_item(item)
@button(style=ButtonStyle.secondary, label="Edit", row=4)
async def edit_button(self, interaction: discord.Interaction, button: ui.Button):
"""
Spawn a simple edit modal,
populated with `setting.input_field`.
"""
recover_context(self._context)
# Spawn the setting modal
await interaction.response.send_modal(self.modal)
@button(style=ButtonStyle.danger, label="Reset", row=4)
async def reset_button(self, interaction: discord.Interaction, button: Button):
recover_context(self._context)
await interaction.response.defer(thinking=True, ephemeral=True)
await self.setting.interactive_set(None, interaction)
@property
def modal(self) -> Modal:
"""
Build a Modal dialogue for updating the setting.
Refreshes (and re-attaches) the input field each time this is called.
"""
if self._modal is not None:
self._modal.update_field(self.setting.input_field)
return self._modal
# TODO: Attach shared timeouts to the modal
self._modal = modal = SettingModal(
title=f"Edit {self.setting.display_name}",
)
modal.update_field(self.setting.input_field)
@modal.submit_callback()
async def edit_submit(interaction: discord.Interaction):
# TODO: Catch and handle UserInputError
await interaction.response.defer(thinking=True, ephemeral=True)
data = await self.setting._parse_string(self.setting.parent_id, modal.input_field.value)
await self.setting.interactive_set(data, interaction)
return modal
class InteractiveSetting(BaseSetting[ParentID, SettingData, SettingValue]):
__slots__ = ('_widget',)
# Configuration interface descriptions
display_name: str # User readable name of the setting
desc: str # User readable brief description of the setting
long_desc: str # User readable long description of the setting
accepts: str # User readable description of the acceptable values
Widget = SettingWidget
# A list of callback coroutines to call when the setting updates
# This can be used globally to refresh state when the setting updates,
# Or locallly to e.g. refresh an active widget.
# The callbacks are called on write, so they may be bypassed by direct use of _writer!
_listeners_: Dict[Any, Callable[[Optional[SettingData]], Coroutine[Any, Any, None]]] = {}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._widget: Optional[SettingWidget] = None
async def write(self, **kwargs) -> None:
await super().write(**kwargs)
for listener in self._listeners_.values():
asyncio.create_task(listener(self.data))
def get_listener(self, key):
return self._listeners_.get(key, None)
@classmethod
def register_callback(cls, name=None):
def wrapped(coro):
cls._listeners_[name or coro.__name__] = coro
return coro
return wrapped
@classmethod
def deregister_callback(cls, name):
cls._listeners_.pop(name, None)
@property
def update_message(self):
"""
Response message sent when the setting has successfully been updated.
Should generally be one line.
"""
if self.data is None:
return "Setting reset!"
else:
return f"Setting Updated! New value: {self.formatted}"
async def update_response(self, interaction: discord.Interaction, **kwargs):
"""
Respond to an interaction which triggered a setting update.
Usually just wraps `update_message` in an embed and sends it back.
Passes any extra `kwargs` to the message creation method.
"""
embed = discord.Embed(
description=f"{str(conf.emojis.tick)} {self.update_message}",
colour=discord.Color.green()
)
if interaction.response.is_done():
await interaction.edit_original_response(embed=embed, **kwargs)
else:
await interaction.response.send_message(embed=embed, **kwargs)
async def interactive_set(self, new_data: Optional[SettingData], interaction: discord.Interaction, **kwargs):
self.data = new_data
await self.write()
await self.update_response(interaction, **kwargs)
@property
def embed_field(self):
"""
Returns a {name, value} pair for use in an Embed field.
"""
name = self.display_name
value = f"{self.long_dec}\n{self.desc_table}"
return {'name': name, 'value': value}
@property
def embed(self):
"""
Returns a full embed describing this setting.
"""
embed = discord.Embed(
title="Configuration options for `{}`".format(self.display_name),
)
embed.description = "{}\n{}".format(self.long_desc.format(self=self), self.desc_table)
return embed
@property
def desc_table(self):
fields = ("Current value", "Default value")
values = (self.formatted or "Not Set",
self._format_data(self.parent_id, self.default) or "None")
return prop_tabulate(fields, values)
@property
def input_field(self) -> TextInput:
"""
TextInput field used for string-based setting modification.
May be added to external modal for grouped setting editing.
This property is not persistent, and creates a new field each time.
"""
return TextInput(
label=self.display_name,
placeholder=self.accepts,
default=self.input_formatted,
required=False
)
@property
def widget(self):
"""
Returns the Discord UI View associated with the current setting.
"""
if self._widget is None:
self._widget = self.Widget(self)
return self._widget
@classmethod
def set_widget(cls, WidgetCls):
"""
Convenience decorator to create the widget class for this setting.
"""
cls.Widget = WidgetCls
return WidgetCls
@property
def formatted(self):
"""
Default user-readable form of the setting.
Should be a short single line.
"""
return self._format_data(self.parent_id, self.data)
@property
def input_formatted(self) -> str:
"""
Format the current value as a default value for an input field.
Returned string must be acceptable through parse_string.
Does not take into account defaults.
"""
if self._data is not None:
return str(self._data)
else:
return ""
@property
def summary(self):
"""
Formatted summary of the data.
May be implemented in `_format_data(..., summary=True, ...)` or overidden.
"""
return self._format_data(self.parent_id, self.data, summary=True)
@classmethod
async def from_string(cls, parent_id, userstr: str, **kwargs):
"""
Return a setting instance initialised from a parsed user string.
"""
data = await cls._parse_string(parent_id, userstr, **kwargs)
return cls(parent_id, data, **kwargs)
@classmethod
async def _parse_string(cls, parent_id, string: str, **kwargs) -> Optional[SettingData]:
"""
Parse user provided string (usually from a TextInput) into raw setting data.
Must be overriden by the setting if the setting is user-configurable.
Returns None if the setting was unset.
"""
raise NotImplementedError
@classmethod
def _format_data(cls, parent_id, data, **kwargs):
"""
Convert raw setting data into a formatted user-readable string,
representing the current value.
"""
raise NotImplementedError
"""
command callback for set command?
autocomplete for set command?
Might be better in a ConfigSetting subclass.
But also mix into the base setting types.
"""