(feature): Add basic streamalerts.

This commit is contained in:
2024-08-02 20:45:18 +10:00
parent 850c5d7abb
commit c3c8baa4b2
25 changed files with 5471 additions and 0 deletions

View File

@@ -51,6 +51,56 @@ CREATE TABLE channel_links(
); );
-- }}}
-- Stream Alerts {{{
-- DROP TABLE IF EXISTS stream_alerts;
-- DROP TABLE IF EXISTS streams;
-- DROP TABLE IF EXISTS alert_channels;
-- DROP TABLE IF EXISTS streamers;
CREATE TABLE streamers(
userid BIGINT PRIMARY KEY,
login_name TEXT NOT NULL,
display_name TEXT NOT NULL
);
CREATE TABLE alert_channels(
subscriptionid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
channelid BIGINT NOT NULL,
streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE,
created_by BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
paused BOOLEAN NOT NULL DEFAULT FALSE,
end_delete BOOLEAN NOT NULL DEFAULT FALSE,
live_message TEXT,
end_message TEXT
);
CREATE INDEX alert_channels_guilds ON alert_channels (guildid);
CREATE UNIQUE INDEX alert_channels_channelid_streamerid ON alert_channels (channelid, streamerid);
CREATE TABLE streams(
streamid SERIAL PRIMARY KEY,
streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE,
start_at TIMESTAMPTZ NOT NULL,
twitch_stream_id BIGINT,
game_name TEXT,
title TEXT,
end_at TIMESTAMPTZ
);
CREATE TABLE stream_alerts(
alertid SERIAL PRIMARY KEY,
streamid INTEGER NOT NULL REFERENCES streams (streamid) ON DELETE CASCADE,
subscriptionid INTEGER NOT NULL REFERENCES alert_channels (subscriptionid) ON DELETE CASCADE,
sent_at TIMESTAMPTZ NOT NULL,
messageid BIGINT NOT NULL,
resolved_at TIMESTAMPTZ
);
-- }}} -- }}}
-- vim: set fdm=marker: -- vim: set fdm=marker:

View File

@@ -5,3 +5,4 @@ discord.py [voice]
iso8601==0.1.16 iso8601==0.1.16
psycopg[pool] psycopg[pool]
pytz==2021.1 pytz==2021.1
twitchAPI

3
src/babel/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .translator import SOURCE_LOCALE, LeoBabel, LocalBabel, LazyStr, ctx_locale, ctx_translator
babel = LocalBabel('babel')

81
src/babel/enums.py Normal file
View File

@@ -0,0 +1,81 @@
from enum import Enum
from . import babel
_p = babel._p
class LocaleMap(Enum):
american_english = 'en-US'
british_english = 'en-GB'
bulgarian = 'bg'
chinese = 'zh-CN'
taiwan_chinese = 'zh-TW'
croatian = 'hr'
czech = 'cs'
danish = 'da'
dutch = 'nl'
finnish = 'fi'
french = 'fr'
german = 'de'
greek = 'el'
hindi = 'hi'
hungarian = 'hu'
italian = 'it'
japanese = 'ja'
korean = 'ko'
lithuanian = 'lt'
norwegian = 'no'
polish = 'pl'
brazil_portuguese = 'pt-BR'
romanian = 'ro'
russian = 'ru'
spain_spanish = 'es-ES'
swedish = 'sv-SE'
thai = 'th'
turkish = 'tr'
ukrainian = 'uk'
vietnamese = 'vi'
hebrew = 'he-IL'
# Original Discord names
locale_names = {
'id': (_p('localenames|locale:id', "Indonesian"), "Bahasa Indonesia"),
'da': (_p('localenames|locale:da', "Danish"), "Dansk"),
'de': (_p('localenames|locale:de', "German"), "Deutsch"),
'en-GB': (_p('localenames|locale:en-GB', "English, UK"), "English, UK"),
'en-US': (_p('localenames|locale:en-US', "English, US"), "English, US"),
'es-ES': (_p('localenames|locale:es-ES', "Spanish"), "Español"),
'fr': (_p('localenames|locale:fr', "French"), "Français"),
'hr': (_p('localenames|locale:hr', "Croatian"), "Hrvatski"),
'it': (_p('localenames|locale:it', "Italian"), "Italiano"),
'lt': (_p('localenames|locale:lt', "Lithuanian"), "Lietuviškai"),
'hu': (_p('localenames|locale:hu', "Hungarian"), "Magyar"),
'nl': (_p('localenames|locale:nl', "Dutch"), "Nederlands"),
'no': (_p('localenames|locale:no', "Norwegian"), "Norsk"),
'pl': (_p('localenames|locale:pl', "Polish"), "Polski"),
'pt-BR': (_p('localenames|locale:pt-BR', "Portuguese, Brazilian"), "Português do Brasil"),
'ro': (_p('localenames|locale:ro', "Romanian, Romania"), "Română"),
'fi': (_p('localenames|locale:fi', "Finnish"), "Suomi"),
'sv-SE': (_p('localenames|locale:sv-SE', "Swedish"), "Svenska"),
'vi': (_p('localenames|locale:vi', "Vietnamese"), "Tiếng Việt"),
'tr': (_p('localenames|locale:tr', "Turkish"), "Türkçe"),
'cs': (_p('localenames|locale:cs', "Czech"), "Čeština"),
'el': (_p('localenames|locale:el', "Greek"), "Ελληνικά"),
'bg': (_p('localenames|locale:bg', "Bulgarian"), "български"),
'ru': (_p('localenames|locale:ru', "Russian"), "Pусский"),
'uk': (_p('localenames|locale:uk', "Ukrainian"), "Українська"),
'hi': (_p('localenames|locale:hi', "Hindi"), "हिन्दी"),
'th': (_p('localenames|locale:th', "Thai"), "ไทย"),
'zh-CN': (_p('localenames|locale:zh-CN', "Chinese, China"), "中文"),
'ja': (_p('localenames|locale:ja', "Japanese"), "日本語"),
'zh-TW': (_p('localenames|locale:zh-TW', "Chinese, Taiwan"), "繁體中文"),
'ko': (_p('localenames|locale:ko', "Korean"), "한국어"),
}
# More names for languages not supported by Discord
locale_names |= {
'he': (_p('localenames|locale:he', "Hebrew"), "Hebrew"),
'he-IL': (_p('localenames|locale:he-IL', "Hebrew"), "Hebrew"),
'ceaser': (_p('localenames|locale:test', "Test Language"), "dfbtfs"),
}

108
src/babel/translator.py Normal file
View File

@@ -0,0 +1,108 @@
from typing import Optional
import logging
from contextvars import ContextVar
from collections import defaultdict
from enum import Enum
import gettext
from discord.app_commands import Translator, locale_str
from discord.enums import Locale
logger = logging.getLogger(__name__)
SOURCE_LOCALE = 'en_GB'
ctx_locale: ContextVar[str] = ContextVar('locale', default=SOURCE_LOCALE)
ctx_translator: ContextVar['LeoBabel'] = ContextVar('translator', default=None) # type: ignore
null = gettext.NullTranslations()
class LeoBabel(Translator):
def __init__(self):
self.supported_locales = {loc.name for loc in Locale}
self.supported_domains = {}
self.translators = defaultdict(dict) # locale -> domain -> GNUTranslator
async def load(self):
pass
async def unload(self):
self.translators.clear()
def get_translator(self, locale: Optional[str], domain):
return null
def t(self, lazystr, locale=None):
return lazystr._translate_with(null)
async def translate(self, string: locale_str, locale: Locale, context):
if not isinstance(string, LazyStr):
return string
else:
return string.message
ctx_translator.set(LeoBabel())
class Method(Enum):
GETTEXT = 'gettext'
NGETTEXT = 'ngettext'
PGETTEXT = 'pgettext'
NPGETTEXT = 'npgettext'
class LocalBabel:
def __init__(self, domain):
self.domain = domain
@property
def methods(self):
return (self._, self._n, self._p, self._np)
def _(self, message):
return LazyStr(Method.GETTEXT, message, domain=self.domain)
def _n(self, singular, plural, n):
return LazyStr(Method.NGETTEXT, singular, plural, n, domain=self.domain)
def _p(self, context, message):
return LazyStr(Method.PGETTEXT, context, message, domain=self.domain)
def _np(self, context, singular, plural, n):
return LazyStr(Method.NPGETTEXT, context, singular, plural, n, domain=self.domain)
class LazyStr(locale_str):
__slots__ = ('method', 'args', 'domain', 'locale')
def __init__(self, method, *args, locale=None, domain=None):
self.method = method
self.args = args
self.domain = domain
self.locale = locale
@property
def message(self):
return self._translate_with(null)
@property
def extras(self):
return {'locale': self.locale, 'domain': self.domain}
def __str__(self):
return self.message
def _translate_with(self, translator: gettext.GNUTranslations):
method = getattr(translator, self.method.value)
return method(*self.args)
def __repr__(self) -> str:
return f'{self.__class__.__name__}({self.method}, {self.args!r}, locale={self.locale}, domain={self.domain})'
def __eq__(self, obj: object) -> bool:
return isinstance(obj, locale_str) and self.message == obj.message
def __hash__(self) -> int:
return hash(self.args)

20
src/babel/utils.py Normal file
View File

@@ -0,0 +1,20 @@
from .translator import ctx_translator
from . import babel
_, _p, _np = babel._, babel._p, babel._np
MONTHS = _p(
'utils|months',
"January,February,March,April,May,June,July,August,September,October,November,December"
)
SHORT_MONTHS = _p(
'utils|short_months',
"Jan,Feb,Mar,Apr,May,Jun,Jul,Aug,Sep,Oct,Nov,Dec"
)
def local_month(month, short=False):
string = MONTHS if not short else SHORT_MONTHS
return ctx_translator.get().t(string).split(',')[month-1]

View File

@@ -1,4 +1,6 @@
from babel import LocalBabel
babel = LocalBabel('core')
async def setup(bot): async def setup(bot):
from .cog import CoreCog from .cog import CoreCog

227
src/core/setting_types.py Normal file
View File

@@ -0,0 +1,227 @@
"""
Additional abstract setting types useful for StudyLion settings.
"""
from typing import Optional
import json
import traceback
import discord
from discord.enums import TextStyle
from settings.base import ParentID
from settings.setting_types import IntegerSetting, StringSetting
from meta import conf
from meta.errors import UserInputError
from babel.translator import ctx_translator
from utils.lib import MessageArgs
from . import babel
_p = babel._p
class MessageSetting(StringSetting):
"""
Typed Setting ABC representing a message sent to Discord.
Data is a json-formatted string dict with at least one of the fields 'content', 'embed', 'embeds'
Value is the corresponding dictionary
"""
# TODO: Extend to support format keys
_accepts = _p(
'settype:message|accepts',
"JSON formatted raw message data"
)
@staticmethod
async def download_attachment(attached: discord.Attachment):
"""
Download a discord.Attachment with some basic filetype and file size validation.
"""
t = ctx_translator.get().t
error = None
decoded = None
if attached.content_type and not ('json' in attached.content_type):
error = t(_p(
'settype:message|download|error:not_json',
"The attached message data is not a JSON file!"
))
elif attached.size > 10000:
error = t(_p(
'settype:message|download|error:size',
"The attached message data is too large!"
))
else:
content = await attached.read()
try:
decoded = content.decode('UTF-8')
except UnicodeDecodeError:
error = t(_p(
'settype:message|download|error:decoding',
"Could not decode the message data. Please ensure it is saved with the `UTF-8` encoding."
))
if error is not None:
raise UserInputError(error)
else:
return decoded
@classmethod
def value_to_args(cls, parent_id: ParentID, value: dict, **kwargs) -> MessageArgs:
if not value:
return None
args = {}
args['content'] = value.get('content', "")
if 'embed' in value:
embed = discord.Embed.from_dict(value['embed'])
args['embed'] = embed
if 'embeds' in value:
embeds = []
for embed_data in value['embeds']:
embeds.append(discord.Embed.from_dict(embed_data))
args['embeds'] = embeds
return MessageArgs(**args)
@classmethod
def _data_from_value(cls, parent_id: ParentID, value: Optional[dict], **kwargs):
if value and any(value.get(key, None) for key in ('content', 'embed', 'embeds')):
data = json.dumps(value)
else:
data = None
return data
@classmethod
def _data_to_value(cls, parent_id: ParentID, data: Optional[str], **kwargs):
if data:
value = json.loads(data)
else:
value = None
return value
@classmethod
async def _parse_string(cls, parent_id: ParentID, string: str, **kwargs):
"""
Provided user string can be downright random.
If it isn't json-formatted, treat it as the content of the message.
If it is, do basic checking on the length and embeds.
"""
string = string.strip()
if not string or string.lower() == 'none':
return None
t = ctx_translator.get().t
error_tip = t(_p(
'settype:message|error_suffix',
"You can view, test, and fix your embed using the online [embed builder]({link})."
)).format(
link="https://glitchii.github.io/embedbuilder/?editor=json"
)
if string.startswith('{') and string.endswith('}'):
# Assume the string is a json-formatted message dict
try:
value = json.loads(string)
except json.JSONDecodeError as err:
error = t(_p(
'settype:message|error:invalid_json',
"The provided message data was not a valid JSON document!\n"
"`{error}`"
)).format(error=str(err))
raise UserInputError(error + '\n' + error_tip)
if not isinstance(value, dict) or not any(value.get(key, None) for key in ('content', 'embed', 'embeds')):
error = t(_p(
'settype:message|error:json_missing_keys',
"Message data must be a JSON object with at least one of the following fields: "
"`content`, `embed`, `embeds`"
))
raise UserInputError(error + '\n' + error_tip)
embed_data = value.get('embed', None)
if not isinstance(embed_data, dict):
error = t(_p(
'settype:message|error:json_embed_type',
"`embed` field must be a valid JSON object."
))
raise UserInputError(error + '\n' + error_tip)
embeds_data = value.get('embeds', [])
if not isinstance(embeds_data, list):
error = t(_p(
'settype:message|error:json_embeds_type',
"`embeds` field must be a list."
))
raise UserInputError(error + '\n' + error_tip)
if embed_data and embeds_data:
error = t(_p(
'settype:message|error:json_embed_embeds',
"Message data cannot include both `embed` and `embeds`."
))
raise UserInputError(error + '\n' + error_tip)
content_data = value.get('content', "")
if not isinstance(content_data, str):
error = t(_p(
'settype:message|error:json_content_type',
"`content` field must be a string."
))
raise UserInputError(error + '\n' + error_tip)
# Validate embeds, which is the most likely place for something to go wrong
embeds = [embed_data] if embed_data else embeds_data
try:
for embed in embeds:
discord.Embed.from_dict(embed)
except Exception as e:
# from_dict may raise a range of possible exceptions.
raw_error = ''.join(
traceback.TracebackException.from_exception(e).format_exception_only()
)
error = t(_p(
'ui:settype:message|error:embed_conversion',
"Could not parse the message embed data.\n"
"**Error:** `{exception}`"
)).format(exception=raw_error)
raise UserInputError(error + '\n' + error_tip)
# At this point, the message will at least successfully convert into MessageArgs
# There are numerous ways it could still be invalid, e.g. invalid urls, or too-long fields
# or the total message content being too long, or too many fields, etc
# This will need to be caught in anything which displays a message parsed from user data.
else:
# Either the string is not json formatted, or the formatting is broken
# Assume the string is a content message
value = {
'content': string
}
return json.dumps(value)
@classmethod
def _format_data(cls, parent_id: ParentID, data: Optional[str], **kwargs):
if not data:
return None
value = cls._data_to_value(parent_id, data, **kwargs)
content = value.get('content', "")
if 'embed' in value or 'embeds' in value or len(content) > 100:
t = ctx_translator.get().t
formatted = t(_p(
'settype:message|format:too_long',
"Too long to display! See Preview."
))
else:
formatted = content
return formatted
@property
def input_field(self):
field = super().input_field
field.style = TextStyle.long
return field

View File

@@ -12,6 +12,7 @@ from aiohttp import ClientSession
from data import Database from data import Database
from utils.lib import tabulate from utils.lib import tabulate
from babel.translator import LeoBabel
from .config import Conf from .config import Conf
from .logger import logging_context, log_context, log_action_stack, log_wrap, set_logging_context from .logger import logging_context, log_context, log_action_stack, log_wrap, set_logging_context
@@ -43,6 +44,7 @@ class LionBot(Bot):
self.shardname = shardname self.shardname = shardname
# self.appdata = appdata # self.appdata = appdata
self.config = config self.config = config
self.translator = LeoBabel()
self.system_monitor = SystemMonitor() self.system_monitor = SystemMonitor()
self.monitor = ComponentMonitor('LionBot', self._monitor_status) self.monitor = ComponentMonitor('LionBot', self._monitor_status)

View File

@@ -3,6 +3,7 @@ this_package = 'modules'
active = [ active = [
'.sysadmin', '.sysadmin',
'.voicefix', '.voicefix',
'.streamalerts',
] ]

View File

@@ -0,0 +1,8 @@
import logging
from meta import LionBot
logger = logging.getLogger(__name__)
async def setup(bot: LionBot):
from .cog import AlertCog
await bot.add_cog(AlertCog(bot))

View File

@@ -0,0 +1,609 @@
import asyncio
from typing import Optional
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
from twitchAPI.twitch import Twitch
from twitchAPI.helper import first
from meta import LionBot, LionCog, LionContext
from meta.errors import UserInputError
from meta.logger import log_wrap
from utils.lib import utc_now
from data.conditions import NULL
from . import logger
from .data import AlertsData
from .settings import AlertConfig, AlertSettings
from .editor import AlertEditorUI
class AlertCog(LionCog):
POLL_PERIOD = 60
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(AlertsData())
self.twitch = None
self.alert_settings = AlertSettings()
self.poll_task = None
self.event_tasks = set()
# Cache of currently live streams, maps streamerid -> stream
self.live_streams = {}
# Cache of streamers we are watching state changes for
# Map of streamerid -> streamer
self.watching = {}
async def cog_load(self):
await self.data.init()
await self.twitch_login()
await self.load_subs()
self.poll_task = asyncio.create_task(self.poll_live())
async def twitch_login(self):
# TODO: Probably abstract this out to core or a dedicated core cog
# Also handle refresh tokens
if self.twitch is not None:
await self.twitch.close()
self.twitch = None
self.twitch = await Twitch(
self.bot.config.twitch['app_id'].strip(),
self.bot.config.twitch['app_secret'].strip()
)
async def load_subs(self):
# Load active subscriptions
active_subs = await self.data.AlertChannel.fetch_where()
to_watch = {sub.streamerid for sub in active_subs}
live_streams = await self.data.Stream.fetch_where(
self.data.Stream.end_at != NULL
)
to_watch.union(stream.streamerid for stream in live_streams)
# Load associated streamers
watching = {}
if to_watch:
streamers = await self.data.Streamer.fetch_where(
userid=list(to_watch)
)
for streamer in streamers:
watching[streamer.userid] = streamer
self.watching = watching
self.live_streams = {stream.streamerid: stream for stream in live_streams}
logger.info(
f"Watching {len(watching)} streamers for state changes. "
f"Loaded {len(live_streams)} (previously) live streams into cache."
)
async def poll_live(self):
# Every PERIOD seconds,
# request get_streams for the streamers we are currently watching.
# Check if they are in the live_stream cache,
# and update cache and data and fire-and-forget start/stop events as required.
# TODO: Logging
# TODO: Error handling so the poll loop doesn't die from temporary errors
# And when it does die it gets logged properly.
if not self.twitch:
raise ValueError("Attempting to start alert poll-loop before twitch set.")
block_i = 0
self.polling = True
while self.polling:
await asyncio.sleep(self.POLL_PERIOD)
to_request = list(self.watching.keys())
if not to_request:
continue
# Each loop we request the 'next' slice of 100 userids
blocks = [to_request[i:i+100] for i in range(0, len(to_request), 100)]
block_i += 1
block_i %= len(blocks)
block = blocks[block_i]
streaming = {}
async for stream in self.twitch.get_streams(user_id=block, first=100):
# Note we set page size to 100
# So we should never get repeat or missed streams
# Since we can request a max of 100 userids anyway.
streaming[stream.user_id] = stream
started = set(streaming.keys()).difference(self.live_streams.keys())
ended = set(self.live_streams.keys()).difference(streaming.keys())
for streamerid in started:
stream = streaming[streamerid]
stream_data = await self.data.Stream.create(
streamerid=stream.user_id,
start_at=stream.started_at,
twitch_stream_id=stream.id,
game_name=stream.game_name,
title=stream.title,
)
self.live_streams[streamerid] = stream_data
task = asyncio.create_task(self.on_stream_start(stream_data))
self.event_tasks.add(task)
task.add_done_callback(self.event_tasks.discard)
for streamerid in ended:
stream_data = self.live_streams.pop(streamerid)
await stream_data.update(end_at=utc_now())
task = asyncio.create_task(self.on_stream_end(stream_data))
self.event_tasks.add(task)
task.add_done_callback(self.event_tasks.discard)
async def on_stream_start(self, stream_data):
# Get channel subscriptions listening for this streamer
uid = stream_data.streamerid
logger.info(f"Streamer <uid:{uid}> started streaming! {stream_data=}")
subbed = await self.data.AlertChannel.fetch_where(streamerid=uid)
# Fulfill those alerts
for sub in subbed:
try:
# If the sub is paused, don't create the alert
await self.sub_alert(sub, stream_data)
except discord.HTTPException:
# TODO: Needs to be handled more gracefully at user level
# Retry logic?
logger.warning(
f"Could not complete subscription {sub=} for {stream_data=}", exc_info=True
)
except Exception:
logger.exception(
f"Unexpected exception completing {sub=} for {stream_data=}"
)
raise
async def subscription_error(self, subscription, stream_data, err_msg):
"""
Handle a subscription fulfill failure.
Stores the error message for user display,
and deletes the subscription after some number of errors.
# TODO
"""
logger.warning(
f"Subscription error {subscription=} {stream_data=} {err_msg=}"
)
async def sub_alert(self, subscription, stream_data):
# Base alert behaviour is just to send a message
# and create an alert row
channel = self.bot.get_channel(subscription.channelid)
if channel is None or not isinstance(channel, discord.abc.Messageable):
# Subscription channel is gone!
# Or the Discord channel cache died
await self.subscription_error(
subscription, stream_data,
"Subscription channel no longer exists."
)
return
permissions = channel.permissions_for(channel.guild.me)
if not (permissions.send_messages and permissions.embed_links):
await self.subscription_error(
subscription, stream_data,
"Insufficient permissions to post alert message."
)
return
# Build message
streamer = await self.data.Streamer.fetch(stream_data.streamerid)
if not streamer:
# Streamer was deleted while handling the alert
# Just quietly ignore
# Don't error out because the stream data row won't exist anymore
logger.warning(
f"Cancelling alert for subscription {subscription.subscriptionid}"
" because the streamer no longer exists."
)
return
alert_config = AlertConfig(subscription.subscriptionid, subscription)
paused = alert_config.get(self.alert_settings.AlertPaused.setting_id)
if paused.value:
logger.info(f"Skipping alert for subscription {subscription=} because it is paused.")
return
live_message = alert_config.get(self.alert_settings.AlertMessage.setting_id)
formatter = await live_message.generate_formatter(self.bot, stream_data, streamer)
formatted = await formatter(live_message.value)
args = live_message.value_to_args(subscription.subscriptionid, formatted)
try:
message = await channel.send(**args.send_args)
except discord.HTTPException as e:
logger.warning(
f"Message send failure while sending streamalert {subscription.subscriptionid}",
exc_info=True
)
await self.subscription_error(
subscription, stream_data,
"Failed to post live alert."
)
return
# Store sent alert
alert = await self.data.StreamAlert.create(
streamid=stream_data.streamid,
subscriptionid=subscription.subscriptionid,
sent_at=utc_now(),
messageid=message.id
)
logger.debug(
f"Fulfilled subscription {subscription.subscriptionid} with alert {alert.alertid}"
)
async def on_stream_end(self, stream_data):
# Get channel subscriptions listening for this streamer
uid = stream_data.streamerid
logger.info(f"Streamer <uid:{uid}> stopped streaming! {stream_data=}")
subbed = await self.data.AlertChannel.fetch_where(streamerid=uid)
# Resolve subscriptions
for sub in subbed:
try:
await self.sub_resolve(sub, stream_data)
except discord.HTTPException:
# TODO: Needs to be handled more gracefully at user level
# Retry logic?
logger.warning(
f"Could not resolve subscription {sub=} for {stream_data=}", exc_info=True
)
except Exception:
logger.exception(
f"Unexpected exception resolving {sub=} for {stream_data=}"
)
raise
async def sub_resolve(self, subscription, stream_data):
# Check if there is a current active alert to resolve
alerts = await self.data.StreamAlert.fetch_where(
streamid=stream_data.streamid,
subscriptionid=subscription.subscriptionid,
)
if not alerts:
logger.info(
f"Resolution requested for subscription {subscription.subscriptionid} with stream {stream_data.streamid} "
"but no active alerts were found."
)
return
alert = alerts[0]
if alert.resolved_at is not None:
# Alert was already resolved
# This is okay, Twitch might have just sent the stream ending twice
logger.info(
f"Resolution requested for subscription {subscription.subscriptionid} with stream {stream_data.streamid} "
"but alert was already resolved."
)
return
# Check if message is to be deleted or edited (or nothing)
alert_config = AlertConfig(subscription.subscriptionid, subscription)
del_setting = alert_config.get(self.alert_settings.AlertEndDelete.setting_id)
edit_setting = alert_config.get(self.alert_settings.AlertEndMessage.setting_id)
if (delmsg := del_setting.value) or (edit_setting.value):
# Find the message
message = None
channel = self.bot.get_channel(subscription.channelid)
if channel:
try:
message = await channel.fetch_message(alert.messageid)
except discord.HTTPException:
# Message was probably deleted already
# Or permissions were changed
# Or Discord connection broke
pass
else:
# Channel went after posting the alert
# Or Discord cache sucks
# Nothing we can do, just mark it handled
pass
if message:
if delmsg:
# Delete the message
try:
await message.delete()
except discord.HTTPException:
logger.warning(
f"Discord exception while del-resolve live alert {alert=}",
exc_info=True
)
else:
# Edit message with custom arguments
streamer = await self.data.Streamer.fetch(stream_data.streamerid)
formatter = await edit_setting.generate_formatter(self.bot, stream_data, streamer)
formatted = await formatter(edit_setting.value)
args = edit_setting.value_to_args(subscription.subscriptionid, formatted)
try:
await message.edit(**args.edit_args)
except discord.HTTPException:
logger.warning(
f"Discord exception while edit-resolve live alert {alert=}",
exc_info=True
)
else:
# Explicitly don't need to do anything to the alert
pass
# Save alert as resolved
await alert.update(resolved_at=utc_now())
async def cog_unload(self):
if self.poll_task is not None and not self.poll_task.cancelled():
self.poll_task.cancel()
if self.twitch is not None:
await self.twitch.close()
self.twitch = None
# ----- Commands -----
@cmds.hybrid_group(
name='streamalert',
description=(
"Create and configure stream live-alerts."
)
)
@cmds.guild_only()
@appcmds.default_permissions(manage_channels=True)
async def streamalert_group(self, ctx: LionContext):
# Placeholder group, method not used
raise NotImplementedError
@streamalert_group.command(
name='create',
description=(
"Subscribe a Discord channel to notifications when a Twitch stream goes live."
)
)
@appcmds.describe(
streamer="Name of the twitch channel to watch.",
channel="Which Discord channel to send live alerts in.",
message="Custom message to send when the channel goes live (may be edited later)."
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_create_cmd(self, ctx: LionContext,
streamer: str,
channel: discord.TextChannel,
message: Optional[str]):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
# Wards
if not channel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"to add a stream alert to a channel."
)
return
# Look up the specified streamer
tw_user = await first(self.twitch.get_users(logins=[streamer]))
if not tw_user:
await ctx.error_reply(
f"Sorry, could not find `{streamer}` on Twitch! "
"Make sure you use the name in their channel url."
)
return
# Create streamer data if it doesn't already exist
streamer_data = await self.data.Streamer.fetch_or_create(
tw_user.id,
login_name=tw_user.login,
display_name=tw_user.display_name,
)
# Add subscription to alerts list
sub_data = await self.data.AlertChannel.create(
streamerid=streamer_data.userid,
guildid=channel.guild.id,
channelid=channel.id,
created_by=ctx.author.id,
paused=False
)
# Add to watchlist
self.watching[streamer_data.userid] = streamer_data
# Open AlertEditorUI for the new subscription
# TODO
await ctx.reply("StreamAlert Created.")
async def alert_acmpl(self, interaction: discord.Interaction, partial: str):
if not interaction.guild:
raise ValueError("Cannot acmpl alert in guildless interaction.")
# Get all alerts in the server
alerts = await self.data.AlertChannel.fetch_where(guildid=interaction.guild_id)
if not alerts:
# No alerts available
options = [
appcmds.Choice(
name="No stream alerts are set up in this server!",
value=partial
)
]
else:
options = []
for alert in alerts:
streamer = await self.data.Streamer.fetch(alert.streamerid)
if streamer is None:
# Should be impossible by foreign key condition
# Might be a stale cache
continue
channel = interaction.guild.get_channel(alert.channelid)
display = f"{streamer.display_name} in #{channel.name if channel else 'unknown'}"
if partial.lower() in display.lower():
# Matching option
options.append(appcmds.Choice(name=display, value=str(alert.subscriptionid)))
if not options:
options.append(
appcmds.Choice(
name=f"No stream alerts matching {partial}"[:25],
value=partial
)
)
return options
async def resolve_alert(self, interaction: discord.Interaction, alert_str: str):
if not interaction.guild:
raise ValueError("Resolving alert outside of a guild.")
# Expect alert_str to be the integer subscriptionid
if not alert_str.isdigit():
raise UserInputError(
f"No stream alerts in this server matching `{alert_str}`!"
)
alert = await self.data.AlertChannel.fetch(int(alert_str))
if not alert or not alert.guildid == interaction.guild_id:
raise UserInputError(
"Could not find the selected alert! Please try again."
)
return alert
@streamalert_group.command(
name='edit',
description=(
"Update settings for an existing Twitch stream alert."
)
)
@appcmds.describe(
alert="Which alert do you want to edit?",
# TODO: Other settings here
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_edit_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
# If edit options have been given, save edits and retouch cache if needed
# If not, open AlertEditorUI
ui = AlertEditorUI(bot=self.bot, sub_data=sub_data, callerid=ctx.author.id)
await ui.run(ctx.interaction)
await ui.wait()
@streamalert_edit_cmd.autocomplete('alert')
async def streamalert_edit_cmd_alert_acmpl(self, interaction, partial):
return await self.alert_acmpl(interaction, partial)
@streamalert_group.command(
name='pause',
description=(
"Pause a streamalert."
)
)
@appcmds.describe(
alert="Which alert do you want to pause?",
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_pause_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
await sub_data.update(paused=True)
await ctx.reply("This alert is now paused!")
@streamalert_group.command(
name='unpause',
description=(
"Resume a streamalert."
)
)
@appcmds.describe(
alert="Which alert do you want to unpause?",
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_unpause_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
await sub_data.update(paused=False)
await ctx.reply("This alert has been unpaused!")
@streamalert_group.command(
name='remove',
description=(
"Deactivate a streamalert entirely (see /streamalert pause to temporarily pause it)."
)
)
@appcmds.describe(
alert="Which alert do you want to remove?",
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_remove_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
await sub_data.delete()
await ctx.reply("This alert has been deleted.")

View File

@@ -0,0 +1,105 @@
from data import Registry, RowModel
from data.columns import Integer, Bool, Timestamp, String
from data.models import WeakCache
from cachetools import TTLCache
class AlertsData(Registry):
class Streamer(RowModel):
"""
Schema
------
CREATE TABLE streamers(
userid BIGINT PRIMARY KEY,
login_name TEXT NOT NULL,
display_name TEXT NOT NULL
);
"""
_tablename_ = 'streamers'
_cache_ = {}
userid = Integer(primary=True)
login_name = String()
display_name = String()
class AlertChannel(RowModel):
"""
Schema
------
CREATE TABLE alert_channels(
subscriptionid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
channelid BIGINT NOT NULL,
streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE,
created_by BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
paused BOOLEAN NOT NULL DEFAULT FALSE,
end_delete BOOLEAN NOT NULL DEFAULT FALSE,
live_message TEXT,
end_message TEXT
);
CREATE INDEX alert_channels_guilds ON alert_channels (guildid);
CREATE UNIQUE INDEX alert_channels_channelid_streamerid ON alert_channels (channelid, streamerid);
"""
_tablename_ = 'alert_channels'
_cache_ = {}
subscriptionid = Integer(primary=True)
guildid = Integer()
channelid = Integer()
streamerid = Integer()
display_name = Integer()
created_by = Integer()
created_at = Timestamp()
paused = Bool()
end_delete = Bool()
live_message = String()
end_message = String()
class Stream(RowModel):
"""
Schema
------
CREATE TABLE streams(
streamid SERIAL PRIMARY KEY,
streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE,
start_at TIMESTAMPTZ NOT NULL,
twitch_stream_id BIGINT,
game_name TEXT,
title TEXT,
end_at TIMESTAMPTZ
);
"""
_tablename_ = 'streams'
_cache_ = WeakCache(TTLCache(maxsize=100, ttl=24*60*60))
streamid = Integer(primary=True)
streamerid = Integer()
start_at = Timestamp()
twitch_stream_id = Integer()
game_name = String()
title = String()
end_at = Timestamp()
class StreamAlert(RowModel):
"""
Schema
------
CREATE TABLE stream_alerts(
alertid SERIAL PRIMARY KEY,
streamid INTEGER NOT NULL REFERENCES streams (streamid) ON DELETE CASCADE,
subscriptionid INTEGER NOT NULL REFERENCES alert_channels (subscriptionid) ON DELETE CASCADE,
sent_at TIMESTAMPTZ NOT NULL,
messageid BIGINT NOT NULL,
resolved_at TIMESTAMPTZ
);
"""
_tablename_ = 'stream_alerts'
_cache_ = WeakCache(TTLCache(maxsize=1000, ttl=24*60*60))
alertid = Integer(primary=True)
streamid = Integer()
subscriptionid = Integer()
sent_at = Timestamp()
messageid = Integer()
resolved_at = Timestamp()

View File

@@ -0,0 +1,369 @@
import asyncio
import datetime as dt
from collections import namedtuple
from functools import wraps
from typing import TYPE_CHECKING
import discord
from discord.ui.button import button, Button, ButtonStyle
from discord.ui.select import select, Select, SelectOption, ChannelSelect
from meta import LionBot, conf
from utils.lib import MessageArgs, tabulate, utc_now
from utils.ui import MessageUI
from utils.ui.msgeditor import MsgEditor
from .settings import AlertSettings as Settings
from .settings import AlertConfig as Config
from .data import AlertsData
if TYPE_CHECKING:
from .cog import AlertCog
FakeStream = namedtuple(
'FakeStream',
["streamid", "streamerid", "start_at", "twitch_stream_id", "game_name", "title", "end_at"]
)
class AlertEditorUI(MessageUI):
setting_classes = (
Settings.AlertPaused,
Settings.AlertEndDelete,
Settings.AlertEndMessage,
Settings.AlertMessage,
Settings.AlertChannel,
)
def __init__(self, bot: LionBot, sub_data: AlertsData.AlertChannel, **kwargs):
super().__init__(**kwargs)
self.bot = bot
self.sub_data = sub_data
self.subid = sub_data.subscriptionid
self.cog: 'AlertCog' = bot.get_cog('AlertCog')
self.config = Config(self.subid, sub_data)
# ----- UI API -----
def preview_stream_data(self):
# TODO: Probably makes sense to factor this out to the cog
# Or even generate it in the formatters themselves
data = self.sub_data
return FakeStream(
-1,
data.streamerid,
utc_now() - dt.timedelta(hours=1),
-1,
"Discord Admin",
"Testing Go Live Message",
utc_now()
)
def call_and_refresh(self, func):
"""
Generate a wrapper which runs coroutine 'func' and then refreshes the UI.
"""
# TODO: Check whether the UI has finished interaction
@wraps(func)
async def wrapped(*args, **kwargs):
await func(*args, **kwargs)
await self.refresh()
return wrapped
# ----- UI Components -----
# Pause button
@button(label="PAUSE_PLACEHOLDER", style=ButtonStyle.blurple)
async def pause_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
setting = self.config.get(Settings.AlertPaused.setting_id)
setting.value = not setting.value
await setting.write()
await self.refresh(thinking=press)
async def pause_button_refresh(self):
button = self.pause_button
if self.config.get(Settings.AlertPaused.setting_id).value:
button.label = "UnPause"
button.style = ButtonStyle.grey
else:
button.label = "Pause"
button.style = ButtonStyle.green
# Delete button
@button(label="Delete Alert", style=ButtonStyle.red)
async def delete_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
await self.sub_data.delete()
embed = discord.Embed(
colour=discord.Colour.brand_green(),
description="Stream alert removed."
)
await press.edit_original_response(embed=embed)
await self.close()
# Close button
@button(emoji=conf.emojis.cancel, style=ButtonStyle.red)
async def close_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=False)
await self.close()
# Edit Alert button
@button(label="Edit Alert", style=ButtonStyle.blurple)
async def edit_alert_button(self, press: discord.Interaction, pressed: Button):
# Spawn MsgEditor for the live alert
await press.response.defer(thinking=True, ephemeral=True)
setting = self.config.get(Settings.AlertMessage.setting_id)
stream = self.preview_stream_data()
streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid)
editor = MsgEditor(
self.bot,
setting.value,
callback=self.call_and_refresh(setting.editor_callback),
formatter=await setting.generate_formatter(self.bot, stream, streamer),
callerid=press.user.id
)
self._slaves.append(editor)
await editor.run(press)
# Edit End message
@button(label="Edit Ending Alert", style=ButtonStyle.blurple)
async def edit_end_button(self, press: discord.Interaction, pressed: Button):
# Spawn MsgEditor for the ending alert
await press.response.defer(thinking=True, ephemeral=True)
await self.open_end_editor(press)
async def open_end_editor(self, respond_to: discord.Interaction):
setting = self.config.get(Settings.AlertEndMessage.setting_id)
# Start from current live alert data if not set
if not setting.value:
alert_setting = self.config.get(Settings.AlertMessage.setting_id)
setting.value = alert_setting.value
stream = self.preview_stream_data()
streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid)
editor = MsgEditor(
self.bot,
setting.value,
callback=self.call_and_refresh(setting.editor_callback),
formatter=await setting.generate_formatter(self.bot, stream, streamer),
callerid=respond_to.user.id
)
self._slaves.append(editor)
await editor.run(respond_to)
return editor
# Ending Mode Menu
@select(
cls=Select,
placeholder="Select action to take when the stream ends",
options=[SelectOption(label="DUMMY")],
min_values=0, max_values=1
)
async def ending_mode_menu(self, selection: discord.Interaction, selected: Select):
if not selected.values:
await selection.response.defer()
return
await selection.response.defer(thinking=True, ephemeral=True)
value = selected.values[0]
if value == '0':
# In Do Nothing case,
# Ensure Delete is off and custom edit message is unset
setting = self.config.get(Settings.AlertEndDelete.setting_id)
if setting.value:
setting.value = False
await setting.write()
setting = self.config.get(Settings.AlertEndMessage.setting_id)
if setting.value:
setting.value = None
await setting.write()
await self.refresh(thinking=selection)
elif value == '1':
# In Delete Alert case,
# Set the delete setting to True
setting = self.config.get(Settings.AlertEndDelete.setting_id)
if not setting.value:
setting.value = True
await setting.write()
await self.refresh(thinking=selection)
elif value == '2':
# In Edit Message case,
# Set the delete setting to False,
setting = self.config.get(Settings.AlertEndDelete.setting_id)
if setting.value:
setting.value = False
await setting.write()
# And open the edit message editor
await self.open_end_editor(selection)
await self.refresh()
async def ending_mode_menu_refresh(self):
# Build menu options
options = [
SelectOption(
label="Do Nothing",
description="Don't modify the live alert message.",
value="0",
),
SelectOption(
label="Delete Alert After Stream",
description="Delete the live alert message.",
value="1",
),
SelectOption(
label="Edit Alert After Stream",
description="Edit the live alert message to a custom message. Opens editor.",
value="2",
),
]
# Calculate the correct default
if self.config.get(Settings.AlertEndDelete.setting_id).value:
options[1].default = True
elif self.config.get(Settings.AlertEndMessage.setting_id).value:
options[2].default = True
self.ending_mode_menu.options = options
# Edit channel menu
@select(cls=ChannelSelect,
placeholder="Select Alert Channel",
channel_types=[discord.ChannelType.text, discord.ChannelType.voice],
min_values=0, max_values=1)
async def channel_menu(self, selection: discord.Interaction, selected):
if selected.values:
await selection.response.defer(thinking=True, ephemeral=True)
setting = self.config.get(Settings.AlertChannel.setting_id)
setting.value = selected.values[0]
await setting.write()
await self.refresh(thinking=selection)
else:
await selection.response.defer(thinking=False)
async def channel_menu_refresh(self):
# current = self.config.get(Settings.AlertChannel.setting_id).value
# TODO: Check if discord-typed menus can have defaults yet
# Impl in stable dpy, but not released to pip yet
...
# ----- UI Flow -----
async def make_message(self) -> MessageArgs:
streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid)
if streamer is None:
raise ValueError("Streamer row does not exist in AlertEditor")
name = streamer.display_name
# Build relevant setting table
table_map = {}
table_map['Channel'] = self.config.get(Settings.AlertChannel.setting_id).formatted
table_map['Streamer'] = f"https://www.twitch.tv/{streamer.login_name}"
table_map['Paused'] = self.config.get(Settings.AlertPaused.setting_id).formatted
prop_table = '\n'.join(tabulate(*table_map.items()))
embed = discord.Embed(
colour=discord.Colour.dark_green(),
title=f"Stream Alert for {name}",
description=prop_table,
timestamp=utc_now()
)
message_setting = self.config.get(Settings.AlertMessage.setting_id)
message_desc_lines = [
f"An alert message will be posted to {table_map['Channel']}.",
f"Press `{self.edit_alert_button.label}`"
" to preview or edit the alert.",
"The following keys will be substituted in the alert message."
]
keytable = tabulate(*message_setting._subkey_desc.items())
for line in keytable:
message_desc_lines.append(f"> {line}")
embed.add_field(
name=f"When {name} goes live",
value='\n'.join(message_desc_lines),
inline=False
)
# Determine the ending behaviour
del_setting = self.config.get(Settings.AlertEndDelete.setting_id)
end_msg_setting = self.config.get(Settings.AlertEndMessage.setting_id)
if del_setting.value:
# Deleting
end_msg_desc = "The live alert message will be deleted."
...
elif end_msg_setting.value:
# Editing
lines = [
"The live alert message will edited to the configured message.",
f"Press `{self.edit_end_button.label}` to preview or edit the message.",
"The following substitution keys are supported "
"*in addition* to the live alert keys."
]
keytable = tabulate(
*[(k, v) for k, v in end_msg_setting._subkey_desc.items() if k not in message_setting._subkey_desc]
)
for line in keytable:
lines.append(f"> {line}")
end_msg_desc = '\n'.join(lines)
else:
# Doing nothing
end_msg_desc = "The live alert message will not be changed."
embed.add_field(
name=f"When {name} ends their stream",
value=end_msg_desc,
inline=False
)
return MessageArgs(embed=embed)
async def reload(self):
await self.sub_data.refresh()
# Note self.config references the sub_data, and doesn't need reloading.
async def refresh_layout(self):
to_refresh = (
self.pause_button_refresh(),
self.channel_menu_refresh(),
self.ending_mode_menu_refresh(),
)
await asyncio.gather(*to_refresh)
show_end_edit = (
not self.config.get(Settings.AlertEndDelete.setting_id).value
and
self.config.get(Settings.AlertEndMessage.setting_id).value
)
if not show_end_edit:
# Don't show edit end button
buttons = (
self.edit_alert_button,
self.pause_button, self.delete_button, self.close_button
)
else:
buttons = (
self.edit_alert_button, self.edit_end_button,
self.pause_button, self.delete_button, self.close_button
)
self.set_layout(
buttons,
(self.ending_mode_menu,),
(self.channel_menu,),
)

View File

@@ -0,0 +1,264 @@
from typing import Optional, Any
import json
from meta.LionBot import LionBot
from settings import ModelData
from settings.groups import SettingGroup, ModelConfig, SettingDotDict
from settings.setting_types import BoolSetting, ChannelSetting
from core.setting_types import MessageSetting
from babel.translator import LocalBabel
from utils.lib import recurse_map, replace_multiple, tabulate
from .data import AlertsData
babel = LocalBabel('streamalerts')
_p = babel._p
class AlertConfig(ModelConfig):
settings = SettingDotDict()
_model_settings = set()
model = AlertsData.AlertChannel
class AlertSettings(SettingGroup):
@AlertConfig.register_model_setting
class AlertMessage(ModelData, MessageSetting):
setting_id = 'alert_live_message'
_display_name = _p('', 'live_message')
_desc = _p(
'',
'Message sent to the channel when the streamer goes live.'
)
_long_desc = _p(
'',
'Message sent to the attached channel when the Twitch streamer goes live.'
)
_accepts = _p('', 'JSON formatted greeting message data')
_default = json.dumps({'content': "**{display_name}** just went live at {channel_link}"})
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.live_message.name
_subkey_desc = {
'{display_name}': "Twitch channel name (with capitalisation)",
'{login_name}': "Twitch channel login name (as in url)",
'{channel_link}': "Link to the live twitch channel",
'{stream_start}': "Numeric timestamp when stream went live",
}
# TODO: More stuff
@property
def update_message(self) -> str:
return "The go-live notification message has been updated!"
@classmethod
async def generate_formatter(cls, bot: LionBot, stream: AlertsData.Stream, streamer: AlertsData.Streamer, **kwargs):
"""
Generate a formatter function for this message
from the provided stream and streamer data.
The formatter function accepts and returns a message data dict.
"""
async def formatter(data_dict: Optional[dict[str, Any]]):
if not data_dict:
return None
mapping = {
'{display_name}': streamer.display_name,
'{login_name}': streamer.login_name,
'{channel_link}': f"https://www.twitch.tv/{streamer.login_name}",
'{stream_start}': int(stream.start_at.timestamp()),
}
recurse_map(
lambda loc, value: replace_multiple(value, mapping) if isinstance(value, str) else value,
data_dict,
)
return data_dict
return formatter
async def editor_callback(self, editor_data):
self.value = editor_data
await self.write()
def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]:
lines = super()._desc_table(show_value=show_value)
keytable = tabulate(*self._subkey_desc.items(), colon='')
expline = (
"The following placeholders will be substituted with their values."
)
keyfield = (
"Placeholders",
expline + '\n' + '\n'.join(f"> {line}" for line in keytable)
)
lines.append(keyfield)
return lines
@AlertConfig.register_model_setting
class AlertEndMessage(ModelData, MessageSetting):
"""
Custom ending message to edit the live alert to.
If not set, doesn't edit the alert.
"""
setting_id = 'alert_end_message'
_display_name = _p('', 'end_message')
_desc = _p(
'',
'Optional message to edit the live alert with when the stream ends.'
)
_long_desc = _p(
'',
"If set, and `end_delete` is not on, "
"the live alert will be edited with this custom message "
"when the stream ends."
)
_accepts = _p('', 'JSON formatted greeting message data')
_default = None
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.end_message.name
_subkey_desc = {
'{display_name}': "Twitch channel name (with capitalisation)",
'{login_name}': "Twitch channel login name (as in url)",
'{channel_link}': "Link to the live twitch channel",
'{stream_start}': "Numeric timestamp when stream went live",
'{stream_end}': "Numeric timestamp when stream ended",
}
@property
def update_message(self) -> str:
if self.value:
return "The stream ending message has been updated."
else:
return "The stream ending message has been unset."
@classmethod
async def generate_formatter(cls, bot: LionBot, stream: AlertsData.Stream, streamer: AlertsData.Streamer, **kwargs):
"""
Generate a formatter function for this message
from the provided stream and streamer data.
The formatter function accepts and returns a message data dict.
"""
# TODO: Fake stream data maker (namedtuple?) for previewing
async def formatter(data_dict: Optional[dict[str, Any]]):
if not data_dict:
return None
mapping = {
'{display_name}': streamer.display_name,
'{login_name}': streamer.login_name,
'{channel_link}': f"https://www.twitch.tv/{streamer.login_name}",
'{stream_start}': int(stream.start_at.timestamp()),
'{stream_end}': int(stream.end_at.timestamp()),
}
recurse_map(
lambda loc, value: replace_multiple(value, mapping) if isinstance(value, str) else value,
data_dict,
)
return data_dict
return formatter
async def editor_callback(self, editor_data):
self.value = editor_data
await self.write()
def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]:
lines = super()._desc_table(show_value=show_value)
keytable = tabulate(*self._subkey_desc.items(), colon='')
expline = (
"The following placeholders will be substituted with their values."
)
keyfield = (
"Placeholders",
expline + '\n' + '\n'.join(f"> {line}" for line in keytable)
)
lines.append(keyfield)
return lines
...
@AlertConfig.register_model_setting
class AlertEndDelete(ModelData, BoolSetting):
"""
Whether to delete the live alert after the stream ends.
"""
setting_id = 'alert_end_delete'
_display_name = _p('', 'end_delete')
_desc = _p(
'',
'Whether to delete the live alert after the stream ends.'
)
_long_desc = _p(
'',
"If enabled, the live alert message will be deleted when the stream ends. "
"This overrides the `end_message` setting."
)
_default = False
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.end_delete.name
@property
def update_message(self) -> str:
if self.value:
return "The live alert will be deleted at the end of the stream."
else:
return "The live alert will not be deleted when the stream ends."
@AlertConfig.register_model_setting
class AlertPaused(ModelData, BoolSetting):
"""
Whether this live alert is currently paused.
"""
setting_id = 'alert_paused'
_display_name = _p('', 'paused')
_desc = _p(
'',
"Whether the alert is currently paused."
)
_long_desc = _p(
'',
"Paused alerts will not trigger live notifications, "
"although the streams will still be tracked internally."
)
_default = False
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.paused.name
@property
def update_message(self):
if self.value:
return "This alert is now paused"
else:
return "This alert has been unpaused"
@AlertConfig.register_model_setting
class AlertChannel(ModelData, ChannelSetting):
"""
The channel associated to this alert.
"""
setting_id = 'alert_channel'
_display_name = _p('', 'channel')
_desc = _p(
'',
"The Discord channel this live alert will be sent in."
)
_long_desc = _desc
# Note that this cannot actually be None,
# as there is no UI pathway to unset the setting.
_default = None
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.channelid.name
@property
def update_message(self):
return f"This alert will now be posted to {self.value.channel.mention}"

7
src/settings/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from babel.translator import LocalBabel
babel = LocalBabel('settings_base')
from .data import ModelData, ListData
from .base import BaseSetting
from .ui import SettingWidget, InteractiveSetting
from .groups import SettingDotDict, SettingGroup, ModelSettings, ModelSetting

166
src/settings/base.py Normal file
View File

@@ -0,0 +1,166 @@
from typing import Generic, TypeVar, Type, Optional, overload
"""
Setting metclass?
Parse setting docstring to generate default info?
Or just put it in the decorator we are already using
"""
# Typing using Generic[parent_id_type, data_type, value_type]
# value generic, could be Union[?, UNSET]
ParentID = TypeVar('ParentID')
SettingData = TypeVar('SettingData')
SettingValue = TypeVar('SettingValue')
T = TypeVar('T', bound='BaseSetting')
class BaseSetting(Generic[ParentID, SettingData, SettingValue]):
"""
Abstract base class describing a stored configuration setting.
A setting consists of logic to load the setting from storage,
present it in a readable form, understand user entered values,
and write it again in storage.
Additionally, the setting has attributes attached describing
the setting in a user-friendly manner for display purposes.
"""
setting_id: str # Unique source identifier for the setting
_default: Optional[SettingData] = None # Default data value for the setting
def __init__(self, parent_id: ParentID, data: Optional[SettingData], **kwargs):
self.parent_id = parent_id
self._data = data
self.kwargs = kwargs
# Instance generation
@classmethod
async def get(cls: Type[T], parent_id: ParentID, **kwargs) -> T:
"""
Return a setting instance initialised from the stored value, associated with the given parent id.
"""
data = await cls._reader(parent_id, **kwargs)
return cls(parent_id, data, **kwargs)
# Main interface
@property
def data(self) -> Optional[SettingData]:
"""
Retrieves the current internal setting data if it is set, otherwise the default data
"""
return self._data if self._data is not None else self.default
@data.setter
def data(self, new_data: Optional[SettingData]):
"""
Sets the internal raw data.
Does not write the changes.
"""
self._data = new_data
@property
def default(self) -> Optional[SettingData]:
"""
Retrieves the default value for this setting.
Settings should override this if the default depends on the object id.
"""
return self._default
@property
def value(self) -> SettingValue: # Actually optional *if* _default is None
"""
Context-aware object or objects associated with the setting.
"""
return self._data_to_value(self.parent_id, self.data) # type: ignore
@value.setter
def value(self, new_value: Optional[SettingValue]):
"""
Setter which reads the discord-aware object and converts it to data.
Does not write the new value.
"""
self._data = self._data_from_value(self.parent_id, new_value)
async def write(self, **kwargs) -> None:
"""
Write current data to the database.
For settings which override this,
ensure you handle deletion of values when internal data is None.
"""
await self._writer(self.parent_id, self._data, **kwargs)
# Raw converters
@overload
@classmethod
def _data_from_value(cls: Type[T], parent_id: ParentID, value: SettingValue, **kwargs) -> SettingData:
...
@overload
@classmethod
def _data_from_value(cls: Type[T], parent_id: ParentID, value: None, **kwargs) -> None:
...
@classmethod
def _data_from_value(
cls: Type[T], parent_id: ParentID, value: Optional[SettingValue], **kwargs
) -> Optional[SettingData]:
"""
Convert a high-level setting value to internal data.
Must be overridden by the setting.
Be aware of UNSET values, these should always pass through as None
to provide an unsetting interface.
"""
raise NotImplementedError
@overload
@classmethod
def _data_to_value(cls: Type[T], parent_id: ParentID, data: SettingData, **kwargs) -> SettingValue:
...
@overload
@classmethod
def _data_to_value(cls: Type[T], parent_id: ParentID, data: None, **kwargs) -> None:
...
@classmethod
def _data_to_value(
cls: Type[T], parent_id: ParentID, data: Optional[SettingData], **kwargs
) -> Optional[SettingValue]:
"""
Convert internal data to high-level setting value.
Must be overriden by the setting.
"""
raise NotImplementedError
# Database access
@classmethod
async def _reader(cls: Type[T], parent_id: ParentID, **kwargs) -> Optional[SettingData]:
"""
Retrieve the setting data associated with the given parent_id.
May be None if the setting is not set.
Must be overridden by the setting.
"""
raise NotImplementedError
@classmethod
async def _writer(cls: Type[T], parent_id: ParentID, data: Optional[SettingData], **kwargs) -> None:
"""
Write provided setting data to storage.
Must be overridden by the setting unless the `write` method is overridden.
If the data is None, the setting is UNSET and should be deleted.
"""
raise NotImplementedError
@classmethod
async def setup(cls, bot):
"""
Initialisation task to be executed during client initialisation.
May be used for e.g. populating a cache or required client setup.
Main application must execute the initialisation task before the setting is used.
Further, the task must always be executable, if the setting is loaded.
Conditional initialisation should go in the relevant module's init tasks.
"""
return None

233
src/settings/data.py Normal file
View File

@@ -0,0 +1,233 @@
from typing import Type
import json
from data import RowModel, Table, ORDER
from meta.logger import log_wrap, set_logging_context
class ModelData:
"""
Mixin for settings stored in a single row and column of a Model.
Assumes that the parent_id is the identity key of the Model.
This does not create a reference to the Row.
"""
# Table storing the desired data
_model: Type[RowModel]
# Column with the desired data
_column: str
# Whether to create a row if not found
_create_row = False
# High level data cache to use, leave as None to disable cache.
_cache = None # Map[id -> value]
@classmethod
def _read_from_row(cls, parent_id, row, **kwargs):
data = row[cls._column]
if cls._cache is not None:
cls._cache[parent_id] = data
return data
@classmethod
async def _reader(cls, parent_id, use_cache=True, **kwargs):
"""
Read in the requested column associated to the parent id.
"""
if cls._cache is not None and parent_id in cls._cache and use_cache:
return cls._cache[parent_id]
model = cls._model
if cls._create_row:
row = await model.fetch_or_create(parent_id)
else:
row = await model.fetch(parent_id)
data = row[cls._column] if row else None
if cls._cache is not None:
cls._cache[parent_id] = data
return data
@classmethod
async def _writer(cls, parent_id, data, **kwargs):
"""
Write the provided entry to the table.
This does *not* create the row if it does not exist.
It only updates.
"""
# TODO: Better way of getting the key?
# TODO: Transaction
if not isinstance(parent_id, tuple):
parent_id = (parent_id, )
model = cls._model
rows = await model.table.update_where(
**model._dict_from_id(parent_id)
).set(
**{cls._column: data}
)
# If we didn't update any rows, create a new row
if not rows:
await model.fetch_or_create(**model._dict_from_id(parent_id), **{cls._column: data})
if cls._cache is not None:
cls._cache[parent_id] = data
class ListData:
"""
Mixin for list types implemented on a Table.
Implements a reader and writer.
This assumes the list is the only data stored in the table,
and removes list entries by deleting rows.
"""
setting_id: str
# Table storing the setting data
_table_interface: Table
# Name of the column storing the id
_id_column: str
# Name of the column storing the data to read
_data_column: str
# Name of column storing the order index to use, if any. Assumed to be Serial on writing.
_order_column: str
_order_type: ORDER = ORDER.ASC
# High level data cache to use, set to None to disable cache.
_cache = None # Map[id -> value]
@classmethod
@log_wrap(isolate=True)
async def _reader(cls, parent_id, use_cache=True, **kwargs):
"""
Read in all entries associated to the given id.
"""
set_logging_context(action="Read cls.setting_id")
if cls._cache is not None and parent_id in cls._cache and use_cache:
return cls._cache[parent_id]
table = cls._table_interface # type: Table
query = table.select_where(**{cls._id_column: parent_id}).select(cls._data_column)
if cls._order_column:
query.order_by(cls._order_column, direction=cls._order_type)
rows = await query
data = [row[cls._data_column] for row in rows]
if cls._cache is not None:
cls._cache[parent_id] = data
return data
@classmethod
@log_wrap(isolate=True)
async def _writer(cls, id, data, add_only=False, remove_only=False, **kwargs):
"""
Write the provided list to storage.
"""
set_logging_context(action="Write cls.setting_id")
table = cls._table_interface
async with table.connector.connection() as conn:
table.connector.conn = conn
async with conn.transaction():
# Handle None input as an empty list
if data is None:
data = []
current = await cls._reader(id, use_cache=False, **kwargs)
if not cls._order_column and (add_only or remove_only):
to_insert = [item for item in data if item not in current] if not remove_only else []
to_remove = data if remove_only else (
[item for item in current if item not in data] if not add_only else []
)
# Handle required deletions
if to_remove:
params = {
cls._id_column: id,
cls._data_column: to_remove
}
await table.delete_where(**params)
# Handle required insertions
if to_insert:
columns = (cls._id_column, cls._data_column)
values = [(id, value) for value in to_insert]
await table.insert_many(columns, *values)
if cls._cache is not None:
new_current = [item for item in current + to_insert if item not in to_remove]
cls._cache[id] = new_current
else:
# Remove all and add all to preserve order
delete_params = {cls._id_column: id}
await table.delete_where(**delete_params)
if data:
columns = (cls._id_column, cls._data_column)
values = [(id, value) for value in data]
await table.insert_many(columns, *values)
if cls._cache is not None:
cls._cache[id] = data
class KeyValueData:
"""
Mixin for settings implemented in a Key-Value table.
The underlying table should have a Unique constraint on the `(_id_column, _key_column)` pair.
"""
_table_interface: Table
_id_column: str
_key_column: str
_value_column: str
_key: str
@classmethod
async def _reader(cls, id, **kwargs):
params = {
cls._id_column: id,
cls._key_column: cls._key
}
row = await cls._table_interface.select_one_where(**params).select(cls._value_column)
data = row[cls._value_column] if row else None
if data is not None:
data = json.loads(data)
return data
@classmethod
async def _writer(cls, id, data, **kwargs):
params = {
cls._id_column: id,
cls._key_column: cls._key
}
if data is not None:
values = {
cls._value_column: json.dumps(data)
}
rows = await cls._table_interface.update_where(**params).set(**values)
if not rows:
await cls._table_interface.insert_many(
(cls._id_column, cls._key_column, cls._value_column),
(id, cls._key, json.dumps(data))
)
else:
await cls._table_interface.delete_where(**params)
# class UserInputError(SafeCancellation):
# pass

204
src/settings/groups.py Normal file
View File

@@ -0,0 +1,204 @@
from typing import Generic, Type, TypeVar, Optional, overload
from data import RowModel
from .data import ModelData
from .ui import InteractiveSetting
from .base import BaseSetting
from utils.lib import tabulate
T = TypeVar('T', bound=InteractiveSetting)
class SettingDotDict(Generic[T], dict[str, Type[T]]):
"""
Dictionary structure allowing simple dot access to items.
"""
__getattr__ = dict.__getitem__ # type: ignore
__setattr__ = dict.__setitem__ # type: ignore
__delattr__ = dict.__delitem__ # type: ignore
class SettingGroup:
"""
A SettingGroup is a collection of settings under one name.
"""
__initial_settings__: list[Type[InteractiveSetting]] = []
_title: Optional[str] = None
_description: Optional[str] = None
def __init_subclass__(cls, title: Optional[str] = None):
cls._title = title or cls._title
cls._description = cls._description or cls.__doc__
settings: list[Type[InteractiveSetting]] = []
for item in cls.__dict__.values():
if isinstance(item, type) and issubclass(item, InteractiveSetting):
settings.append(item)
cls.__initial_settings__ = settings
def __init_settings__(self):
settings = SettingDotDict()
for setting in self.__initial_settings__:
settings[setting.__name__] = setting
return settings
def __init__(self, title=None, description=None) -> None:
self.title: str = title or self._title or self.__class__.__name__
self.description: str = description or self._description or ""
self.settings: SettingDotDict[InteractiveSetting] = self.__init_settings__()
def attach(self, cls: Type[T], name: Optional[str] = None):
name = name or cls.setting_id
self.settings[name] = cls
return cls
def detach(self, cls):
return self.settings.pop(cls.__name__, None)
def update(self, smap):
self.settings.update(smap.settings)
def reduce(self, *keys):
for key in keys:
self.settings.pop(key, None)
return
async def make_setting_table(self, parent_id, **kwargs):
"""
Convenience method for generating a rendered setting table.
"""
rows = []
for setting in self.settings.values():
if not setting._virtual:
set = await setting.get(parent_id, **kwargs)
name = set.display_name
value = str(set.formatted)
rows.append((name, value, set.hover_desc))
table_rows = tabulate(
*rows,
row_format="[`{invis}{key:<{pad}}{colon}`](https://lionbot.org \"{field[2]}\")\t{value}"
)
return '\n'.join(table_rows)
class ModelSetting(ModelData, BaseSetting):
...
class ModelConfig:
"""
A ModelConfig provides a central point of configuration for any object described by a single Model.
An instance of a ModelConfig represents configuration for a single object
(given by a single row of the corresponding Model).
The ModelConfig also supports registration of non-model configuration,
to support associated settings (e.g. list-settings) for the object.
This is an ABC, and must be subclassed for each object-type.
"""
settings: SettingDotDict
_model_settings: set
model: Type[RowModel]
def __init__(self, parent_id, row, **kwargs):
self.parent_id = parent_id
self.row = row
self.kwargs = kwargs
@classmethod
def register_setting(cls, setting_cls):
"""
Decorator to register a non-model setting as part of the object configuration.
The setting class may be re-accessed through the `settings` class attr.
Subclasses may provide alternative access pathways to key non-model settings.
"""
cls.settings[setting_cls.setting_id] = setting_cls
return setting_cls
@classmethod
def register_model_setting(cls, model_setting_cls):
"""
Decorator to register a model setting as part of the object configuration.
The setting class may be accessed through the `settings` class attr.
A fresh setting instance may also be retrieved (using cached data)
through the `get` instance method.
Subclasses are recommended to provide model settings as properties
for simplified access and type checking.
"""
cls._model_settings.add(model_setting_cls.setting_id)
return cls.register_setting(model_setting_cls)
def get(self, setting_id):
"""
Retrieve a freshly initialised copy of the given model-setting.
The given `setting_id` must have been previously registered through `register_model_setting`.
This uses cached data, and so is not guaranteed to be up-to-date.
"""
if setting_id not in self._model_settings:
# TODO: Log
raise ValueError
setting_cls = self.settings[setting_id]
data = setting_cls._read_from_row(self.parent_id, self.row, **self.kwargs)
return setting_cls(self.parent_id, data, **self.kwargs)
class ModelSettings:
"""
A ModelSettings instance aggregates multiple `ModelSetting` instances
bound to the same parent id on a single Model.
This enables a single point of access
for settings of a given Model,
with support for caching or deriving as needed.
This is an abstract base class,
and should be subclassed to define the contained settings.
"""
_settings: SettingDotDict = SettingDotDict()
model: Type[RowModel]
def __init__(self, parent_id, row, **kwargs):
self.parent_id = parent_id
self.row = row
self.kwargs = kwargs
@classmethod
async def fetch(cls, *parent_id, **kwargs):
"""
Load an instance of this ModelSetting with the given parent_id
and setting keyword arguments.
"""
row = await cls.model.fetch_or_create(*parent_id)
return cls(parent_id, row, **kwargs)
@classmethod
def attach(self, setting_cls):
"""
Decorator to attach the given setting class to this modelsetting.
"""
# This violates the interface principle, use structured typing instead?
if not (issubclass(setting_cls, BaseSetting) and issubclass(setting_cls, ModelData)):
raise ValueError(
f"The provided setting class must be `ModelSetting`, not {setting_cls.__class__.__name__}."
)
self._settings[setting_cls.setting_id] = setting_cls
return setting_cls
def get(self, setting_id):
setting_cls = self._settings.get(setting_id)
data = setting_cls._read_from_row(self.parent_id, self.row, **self.kwargs)
return setting_cls(self.parent_id, data, **self.kwargs)
def __getitem__(self, setting_id):
return self.get(setting_id)

13
src/settings/mock.py Normal file
View File

@@ -0,0 +1,13 @@
import discord
from discord import app_commands
class LocalString:
def __init__(self, string):
self.string = string
def as_string(self):
return self.string
_ = LocalString

File diff suppressed because it is too large Load Diff

512
src/settings/ui.py Normal file
View File

@@ -0,0 +1,512 @@
from typing import Optional, Callable, Any, Dict, Coroutine, Generic, TypeVar, List
import asyncio
from contextvars import copy_context
import discord
from discord import ui
from discord.ui.button import ButtonStyle, Button, button
from discord.ui.modal import Modal
from discord.ui.text_input import TextInput
from meta.errors import UserInputError
from utils.lib import tabulate, recover_context
from utils.ui import FastModal
from meta.config import conf
from meta.context import ctx_bot
from babel.translator import ctx_translator, LazyStr
from .base import BaseSetting, ParentID, SettingData, SettingValue
from . import babel
_p = babel._p
ST = TypeVar('ST', bound='InteractiveSetting')
class SettingModal(FastModal):
input_field: TextInput = TextInput(label="Edit Setting")
def update_field(self, new_field):
self.remove_item(self.input_field)
self.add_item(new_field)
self.input_field = new_field
class SettingWidget(Generic[ST], ui.View):
# TODO: Permission restrictions and callback!
# Context variables for permitted user(s)? Subclass ui.View with PermittedView?
# Don't need to descend permissions to Modal
# Maybe combine with timeout manager
def __init__(self, setting: ST, auto_write=True, **kwargs):
self.setting = setting
self.update_children()
super().__init__(**kwargs)
self.auto_write = auto_write
self._interaction: Optional[discord.Interaction] = None
self._modal: Optional[SettingModal] = None
self._exports: List[ui.Item] = self.make_exports()
self._context = copy_context()
def update_children(self):
"""
Method called before base View initialisation.
Allows updating the children components (usually explicitly defined callbacks),
before Item instantiation.
"""
pass
def order_children(self, *children):
"""
Helper method to set and order the children using bound methods.
"""
child_map = {child.__name__: child for child in self.__view_children_items__}
self.__view_children_items__ = [child_map[child.__name__] for child in children]
def update_child(self, child, new_args):
args = getattr(child, '__discord_ui_model_kwargs__')
args |= new_args
def make_exports(self):
"""
Called post-instantiation to populate self._exports.
"""
return self.children
def refresh(self):
"""
Update widget components from current setting data, if applicable.
E.g. to update the default entry in a select list after a choice has been made,
or update button colours.
This does not trigger a discord ui update,
that is the responsibility of the interaction handler.
"""
pass
async def show(self, interaction: discord.Interaction, key: Any = None, override=False, **kwargs):
"""
Complete standard setting widget UI flow for this setting.
The SettingWidget components may be attached to other messages as needed,
and they may be triggered individually,
but this coroutine defines the standard interface.
Intended for use by any interaction which wants to "open the setting".
Extra keyword arguments are passed directly to the interaction reply (for e.g. ephemeral).
"""
if key is None:
# By default, only have one widget listener per interaction.
key = ('widget', interaction.id)
# If there is already a widget listening on this key, respect override
if self.setting.get_listener(key) and not override:
# Refuse to spawn another widget
return
async def update_callback(new_data):
self.setting.data = new_data
await interaction.edit_original_response(embed=self.setting.embed, view=self, **kwargs)
self.setting.register_callback(key)(update_callback)
await interaction.response.send_message(embed=self.setting.embed, view=self, **kwargs)
await self.wait()
try:
# Try and detach the view, since we aren't handling events anymore.
await interaction.edit_original_response(view=None)
except discord.HTTPException:
pass
self.setting.deregister_callback(key)
def attach(self, group_view: ui.View):
"""
Attach this setting widget to a view representing several settings.
"""
for item in self._exports:
group_view.add_item(item)
@button(style=ButtonStyle.secondary, label="Edit", row=4)
async def edit_button(self, interaction: discord.Interaction, button: ui.Button):
"""
Spawn a simple edit modal,
populated with `setting.input_field`.
"""
recover_context(self._context)
# Spawn the setting modal
await interaction.response.send_modal(self.modal)
@button(style=ButtonStyle.danger, label="Reset", row=4)
async def reset_button(self, interaction: discord.Interaction, button: Button):
recover_context(self._context)
await interaction.response.defer(thinking=True, ephemeral=True)
await self.setting.interactive_set(None, interaction)
@property
def modal(self) -> Modal:
"""
Build a Modal dialogue for updating the setting.
Refreshes (and re-attaches) the input field each time this is called.
"""
if self._modal is not None:
self._modal.update_field(self.setting.input_field)
return self._modal
# TODO: Attach shared timeouts to the modal
self._modal = modal = SettingModal(
title=f"Edit {self.setting.display_name}",
)
modal.update_field(self.setting.input_field)
@modal.submit_callback()
async def edit_submit(interaction: discord.Interaction):
# TODO: Catch and handle UserInputError
await interaction.response.defer(thinking=True, ephemeral=True)
data = await self.setting._parse_string(self.setting.parent_id, modal.input_field.value)
await self.setting.interactive_set(data, interaction)
return modal
class InteractiveSetting(BaseSetting[ParentID, SettingData, SettingValue]):
__slots__ = ('_widget',)
# Configuration interface descriptions
_display_name: LazyStr # User readable name of the setting
_desc: LazyStr # User readable brief description of the setting
_long_desc: LazyStr # User readable long description of the setting
_accepts: LazyStr # User readable description of the acceptable values
_set_cmd: str = None
_notset_str: LazyStr = _p('setting|formatted|notset', "Not Set")
_virtual: bool = False # Whether the setting should be hidden from tables and dashboards
_required: bool = False
Widget = SettingWidget
# A list of callback coroutines to call when the setting updates
# This can be used globally to refresh state when the setting updates,
# Or locallly to e.g. refresh an active widget.
# The callbacks are called on write, so they may be bypassed by direct use of _writer!
_listeners_: Dict[Any, Callable[[Optional[SettingData]], Coroutine[Any, Any, None]]] = {}
# Optional client event to dispatch when theis setting has been written
# Event handlers should be of the form Callable[ParentID, SettingData]
_event: Optional[str] = None
# Interaction ward that should be validated via interaction_check
_write_ward: Optional[Callable[[discord.Interaction], Coroutine[Any, Any, bool]]] = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._widget: Optional[SettingWidget] = None
@property
def long_desc(self):
t = ctx_translator.get().t
bot = ctx_bot.get()
return t(self._long_desc).format(
bot=bot,
cmds=bot.core.mention_cache
)
@property
def display_name(self):
t = ctx_translator.get().t
return t(self._display_name)
@property
def desc(self):
t = ctx_translator.get().t
return t(self._desc)
@property
def accepts(self):
t = ctx_translator.get().t
return t(self._accepts)
async def write(self, **kwargs) -> None:
await super().write(**kwargs)
self.dispatch_update()
for listener in self._listeners_.values():
asyncio.create_task(listener(self.data))
def dispatch_update(self):
"""
Dispatch a client event along `self._event`, if set.
Override to modify the target event handler arguments.
By default, event handlers should be of the form:
Callable[[ParentID, SettingData], Coroutine[Any, Any, None]]
"""
if self._event is not None and (bot := ctx_bot.get()) is not None:
bot.dispatch(self._event, self.parent_id, self)
def get_listener(self, key):
return self._listeners_.get(key, None)
@classmethod
def register_callback(cls, name=None):
def wrapped(coro):
cls._listeners_[name or coro.__name__] = coro
return coro
return wrapped
@classmethod
def deregister_callback(cls, name):
cls._listeners_.pop(name, None)
@property
def update_message(self):
"""
Response message sent when the setting has successfully been updated.
Should generally be one line.
"""
if self.data is None:
return "Setting reset!"
else:
return f"Setting Updated! New value: {self.formatted}"
@property
def hover_desc(self):
"""
This no longer works since Discord changed the hover rules.
return '\n'.join((
self.display_name,
'=' * len(self.display_name),
self.desc,
f"\nAccepts: {self.accepts}"
))
"""
return self.desc
async def update_response(self, interaction: discord.Interaction, message: Optional[str] = None, **kwargs):
"""
Respond to an interaction which triggered a setting update.
Usually just wraps `update_message` in an embed and sends it back.
Passes any extra `kwargs` to the message creation method.
"""
embed = discord.Embed(
description=f"{str(conf.emojis.tick)} {message or self.update_message}",
colour=discord.Color.green()
)
if interaction.response.is_done():
await interaction.edit_original_response(embed=embed, **kwargs)
else:
await interaction.response.send_message(embed=embed, **kwargs)
async def interactive_set(self, new_data: Optional[SettingData], interaction: discord.Interaction, **kwargs):
self.data = new_data
await self.write()
await self.update_response(interaction, **kwargs)
async def format_in(self, bot, **kwargs):
"""
Formatted version of the setting given an asynchronous context with client.
"""
return self.formatted
@property
def embed_field(self):
"""
Returns a {name, value} pair for use in an Embed field.
"""
name = self.display_name
value = f"{self.long_desc}\n{self.desc_table}"
if len(value) > 1024:
t = ctx_translator.get().t
desc_table = '\n'.join(
tabulate(
*self._desc_table(
show_value=t(_p(
'setting|embed_field|too_long',
"Too long to display here!"
))
)
)
)
value = f"{self.long_desc}\n{desc_table}"
if len(value) > 1024:
# Forcibly trim
value = value[:1020] + '...'
return {'name': name, 'value': value}
@property
def set_str(self):
if self._set_cmd is not None:
bot = ctx_bot.get()
if bot:
return bot.core.mention_cmd(self._set_cmd)
else:
return f"`/{self._set_cmd}`"
@property
def notset_str(self):
t = ctx_translator.get().t
return t(self._notset_str)
@property
def embed(self):
"""
Returns a full embed describing this setting.
"""
t = ctx_translator.get().t
embed = discord.Embed(
title=t(_p(
'setting|summary_embed|title',
"Configuration options for `{name}`"
)).format(name=self.display_name),
)
embed.description = "{}\n{}".format(self.long_desc.format(self=self), self.desc_table)
return embed
def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]:
t = ctx_translator.get().t
lines = []
# Currently line
lines.append((
t(_p('setting|summary_table|field:currently|key', "Currently")),
show_value or (self.formatted or self.notset_str)
))
# Default line
if (default := self.default) is not None:
lines.append((
t(_p('setting|summary_table|field:default|key', "By Default")),
self._format_data(self.parent_id, default) or 'None'
))
# Set using line
if (set_str := self.set_str) is not None:
lines.append((
t(_p('setting|summary_table|field:set|key', "Set Using")),
set_str
))
return lines
@property
def desc_table(self) -> str:
return '\n'.join(tabulate(*self._desc_table()))
@property
def input_field(self) -> TextInput:
"""
TextInput field used for string-based setting modification.
May be added to external modal for grouped setting editing.
This property is not persistent, and creates a new field each time.
"""
return TextInput(
label=self.display_name,
placeholder=self.accepts,
default=self.input_formatted[:4000] if self.input_formatted else None,
required=self._required
)
@property
def widget(self):
"""
Returns the Discord UI View associated with the current setting.
"""
if self._widget is None:
self._widget = self.Widget(self)
return self._widget
@classmethod
def set_widget(cls, WidgetCls):
"""
Convenience decorator to create the widget class for this setting.
"""
cls.Widget = WidgetCls
return WidgetCls
@property
def formatted(self):
"""
Default user-readable form of the setting.
Should be a short single line.
"""
return self._format_data(self.parent_id, self.data, **self.kwargs) or self.notset_str
@property
def input_formatted(self) -> str:
"""
Format the current value as a default value for an input field.
Returned string must be acceptable through parse_string.
Does not take into account defaults.
"""
if self._data is not None:
return str(self._data)
else:
return ""
@property
def summary(self):
"""
Formatted summary of the data.
May be implemented in `_format_data(..., summary=True, ...)` or overidden.
"""
return self._format_data(self.parent_id, self.data, summary=True, **self.kwargs)
@classmethod
async def from_string(cls, parent_id, userstr: str, **kwargs):
"""
Return a setting instance initialised from a parsed user string.
"""
data = await cls._parse_string(parent_id, userstr, **kwargs)
return cls(parent_id, data, **kwargs)
@classmethod
async def from_value(cls, parent_id, value, **kwargs):
await cls._check_value(parent_id, value, **kwargs)
data = cls._data_from_value(parent_id, value, **kwargs)
return cls(parent_id, data, **kwargs)
@classmethod
async def _parse_string(cls, parent_id, string: str, **kwargs) -> Optional[SettingData]:
"""
Parse user provided string (usually from a TextInput) into raw setting data.
Must be overriden by the setting if the setting is user-configurable.
Returns None if the setting was unset.
"""
raise NotImplementedError
@classmethod
def _format_data(cls, parent_id, data, **kwargs):
"""
Convert raw setting data into a formatted user-readable string,
representing the current value.
"""
raise NotImplementedError
@classmethod
async def _check_value(cls, parent_id, value, **kwargs):
"""
Check the provided value is valid.
Many setting update methods now provide Discord objects instead of raw data or user strings.
This method may be used for value-checking such a value.
Raises UserInputError if the value fails validation.
"""
pass
@classmethod
async def interaction_check(cls, parent_id, interaction: discord.Interaction, **kwargs):
if cls._write_ward is not None and not await cls._write_ward(interaction):
# TODO: Combine the check system so we can do customised errors here
t = ctx_translator.get().t
raise UserInputError(t(_p(
'setting|interaction_check|error',
"You do not have sufficient permissions to do this!"
)))
"""
command callback for set command?
autocomplete for set command?
Might be better in a ConfigSetting subclass.
But also mix into the base setting types.
"""

View File

@@ -845,3 +845,35 @@ def write_records(records: list[dict[str, Any]], stream: StringIO):
for record in records: for record in records:
stream.write(','.join(map(str, record.values()))) stream.write(','.join(map(str, record.values())))
stream.write('\n') stream.write('\n')
parse_dur_exps = [
(
r"(?P<value>\d+)\s*(?:(d)|(day))",
60 * 60 * 24,
),
(
r"(?P<value>\d+)\s*(?:(h)|(hour))",
60 * 60
),
(
r"(?P<value>\d+)\s*(?:(m)|(min))",
60
),
(
r"(?P<value>\d+)\s*(?:(s)|(sec))",
1
)
]
def parse_duration(string: str) -> Optional[int]:
seconds = 0
found = False
for expr, multiplier in parse_dur_exps:
match = re.search(expr, string, flags=re.IGNORECASE)
if match:
found = True
seconds += int(match.group('value')) * multiplier
return seconds if found else None

View File

@@ -1,8 +1,12 @@
import asyncio import asyncio
import logging import logging
from babel.translator import LocalBabel
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
util_babel = LocalBabel('utils')
from .hooked import * from .hooked import *
from .leo import * from .leo import *
from .micros import * from .micros import *
from .msgeditor import *

1057
src/utils/ui/msgeditor.py Normal file

File diff suppressed because it is too large Load Diff